Pyspark para aplanar una matriz y explotar una estructura para obtener la salida deseada

Tengo un dato con abajo Schema: el atributo índice es Struct -- ratio con array -- ratio cada elemento array dentro de struct

root
 |-- id_num: string (nullable = true)
 |-- indexes: struct (nullable = true)
 |    |-- customer_rating: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)
 |    |-- reputation: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- low_value_reason: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)
 |    |-- visibility: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
 |    |    |    |-- low_value_reason: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- value: double (nullable = true)
 |    |    |    |-- version: string (nullable = true)

Quiero traducir el esquema en el siguiente formato y obtener los valores de datos en las columnas correspondientes

root
 |-- id_num: string (nullable = true)
 |-- indexes_type: string (nullable = true)    --> this field hold indexes struct elements as a row
 |-- data_sufficiency_indicator: boolean (nullable = true)
 |-- value: double (nullable = true)
 |-- version: string (nullable = true)
 |-- low_value_reason: string (nullable = true)  --> each element in the array becomes a new row

Aquí están los datos de entrada de muestra en formato json:

{"id_num":"1234","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":2.16,"low_value_reason":["low scores from reviews_and_visits","low scores from online_presence"]}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}],"reputation":[{"version":"2.0","data_sufficiency_indicator":false}]}}
{"data_id":"5678","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":2.71,"low_value_reason":["low scores from reviews_and_visits","low scores from online_presence"]}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}]}}
{"data_id":"9876","indexes":{"visibility":[{"version":"2.0","data_sufficiency_indicator":true,"value":3.06}],"customer_rating":[{"version":"2.0","data_sufficiency_indicator":false}],"reputation":[{"version":"2.0","data_sufficiency_indicator":false}]}}

Producto previsto

id_num  |   indexes_type    |   version |   data_sufficiency_indicator | value  |   low_value_reason
==============================================================================================================
9999        visibility          2.0             true                    2.16        low scores from reviews_and_visits
9999        visibility          2.0             true                    2.16        low scores from online_presence
9999        customer_rating     2.0             false
9999        reputation          2.0             false
8888        visibility          2.0             true                    2.71        low scores from reviews_and_visits  
8888        visibility          2.0             true                    2.71        low scores from online_presence
8888        customer_rating     2.0             false
7898        visibility          2.0             true                    3.06
7898        customer_rating     2.0             false       
7898        reputation          2.0             false

Cualquier ayuda en este maletín es muy apreciada. También es posible obtener la salida sin endurecer los valores struct en el código, ya que pueden extenderse más allá de lo que está en el ejemplo.

Pregunta hecha hace 3 años, 4 meses, 29 días - Por codecrafty


