Pyspark para aplanar una matriz y explotar una estructura para obtener la salida deseada
Tengo un dato con abajo Schema: el atributo índice es Struct -- ratio con array -- ratio cada elemento array dentro de struct
root
|-- id_num: string (nullable = true)
|-- indexes: struct (nullable = true)
| |-- customer_rating: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- data_sufficiency_indicator: boolean (nullable = true)
| | | |-- value: double (nullable = true)
| | | |-- version: string (nullable = true)
| |-- reputation: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- data_sufficiency_indicator: boolean (nullable = true)
| | | |-- low_value_reason: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- value: double (nullable = true)
| | | |-- version: string (nullable = true)
| |-- visibility: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- data_sufficiency_indicator: boolean (nullable = true)
| | | |-- low_value_reason: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- value: double (nullable = true)
| | | |-- version: string (nullable = true)
Quiero traducir el esquema en el siguiente formato y obtener los valores de datos en las columnas correspondientes
root
|-- id_num: string (nullable = true)
|-- indexes_type: string (nullable = true) --> this field hold indexes struct elements as a row
|-- data_sufficiency_indicator: boolean (nullable = true)
|-- value: double (nullable = true)
|-- version: string (nullable = true)
|-- low_value_reason: string (nullable = true) --> each element in the array becomes a new row
Aquí están los datos de entrada de muestra en formato json:
{"id_num":"1234","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":2.16,"low_value_reason":["low scores from reviews_and_visits","low scores from online_presence"]}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}],"reputation":[{"version":"2.0","data_sufficiency_indicator":false}]}}
{"data_id":"5678","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":2.71,"low_value_reason":["low scores from reviews_and_visits","low scores from online_presence"]}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}]}}
{"data_id":"9876","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":3.06}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}],"reputation":[{"version":"2.0","data_sufficiency_indicator":false}]}}
Producto previsto
id_num | indexes_type | version | data_sufficiency_indicator | value | low_value_reason
==============================================================================================================
9999 visibility 2.0 true 2.16 low scores from reviews_and_visits
9999 visibility 2.0 true 2.16 low scores from online_presence
9999 customer_rating 2.0 false
9999 reputation 2.0 false
8888 visibility 2.0 true 2.71 low scores from reviews_and_visits
8888 visibility 2.0 true 2.71 low scores from online_presence
8888 customer_rating 2.0 false
7898 visibility 2.0 true 3.06
7898 customer_rating 2.0 false
7898 reputation 2.0 false
Cualquier ayuda en este maletín es muy apreciada. También es posible obtener la salida sin endurecer los valores struct en el código, ya que pueden extenderse más allá de lo que está en el ejemplo.
Pregunta hecha hace 3 años, 4 meses, 29 días - Por codecrafty
3 Respuestas:
-
Puedes establecer la columna
indexes
como MapType en lugar de StructType especificando explícitamente el esquema al cargar el marco de datos conspark.read.json()
, ver abajo:schema = "id_num string,indexes map
,value:double,version:string>>>" df = spark.read.json("/path/to/jsons", schema=schema) df.printSchema() root |-- id_num: string (nullable = true) |-- indexes: map (nullable = true) | |-- key: string | |-- value: array (valueContainsNull = true) | | |-- element: struct (containsNull = true) | | | |-- data_sufficiency_indicator: boolean (nullable = true) | | | |-- low_value_reason: array (nullable = true) | | | | |-- element: string (containsNull = true) | | | |-- value: double (nullable = true) | | | |-- version: string (nullable = true) y luego hacer seleccionar y explosion_outer/inline_outer varias veces para obtener el resultado deseado:
df_new = df.selectExpr("id_num", "explode_outer(indexes) as (indexes_type, vals)") \ .selectExpr("*", "inline_outer(vals)") \ .selectExpr( "id_num", "indexes_type", "version", "data_sufficiency_indicator", "value", "explode_outer(low_value_reason) as low_value_reason" ) df_new.show(truncate=False) +------+---------------+-------+--------------------------+-----+----------------------------------+ |id_num|indexes_type |version|data_sufficiency_indicator|value|low_value_reason | +------+---------------+-------+--------------------------+-----+----------------------------------+ |1234 |visibility |2.0 |true |2.16 |low scores from reviews_and_visits| |1234 |visibility |2.0 |true |2.16 |low scores from online_presence | |1234 |customer_rating|2.0 |false |null |null | |1234 |reputation |2.0 |false |null |null | |5678 |visibility |2.0 |true |2.71 |low scores from reviews_and_visits| |5678 |visibility |2.0 |true |2.71 |low scores from online_presence | |5678 |customer_rating|2.0 |false |null |null | |9876 |visibility |2.0 |true |3.06 |null | |9876 |customer_rating|2.0 |false |null |null | |9876 |reputation |2.0 |false |null |null | +------+---------------+-------+--------------------------+-----+----------------------------------+
BTW. Cambié.
data_id
aid_num
en tu muestra JSON que se supone que es tu tipo. si no, sólo añadirdata_id string
en el esquema y luego utilizarcoalesce(id_num,data_id)
para conseguir la finalid_num
columna.Por otro lado, también puede intentar utilizar
from_json
/to_json
función después de cargar el marco de datos sin especificar esquema, ver un ejemplo similar Aquí..Respondida el Dec 17, 2020 a las 00:06 - por devdynamo
Votos positivos: 0 | Votos negativos: 0 -
Para aplanar la estructura de datos y obtener la salida deseada en PySpark, puedes seguir los siguientes pasos:
- Cargar los datos en formato JSON en un DataFrame de PySpark.
- Utilizar la función `explode` para aplanar la estructura de arrays dentro del campo `indexes`.
- Seleccionar las columnas deseadas y renombrarlas según el formato esperado.
- Manejar el campo `low_value_reason` para convertir los elementos de la lista en nuevas filas.
- Mostrar el DataFrame resultante con la estructura requerida.
A continuación te muestro un ejemplo de código para realizar este proceso:
python from pyspark.sql import SparkSession from pyspark.sql.functions import explode
Crear la sesión de Spark
spark = SparkSession.builder.appName("Flatten Data").getOrCreate()Cargar los datos en un DataFrame
data = spark.read.json("ruta_del_archivo.json")Aplanar la estructura de arrays dentro de 'indexes'
flatten_data = data.select("id_num", explode("indexes").alias("indexes_type", "indexes_values"))Seleccionar y renombrar las columnas deseadas
result_data = flatten_data.select( "id_num", "indexes_type", "indexes_values.version", "indexes_values.data_sufficiency_indicator", "indexes_values.value", explode("indexes_values.low_value_reason").alias("low_value_reason") )Mostrar el DataFrame resultante
result_data.show()Con este código, deberías poder obtener el resultado deseado a partir de los datos de entrada proporcionados. Recuerda ajustar la ruta del archivo JSON y renombrar las columnas según sea necesario en tu caso específico. ¡Espero que esta información te sea útil! Si tienes alguna otra pregunta o necesitas más ayuda, no dudes en decírmelo.
Respondida el Dec 18, 2020 a las 00:07 - por Gpt
Votos positivos: 0 | Votos negativos: 0 -
python from pyspark.sql import functions as F from pyspark.sql.types import
Define the input schema
input_schema = StructType([ StructField("id_num", StringType(), True), StructField("indexes", StructType([ StructField("customer_rating", ArrayType(StructType([ StructField("data_sufficiency_indicator", BooleanType(), True), StructField("value", DoubleType(), True), StructField("version", StringType(), True) ])), True), StructField("reputation", ArrayType(StructType([ StructField("data_sufficiency_indicator", BooleanType(), True), StructField("low_value_reason", ArrayType(StringType()), True), StructField("value", DoubleType(), True), StructField("version", StringType(), True) ])), True), StructField("visibility", ArrayType(StructType([ StructField("data_sufficiency_indicator", BooleanType(), True), StructField("low_value_reason", ArrayType(StringType()), True), StructField("value", DoubleType(), True), StructField("version", StringType(), True) ])), True) ])), True) ])Create a DataFrame from the input data
input_df = spark.read.json(sc.parallelize([ {"id_num": "1234", "indexes": {"visibility": [{"version": "2.0", "data_sufficiency_indicator": True, "value": 2.16, "low_value_reason": ["low scores from reviews_and_visits", "low scores from online_presence"]}], "customer_rating": [{"version": "2.0", "data_sufficiency_indicator": False}], "reputation": [{"version": "2.0", "data_sufficiency_indicator": False}]}}, {"data_id": "5678", "indexes": {"visibility": [{"version": "2.0", "data_sufficiency_indicator": True, "value": 2.71, "low_value_reason": ["low scores from reviews_and_visits", "low scores from online_presence"]}], "customer_rating": [{"version": "2.0", "data_sufficiency_indicator": False}]}}, {"data_id": "9876", "indexes": {"visibility": [{"version": "2.0", "data_sufficiency_indicator": True, "value": 3.06}], "customer_rating": [{"version": "2.0", "data_sufficiency_indicator": False}], "reputation": [{"version": "2.0", "data_sufficiency_indicator": False}]}} ]), input_schema)Define the desired schema
output_schema = StructType([ StructField("id_num", StringType(), True), StructField("indexes_type", StringType(), True), StructField("data_sufficiency_indicator", BooleanType(), True), StructField("value", DoubleType(), True), StructField("version", StringType(), True), StructField("low_value_reason", StringType(), True) ])Flatten the input DataFrame
flattened_df = input_df.select(input_df.id_num, F.explode(F.arrays_zip([F.array([lit(k) for k in input_df.indexes.dtype]]) + [input_df.indexes[k]] for k in input_df.indexes.dtype])).alias("indexes"))Extract the desired columns from the flattened DataFrame
output_df = flattened_df.select(flattened_df.id_num, flattened_df.indexes.col1.alias("indexes_type"), flattened_df.indexes.col2.col2.alias("data_sufficiency_indicator"), flattened_df.indexes.col2.col3.alias("value"), flattened_df.indexes.col2.col4.alias("version"), F.array_join(flattened_df.indexes.col2.col5, ", ").alias("low_value_reason"))Cast the output DataFrame to the desired schema
output_df = output_df.select([F.col(c).cast(output_schema[c].dataType) for c in output_df.columns])Show the output DataFrame
output_df.show()Respondida el Dec 18, 2020 a las 00:15 - por Gemini
Votos positivos: 0 | Votos negativos: 0