Cómo resolver el error java.io. NoSerializable ¿Excepción trabajando en el marco de datos?

def URLEnc(input: String): String = {
    URLEncoder.encode(input, "UTF-8")
  }

  val URLEncUDF: UserDefinedFunction = udf(URLEnc(_: String))
val file = spark.read.format("xml")
    .option("rootTag", "channel").option("rowTag", "item")
    .load("path")
 where file is of xml format 

val file1 = file.withColumn("description", URLEncUDF(col("g:description")))

los registros se ven como abajo:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2362)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:434)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:826)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:803)
    at AIFeed.(AIFeed.scala:16)
    at AIFeed$.delayedEndpoint$AIFeed$1(AIFeed.scala:113)
    at AIFeed$delayedInit$body.apply(AIFeed.scala:112)
    at scala.Function0.apply$mcV$sp(Function0.scala:39)
    at scala.Function0.apply$mcV$sp$(Function0.scala:39)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
    at scala.App.$anonfun$main$1$adapted(App.scala:80)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.App.main(App.scala:80)
    at scala.App.main$(App.scala:78)
    at AIFeed$.main(AIFeed.scala:112)
    at AIFeed.main(AIFeed.scala)
Caused by: java.io.NotSerializableException: AIFeed
Serialization stack:
    - object not serializable (class: AIFeed, value: AIFeed@5bccef9f)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class FeedFunction, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic FeedFunction.$anonfun$URLEncUDF$1:(LFeedFunction;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class FeedFunction$$Lambda$275/1443173326, FeedFunction$$Lambda$275/1443173326@51e94b7d)
    - element of array (index: 5)
    - array (class [Ljava.lang.Object;, size 6)
    - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2116/996471089, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2116/996471089@565a6af)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
    ... 45 more
20/12/16 17:55:15 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.1.4:34511 in memory (size: 2.9 KiB, free: 1407.3 MiB)
20/12/16 17:55:15 INFO SparkContext: Invoking stop() from shutdown hook
20/12/16 17:55:15 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.1.4:34511 in memory (size: 23.7 KiB, free: 1407.3 MiB)
20/12/16 17:55:15 INFO SparkUI: Stopped Spark web UI at http://192.168.1.4:4040
20/12/16 17:55:15 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/12/16 17:55:15 INFO MemoryStore: MemoryStore cleared
20/12/16 17:55:15 INFO BlockManager: BlockManager stopped
20/12/16 17:55:15 INFO BlockManagerMaster: BlockManagerMaster stopped
20/12/16 17:55:15 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/12/16 17:55:15 INFO SparkContext: Successfully stopped SparkContext
20/12/16 17:55:15 INFO ShutdownHookManager: Shutdown hook called

Pregunta hecha hace 3 años, 5 meses, 0 días - Por csscraftsman


3 Respuestas:

  • Usted puede ver desde el rastro de la pila que Spark es incapaz de serializar AIFeed, que es presumiblemente la clase que encierra el código que publicaste.

    Esto es porque tu UDF confía en URLEnc, que es un método de esa clase, y así tiene una referencia a la clase de clausura. Spark está tratando de serializar toda tu clase de conductor, que no es serializable. A veces simplemente puedes marcar la clase como serializable para arreglar esto.

    Usted generalmente puede arreglar esto más simplemente haciendo URLEnc una función más que un método, por ejemplo, en lugar de:

    def someMeth(a:Int): Int = a + 1
    

    uso:

    val someFunc = (a: Int) => a + 1
    

    Respondida el Dec 16, 2020 a las 12:42 - por Brian

    Votos positivos: 0 | Votos negativos: 0

  • El error "Task not serializable" generalmente se produce cuando se intenta serializar una función o clase que no es serializable en Spark. En este caso, parece que la clase AIFeed que contiene la lógica de tu código no es serializable, lo cual está causando el error al intentar ejecutar la tarea en Spark.

    Para resolver este problema, puedes intentar mover la definición de la función URLEnc y la creación del UserDefinedFunction dentro del cuerpo del objeto AIFeed o de una función local dentro del main en lugar de tenerlos a nivel de clase. De esta forma, la serialización se gestionará de manera más efectiva.

    Aquí tienes un ejemplo de cómo podrías estructurar tu código para evitar el error de serialización:

    scala
    object AIFeed {
      def URLEnc(input: String): String = {
        URLEncoder.encode(input, "UTF-8")
      }
    
      def main(args: Array[String]): Unit = {
        // Crear SparkSession y otras configuraciones
    
        import org.apache.spark.sql.functions.udf
    
        val URLEncUDF = udf((input: String) => URLEnc(input))
    
        val file = spark.read.format("xml")
          .option("rootTag", "channel").option("rowTag", "item")
          .load("path")
    
        val file1 = file.withColumn("description", URLEncUDF(col("g:description")))
    
        // Resto de la lógica de tu programa
    
        spark.stop()
      }
    }
    

    Al mover la definición de la función y el UserDefinedFunction dentro de la función main, deberías poder evitar el error de serialización y ejecutar tu código correctamente en Spark.

    Respondida el Dec 17, 2020 a las 12:42 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • El error java.io.NotSerializableException ocurre cuando intentas enviar una función que no es serializable a un nodo de trabajo. En este caso, la función URLEncUDF no es serializable porque hace referencia a la clase AIFeed, que no es serializable.

    Para resolver este problema, puedes hacer que la clase AIFeed sea serializable implementando la interfaz Serializable. Alternativamente, puedes definir la función URLEncUDF dentro de la clase AIFeed y hacer que la clase AIFeed sea un singleton. Esto garantizará que solo haya una instancia de la clase AIFeed y que sea serializable.

    A continuación, se muestra un ejemplo de cómo hacer que la clase AIFeed sea serializable:

    scala
    class AIFeed extends Serializable {
    
      def URLEnc(input: String): String = {
        URLEncoder.encode(input, "UTF-8")
      }
    
      val URLEncUDF: UserDefinedFunction = udf(URLEnc(_: String))
    
      ...
    }
    

    Una vez que hayas hecho que la clase AIFeed sea serializable, podrás utilizar la función URLEncUDF en tu código sin encontrar el error java.io.NotSerializableException.

    Respondida el Dec 17, 2020 a las 12:45 - por Gemini

    Votos positivos: 0 | Votos negativos: 0