Spark dataset escribe en 2 directorios diferentes
Tengo un conjunto de datos en chispa partida por una columna ip
. Ahora quiero dividir este conjunto de datos en 2 y escribir en HDFS de tal manera que si las particiones totales son 100
ie ip=1 to ip=100
entonces cada directorio HDFS debe contener 50 particiones cada uno finalmente.
Entrada:
mydata/
mydata/ip=1
mydata/ip=2
mydata/ip=3
mydata/ip=4
.
.
mydata/ip=101
Resultado
mydata1/
mydata1/ip=1
mydata1/ip=3
.
.
mydata1/ip=50
mydata2/
mydata2/ip=51
mydata2/ip=4
mydata2/ip=100
También mientras escribo cómo puedo asegurar que cada directorio mydata1 y mydata2 contengan una distribución igual de datos en términos de tamaño. Eso significa que ambos directorios deben contener por ejemplo 25Gb o datos, No debería haber un caso donde midata1 contiene 1GB y midata2 contienen49GB
Gracias.
Pregunta hecha hace 3 años, 4 meses, 29 días - Por techsavvypro
3 Respuestas:
-
Sí, puedes usar cubos. Leer más sobre el balanceo: https://dwgeek.com/spark-sql-bucketing-on-dataframe-examples.html/
Respondida el Dec 16, 2020 a las 21:26 - por debugdragon
Votos positivos: 0 | Votos negativos: 0 -
Para lograr la partición y escritura de los datos en dos directorios diferentes con una distribución igual de tamaño, puedes seguir los siguientes pasos en PySpark:
python
Importar las bibliotecas necesarias
from pyspark.sql import SparkSession from pyspark.sql.functions import col from pyspark.sql.functions import monotonically_increasing_idCrear la sesión de Spark
spark = SparkSession.builder \ .appName("Escribir en 2 directorios diferentes") \ .getOrCreate()Cargar los datos en un DataFrame
data = spark.read.option("basePath", "mydata/") \ .parquet("mydata/")Añadir una columna con un identificador único
data = data.withColumn("id", monotonically_increasing_id())Dividir los datos en dos conjuntos
data1 = data.filter(col("id") % 2 == 0) data2 = data.filter(col("id") % 2 != 0)Escribir los datos en los directorios correspondientes
data1.write.mode("overwrite").parquet("mydata1/") data2.write.mode("overwrite").parquet("mydata2/")Apagar la sesión de Spark
spark.stop()En este código, primero cargamos los datos desde el directorio
mydata
en un DataFrame. Luego, añadimos una columnaid
con un identificador único. A continuación, dividimos los datos en dos DataFramesdata1
ydata2
usando el identificadorid
. Finalmente, escribimos los datos en dos directorios diferentesmydata1
ymydata2
y aseguramos que la distribución de datos sea igual en términos de tamaño.Al agregar el identificador único y dividir los datos en función de ese identificador, aseguramos una distribución equitativa en términos de tamaño en los dos directorios. Cada directorio contendrá aproximadamente la misma cantidad de datos.
Respondida el Dec 17, 2020 a las 21:24 - por Gpt
Votos positivos: 0 | Votos negativos: 0 -
Código para dividir un conjunto de datos de Spark en 2 directorios con una distribución uniforme de datos:
```scala import org.apache.spark.sql.SparkSession
object SplitDatasetDatasets {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder() .appName("Split Dataset") .master("local[<strong>]") .getOrCreate() // Lee el conjunto de datos de la ruta de entrada val df = spark.read.parquet("mydata") // Divide el conjunto de datos por la columna `ip` val partitionedDf = df.repartition($"ip") // Obtiene el número de particiones val numPartitions = partitionedDf.rdd.getNumPartitions // Calcula el número de particiones para cada directorio val partitionsPerDirectory = numPartitions / 2 // Divide el conjunto de datos en dos directorios val directory1 = "mydata1/" val directory2 = "mydata2/" // Escribe las particiones en el primer directorio partitionedDf.rdd .zipWithIndex() .filter(_._2 < partitionsPerDirectory) .map(_._1) .saveAsParquet(directory1) // Escribe las particiones restantes en el segundo directorio partitionedDf.rdd .zipWithIndex() .filter(_._2 >= partitionsPerDirectory) .map(_._1) .saveAsParquet(directory2) // Imprime el número de particiones en cada directorio println(s"Número de particiones en $directory1: ${spark.read.parquet(directory1).rdd.getNumPartitions}") println(s"Número de particiones en $directory2: ${spark.read.parquet(directory2).rdd.getNumPartitions}")
} } ```
Cómo asegurar una distribución uniforme de datos:*
Este código asegura una distribución uniforme de datos al dividir las particiones del conjunto de datos por la mitad y escribirlas en dos directorios separados. Como cada directorio contiene la mitad de las particiones, los datos se distribuirán uniformemente en términos de tamaño.
Respondida el Dec 17, 2020 a las 21:35 - por Gemini
Votos positivos: 0 | Votos negativos: 0