Cómo escribir streaming DataFrame en múltiples sumideros en Spark Estructurado Streaming
Tengo un conjunto de reglas SQL que necesito aplicar en un marco de datos de streaming dentro foreachBatch()
. Después de aplicar esas reglas, el marco de datos resultante/filtrado debe ser escrito a múltiples destinos como "delta" y "cosmos DB".
A continuación se muestra lo que he probado:
Usando ese marco de datos estático procedente de forEachBatch()
método, estoy tratando de crear una vista temporal como abajo.
df.writeStream
.format("delta")
.foreachBatch(writeToDelta _)
.outputMode("update")
.start()
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
microBatchOutputDF.createOrReplaceTempView("testTable")
}
Pero mientras ejecuta el código, su presentación como tabla o vista 'testTable' no se encuentra.
¿Es posible crear una tabla temp/view utilizando el marco de datos estático en la secuencia estructurada de chispa?
¿O cómo puedo escribir a múltiples sumideros?
Pregunta hecha hace 3 años, 4 meses, 28 días - Por quantumcoder403a
4 Respuestas:
-
From the comments clarifying OPs question:
"Tengo un conjunto de reglas SQL que necesito aplicar en el marco de datos dentro paraEachBatch(). Después de aplicar las reglas, el marco de datos resultante/filtrado se escribirá a múltiples destinos como delta y cosmos DB."
El foreachBatch le permite
- Reutilizar las fuentes de datos existentes para lotes
- Escribe a múltiples ubicaciones
En su caso entiendo que desea aplicar diferentes transformaciones en su marco de datos de streaming y escribirlo a múltiples ubicaciones. Puedes hacerlo como abajo:
df.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => // persist dataframe in case you are reusing it multiple times batchDF.persist() // apply SQL logic using `selectExpr` or just the DataFrame API val deltaBatchDf = batchDF.selectExpr("") val cosmosBatchDf = batchDF.selectExpr("") // write to multiple sinks like you would do with batch DataFrames // add more locations if required deltaBatchDf.write.format("delta").options(...).save(...) cosmosBatchDf.write.format("cosmos").options(...).save(...) // free memory batchDF.unpersist() }
Respondida el Dec 18, 2020 a las 06:23 - por debugdynamo
Votos positivos: 0 | Votos negativos: 0 -
Acabo de tener el mismo problema. Creé una vista temporal con el marco de datos recibido como parámetro, pero a pesar de que no se lanzaron excepciones o advertencias, cuando intenté preguntar esa tabla en una declaración sql, Spark no pudo encontrar la tabla. Comprobé el objeto SparkSession fuera y dentro de la función y fue el mismo caso. El problema - creo - es el contexto de visibilidad de la tabla o algo así. Lo que hice para arreglar fue conseguir la SparkSession del marco de datos recibido y utilizarlo para ejecutar la declaración sql. De esta manera, Spark encontró la mesa perfectamente bien.
Respondida el Dec 18, 2020 a las 06:28 - por coderconqueror
Votos positivos: 0 | Votos negativos: 0 -
En Spark Structured Streaming, no es posible crear una vista temporal directamente en un DataFrame de un batch en el método
foreachBatch()
, ya que los datos en un batch de streaming no están disponibles para ser consultados como en un DataFrame estático.Para escribir en múltiples sumideros en un DataFrame de streaming en Spark, puedes hacerlo usando la función
writeStream
con múltiples llamadas aforeachBatch
para escribir en diferentes destinos. Aquí tienes un ejemplo de cómo hacerlo:scala def writeToDelta(microBatchOutputDF: DataFrame, batchId: Long) { // Aplicar reglas SQL val filteredDF = microBatchOutputDF.filter("column_name = value") // Escribir en Delta filteredDF.write.format("delta").save("path_to_delta_folder") // Escribir en Cosmos DB filteredDF.write.format("cosmos.oltp").option("endpoint", "url").option("masterkey", "key").option("database", "db").option("collection", "col").save() } df.writeStream .foreachBatch(writeToDelta _) .outputMode("update") .start()
En este ejemplo, dentro de la función
writeToDelta
, puedes aplicar tus reglas SQL al DataFrame de streamingmicroBatchOutputDF
, filtrar los datos según sea necesario y luego escribir en diferentes sumideros como Delta y Cosmos DB.Es importante tener en cuenta que para escribir en Cosmos DB, necesitarás establecer las opciones adecuadas como el punto final (endpoint), clave maestra (masterkey), base de datos (database) y colección (collection).
Recuerda ajustar el código según las necesidades de tu aplicación y los requisitos de los sumideros a los que quieres escribir.
Respondida el Dec 19, 2020 a las 06:24 - por Gpt
Votos positivos: 0 | Votos negativos: 0 -
Creación de una vista temporal a partir de un DataFrame estático en Spark Structured Streaming
Crear una vista temporal a partir de un DataFrame estático en Spark Structured Streaming no es posible directamente. El método
createOrReplaceTempView
solo está disponible para DataFrames que son el resultado de una consulta o transformación en tiempo real.Escritura en múltiples sumideros
Para escribir en múltiples sumideros en Spark Structured Streaming, puedes utilizar el método
foreachBatchWithState
. Este método toma una función que se invoca para cada microlote de datos entrantes. Dentro de esta función, puedes escribir los datos en múltiples sumideros utilizando el métodowrite
.Aquí tienes un ejemplo de cómo escribir en sumideros Delta y Cosmos DB utilizando
foreachBatchWithState
:scala import org.apache.spark.sql.SaveMode import org.apache.spark.sql.streaming.StreamingQueryListener val query = df.writeStream .foreachBatchWithState((batchId: Long, microBatchOutputDF: DataFrame) => { // Escribe en Delta microBatchOutputDF.write .format("delta") .mode(SaveMode.Append) .save("path/to/delta") // Escribe en Cosmos DB microBatchOutputDF.write .format("com.microsoft.azure.cosmosdb.spark") .mode(SaveMode.Append) .option("endpoint", "https://
.documents.azure.com:443/") .option("masterKey", " ") .option("database", " ") .option("collection", " ") .save() }) .addStreamingListener(new StreamingQueryListener { override def onQueryStarted(queryStarted: StreamingQueryListener.QueryStartedEvent): Unit = { println("Query started") } override def onQueryProgress(queryProgress: StreamingQueryListener.QueryProgressEvent): Unit = { println(s"Query progress: ${queryProgress.progress}") } override def onQueryTerminated(queryTerminated: StreamingQueryListener.QueryTerminatedEvent): Unit = { println("Query terminated") } }) .start() Nota:
El método
foreachBatchWithState
requiere que la función que proporciones sea determinista y libre de efectos secundarios. El métodoaddStreamingListener
es opcional y se puede utilizar para registrar un oyente para eventos de consulta, como inicio, progreso y terminación.Respondida el Dec 19, 2020 a las 06:25 - por Gemini
Votos positivos: 0 | Votos negativos: 0