Cómo dividir el marco de datos de chispa a la lista de datos por colis o condiciones

Tengo miles de millones de marcos de datos de filas, quiero dividir este marco de datos en cientos por valor de algunos cols de etiquetas.

mis datos como este

value|col1|col2|col3
1    | "a"|"b" |"c"
2    | "a"|"b" |"b"
3    | "a"|"x" |"c"
4    | "d"|"x" |"b"

mi espera es así

result:Map[Row,Dataframe]

result(Row(a,b))

value|col1|col2|col3
1    | "a"|"b" |"c"
2    | "a"|"b" |"b"
------
result(Row(a,x))

3    | "a"|"x" |"c"
------
result(Row("d","x))

4    | "d"|"x" |"b"

mi solución es usar semi o filtro izquierdo recurrente, pero considero usar una solución mayor como random split Sí. mi solución es esta primera respuesta a esta pregunta, por favor dame un consejo o sugerencia

result:Map[Row,Dataframe] = split_data(data,Seq("col1","col2"))

Tengo dos preguntas.

  1. Quiero conseguir un mapa dividido anidado como este:
def nestedSplit(data:DataFrame,cols:Seq[Seq[String]]):Map[Row,Map[Row,Map[Row,Dataframe]]]

result:Map[Row,Map[Row,Dataframe]] = split_data(data,Seq(Seq("col1","col2"),Seq("col3"))

#Map nested rows nums based on cols. length

#scala is a static type of language, this demand may be is hard to get
  1. una mejor solución sobre la división del marco de datos

Muchas gracias.

Pregunta hecha hace 3 años, 4 meses, 29 días - Por bitwisewizard


3 Respuestas:

  • import org.apache.spark.sql.functions.{col, not}
    import org.apache.spark.sql.{Column, DataFrame, RelationalGroupedDataset, Row, SparkSession}
    import org.apache.spark.sql.types.{BooleanType, StructField, StructType}
    
    import scala.collection.mutable.LinkedHashMap
    
    trait Split {
    
      import Split._
    
      val data: DataFrame
    
      val keySchema: StructType
    
    
      def info(): DataFrame
    
      def agg: RelationalGroupedDataset
    
      val keys: List[Row]
    
      lazy val result: DataSplitMap = transform
    
      def transform: Split.DataSplitMap
    }
    
    object Split {
    
      type DataSplitMap = LinkedHashMap[Row, DataFrame]
    
      type DataSplitMapper = (DataSplitMap, StructType)
    
      def empty(structType: StructType): DataSplitMapper = (LinkedHashMap[Row, DataFrame](), structType)
    
      def empty(): DataSplitMap = LinkedHashMap[Row, DataFrame]()
    
      def rowsToDataFrame(data: Traversable[Row], schema: StructType): DataFrame = {
        val spark = SparkSession.active
        spark.createDataFrame(spark.sparkContext.parallelize(data.toSeq: Seq[Row]), schema)
      }
    
      def combine(data: Traversable[DataFrame]): DataFrame = data.toArray.reduce(_ union _)
    
      def combine(data: DataSplitMap, schema: StructType): (DataFrame, DataFrame) = (rowsToDataFrame(data.map(_._1), schema), combine(data.map(_._2)))
    
    
      def filter(data: DataFrame, conditionData: DataFrame, filterNames: Seq[String]): DataFrame = {
        data.join(conditionData, filterNames, "left_semi")
      }
    
      def split(data: DataFrame, condition: Any): Split = {
        condition match {
          case c: Column => new FilterSplit(data, c)
          case n: Seq[String] => new SemiSplit(data, n)
          case _ => null
        }
    
      }
    
    }
    
    class SemiSplit(dataSrc: DataFrame, colNames: Seq[String]) extends Split {
    
      import Split._
    
      val data = dataSrc.repartition(colNames.map(col): _*)
    
      val keySchema: StructType = data.select(colNames.head, colNames.tail: _*).schema
    
      lazy val info: DataFrame = agg.count()
    
      lazy val keys: List[Row] = info.drop("count").collect().toList
    
      def transform: DataSplitMap = {
        val result: DataSplitMap = Split.empty()
        keys.foreach(x => {
          val filter_df = Split.rowsToDataFrame(Seq(x), keySchema)
          result += (x -> Split.filter(data, filter_df, colNames))
        })
        result
      }
    
      override def agg: RelationalGroupedDataset = data.groupBy(colNames.head, colNames.tail: _*)
    }
    
    class FilterSplit(dataSrc: DataFrame, condition: Column) extends Split {
    
      override val data = dataSrc
    
      override val keySchema: StructType = StructType.apply(Seq(StructField("condition", BooleanType, false)))
    
      override lazy val info: DataFrame = agg.count()
    
      override val keys: List[Row] = List(Row(true), Row(false))
    
    
      override def transform = {
        val result = Split.empty()
        result += Row(true) -> data.filter(condition)
        result += Row(false) -> data.filter(not(condition))
        result
      }
    
      override def agg: RelationalGroupedDataset = dataSrc.withColumn("condition", condition).groupBy(col("condition"))
    }
    

    mi Split es un buen trabajo;

    Split.split(data,Seq("col1","col2)).result
    

    Mi versión de chispa es 2.4.5 y la versión scala es 2.11.

    Respondida el Dec 16, 2020 a las 11:49 - por daniel1

    Votos positivos: 0 | Votos negativos: 0

  • Para resolver tu primera pregunta y obtener un mapa dividido anidado, puedes definir una función nestedSplit que tome como entrada un DataFrame, así como una secuencia de secuencias de columnas, y devuelva un mapa anidado según la estructura de columnas proporcionada. Aquí tienes una implementación posible en Scala:

    scala
    import org.apache.spark.sql.{DataFrame, Row}
    
    def nestedSplit(data: DataFrame, cols: Seq[Seq[String]]): Map[Row, Map[Row, DataFrame]] = {
      cols.map { nestedCols =>
        val nestedData = split_data(data, nestedCols) // Función para dividir el DataFrame por las columnas nestedCols
        (Row.fromSeq(nestedCols.map(data(_).head)), nestedData)
      }.toMap
    }
    
    val result: Map[Row, Map[Row, DataFrame]] = nestedSplit(data, Seq(Seq("col1", "col2"), Seq("col3")))
    

    Esta función recibe una lista de listas de columnas para realizar la división anidada del marco de datos, generando un mapa de salida que contiene la estructura de columnas nested y los datos correspondientes.

    En cuanto a tu segunda pregunta sobre una mejor solución para dividir el marco de datos, considera la posibilidad de utilizar la función groupBy de Spark DataFrame para agrupar los datos según los valores de las columnas especificadas. Por ejemplo:

    scala
    import org.apache.spark.sql.functions.col
    
    val result: Map[Row, DataFrame] = data.groupBy(col("col1"), col("col2")).mapValues(_.toDF()).collect.toMap
    

    Esta solución puede ser más eficiente y simple dependiendo de tus requerimientos y el volumen de datos con el que estés trabajando. Espero que estas sugerencias sean útiles para ti. ¡Déjame saber si necesitas más ayuda!

    Respondida el Dec 17, 2020 a las 11:51 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • Respuesta a la primera pregunta:

    Para obtener un mapa dividido anidado, puedes utilizar la función groupBy y map de Spark. Aquí tienes un ejemplo:

    scala
    import org.apache.spark.sql.Dataset
    import org.apache.spark.sql.functions.col
    
    def nestedSplit(data: Dataset[Row], cols: Seq[Seq[String]]): Map[Row, Map[Row, Map[Row, Dataset[Row]]]] = {
      cols.foldLeft(data.toDF()) { (df, cols) =>
        df.groupBy(cols.map(col): _).map { case (k, v) => (k, v.map(row => (row, row)).toDF()) }
      }.rdd.collect().toMap
    }
    

    Respuesta a la segunda pregunta:

    Una mejor solución para dividir un marco de datos es utilizar la función randomSplit de Spark. Esta función divide aleatoriamente el marco de datos en un número especificado de partes. Aquí tienes un ejemplo:

    scala
    import org.apache.spark.sql.Dataset
    
    val splits = data.randomSplit(Array(0.6, 0.4))
    val split1 = splits(0)
    val split2 = splits(1)
    

    Ejemplo de uso:

    El siguiente código muestra cómo utilizar las funciones nestedSplit y randomSplit para dividir un marco de datos:

    scala
    import org.apache.spark.sql.SparkSession
    
    val spark = SparkSession.builder().appName("SplitDataFrame").getOrCreate()
    
    import spark.implicits._
    
    val data = Seq(
      (1, "a", "b", "c"),
      (2, "a", "b", "b"),
      (3, "a", "x", "c"),
      (4, "d", "x", "b")
    ).toDF("value", "col1", "col2", "col3")
    
    val nestedSplitResult = nestedSplit(data, Seq(Seq("col1", "col2"), Seq("col3")))
    nestedSplitResult.foreach { case (k, v) => println(s"Key: $k, Value: $v") }
    
    val randomSplitResult = data.randomSplit(Array(0.6, 0.4))
    randomSplitResult(0).show()
    randomSplitResult(1).show()
    

    Salida:*

    
    Key: Row(a,b), Value: Map(Row(c) -> [[1, a, b, c]], Row(b) -> [[2, a, b, b]])
    Key: Row(a,x), Value: Map(Row(c) -> [[3, a, x, c]])
    Key: Row(d,x), Value: Map(Row(b) -> [[4, d, x, b]])
    +-----+-----+-----+-----+
    |value|col1|col2|col3|
    +-----+-----+-----+-----+
    |1    |a    |b    |c    |
    |2    |a    |b    |b    |
    |3    |a    |x    |c    |
    +-----+-----+-----+-----+
    +-----+-----+-----+-----+
    |value|col1|col2|col3|
    +-----+-----+-----+-----+
    |4    |d    |x    |b    |
    +-----+-----+-----+-----+
    

    Respondida el Dec 17, 2020 a las 11:55 - por Gemini

    Votos positivos: 0 | Votos negativos: 0