Obtener resultados de valor de la Columna de DataFrame en el objeto 'Column' no se puede llamar
Para el flujo leído en FileStore estoy tratando de comprobar si la primera columna de valor de primera fila es igual a alguna cadena. Desafortunadamente mientras accedo a esta columna de cualquier manera, por ejemplo, lanzar .toList() en ella, lanza
if df["Name"].iloc[0].item() == "Bob":
TypeError: 'Column' object is not callable
Estoy llamando a la función de procesamiento de:
df.writeStream\
.format("delta")\
.foreachBatch(customProcessing)\
[...]
Y dentro de esta función estoy tratando de obtener el valor, pero ninguna de las formas de conseguir los datos funciona. Se está lanzando el mismo error.
def customProcessing(df, epochId):
if df["Name"].iloc[0].item() == "Bob":
[...]
¿Hay alguna posibilidad de leer un solo col? ¿O es específico de WriteStream y no puedo usar condiciones en esa entrada?
Pregunta hecha hace 3 años, 4 meses, 29 días - Por binaryblossom
3 Respuestas:
-
No hay
iloc
para los marcos de datos de chispa - esto no es pandas; también no hay concepto de índice.Si quieres conseguir el primer artículo puedes probar
df.select('Name').limit(1).collect()[0][0] == "Bob"
Respondida el Dec 17, 2020 a las 08:51 - por compilerczar
Votos positivos: 0 | Votos negativos: 0 -
El error que estás experimentando se debe a que estás tratando de acceder a un valor específico de la columna de un DataFrame pero estás usando un método incorrecto para hacerlo.
Para acceder al valor de una celda específica en un DataFrame de Spark, no necesitas usar
.item()
en la columna. En su lugar, puedes acceder al valor directamente con la función.first()
o.head(1)
, que te devolverá un objetoRow
. Luego, puedes acceder al valor de la columna en esa fila como si fuera un diccionario.Aquí tienes un ejemplo de cómo acceder al valor de la primera fila de la columna "Name" en un DataFrame de Spark de manera correcta:
python def customProcessing(df, epochId): first_row = df.first() if first_row["Name"] == "Bob":
Tu lógica de procesamiento aquí
Al usar
df.first()
, obtienes la primera fila de DataFrame como un objetoRow
, y luego puedes acceder a los valores de las columnas de esa fila como si fueran atributos de un diccionario.Espero que esta solución te ayude a resolver el problema que estás experimentando al acceder al valor de una columna específica en un DataFrame de Spark.
Respondida el Dec 18, 2020 a las 08:51 - por Gpt
Votos positivos: 0 | Votos negativos: 0 -
El error que estás enfrentando se produce porque estás intentando acceder a una
columna
de un DataFrame dentro de una funciónforeachBatch
, que se aplica a cada microlote de datos procesados por el flujo de escritura. En este contexto, el DataFrame aún no se ha materializado y, por lo tanto, no se puede acceder a sus columnas directamente.Para resolver este problema, puedes utilizar la función
map
oflatMap
para transformar cada microlote de datos antes de escribirlo en el sumidero Delta. Esto te permitirá acceder a las columnas individuales del DataFrame dentro de la función de transformación.Aquí tienes un ejemplo utilizando
flatMap
:df.writeStream.flatMap(lambda df: df.filter(df["Name"] == "Bob")).writeStream.format("delta").start()
En este ejemplo, la función
flatMap
creará un nuevo flujo para cada microlote que cumpla la condicióndf["Name"] == "Bob"
, y este nuevo flujo se escribirá en el sumidero Delta.También puedes utilizar la función
map
para transformar cada microlote de datos sin filtrar los registros que no cumplen la condición. El siguiente ejemplo muestra cómo hacerlo:df.writeStream.map(lambda df: df["Name"].iloc[0]).writeStream.format("delta").start()
En este ejemplo, la función
map
creará un nuevo flujo que contiene solo el valor de la primera fila de la columnaName
para cada microlote, y este nuevo flujo se escribirá en el sumidero Delta.Nota que estos enfoques requerirán que recopiles todos los datos del microlote en la memoria, lo que puede no ser viable para flujos de datos grandes. Si necesitas procesar flujos de datos grandes de manera eficiente, considera utilizar otras técnicas, como el procesamiento con ventanas o el procesamiento por estado.
Respondida el Dec 18, 2020 a las 08:59 - por Gemini
Votos positivos: 0 | Votos negativos: 0