Spark Scala - Winsorize DataFrame columns within groups

Estoy preprocesando datos para entradas de aprendizaje automático, una columna de valor objetivo, llámalo "precio" tiene muchos outliers, y en lugar de winsorizing precio sobre todo el conjunto Quiero winsorize dentro de grupos etiquetados por "product_categoría". Hay otras características, product_category es sólo una etiqueta de precio relevante.

Hay una función Scala stat que funciona muy bien:

df_data.stat.approxQuantile("price", Array(0.01, 0.99), 0.00001)
// res19: Array[Double] = Array(3.13, 318.54)

Desafortunadamente, no soporta computar los quantiles dentro de los grupos. Tampoco soporta las particiones de ventana.

df_data
    .groupBy("product_category")
    .approxQuantile($"price", Array(0.01, 0.99), 0.00001)

// error: value approxQuantile is not a member of
//   org.apache.spark.sql.RelationalGroupedDataset

¿Cuál es la mejor manera de calcular decir el p01 y p99 dentro de grupos de un marco de datos de chispa, con el propósito de reemplazar valores más allá de ese rango, ie winsorizing?

Mi esquema de conjunto de datos se puede imaginar así, y sus más de 20MM filas con Appx 10K diferentes etiquetas para "product_categoría", por lo que el rendimiento también es una preocupación.

df_data and a winsorized price column:
+---------+------------------+--------+---------+
|   item  | product_category |  price | pr_winz |
+---------+------------------+--------+---------+
| I000001 |     XX11         |   1.99 |   5.00  |
| I000002 |     XX11         |  59.99 |  59.99  |
| I000003 |     XX11         |1359.00 | 850.00  |
+---------+------------------+--------+---------+
supposing p01 = 5.00, p99 = 850.00 for this product_category 

Pregunta hecha hace 3 años, 4 meses, 29 días - Por quantumcoder403a


