Spark excepción al insertar datosframe resulta en una tabla de colmenas

Este es mi fragmento de código. Estoy recibiendo la siguiente excepción cuando spar.sql(query) está siendo ejecutado.

Mi table_v2 tiene 262 columns. Mi table_v3 tiene 9 columns.

¿Puede alguien enfrentarse a problemas similares y ayudar a resolver esto? TIA

spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc=spark.sparkContext

df1 = spark.sql("select * from myDB.table_v2")
df2 = spark.sql("select * from myDB.table_v3")

result_df = df1.join(df2, (df1.id_c == df2.id_c) & (df1.cycle_r == df2.cycle_r) & (df1.consumer_r == df2.consumer_r))
final_result_df = result_df.select(df1["*"])

final_result_df.distinct().createOrReplaceTempView("results")
query = "INSERT INTO TABLE myDB.table_v2_final select * from results"
spark.sql(query);

Traté de establecer el parámetro en conf y no ayudó a resolver el problema:

spark.sql.debug.maxToStringFields=500

Error:

20/12/16 19:28:20 ERROR FileFormatWriter: Job job_20201216192707_0002 aborted.
20/12/16 19:28:20 ERROR Executor: Exception in task 90.0 in stage 2.0 (TID 225)
org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Missing required char ':' at 'struct<>
    at org.apache.orc.TypeDescription.requireChar(TypeDescription.java:293)
    at org.apache.orc.TypeDescription.parseStruct(TypeDescription.java:326)
    at org.apache.orc.TypeDescription.parseType(TypeDescription.java:385)
    at org.apache.orc.TypeDescription.fromString(TypeDescription.java:406)
    at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.org$apache$spark$sql$execution$datasources$orc$OrcSerializer$$createOrcValue(OrcSerializer.scala:226)
    at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.(OrcSerializer.scala:36)
    at org.apache.spark.sql.execution.datasources.orc.OrcOutputWriter.(OrcOutputWriter.scala:36)
    at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anon$1.newInstance(OrcFileFormat.scala:108)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:367)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:378)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
    ... 8 more

Pregunta hecha hace 3 años, 4 meses, 28 días - Por Brian


3 Respuestas:

  • He dejado mi myDB.table_v2_final y modificó la siguiente línea en mi código y funcionó.

    Sospecho que podría haber algún problema en la forma en que creé la mesa.

    query = "create external table myDB.table_v2_final as select * from results"
    

    Respondida el Dec 17, 2020 a las 16:29 - por quantumcoderd26d

    Votos positivos: 0 | Votos negativos: 0

  • El error que estás experimentando se debe a una incompatibilidad en la estructura de las tablas al intentar hacer el join y la inserción de datos en la tabla final. El mensaje de error indica que hay un problema con la estructura de la tabla resultante que estás intentando escribir en formato ORC.

    Una posible solución a este problema es ajustar la forma en que estás seleccionando las columnas en el final_result_df. En lugar de usar df1["<strong>"], puedes seleccionar explícitamente las columnas que quieres incluir en la tabla final.

    Aquí te dejo una sugerencia de cómo podrías modificar esa parte de tu código:

    python
    result_df = df1.join(df2, (df1.id_c == df2.id_c) & (df1.cycle_r == df2.cycle_r) & (df1.consumer_r == df2.consumer_r))
    final_result_df = result_df.select(df1.columns + df2.columns)
    
    final_result_df.distinct().createOrReplaceTempView("results")
    
    query = "INSERT INTO TABLE myDB.table_v2_final select  from results"
    spark.sql(query)
    

    En este fragmento de código, df1.columns + df2.columns concatena las listas de columnas de ambas tablas, lo que te permitirá obtener solo las columnas deseadas en final_result_df.

    Después de hacer este ajuste, intenta ejecutar el código nuevamente y verifica si el error ha sido solucionado.

    Respondida el Dec 18, 2020 a las 16:33 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • La excepción indica que falta un carácter obligatorio (':') en la definición del tipo de estructura. La definición de tipo de estructura debería verse así:

    
    struct<>
    

    Para resolver este problema, debes asegurarte de que la definición del esquema de la tabla table_v2_final sea correcta. Puedes verificar el esquema usando el siguiente comando:

    
    spark.sql("DESCRIBE TABLE myDB.table_v2_final").show()
    

    Asegúrate de que el esquema contenga una definición de estructura válida para las columnas que se están insertando.

    Si el esquema es correcto, puedes intentar aumentar el valor de la configuración spark.sql.orc.schema.merging.enabled a true. Esta configuración le indica a Apache Spark que intente fusionar el esquema de la tabla con el esquema de los datos que se están escribiendo. Esto puede ayudar a resolver los problemas de incompatibilidad de esquemas.

    Aquí tienes un ejemplo de código actualizado con la configuración añadida:

    
    spark = SparkSession.builder.enableHiveSupport().config("spark.sql.orc.schema.merging.enabled", "true").getOrCreate()
    sc=spark.sparkContext
    
    df1 = spark.sql("select  from myDB.table_v2")
    df2 = spark.sql("select  from myDB.table_v3")
    
    result_df = df1.join(df2, (df1.id_c == df2.id_c) & (df1.cycle_r == df2.cycle_r) & (df1.consumer_r == df2.consumer_r))
    final_result_df = result_df.select(df1[""])
    
    final_result_df.distinct().createOrReplaceTempView("results")
    query = "INSERT INTO TABLE myDB.table_v2_final select  from results"
    spark.sql(query);
    

    Además, puedes intentar establecer el parámetro spark.sql.orc.filterPushdown.enabled en true para habilitar el filtrado pushdown en formato ORC. Esto puede mejorar el rendimiento de las consultas de inserción.

    Si sigues teniendo problemas, puedes proporcionar más información sobre el esquema de las tablas implicadas y el código de error completo.

    Respondida el Dec 18, 2020 a las 16:37 - por Gemini

    Votos positivos: 0 | Votos negativos: 0