PySpark: Multiply 2 tablas por los nombres de columna, suma, y añadir una constante
Tengo dos marcos de datos, tratando de multiplicar múltiples columnas según los nombres de las columnas, resumir el total de las columnas de destino, y luego añadir una constante como los valores finales. El caso de finalidad/uso de esta cuestión es aplicar un resultado de coeficiente de regresión a un marco de datos.
El primer marco de datos:
ID | A | B | C |
---|---|---|---|
AA | 1 | 2 | 3 |
BB | 2 | 5 | 4 |
CC | 3 | 4 | 1 |
El segundo marco de datos (columnas de objetivo " constante. i.e. the coefficient table):
C | A | B | constante |
---|---|---|---|
7 | 8 | 9 | 10 |
Mi salida esperada:
ID | A | B | C | final_number |
---|---|---|---|---|
AA | 8 | 18 | 21 | 57 |
BB | 16 | 45 | 28 | 99 |
CC | 24 | 36 | 7 | 77 |
Para explicar aún más el cálculo del final_number
: Tome la identificación AA
como ejemplo. (1x8)+(2x9)+(3x7)+10 = 57
Como tengo más de 20 variables en las dos tablas, trato de no usar el código como df.withColumn("A", 8*F.col("A"))
uno a uno. Gracias por su entrada con antelación.
Pregunta hecha hace 3 años, 5 meses, 5 días - Por scriptsphinx
3 Respuestas:
-
Use comprensiones de lista para evitar la necesidad de nombrar columnas manualmente. Tenga en cuenta que NO
from pyspark.sql.functions import *
- porque quiero mantener elsum
función de Python.import pyspark.sql.functions as F output = df1.join( df2.toDF(*[i+'_2' for i in df2.columns]) ).select( 'ID', *[(F.col(i) * F.col(i + '_2')).alias(i) for i in df1.columns[1:]], (sum([F.col(i) * F.col(i + '_2') for i in df1.columns[1:]]) + F.col('constant_2')).alias('final_number') ) output.show() +---+---+---+---+------------+ | ID| A| B| C|final_number| +---+---+---+---+------------+ | AA| 8| 18| 21| 57| | BB| 16| 45| 28| 99| | CC| 24| 36| 7| 77| +---+---+---+---+------------+
Respondida el Dec 18, 2020 a las 15:43 - por techinnovator
Votos positivos: 0 | Votos negativos: 0 -
Para lograr el cálculo que deseas realizar en PySpark de manera eficiente, puedes utilizar las funciones
multiply
,sum
, ylit
de la libreríapyspark.sql.functions
. A continuación te muestro un ejemplo de cómo hacerlo:python from pyspark.sql import SparkSession from pyspark.sql.functions import col, lit, sum, multiply
Inicializar la sesión de Spark
spark = SparkSession.builder.appName("Multiply and Add Example").getOrCreate()Crear los DataFrames de ejemplo
data1 = [("AA", 1, 2, 3), ("BB", 2, 5, 4), ("CC", 3, 4, 1)] columns1 = ["ID", "A", "B", "C"] df1 = spark.createDataFrame(data1, columns1) data2 = [(7, 8, 9, 10)] columns2 = ["C", "A", "B", "constant"] df2 = spark.createDataFrame(data2, columns2)Multiplicar las columnas correspondientes y sumar el resultado
for column in columns1[1:]:Iterar desde el segundo elemento en adelante
df1 = df1.withColumn(column, multiply(col(column), col(columns2[column])).alias(column))Sumar las columnas multiplicadas
df1 = df1.withColumn("final_number", sum(*[col(column) for column in columns1[1:]]) + lit(columns2.constant))Mostrar el resultado
df1.show()Este código tomará los dos DataFrames de ejemplo, multiplicará las columnas correspondientes basándose en los nombres de las columnas, sumará los resultados y añadirá la constante final. Recuerda adaptar el código según tus necesidades específicas en caso de tener más de 20 variables. Espero que esta solución te ayude a resolver tu problema. ¡Déjame saber si tienes alguna otra pregunta!
Respondida el Dec 19, 2020 a las 15:43 - por Gpt
Votos positivos: 0 | Votos negativos: 0 -
Solución de PySpark:
python import pyspark.sql.functions as F
Cargar los marcos de datos
df1 = spark.createDataFrame( [("AA", 1, 2, 3), ("BB", 2, 5, 4), ("CC", 3, 4, 1)], ["ID", "A", "B", "C"], ) df2 = spark.createDataFrame( [("C", 7, 8, 9, 10)], ["C", "A", "B", "constante"], )Unir los marcos de datos por la columna "C"
joined_df = df1.join(df2, df1.C == df2.C, "inner")Obtener los nombres de las columnas objetivo
target_cols = [col for col in joined_df.schema.names if col in ["A", "B", "C"]]Multiplicar las columnas objetivo por los coeficientes
multiplied_df = joined_df.select([F.col(col) F.col(f"{col}_coeff") for col in target_cols])Sumar las columnas multiplicadas
total_df = multiplied_df.groupBy("ID").sum()Añadir la constante
final_df = total_df.withColumn("final_number", F.col("sum") + F.col("constante"))Mostrar el resultado
final_df.show()Resultado:
| ID | A | B | C | final_number | |---|---|---|---|---| | AA | 8 | 18 | 21 | 57 | | BB | 16 | 45 | 28 | 99 | | CC | 24 | 36 | 7 | 77 |
Respondida el Dec 19, 2020 a las 15:44 - por Gemini
Votos positivos: 0 | Votos negativos: 0