3 Respuestas:

  • Esto es lo que se me ocurrió, después de luchar con la documentación (hay dos funciones approx_percentile y percentile_approx que aparentemente hacen lo mismo).

    No pude averiguar cómo implementar esto excepto como una expresión chispa sql, no seguro exactamente por qué agrupar sólo funciona allí. ¿Sospecho por su parte de Hive?

    Spark DataFrame Winsorizor
    • Probada en DF en 10 a 100MM filas rango
    // Winsorize function, groupable by columns list
    // low/hi element of [0,1]
    // precision: integer in [1, 1E7-ish], in practice use 100 or 1000 for large data, smaller is faster/less accurate
    // group_col: comma-separated list of column names
    import org.apache.spark.sql._
    
    def grouped_winzo(df: DataFrame, winz_col: String, group_col: String, low: Double, hi: Double, precision: Integer): DataFrame = {
        df.createOrReplaceTempView("df_table")
        
        spark.sql(s"""
        select distinct 
        *
        , percentile_approx($winz_col, $low, $precision) over(partition by $group_col) p_low
        , percentile_approx($winz_col, $hi, $precision) over(partition by $group_col) p_hi
        
        from df_table
        """)
        .withColumn(winz_col + "_winz", expr(s"""
            case when $winz_col <= p_low then p_low
                 when $winz_col >= p_hi then p_hi
                 else $winz_col end"""))
        .drop(winz_col, "p_low", "p_hi")
        
    }
    
    // winsorize the price column of a dataframe at the p01 and p99 
    // percentiles, grouped by 'product_category' column.
    
    val df_winsorized = grouped_winzo(
       df_data
       , "price"
       , "product_category"
       , 0.01, 0.99, 1000)
    

    Respondida el Dec 17, 2020 a las 06:38 - por csharpsorcerer

    Votos positivos: 0 | Votos negativos: 0

  • Para lograr winsorizar el DataFrame dentro de grupos en Spark Scala, puedes seguir los siguientes pasos:

    • Calcular los percentiles dentro de cada grupo utilizando la función `percentile_approx` de Spark.
    • Luego, unir los percentiles calculados a los datos originales y aplicar la función de winsorización en una columna nueva.

    Aquí tienes un ejemplo de cómo puedes lograrlo:

    scala
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    
    // Calcular los percentiles dentro de cada grupo
    val percentilesDF = df_data
      .groupBy("product_category")
      .agg(
        percentile_approx(col("price"), lit(0.01)).as("p01"),
        percentile_approx(col("price"), lit(0.99)).as("p99")
      )
    
    // Unir los percentiles al DataFrame original
    val joinedDF = df_data.join(percentilesDF, Seq("product_category"), "left")
    
    // Aplicar la winsorización y crear una nueva columna
    val winsorizedDF = joinedDF.withColumn("pr_winz", when(col("price") < col("p01"), col("p01"))
      .when(col("price") > col("p99"), col("p99"))
      .otherwise(col("price")))
    
    // Mostrar el resultado
    winsorizedDF.show()
    

    En este código, calculamos los percentiles p01 y p99 dentro de cada grupo de product_category, luego unimos esos percentiles con el DataFrame original. Finalmente, aplicamos la winsorización a la columna price basada en los percentiles calculados, creando una nueva columna pr_winz con los precios winsorizados.

    Espero que esta solución te sea de ayuda. ¡Déjame saber si tienes alguna otra pregunta!

    Respondida el Dec 18, 2020 a las 06:33 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • Aproximación basada en ventanas

    Una forma de lograr la winsorización dentro de grupos utilizando Spark Scala es mediante el uso de ventanas. Aquí tienes un ejemplo:

    scala
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    
    // Crea una ventana para cada categoría de producto
    val windowSpec = Window.partitionBy("product_category")
    
    // Calcula los cuantiles p01 y p99 dentro de cada ventana
    val df_quantiles = df_data
      .withColumn("p01", percentile_approx($"price", 0.01, windowSpec))
      .withColumn("p99", percentile_approx($"price", 0.99, windowSpec))
    
    // Realiza la winsorización reemplazando los valores fuera del rango
    val df_winsorized = df_data
      .withColumn("pr_winz", when($"price" < $"p01", $"p01")
        .when($"price" > $"p99", $"p99")
        .otherwise($"price"))
    

    Aproximación basada en UDF

    Otra opción es utilizar una función definida por el usuario (UDF) para calcular los cuantiles y realizar la winsorización. Aquí tienes un ejemplo de UDF que calcula el p01 y el p99:

    scala
    import org.apache.spark.sql.functions.udf
    
    // Define la UDF para calcular los cuantiles
    val quantileUDF = udf((values: Seq[Double], p: Double) => {
      val sortedValues = values.sorted
      val index = (p  (sortedValues.length - 1)).toInt
      sortedValues(index)
    })
    
    // Aplica la UDF para calcular los cuantiles y realizar la winsorización
    val df_winsorized = df_data
      .groupBy("product_category")
      .agg(quantileUDF(collect_list($"price"), 0.01) as "p01",
            quantileUDF(collect_list($"price"), 0.99) as "p99")
      .withColumn("pr_winz", when($"price" < $"p01", $"p01")
        .when($"price" > $"p99", $"p99")
        .otherwise($"price"))
    

    Recomendaciones de rendimiento

    Optimización de la memoria: Para mejorar el rendimiento de la memoria, puedes reducir el número de particiones aumentando el número de particiones al agrupar por la columna "product_category" mediante df_data.repartition($"product_category", 1000). Paralelización: Utiliza un mayor número de ejecutores y núcleos para paralelizar el procesamiento. Almacenamiento en caché: Almacena en caché el DataFrame después de la winsorización para evitar volver a calcular los valores.

    Respondida el Dec 18, 2020 a las 06:47 - por Gemini

    Votos positivos: 0 | Votos negativos: 0