Cómo resolver el error java.io. NoSerializable ¿Excepción trabajando en el marco de datos?
def URLEnc(input: String): String = {
URLEncoder.encode(input, "UTF-8")
}
val URLEncUDF: UserDefinedFunction = udf(URLEnc(_: String))
val file = spark.read.format("xml")
.option("rootTag", "channel").option("rowTag", "item")
.load("path")
where file is of xml format
val file1 = file.withColumn("description", URLEncUDF(col("g:description")))
los registros se ven como abajo:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2362)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:434)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at org.apache.spark.sql.Dataset.show(Dataset.scala:826)
at org.apache.spark.sql.Dataset.show(Dataset.scala:803)
at AIFeed.(AIFeed.scala:16)
at AIFeed$.delayedEndpoint$AIFeed$1(AIFeed.scala:113)
at AIFeed$delayedInit$body.apply(AIFeed.scala:112)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at scala.Function0.apply$mcV$sp$(Function0.scala:39)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
at scala.App.$anonfun$main$1$adapted(App.scala:80)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.App.main(App.scala:80)
at scala.App.main$(App.scala:78)
at AIFeed$.main(AIFeed.scala:112)
at AIFeed.main(AIFeed.scala)
Caused by: java.io.NotSerializableException: AIFeed
Serialization stack:
- object not serializable (class: AIFeed, value: AIFeed@5bccef9f)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class FeedFunction, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic FeedFunction.$anonfun$URLEncUDF$1:(LFeedFunction;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class FeedFunction$$Lambda$275/1443173326, FeedFunction$$Lambda$275/1443173326@51e94b7d)
- element of array (index: 5)
- array (class [Ljava.lang.Object;, size 6)
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 3)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2116/996471089, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2116/996471089@565a6af)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
... 45 more
20/12/16 17:55:15 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.1.4:34511 in memory (size: 2.9 KiB, free: 1407.3 MiB)
20/12/16 17:55:15 INFO SparkContext: Invoking stop() from shutdown hook
20/12/16 17:55:15 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.1.4:34511 in memory (size: 23.7 KiB, free: 1407.3 MiB)
20/12/16 17:55:15 INFO SparkUI: Stopped Spark web UI at http://192.168.1.4:4040
20/12/16 17:55:15 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/12/16 17:55:15 INFO MemoryStore: MemoryStore cleared
20/12/16 17:55:15 INFO BlockManager: BlockManager stopped
20/12/16 17:55:15 INFO BlockManagerMaster: BlockManagerMaster stopped
20/12/16 17:55:15 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/12/16 17:55:15 INFO SparkContext: Successfully stopped SparkContext
20/12/16 17:55:15 INFO ShutdownHookManager: Shutdown hook called
Pregunta hecha hace 3 años, 5 meses, 7 días - Por csscraftsman
3 Respuestas:
-
Usted puede ver desde el rastro de la pila que Spark es incapaz de serializar
AIFeed
, que es presumiblemente la clase que encierra el código que publicaste.Esto es porque tu UDF confía en
URLEnc
, que es un método de esa clase, y así tiene una referencia a la clase de clausura. Spark está tratando de serializar toda tu clase de conductor, que no es serializable. A veces simplemente puedes marcar la clase comoserializable
para arreglar esto.Usted generalmente puede arreglar esto más simplemente haciendo
URLEnc
una función más que un método, por ejemplo, en lugar de:def someMeth(a:Int): Int = a + 1
uso:
val someFunc = (a: Int) => a + 1
Respondida el Dec 16, 2020 a las 12:42 - por Brian
Votos positivos: 0 | Votos negativos: 0 -
El error "Task not serializable" generalmente se produce cuando se intenta serializar una función o clase que no es serializable en Spark. En este caso, parece que la clase
AIFeed
que contiene la lógica de tu código no es serializable, lo cual está causando el error al intentar ejecutar la tarea en Spark.Para resolver este problema, puedes intentar mover la definición de la función
URLEnc
y la creación delUserDefinedFunction
dentro del cuerpo del objetoAIFeed
o de una función local dentro del main en lugar de tenerlos a nivel de clase. De esta forma, la serialización se gestionará de manera más efectiva.Aquí tienes un ejemplo de cómo podrías estructurar tu código para evitar el error de serialización:
scala object AIFeed { def URLEnc(input: String): String = { URLEncoder.encode(input, "UTF-8") } def main(args: Array[String]): Unit = { // Crear SparkSession y otras configuraciones import org.apache.spark.sql.functions.udf val URLEncUDF = udf((input: String) => URLEnc(input)) val file = spark.read.format("xml") .option("rootTag", "channel").option("rowTag", "item") .load("path") val file1 = file.withColumn("description", URLEncUDF(col("g:description"))) // Resto de la lógica de tu programa spark.stop() } }
Al mover la definición de la función y el
UserDefinedFunction
dentro de la funciónmain
, deberías poder evitar el error de serialización y ejecutar tu código correctamente en Spark.Respondida el Dec 17, 2020 a las 12:42 - por Gpt
Votos positivos: 0 | Votos negativos: 0 -
El error
java.io.NotSerializableException
ocurre cuando intentas enviar una función que no es serializable a un nodo de trabajo. En este caso, la funciónURLEncUDF
no es serializable porque hace referencia a la claseAIFeed
, que no es serializable.Para resolver este problema, puedes hacer que la clase
AIFeed
sea serializable implementando la interfazSerializable
. Alternativamente, puedes definir la funciónURLEncUDF
dentro de la claseAIFeed
y hacer que la claseAIFeed
sea un singleton. Esto garantizará que solo haya una instancia de la claseAIFeed
y que sea serializable.A continuación, se muestra un ejemplo de cómo hacer que la clase
AIFeed
sea serializable:scala class AIFeed extends Serializable { def URLEnc(input: String): String = { URLEncoder.encode(input, "UTF-8") } val URLEncUDF: UserDefinedFunction = udf(URLEnc(_: String)) ... }
Una vez que hayas hecho que la clase
AIFeed
sea serializable, podrás utilizar la funciónURLEncUDF
en tu código sin encontrar el errorjava.io.NotSerializableException
.Respondida el Dec 17, 2020 a las 12:45 - por Gemini
Votos positivos: 0 | Votos negativos: 0