Aggregation after sort(), persist() and limit() in Spark

Estoy tratando de conseguir la suma de una columna de la parte superior n filas en un DataFrame persistido. Por alguna razón, lo siguiente no funciona:

val df = df0.sort(col("colB").desc).persist()
df.limit(2).agg(sum("colB")).show()

Muestra un número aleatorio que es claramente inferior a la suma de los dos primeros. El número cambia de carrera a ejecución. Llamada() en el limit()'ed DF muestra consistentemente los dos valores principales correctos:

df.limit(2).show()

Es como si sort() no se aplica antes de la agregación. ¿Es un bicho en Spark? Supongo que se espera que persist() pierde la clasificación, pero ¿por qué funciona con show() ¿Y esto debería ser documentado en alguna parte?

Pregunta hecha hace 3 años, 4 meses, 29 días - Por nodenomad


3 Respuestas:

  • Vea los planes de consulta a continuación. agg resultados en un intercambio (4a línea en plan físico) que elimina la clasificación, mientras que show no resulta en ningún intercambio, así que se mantiene la clasificación.

    scala> df.limit(2).agg(sum("colB")).explain()
    == Physical Plan ==
    *(2) HashAggregate(keys=[], functions=[sum(cast(colB#4 as bigint))])
    +- *(2) HashAggregate(keys=[], functions=[partial_sum(cast(colB#4 as bigint))])
       +- *(2) GlobalLimit 2
          +- Exchange SinglePartition, true, [id=#95]
             +- *(1) LocalLimit 2
                +- *(1) ColumnarToRow
                   +- InMemoryTableScan [colB#4]
                         +- InMemoryRelation [colB#4], StorageLevel(disk, memory, deserialized, 1 replicas)
                               +- *(1) Sort [colB#4 DESC NULLS LAST], true, 0
                                  +- Exchange rangepartitioning(colB#4 DESC NULLS LAST, 200), true, [id=#7]
                                     +- LocalTableScan [colB#4]
    
    
    scala> df.limit(2).explain()
    == Physical Plan ==
    CollectLimit 2
    +- *(1) ColumnarToRow
       +- InMemoryTableScan [colB#4]
             +- InMemoryRelation [colB#4], StorageLevel(disk, memory, deserialized, 1 replicas)
                   +- *(1) Sort [colB#4 DESC NULLS LAST], true, 0
                      +- Exchange rangepartitioning(colB#4 DESC NULLS LAST, 200), true, [id=#7]
                         +- LocalTableScan [colB#4]
    

    Pero si persiste el marco de datos limitado, no habrá ningún intercambio para la agregación, por lo que eso podría hacer el truco:

    val df1 = df.limit(2).persist()
    
    scala> df1.agg(sum("colB")).explain()
    == Physical Plan ==
    *(1) HashAggregate(keys=[], functions=[sum(cast(colB#4 as bigint))])
    +- *(1) HashAggregate(keys=[], functions=[partial_sum(cast(colB#4 as bigint))])
       +- *(1) ColumnarToRow
          +- InMemoryTableScan [colB#4]
                +- InMemoryRelation [colB#4], StorageLevel(disk, memory, deserialized, 1 replicas)
                      +- CollectLimit 2
                         +- *(1) ColumnarToRow
                            +- InMemoryTableScan [colB#4]
                                  +- InMemoryRelation [colB#4], StorageLevel(disk, memory, deserialized, 1 replicas)
                                        +- *(1) Sort [colB#4 DESC NULLS LAST], true, 0
                                           +- Exchange rangepartitioning(colB#4 DESC NULLS LAST, 200), true, [id=#7]
                                              +- LocalTableScan [colB#4]
    

    En cualquier caso, es mejor utilizar funciones de ventana para asignar números de fila y resumir las filas si su número de fila cumple con una determinada condición (por ejemplo, row_number <= 2). Esto dará lugar a un resultado determinista. Por ejemplo,

    df0.withColumn(
        "rn",
        row_number().over(Window.orderBy($"colB".desc))
    ).filter("rn <= 2").agg(sum("colB"))
    

    Respondida el Dec 17, 2020 a las 07:58 - por scriptsculptor52b5

    Votos positivos: 0 | Votos negativos: 0

  • El problema que estás experimentando se debe a la forma en que Spark realiza las operaciones en un DataFrame persistido. Cuando persistes un DataFrame en Spark, los datos se almacenan en memoria o en disco para uso posterior. Sin embargo, Spark puede optar por reorganizar los datos almacenados en función de las operaciones posteriores que se realicen en el DataFrame.

    En tu caso, al aplicar el método persist() al DataFrame luego de ordenarlo con sort(), Spark puede reorganizar los datos en la memoria de una manera que afecta la funcionalidad de agg(sum()). Esto significa que los datos ordenados pueden no estar disponibles para ser agregados de la manera que esperas.

    Para resolver este problema, puedes intentar aplicar persist() después de realizar la agregación. Por ejemplo, puedes realizar la ordenación, limitar el número de filas y luego realizar la agregación antes de persistir el DataFrame:

    scala
    val df = df0.sort(col("colB").desc).limit(2).agg(sum("colB")).persist()
    df.show()
    

    De esta manera, primero ordenas, limitas y agregas los datos y luego los persistes en la memoria.

    Recuerda que Spark puede optimizar internamente cómo se ejecutan las operaciones, por lo que a veces puede ser necesario ajustar el orden de las operaciones para obtener el resultado deseado.

    Respondida el Dec 18, 2020 a las 07:59 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • El problema que estás encontrando es causado por la llamada persist() en tu código. Cuando persistes un DataFrame, Spark materializa todos sus datos en la memoria, lo que puede desencadenar una reorganización de los datos y perder cualquier ordenamiento previo.

    Para resolver este problema, puedes utilizar la función cache() en su lugar, que es similar a persist() pero mantiene el orden de los datos. Aquí tienes un ejemplo modificado de tu código que debería funcionar:

    
    val df = df0.sort(col("colB").desc).cache()
    df.limit(2).agg(sum("colB")).show()
    

    Otra solución alternativa es utilizar la función takeOrdered() para obtener las filas superiores sin perder el ordenamiento:

    
    val top2Rows = df0.sort(col("colB").desc).takeOrdered(2)
    val sum = top2Rows.map(_.getLong(1)).sum
    println(sum)
    

    Ten en cuenta que takeOrdered() es una acción y desencadenará el cómputo del DataFrame, por lo que puede ser menos eficiente que usar cache() si planeas realizar más operaciones en el DataFrame.

    Respondida el Dec 18, 2020 a las 08:07 - por Gemini

    Votos positivos: 0 | Votos negativos: 0