3 Respuestas:

  • Puedes establecer la columna indexes como MapType en lugar de StructType especificando explícitamente el esquema al cargar el marco de datos con spark.read.json(), ver abajo:

    schema = "id_num string,indexes map,value:double,version:string>>>"
    
    df = spark.read.json("/path/to/jsons", schema=schema)
    
    df.printSchema()
    root
     |-- id_num: string (nullable = true)
     |-- indexes: map (nullable = true)
     |    |-- key: string
     |    |-- value: array (valueContainsNull = true)
     |    |    |-- element: struct (containsNull = true)
     |    |    |    |-- data_sufficiency_indicator: boolean (nullable = true)
     |    |    |    |-- low_value_reason: array (nullable = true)
     |    |    |    |    |-- element: string (containsNull = true)
     |    |    |    |-- value: double (nullable = true)
     |    |    |    |-- version: string (nullable = true)
    

    y luego hacer seleccionar y explosion_outer/inline_outer varias veces para obtener el resultado deseado:

    df_new = df.selectExpr("id_num", "explode_outer(indexes) as (indexes_type, vals)") \
        .selectExpr("*", "inline_outer(vals)") \
        .selectExpr(
            "id_num",
            "indexes_type",
            "version",
            "data_sufficiency_indicator",
            "value",
            "explode_outer(low_value_reason) as low_value_reason"
        )
    
    df_new.show(truncate=False)
    +------+---------------+-------+--------------------------+-----+----------------------------------+
    |id_num|indexes_type   |version|data_sufficiency_indicator|value|low_value_reason                  |
    +------+---------------+-------+--------------------------+-----+----------------------------------+
    |1234  |visibility     |2.0    |true                      |2.16 |low scores from reviews_and_visits|
    |1234  |visibility     |2.0    |true                      |2.16 |low scores from online_presence   |
    |1234  |customer_rating|2.0    |false                     |null |null                              |
    |1234  |reputation     |2.0    |false                     |null |null                              |
    |5678  |visibility     |2.0    |true                      |2.71 |low scores from reviews_and_visits|
    |5678  |visibility     |2.0    |true                      |2.71 |low scores from online_presence   |
    |5678  |customer_rating|2.0    |false                     |null |null                              |
    |9876  |visibility     |2.0    |true                      |3.06 |null                              |
    |9876  |customer_rating|2.0    |false                     |null |null                              |
    |9876  |reputation     |2.0    |false                     |null |null                              |
    +------+---------------+-------+--------------------------+-----+----------------------------------+
    

    BTW. Cambié. data_id a id_num en tu muestra JSON que se supone que es tu tipo. si no, sólo añadir data_id string en el esquema y luego utilizar coalesce(id_num,data_id) para conseguir la final id_num columna.

    Por otro lado, también puede intentar utilizar from_json/to_json función después de cargar el marco de datos sin especificar esquema, ver un ejemplo similar Aquí..

    Respondida el Dec 17, 2020 a las 00:06 - por devdynamo

    Votos positivos: 0 | Votos negativos: 0

  • Para aplanar la estructura de datos y obtener la salida deseada en PySpark, puedes seguir los siguientes pasos:

    • Cargar los datos en formato JSON en un DataFrame de PySpark.
    • Utilizar la función `explode` para aplanar la estructura de arrays dentro del campo `indexes`.
    • Seleccionar las columnas deseadas y renombrarlas según el formato esperado.
    • Manejar el campo `low_value_reason` para convertir los elementos de la lista en nuevas filas.
    • Mostrar el DataFrame resultante con la estructura requerida.

    A continuación te muestro un ejemplo de código para realizar este proceso:

    python
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import explode
    
    

    Crear la sesión de Spark

    spark = SparkSession.builder.appName("Flatten Data").getOrCreate()

    Cargar los datos en un DataFrame

    data = spark.read.json("ruta_del_archivo.json")

    Aplanar la estructura de arrays dentro de 'indexes'

    flatten_data = data.select("id_num", explode("indexes").alias("indexes_type", "indexes_values"))

    Seleccionar y renombrar las columnas deseadas

    result_data = flatten_data.select( "id_num", "indexes_type", "indexes_values.version", "indexes_values.data_sufficiency_indicator", "indexes_values.value", explode("indexes_values.low_value_reason").alias("low_value_reason") )

    Mostrar el DataFrame resultante

    result_data.show()

    Con este código, deberías poder obtener el resultado deseado a partir de los datos de entrada proporcionados. Recuerda ajustar la ruta del archivo JSON y renombrar las columnas según sea necesario en tu caso específico. ¡Espero que esta información te sea útil! Si tienes alguna otra pregunta o necesitas más ayuda, no dudes en decírmelo.

    Respondida el Dec 18, 2020 a las 00:07 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • python
    from pyspark.sql import functions as F
    from pyspark.sql.types import 
    
    

    Define the input schema

    input_schema = StructType([ StructField("id_num", StringType(), True), StructField("indexes", StructType([ StructField("customer_rating", ArrayType(StructType([ StructField("data_sufficiency_indicator", BooleanType(), True), StructField("value", DoubleType(), True), StructField("version", StringType(), True) ])), True), StructField("reputation", ArrayType(StructType([ StructField("data_sufficiency_indicator", BooleanType(), True), StructField("low_value_reason", ArrayType(StringType()), True), StructField("value", DoubleType(), True), StructField("version", StringType(), True) ])), True), StructField("visibility", ArrayType(StructType([ StructField("data_sufficiency_indicator", BooleanType(), True), StructField("low_value_reason", ArrayType(StringType()), True), StructField("value", DoubleType(), True), StructField("version", StringType(), True) ])), True) ])), True) ])

    Create a DataFrame from the input data

    input_df = spark.read.json(sc.parallelize([ {"id_num": "1234", "indexes": {"visibility": [{"version": "2.0", "data_sufficiency_indicator": True, "value": 2.16, "low_value_reason": ["low scores from reviews_and_visits", "low scores from online_presence"]}], "customer_rating": [{"version": "2.0", "data_sufficiency_indicator": False}], "reputation": [{"version": "2.0", "data_sufficiency_indicator": False}]}}, {"data_id": "5678", "indexes": {"visibility": [{"version": "2.0", "data_sufficiency_indicator": True, "value": 2.71, "low_value_reason": ["low scores from reviews_and_visits", "low scores from online_presence"]}], "customer_rating": [{"version": "2.0", "data_sufficiency_indicator": False}]}}, {"data_id": "9876", "indexes": {"visibility": [{"version": "2.0", "data_sufficiency_indicator": True, "value": 3.06}], "customer_rating": [{"version": "2.0", "data_sufficiency_indicator": False}], "reputation": [{"version": "2.0", "data_sufficiency_indicator": False}]}} ]), input_schema)

    Define the desired schema

    output_schema = StructType([ StructField("id_num", StringType(), True), StructField("indexes_type", StringType(), True), StructField("data_sufficiency_indicator", BooleanType(), True), StructField("value", DoubleType(), True), StructField("version", StringType(), True), StructField("low_value_reason", StringType(), True) ])

    Flatten the input DataFrame

    flattened_df = input_df.select(input_df.id_num, F.explode(F.arrays_zip(
    [F.array([lit(k) for k in input_df.indexes.dtype]]) + [input_df.indexes[k]] for k in input_df.indexes.dtype])).alias("indexes"))

    Extract the desired columns from the flattened DataFrame

    output_df = flattened_df.select(flattened_df.id_num, flattened_df.indexes.col1.alias("indexes_type"), flattened_df.indexes.col2.col2.alias("data_sufficiency_indicator"), flattened_df.indexes.col2.col3.alias("value"), flattened_df.indexes.col2.col4.alias("version"), F.array_join(flattened_df.indexes.col2.col5, ", ").alias("low_value_reason"))

    Cast the output DataFrame to the desired schema

    output_df = output_df.select([F.col(c).cast(output_schema[c].dataType) for c in output_df.columns])

    Show the output DataFrame

    output_df.show()

    Respondida el Dec 18, 2020 a las 00:15 - por Gemini

    Votos positivos: 0 | Votos negativos: 0