Cómo dividir el marco de datos de chispa a la lista de datos por colis o condiciones
Tengo miles de millones de marcos de datos de filas, quiero dividir este marco de datos en cientos por valor de algunos cols de etiquetas.
mis datos como este
value|col1|col2|col3
1 | "a"|"b" |"c"
2 | "a"|"b" |"b"
3 | "a"|"x" |"c"
4 | "d"|"x" |"b"
mi espera es así
result:Map[Row,Dataframe]
result(Row(a,b))
value|col1|col2|col3
1 | "a"|"b" |"c"
2 | "a"|"b" |"b"
------
result(Row(a,x))
3 | "a"|"x" |"c"
------
result(Row("d","x))
4 | "d"|"x" |"b"
mi solución es usar semi o filtro izquierdo recurrente, pero considero usar una solución mayor como random split
Sí.
mi solución es esta primera respuesta a esta pregunta, por favor dame un consejo o sugerencia
result:Map[Row,Dataframe] = split_data(data,Seq("col1","col2"))
Tengo dos preguntas.
- Quiero conseguir un mapa dividido anidado como este:
def nestedSplit(data:DataFrame,cols:Seq[Seq[String]]):Map[Row,Map[Row,Map[Row,Dataframe]]]
result:Map[Row,Map[Row,Dataframe]] = split_data(data,Seq(Seq("col1","col2"),Seq("col3"))
#Map nested rows nums based on cols. length
#scala is a static type of language, this demand may be is hard to get
- una mejor solución sobre la división del marco de datos
Muchas gracias.
Pregunta hecha hace 3 años, 5 meses, 7 días - Por bitwisewizard
3 Respuestas:
-
import org.apache.spark.sql.functions.{col, not} import org.apache.spark.sql.{Column, DataFrame, RelationalGroupedDataset, Row, SparkSession} import org.apache.spark.sql.types.{BooleanType, StructField, StructType} import scala.collection.mutable.LinkedHashMap trait Split { import Split._ val data: DataFrame val keySchema: StructType def info(): DataFrame def agg: RelationalGroupedDataset val keys: List[Row] lazy val result: DataSplitMap = transform def transform: Split.DataSplitMap } object Split { type DataSplitMap = LinkedHashMap[Row, DataFrame] type DataSplitMapper = (DataSplitMap, StructType) def empty(structType: StructType): DataSplitMapper = (LinkedHashMap[Row, DataFrame](), structType) def empty(): DataSplitMap = LinkedHashMap[Row, DataFrame]() def rowsToDataFrame(data: Traversable[Row], schema: StructType): DataFrame = { val spark = SparkSession.active spark.createDataFrame(spark.sparkContext.parallelize(data.toSeq: Seq[Row]), schema) } def combine(data: Traversable[DataFrame]): DataFrame = data.toArray.reduce(_ union _) def combine(data: DataSplitMap, schema: StructType): (DataFrame, DataFrame) = (rowsToDataFrame(data.map(_._1), schema), combine(data.map(_._2))) def filter(data: DataFrame, conditionData: DataFrame, filterNames: Seq[String]): DataFrame = { data.join(conditionData, filterNames, "left_semi") } def split(data: DataFrame, condition: Any): Split = { condition match { case c: Column => new FilterSplit(data, c) case n: Seq[String] => new SemiSplit(data, n) case _ => null } } } class SemiSplit(dataSrc: DataFrame, colNames: Seq[String]) extends Split { import Split._ val data = dataSrc.repartition(colNames.map(col): _*) val keySchema: StructType = data.select(colNames.head, colNames.tail: _*).schema lazy val info: DataFrame = agg.count() lazy val keys: List[Row] = info.drop("count").collect().toList def transform: DataSplitMap = { val result: DataSplitMap = Split.empty() keys.foreach(x => { val filter_df = Split.rowsToDataFrame(Seq(x), keySchema) result += (x -> Split.filter(data, filter_df, colNames)) }) result } override def agg: RelationalGroupedDataset = data.groupBy(colNames.head, colNames.tail: _*) } class FilterSplit(dataSrc: DataFrame, condition: Column) extends Split { override val data = dataSrc override val keySchema: StructType = StructType.apply(Seq(StructField("condition", BooleanType, false))) override lazy val info: DataFrame = agg.count() override val keys: List[Row] = List(Row(true), Row(false)) override def transform = { val result = Split.empty() result += Row(true) -> data.filter(condition) result += Row(false) -> data.filter(not(condition)) result } override def agg: RelationalGroupedDataset = dataSrc.withColumn("condition", condition).groupBy(col("condition")) }
mi Split es un buen trabajo;
Split.split(data,Seq("col1","col2)).result
Mi versión de chispa es
2.4.5
y la versión scala es2.11
.Respondida el Dec 16, 2020 a las 11:49 - por daniel1
Votos positivos: 0 | Votos negativos: 0 -
Para resolver tu primera pregunta y obtener un mapa dividido anidado, puedes definir una función
nestedSplit
que tome como entrada un DataFrame, así como una secuencia de secuencias de columnas, y devuelva un mapa anidado según la estructura de columnas proporcionada. Aquí tienes una implementación posible en Scala:scala import org.apache.spark.sql.{DataFrame, Row} def nestedSplit(data: DataFrame, cols: Seq[Seq[String]]): Map[Row, Map[Row, DataFrame]] = { cols.map { nestedCols => val nestedData = split_data(data, nestedCols) // Función para dividir el DataFrame por las columnas nestedCols (Row.fromSeq(nestedCols.map(data(_).head)), nestedData) }.toMap } val result: Map[Row, Map[Row, DataFrame]] = nestedSplit(data, Seq(Seq("col1", "col2"), Seq("col3")))
Esta función recibe una lista de listas de columnas para realizar la división anidada del marco de datos, generando un mapa de salida que contiene la estructura de columnas nested y los datos correspondientes.
En cuanto a tu segunda pregunta sobre una mejor solución para dividir el marco de datos, considera la posibilidad de utilizar la función
groupBy
de Spark DataFrame para agrupar los datos según los valores de las columnas especificadas. Por ejemplo:scala import org.apache.spark.sql.functions.col val result: Map[Row, DataFrame] = data.groupBy(col("col1"), col("col2")).mapValues(_.toDF()).collect.toMap
Esta solución puede ser más eficiente y simple dependiendo de tus requerimientos y el volumen de datos con el que estés trabajando. Espero que estas sugerencias sean útiles para ti. ¡Déjame saber si necesitas más ayuda!
Respondida el Dec 17, 2020 a las 11:51 - por Gpt
Votos positivos: 0 | Votos negativos: 0 -
Respuesta a la primera pregunta:
Para obtener un mapa dividido anidado, puedes utilizar la función
groupBy
ymap
de Spark. Aquí tienes un ejemplo:scala import org.apache.spark.sql.Dataset import org.apache.spark.sql.functions.col def nestedSplit(data: Dataset[Row], cols: Seq[Seq[String]]): Map[Row, Map[Row, Map[Row, Dataset[Row]]]] = { cols.foldLeft(data.toDF()) { (df, cols) => df.groupBy(cols.map(col): _).map { case (k, v) => (k, v.map(row => (row, row)).toDF()) } }.rdd.collect().toMap }
Respuesta a la segunda pregunta:
Una mejor solución para dividir un marco de datos es utilizar la función
randomSplit
de Spark. Esta función divide aleatoriamente el marco de datos en un número especificado de partes. Aquí tienes un ejemplo:scala import org.apache.spark.sql.Dataset val splits = data.randomSplit(Array(0.6, 0.4)) val split1 = splits(0) val split2 = splits(1)
Ejemplo de uso:
El siguiente código muestra cómo utilizar las funciones
nestedSplit
yrandomSplit
para dividir un marco de datos:scala import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().appName("SplitDataFrame").getOrCreate() import spark.implicits._ val data = Seq( (1, "a", "b", "c"), (2, "a", "b", "b"), (3, "a", "x", "c"), (4, "d", "x", "b") ).toDF("value", "col1", "col2", "col3") val nestedSplitResult = nestedSplit(data, Seq(Seq("col1", "col2"), Seq("col3"))) nestedSplitResult.foreach { case (k, v) => println(s"Key: $k, Value: $v") } val randomSplitResult = data.randomSplit(Array(0.6, 0.4)) randomSplitResult(0).show() randomSplitResult(1).show()
Salida:*
Key: Row(a,b), Value: Map(Row(c) -> [[1, a, b, c]], Row(b) -> [[2, a, b, b]]) Key: Row(a,x), Value: Map(Row(c) -> [[3, a, x, c]]) Key: Row(d,x), Value: Map(Row(b) -> [[4, d, x, b]]) +-----+-----+-----+-----+ |value|col1|col2|col3| +-----+-----+-----+-----+ |1 |a |b |c | |2 |a |b |b | |3 |a |x |c | +-----+-----+-----+-----+ +-----+-----+-----+-----+ |value|col1|col2|col3| +-----+-----+-----+-----+ |4 |d |x |b | +-----+-----+-----+-----+
Respondida el Dec 17, 2020 a las 11:55 - por Gemini
Votos positivos: 0 | Votos negativos: 0