Obtener resultados de valor de la Columna de DataFrame en el objeto 'Column' no se puede llamar

Para el flujo leído en FileStore estoy tratando de comprobar si la primera columna de valor de primera fila es igual a alguna cadena. Desafortunadamente mientras accedo a esta columna de cualquier manera, por ejemplo, lanzar .toList() en ella, lanza

    if df["Name"].iloc[0].item() == "Bob":
TypeError: 'Column' object is not callable

Estoy llamando a la función de procesamiento de:

df.writeStream\
  .format("delta")\
  .foreachBatch(customProcessing)\
[...]

Y dentro de esta función estoy tratando de obtener el valor, pero ninguna de las formas de conseguir los datos funciona. Se está lanzando el mismo error.

    def customProcessing(df, epochId):
      
      if df["Name"].iloc[0].item() == "Bob":
[...]

¿Hay alguna posibilidad de leer un solo col? ¿O es específico de WriteStream y no puedo usar condiciones en esa entrada?

Pregunta hecha hace 3 años, 4 meses, 29 días - Por binaryblossom


3 Respuestas:

  • No hay iloc para los marcos de datos de chispa - esto no es pandas; también no hay concepto de índice.

    Si quieres conseguir el primer artículo puedes probar

    df.select('Name').limit(1).collect()[0][0] == "Bob"
    

    Respondida el Dec 17, 2020 a las 08:51 - por compilerczar

    Votos positivos: 0 | Votos negativos: 0

  • El error que estás experimentando se debe a que estás tratando de acceder a un valor específico de la columna de un DataFrame pero estás usando un método incorrecto para hacerlo.

    Para acceder al valor de una celda específica en un DataFrame de Spark, no necesitas usar .item() en la columna. En su lugar, puedes acceder al valor directamente con la función .first() o .head(1), que te devolverá un objeto Row. Luego, puedes acceder al valor de la columna en esa fila como si fuera un diccionario.

    Aquí tienes un ejemplo de cómo acceder al valor de la primera fila de la columna "Name" en un DataFrame de Spark de manera correcta:

    python
    def customProcessing(df, epochId):
        first_row = df.first()
    
        if first_row["Name"] == "Bob":
            

    Tu lógica de procesamiento aquí

    Al usar df.first(), obtienes la primera fila de DataFrame como un objeto Row, y luego puedes acceder a los valores de las columnas de esa fila como si fueran atributos de un diccionario.

    Espero que esta solución te ayude a resolver el problema que estás experimentando al acceder al valor de una columna específica en un DataFrame de Spark.

    Respondida el Dec 18, 2020 a las 08:51 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • El error que estás enfrentando se produce porque estás intentando acceder a una columna de un DataFrame dentro de una función foreachBatch, que se aplica a cada microlote de datos procesados por el flujo de escritura. En este contexto, el DataFrame aún no se ha materializado y, por lo tanto, no se puede acceder a sus columnas directamente.

    Para resolver este problema, puedes utilizar la función map o flatMap para transformar cada microlote de datos antes de escribirlo en el sumidero Delta. Esto te permitirá acceder a las columnas individuales del DataFrame dentro de la función de transformación.

    Aquí tienes un ejemplo utilizando flatMap:

    
    df.writeStream.flatMap(lambda df: df.filter(df["Name"] == "Bob")).writeStream.format("delta").start()
    

    En este ejemplo, la función flatMap creará un nuevo flujo para cada microlote que cumpla la condición df["Name"] == "Bob", y este nuevo flujo se escribirá en el sumidero Delta.

    También puedes utilizar la función map para transformar cada microlote de datos sin filtrar los registros que no cumplen la condición. El siguiente ejemplo muestra cómo hacerlo:

    
    df.writeStream.map(lambda df: df["Name"].iloc[0]).writeStream.format("delta").start()
    

    En este ejemplo, la función map creará un nuevo flujo que contiene solo el valor de la primera fila de la columna Name para cada microlote, y este nuevo flujo se escribirá en el sumidero Delta.

    Nota que estos enfoques requerirán que recopiles todos los datos del microlote en la memoria, lo que puede no ser viable para flujos de datos grandes. Si necesitas procesar flujos de datos grandes de manera eficiente, considera utilizar otras técnicas, como el procesamiento con ventanas o el procesamiento por estado.

    Respondida el Dec 18, 2020 a las 08:59 - por Gemini

    Votos positivos: 0 | Votos negativos: 0