Conde Distinct Times Out During Left Join

Tengo una tabla de productos y subproductos. Es una mesa bastante pequeña en 122 filas totales y sólo 3 columnas:

| backend | sub_product | product |
|---------|-------------|---------|
| conn_go | go_update   | prod_go |
| conn_go | go_delete   | prod_go |
| conn_go | go_edit     | prod_go |
| conn_rv | rv_update   | prod_rv |
| conn_mb | mb_update   | prod_mb |
| conn_mb | mb_delete   | prod_mb |
| conn_mb | mb_edit     | prod_mb |
| conn_pr | pr_update   | prod_pr |
| conn_pr | pr_edit     | prod_pr |
| conn_ct | pol_edit    | null    |
....

Luego tengo esta tabla de uso de particiones diarias, que registra cuántas veces un usuario ha utilizado cada subproducto. Esta tabla es mucho más grande (186GB, 247MB por partición, 4.6M filas por partición):

| backend | yyyy_mm_dd | sub_product | x_id | user_id | count |
|---------|------------|-------------|------|---------|-------|
| conn_go | 2020-12-15 | go_update   | 10   | 3422    | 1     |
| conn_go | 2020-12-15 | go_delete   | 10   | 23445   | 2     |
| conn_go | 2020-12-15 | go_edit     | 10   | 2243    | 2     |
| conn_rv | 2020-12-15 | rv_update   | 10   | 245342  | 1     |
| conn_mb | 2020-12-15 | mb_update   | 11   | 5464    | 3     |
| conn_mb | 2020-12-15 | mb_delete   | 11   | 1424    | 2     |
| conn_mb | 2020-12-15 | mb_edit     | 11   | 21454   | 2     |
| conn_pr | 2020-12-15 | pr_update   | 12   | 224525  | 1     |
| conn_pr | 2020-12-15 | pr_edit     | 12   | 22424   | 1     |

Si user_id no ha usado sub_product en un día entonces no habrá fila para eso user_id / sub_product / yyyy_mm_dd combinación en la usage mesa. Me gustaría contar, por x_id, el recuento distintivo de user_idque han usado cada uno product. A continuación se muestra la salida que estoy buscando, basada en los datos de muestra anteriores.

| x_id | product | usage |
|------|---------|-------|
| 10   | prod_go | 3     |
| 10   | prod_rv | 1     |
| 10   | prod_mb | 0     |
| 10   | prop_pr | 0     |
| 10   | null    | 0     |
| 11   | prod_go | 0     |
| 11   | prod_rv | 0     |
| 11   | prod_mb | 3     |
| 11   | prop_pr | 0     |
| 11   | null    | 0     |
| 12   | prod_go | 0     |
| 12   | prod_rv | 0     |
| 12   | prod_mb | 0     |
| 12   | prop_pr | 1     |
| 12   | null    | 1     |

Lo que he intentado:

import pyspark.sql
from pyspark.sql import functions as sf

products = (
    spark.table('my_schema.products')
    .select('backend', 'sub_product', 'product')
)

usage = (
    spark.table('my_schema.usage')
    #.where(sf.col('yyyy_mm_dd')).between('2018-08-11', '2020-01-20')
    .where(sf.col('count') > 0)
    .where(sf.col('x_id').isNotNull())
    .select('yyyy_mm_dd', 'x_id', 'user_id', 'backend', 'sub_product', 'count')

)

agg = (
    products
    .join(usage, on = ['backend', 'sub_product'], how = 'left')
    .drop('count')
    .groupby('x_id', 'product')
    .agg(
        sf.countDistinct('user_id')
    )
)

agg.show(30,False)

El Python anterior se ejecuta durante bastante tiempo (30+mins) y eventualmente se agota:

