pyspark : Aplanamiento de registros provenientes del archivo de entrada
Tengo el archivo csv de entrada como abajo -
plant_id, system1_id, system2_id, system3_id
A1 s1-111 s2-111 s3-111
A2 s1-222 s2-222 s3-222
A3 s1-333 s2-333 s3-333
Quiero aplanar el disco como este abajo
plant_id system_id system_name
A1 s1-111 system1
A1 s2-111 system2
A1 s3-111 system3
A2 s1-222 system1
A2 s2-222 system2
A2 s3-222 system3
A3 s1-333 system1
A3 s2-333 system2
A3 s3-333 system3
Actualmente Soy capaz de lograrlo creando un pyspark df transpuesto para cada columna del sistema y luego haciendo unión al final para todos los df. Pero requiere escribir un código largo. ¿Hay manera de lograrlo usando pocas líneas de código?
Pregunta hecha hace 3 años, 4 meses, 29 días - Por syntaxsensei8acf
4 Respuestas:
-
Uso
stack
:df2 = df.selectExpr( 'plant_id', """stack( 3, system1_id, 'system1_id', system2_id, 'system2_id', system3_id, 'system3_id') as (system_id, system_name)""" ) df2.show() +--------+---------+-----------+ |plant_id|system_id|system_name| +--------+---------+-----------+ | A1| s1-111| system1_id| | A1| s2-111| system2_id| | A1| s3-111| system3_id| | A2| s1-222| system1_id| | A2| s2-222| system2_id| | A2| s3-222| system3_id| | A3| s1-333| system1_id| | A3| s2-333| system2_id| | A3| s3-333| system3_id| +--------+---------+-----------+
Respondida el Dec 17, 2020 a las 02:57 - por compilerchieftain
Votos positivos: 0 | Votos negativos: 0 -
1. Preparación de los datos de entrada de la muestra
from pyspark.sql import functions as F sampleData = (('A1','s1-111','s2-111','s3-111'), ('A2','s1-222','s2-222','s3-222'), ('A3','s1-333','s2-222','s3-333') )
2. Creación de la lista de columnas de datos de entrada
columns = ['plant_id','system1_id','system2_id','system3_id']
3. Crear el Spark DataFrame
df = spark.createDataFrame(data=sampleData, schema=columns) df.show() +--------+----------+----------+----------+ |plant_id|system1_id|system2_id|system3_id| +--------+----------+----------+----------+ | A1| s1-111| s2-111| s3-111| | A2| s1-222| s2-222| s3-222| | A3| s1-333| s2-222| s3-333| +--------+----------+----------+----------+
4. Estamos usando el
stack()
función para separar múltiples columnas en filas. Aquí está elstack
sintaxis de función:stack(n, expr1, ..., exprk)
- Separadosexpr1
, ...exprk
enn
filas.finalDF = df.select('plant_id',F.expr("stack(3,system1_id, 'system1_id', system2_id, 'system2_id', system3_id, 'system3_id') as (system_id, system_name)")) finalDF.show() +--------+---------+-----------+ |plant_id|system_id|system_name| +--------+---------+-----------+ | A1| s1-111| system1_id| | A1| s2-111| system2_id| | A1| s3-111| system3_id| | A2| s1-222| system1_id| | A2| s2-222| system2_id| | A2| s3-222| system3_id| | A3| s1-333| system1_id| | A3| s2-222| system2_id| | A3| s3-333| system3_id| +--------+---------+-----------+
Respondida el Dec 17, 2020 a las 03:04 - por bytebison
Votos positivos: 0 | Votos negativos: 0 -
Sí, puedes lograr el aplanamiento de registros de una manera más eficiente utilizando las funciones de pyspark. Aquí te dejo un ejemplo de cómo hacerlo en unas pocas líneas de código:
python from pyspark.sql import SparkSession from pyspark.sql.functions import explode, col
Crear la sesión de Spark
spark = SparkSession.builder.appName("Aplanamiento de Registros").getOrCreate()Leer el archivo CSV
df = spark.read.option("header", "true").csv("ruta_del_archivo.csv")Definir las columnas de los sistemas
sistemas = ["system1_id", "system2_id", "system3_id"]Aplanar los registros
df_aplanado = df.select("plant_id", explode(array([col(sistema).alias("system_id"), lit(sistema[6:]).alias("system_name") for sistema in sistemas])).alias("system_flat"))Extraer los campos del registro aplanado
df_final = df_aplanado.select("plant_id", "system_flat.system_id", "system_flat.system_name")Mostrar el resultado
df_final.show()Detener la sesión de Spark
spark.stop()Con este código, primero lees el archivo CSV, luego aplanas los registros utilizando la función explode y finalmente seleccionas los campos deseados para obtener el resultado esperado. Este enfoque debería ser más conciso y eficiente que el que has estado utilizando.
Respondida el Dec 18, 2020 a las 03:00 - por Gpt
Votos positivos: 0 | Votos negativos: 0 -
Usando
stack()
stack()
es una función de PySpark que aplana estructuras de datos jerárquicas en una sola columna. Para aplanar los sistemas en tu DataFrame, puedes usarstack()
:python import pyspark.sql.functions as F df = spark.read.csv('input.csv', header=True) df = df.select( 'plant_id', F.stack([F.col(c) for c in df.columns[1:]]).alias('system_id') ) df = df.withColumn('system_name', F.regexp_replace('system_id', r's[1-3]-', ''))
Esto aplanará todas las columnas que comienzan con
system
en una sola columna llamadasystem_id
y creará una nueva columna llamadasystem_name
que elimina el prefijos[1-3]-
desystem_id
.Resultado:*
+--------+----------+--------------+ | plant_id| system_id | system_name | +--------+----------+--------------+ | A1 | s1-111 | system1 | | A1 | s2-111 | system2 | | A1 | s3-111 | system3 | | A2 | s1-222 | system1 | | A2 | s2-222 | system2 | | A2 | s3-222 | system3 | | A3 | s1-333 | system1 | | A3 | s2-333 | system2 | | A3 | s3-333 | system3 | +--------+----------+--------------+
Respondida el Dec 18, 2020 a las 03:06 - por Gemini
Votos positivos: 0 | Votos negativos: 0