PySpark: Multiply 2 tablas por los nombres de columna, suma, y añadir una constante

Tengo dos marcos de datos, tratando de multiplicar múltiples columnas según los nombres de las columnas, resumir el total de las columnas de destino, y luego añadir una constante como los valores finales. El caso de finalidad/uso de esta cuestión es aplicar un resultado de coeficiente de regresión a un marco de datos.

El primer marco de datos:

IDABC
AA123
BB254
CC341

El segundo marco de datos (columnas de objetivo " constante. i.e. the coefficient table):

CABconstante
78910

Mi salida esperada:

IDABCfinal_number
AA8182157
BB16452899
CC2436777

Para explicar aún más el cálculo del final_number: Tome la identificación AA como ejemplo. (1x8)+(2x9)+(3x7)+10 = 57

Como tengo más de 20 variables en las dos tablas, trato de no usar el código como df.withColumn("A", 8*F.col("A")) uno a uno. Gracias por su entrada con antelación.

Pregunta hecha hace 3 años, 4 meses, 28 días - Por scriptsphinx


3 Respuestas:

  • Use comprensiones de lista para evitar la necesidad de nombrar columnas manualmente. Tenga en cuenta que NO from pyspark.sql.functions import * - porque quiero mantener el sum función de Python.

    import pyspark.sql.functions as F
    
    output = df1.join(
        df2.toDF(*[i+'_2' for i in df2.columns])
    ).select(
        'ID',
        *[(F.col(i) * F.col(i + '_2')).alias(i) for i in df1.columns[1:]],
        (sum([F.col(i) * F.col(i + '_2') for i in df1.columns[1:]]) + F.col('constant_2')).alias('final_number')
    )
    
    output.show()
    +---+---+---+---+------------+
    | ID|  A|  B|  C|final_number|
    +---+---+---+---+------------+
    | AA|  8| 18| 21|          57|
    | BB| 16| 45| 28|          99|
    | CC| 24| 36|  7|          77|
    +---+---+---+---+------------+
    

    Respondida el Dec 18, 2020 a las 15:43 - por techinnovator

    Votos positivos: 0 | Votos negativos: 0

  • Para lograr el cálculo que deseas realizar en PySpark de manera eficiente, puedes utilizar las funciones multiply, sum, y lit de la librería pyspark.sql.functions. A continuación te muestro un ejemplo de cómo hacerlo:

    python
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, lit, sum, multiply
    
    

    Inicializar la sesión de Spark

    spark = SparkSession.builder.appName("Multiply and Add Example").getOrCreate()

    Crear los DataFrames de ejemplo

    data1 = [("AA", 1, 2, 3), ("BB", 2, 5, 4), ("CC", 3, 4, 1)] columns1 = ["ID", "A", "B", "C"] df1 = spark.createDataFrame(data1, columns1) data2 = [(7, 8, 9, 10)] columns2 = ["C", "A", "B", "constant"] df2 = spark.createDataFrame(data2, columns2)

    Multiplicar las columnas correspondientes y sumar el resultado

    for column in columns1[1:]:

    Iterar desde el segundo elemento en adelante

    df1 = df1.withColumn(column, multiply(col(column), col(columns2[column])).alias(column))

    Sumar las columnas multiplicadas

    df1 = df1.withColumn("final_number", sum(*[col(column) for column in columns1[1:]]) + lit(columns2.constant))

    Mostrar el resultado

    df1.show()

    Este código tomará los dos DataFrames de ejemplo, multiplicará las columnas correspondientes basándose en los nombres de las columnas, sumará los resultados y añadirá la constante final. Recuerda adaptar el código según tus necesidades específicas en caso de tener más de 20 variables. Espero que esta solución te ayude a resolver tu problema. ¡Déjame saber si tienes alguna otra pregunta!

    Respondida el Dec 19, 2020 a las 15:43 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • Solución de PySpark:

    python
    import pyspark.sql.functions as F
    
    

    Cargar los marcos de datos

    df1 = spark.createDataFrame( [("AA", 1, 2, 3), ("BB", 2, 5, 4), ("CC", 3, 4, 1)], ["ID", "A", "B", "C"], ) df2 = spark.createDataFrame( [("C", 7, 8, 9, 10)], ["C", "A", "B", "constante"], )

    Unir los marcos de datos por la columna "C"

    joined_df = df1.join(df2, df1.C == df2.C, "inner")

    Obtener los nombres de las columnas objetivo

    target_cols = [col for col in joined_df.schema.names if col in ["A", "B", "C"]]

    Multiplicar las columnas objetivo por los coeficientes

    multiplied_df = joined_df.select([F.col(col) F.col(f"{col}_coeff") for col in target_cols])

    Sumar las columnas multiplicadas

    total_df = multiplied_df.groupBy("ID").sum()

    Añadir la constante

    final_df = total_df.withColumn("final_number", F.col("sum") + F.col("constante"))

    Mostrar el resultado

    final_df.show()

    Resultado:

    | ID | A | B | C | final_number | |---|---|---|---|---| | AA | 8 | 18 | 21 | 57 | | BB | 16 | 45 | 28 | 99 | | CC | 24 | 36 | 7 | 77 |

    Respondida el Dec 19, 2020 a las 15:44 - por Gemini

    Votos positivos: 0 | Votos negativos: 0