org.apache.spark.SparkException: Job abortó debido al fracaso del escenario: La tarea 157 en el estadio 136.0 falló 10 veces, el fracaso más reciente: Tareas perdidas 157.9 en la etapa 136.0: EjecutorLostFailure (ejecutor 3068 expulsado por una de las tareas de ejecución) Razón: Container marcado como fallado: Estado de salida: 143. Diagnósticos: Container asesinado bajo petición. Código de salida es 143 Container salió con un código de salida no cero 143 Killed by external signal

También, si descomiendo el where cláusula entonces obtengo otro error:

"expresión del filtro 'my_schema.usage.yyyy_mm_dd'de tipo cuerda no es un booleano

Q. ¿Hay alguna manera de optimizar mejor mi código para obtener mi salida deseada?


EDIT: Usar una transmisión se unió enormemente a la unión y me permite obtener una cuenta agregada. Sin embargo, mi código todavía no produce la salida prevista desde arriba:

agg = (
    usage
    .join(broadcast(products), on = ['backend', 'sub_product'], how = 'left')
    .drop('count')
    .where(sf.col('product').isNotNull())
    .groupby('x_id', 'product')
    .agg(
        sf.countDistinct('user_id').alias('usage_ever')
    )

)

agg.orderBy('x_id', 'product').show(500,False)

Problema: En mi salida, no tengo una lista de productos por x_id i.e. Estoy perdiendo filas cuando x_id no tiene usuarios usando un producto en particular (me gustaría ver 0 en tales casos). Mirando los datos de la muestra, mi salida no muestra una fila para x_id = 10, producto = prod_mb y uso = 0.

Pregunta hecha hace 3 años, 4 meses, 28 días - Por codechampion


