Cómo escribir streaming DataFrame en múltiples sumideros en Spark Estructurado Streaming

Tengo un conjunto de reglas SQL que necesito aplicar en un marco de datos de streaming dentro foreachBatch(). Después de aplicar esas reglas, el marco de datos resultante/filtrado debe ser escrito a múltiples destinos como "delta" y "cosmos DB".

A continuación se muestra lo que he probado: Usando ese marco de datos estático procedente de forEachBatch() método, estoy tratando de crear una vista temporal como abajo.

df.writeStream
  .format("delta")
  .foreachBatch(writeToDelta _)
  .outputMode("update")
  .start()

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
    microBatchOutputDF.createOrReplaceTempView("testTable")
}

Pero mientras ejecuta el código, su presentación como tabla o vista 'testTable' no se encuentra.

¿Es posible crear una tabla temp/view utilizando el marco de datos estático en la secuencia estructurada de chispa?

¿O cómo puedo escribir a múltiples sumideros?

Pregunta hecha hace 3 años, 4 meses, 28 días - Por quantumcoder403a


4 Respuestas:

  • From the comments clarifying OPs question:

    "Tengo un conjunto de reglas SQL que necesito aplicar en el marco de datos dentro paraEachBatch(). Después de aplicar las reglas, el marco de datos resultante/filtrado se escribirá a múltiples destinos como delta y cosmos DB."

    El foreachBatch le permite

    • Reutilizar las fuentes de datos existentes para lotes
    • Escribe a múltiples ubicaciones

    En su caso entiendo que desea aplicar diferentes transformaciones en su marco de datos de streaming y escribirlo a múltiples ubicaciones. Puedes hacerlo como abajo:

    df.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    
      // persist dataframe in case you are reusing it multiple times
      batchDF.persist()
    
      // apply SQL logic using `selectExpr` or just the DataFrame API
      val deltaBatchDf = batchDF.selectExpr("") 
      val cosmosBatchDf = batchDF.selectExpr("") 
    
      // write to multiple sinks like you would do with batch DataFrames
      // add more locations if required
      deltaBatchDf.write.format("delta").options(...).save(...)
      cosmosBatchDf.write.format("cosmos").options(...).save(...)
    
      // free memory
      batchDF.unpersist()
    }
    
    

    Respondida el Dec 18, 2020 a las 06:23 - por debugdynamo

    Votos positivos: 0 | Votos negativos: 0

  • Acabo de tener el mismo problema. Creé una vista temporal con el marco de datos recibido como parámetro, pero a pesar de que no se lanzaron excepciones o advertencias, cuando intenté preguntar esa tabla en una declaración sql, Spark no pudo encontrar la tabla. Comprobé el objeto SparkSession fuera y dentro de la función y fue el mismo caso. El problema - creo - es el contexto de visibilidad de la tabla o algo así. Lo que hice para arreglar fue conseguir la SparkSession del marco de datos recibido y utilizarlo para ejecutar la declaración sql. De esta manera, Spark encontró la mesa perfectamente bien.

    Respondida el Dec 18, 2020 a las 06:28 - por coderconqueror

    Votos positivos: 0 | Votos negativos: 0

  • En Spark Structured Streaming, no es posible crear una vista temporal directamente en un DataFrame de un batch en el método foreachBatch(), ya que los datos en un batch de streaming no están disponibles para ser consultados como en un DataFrame estático.

    Para escribir en múltiples sumideros en un DataFrame de streaming en Spark, puedes hacerlo usando la función writeStream con múltiples llamadas a foreachBatch para escribir en diferentes destinos. Aquí tienes un ejemplo de cómo hacerlo:

    scala
    def writeToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
        // Aplicar reglas SQL
        val filteredDF = microBatchOutputDF.filter("column_name = value")
    
        // Escribir en Delta
        filteredDF.write.format("delta").save("path_to_delta_folder")
    
        // Escribir en Cosmos DB
        filteredDF.write.format("cosmos.oltp").option("endpoint", "url").option("masterkey", "key").option("database", "db").option("collection", "col").save()
    }
    
    df.writeStream
      .foreachBatch(writeToDelta _)
      .outputMode("update")
      .start()
    

    En este ejemplo, dentro de la función writeToDelta, puedes aplicar tus reglas SQL al DataFrame de streaming microBatchOutputDF, filtrar los datos según sea necesario y luego escribir en diferentes sumideros como Delta y Cosmos DB.

    Es importante tener en cuenta que para escribir en Cosmos DB, necesitarás establecer las opciones adecuadas como el punto final (endpoint), clave maestra (masterkey), base de datos (database) y colección (collection).

    Recuerda ajustar el código según las necesidades de tu aplicación y los requisitos de los sumideros a los que quieres escribir.

    Respondida el Dec 19, 2020 a las 06:24 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • Creación de una vista temporal a partir de un DataFrame estático en Spark Structured Streaming

    Crear una vista temporal a partir de un DataFrame estático en Spark Structured Streaming no es posible directamente. El método createOrReplaceTempView solo está disponible para DataFrames que son el resultado de una consulta o transformación en tiempo real.

    Escritura en múltiples sumideros

    Para escribir en múltiples sumideros en Spark Structured Streaming, puedes utilizar el método foreachBatchWithState. Este método toma una función que se invoca para cada microlote de datos entrantes. Dentro de esta función, puedes escribir los datos en múltiples sumideros utilizando el método write.

    Aquí tienes un ejemplo de cómo escribir en sumideros Delta y Cosmos DB utilizando foreachBatchWithState:

    scala
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.streaming.StreamingQueryListener
    
    val query = df.writeStream
      .foreachBatchWithState((batchId: Long, microBatchOutputDF: DataFrame) => {
        // Escribe en Delta
        microBatchOutputDF.write
          .format("delta")
          .mode(SaveMode.Append)
          .save("path/to/delta")
    
        // Escribe en Cosmos DB
        microBatchOutputDF.write
          .format("com.microsoft.azure.cosmosdb.spark")
          .mode(SaveMode.Append)
          .option("endpoint", "https://.documents.azure.com:443/")
          .option("masterKey", "")
          .option("database", "")
          .option("collection", "")
          .save()
      })
      .addStreamingListener(new StreamingQueryListener {
        override def onQueryStarted(queryStarted: StreamingQueryListener.QueryStartedEvent): Unit = {
          println("Query started")
        }
    
        override def onQueryProgress(queryProgress: StreamingQueryListener.QueryProgressEvent): Unit = {
          println(s"Query progress: ${queryProgress.progress}")
        }
    
        override def onQueryTerminated(queryTerminated: StreamingQueryListener.QueryTerminatedEvent): Unit = {
          println("Query terminated")
        }
      })
      .start()
    

    Nota:

    El método foreachBatchWithState requiere que la función que proporciones sea determinista y libre de efectos secundarios. El método addStreamingListener es opcional y se puede utilizar para registrar un oyente para eventos de consulta, como inicio, progreso y terminación.

    Respondida el Dec 19, 2020 a las 06:25 - por Gemini

    Votos positivos: 0 | Votos negativos: 0