4 Respuestas:

  • Tal error generalmente significa que su gran tabla excede la memoria del ejecutante durante el tipo de shuffle así que el ejecutante fue perdido. Es este caso con una pequeña mesa que debe unirse a la pequeña mesa al gran usin Broacast HashJoin:

    from pyspark.sql.functions import broadcast
    
    usage.join(broadcast(products), on = ['backend', 'sub_product'], how = 'left')
    

    Acerca de cuando el operador:

    .where(sf.col('yyyy_mm_dd')).between('2018-08-11', '2020-01-20')
    

    debería ser:

    .where(sf.col('yyyy_mm_dd').between('2018-08-11', '2020-01-20'))
    

    Respondida el Dec 18, 2020 a las 11:44 - por binarymaestro

    Votos positivos: 0 | Votos negativos: 0

  • Basado en la respuesta de @valex, pude llegar a esto - que parece funcionar bastante bien. Probablemente se puede reducir / simplificar:

    import pyspark.sql
    from pyspark.sql import functions as sf
    from pyspark.sql.functions import broadcast
    
    products = (
        spark.table('my_schema.products')
        .select('backend', 'sub_product', 'product')
    )
    
    usage = (
        spark.table('my_schema.usage')
        .where(sf.col('yyyy_mm_dd').between('2018-08-06', '2020-01-20'))
        .where(sf.col('count') > 0)
        .where(sf.col('x_id').isNotNull())
        .select('yyyy_mm_dd', 'x_id', 'user_id', 'backend', 'sub_product', 'product')
    
    )
    
    agg = (
        usage
        .join(broadcast(products), on = ['backend', 'sub_product'], how = 'left')
        .drop('count')
        .where(sf.col('product').isNotNull())
        .groupby('x_id', 'product')
        .agg(
            sf.countDistinct('user_id').alias('usage_ever')
        )
    
    )
    
    distinct_products = (
        spark.table('my_schema.products')
        .select('product').distinct()
    )
    
    ids = (
        spark.table('my_schema.all_ids')
        .select(sf.col('id').alias('x_id'))
    )
    
    cross_join = (
        ids
        .crossJoin(distinct_products)
    )
    
    final = (
        cross_join
        .join(agg, on = ['x_id', 'product'], how = 'left_outer')
    )
    final.fillna(0, subset=['usage_ever']).orderBy('x_id', 'product').show(500,False)
    

    Respondida el Dec 18, 2020 a las 11:52 - por devdoodle

    Votos positivos: 0 | Votos negativos: 0

  • El problema que estás experimentando es que al hacer un left join entre las tablas de productos y uso, estás perdiendo las filas en las que no hay coincidencia con un producto en particular.

    Una posible solución para obtener la salida deseada sería generar un DataFrame que incluya todas las combinaciones posibles de x_id y productos y luego realizar un left join con el resultado de tu unión anterior. Esto te permitirá obtener filas con un uso de 0 cuando no haya coincidencias.

    Aquí te dejo un posible enfoque para lograr esto:

    python
    from pyspark.sql.functions import col, countDistinct
    
    

    Crea un DataFrame con todas las combinaciones posibles de x_id y productos

    all_combinations = ( usage.select('x_id', 'product') .distinct() .crossJoin(products.select('product')) .withColumnRenamed('product', 'product_comb') )

    Realiza un left join entre las combinaciones y el DataFrame anterior

    agg_complete = ( all_combinations .join(agg, (col('product_comb') == col('product')) & (col('x_id') == col('x_id')), how='left') .fillna(0, subset=['usage']) .select(col('x_id'), col('product_comb').alias('product'), col('usage')) .orderBy('x_id', 'product') ) agg_complete.show(500, False)

    Con este enfoque, deberías poder obtener una salida que incluye todas las combinaciones de x_id y productos, mostrando un uso de 0 cuando no haya datos disponibles.

    Espero que esta solución te ayude a resolver tu problema. ¡Déjame saber si tienes alguna otra pregunta!

    Respondida el Dec 19, 2020 a las 11:45 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • Hay un par de cosas que puedes hacer para optimizar tu código:

    1. Usa índices:

    Asegúrate de que hay índices en las columnas backend y sub_product en ambas tablas. Esto acelerará significativamente la unión.

    2. Usa una transmisión se unió:

    Una transmisión se unió es más eficiente para unir dos conjuntos de datos cuando uno de ellos es mucho más grande que el otro. En este caso, usage es mucho más grande que products. Usa broadcast(products) para crear una transmisión de datos de products que luego se puede unir a usage.

    3. Filtra después de unir:

    Filtra los datos después de unirlos en lugar de antes. Esto evitará que se procesen filas innecesarias.

    4. Usa agregación parcial:

    En lugar de contar distintos user_id para cada combinación de x_id y product, puedes usar agregación parcial para contar distintos user_id para cada x_id. Esto reducirá la cantidad de datos que necesitas unir.

    5. Utiliza la distribución de datos:

    Distribuye los datos uniformemente entre los ejecutores para evitar cuellos de botella. Puedes usar el repartition() método para repartir los datos. Aquí tienes un ejemplo de código optimizado:

    python
    import pyspark.sql
    from pyspark.sql import functions as sf
    
    products = (
        spark.table('my_schema.products')
        .select('backend', 'sub_product', 'product')
        .repartition('backend', 'sub_product')
    )
    
    usage = (
        spark.table('my_schema.usage')
        .where(sf.col('count') > 0)
        .where(sf.col('x_id').isNotNull())
        .select('yyyy_mm_dd', 'x_id', 'user_id', 'backend', 'sub_product', 'count')
    
    )
    
    agg = (
        usage
        .join(broadcast(products), on = ['backend', 'sub_product'], how = 'left')
        .drop('count')
        .filter(sf.col('product').isNotNull())
        .groupBy('x_id')
        .agg(
            sf.countDistinct('user_id')
        )
    )
    
    agg.show(30,False)
    

    Además de estas optimizaciones, también deberías considerar usar un clúster de Spark con más recursos (por ejemplo, más nodos o más memoria) para manejar el conjunto de datos más grande.

    Respondida el Dec 19, 2020 a las 11:46 - por Gemini

    Votos positivos: 0 | Votos negativos: 0