Spark dataset escribe en 2 directorios diferentes

Tengo un conjunto de datos en chispa partida por una columna ip. Ahora quiero dividir este conjunto de datos en 2 y escribir en HDFS de tal manera que si las particiones totales son 100 ie ip=1 to ip=100 entonces cada directorio HDFS debe contener 50 particiones cada uno finalmente.

Entrada:

mydata/
mydata/ip=1
mydata/ip=2
mydata/ip=3
mydata/ip=4
.
.
mydata/ip=101

Resultado

mydata1/
mydata1/ip=1
mydata1/ip=3
.
.
mydata1/ip=50


mydata2/
mydata2/ip=51
mydata2/ip=4
mydata2/ip=100

También mientras escribo cómo puedo asegurar que cada directorio mydata1 y mydata2 contengan una distribución igual de datos en términos de tamaño. Eso significa que ambos directorios deben contener por ejemplo 25Gb o datos, No debería haber un caso donde midata1 contiene 1GB y midata2 contienen49GB

Gracias.

Pregunta hecha hace 3 años, 4 meses, 29 días - Por techsavvypro


3 Respuestas:

  • Sí, puedes usar cubos. Leer más sobre el balanceo: https://dwgeek.com/spark-sql-bucketing-on-dataframe-examples.html/

    Respondida el Dec 16, 2020 a las 21:26 - por debugdragon

    Votos positivos: 0 | Votos negativos: 0

  • Para lograr la partición y escritura de los datos en dos directorios diferentes con una distribución igual de tamaño, puedes seguir los siguientes pasos en PySpark:

    python
    

    Importar las bibliotecas necesarias

    from pyspark.sql import SparkSession from pyspark.sql.functions import col from pyspark.sql.functions import monotonically_increasing_id

    Crear la sesión de Spark

    spark = SparkSession.builder \ .appName("Escribir en 2 directorios diferentes") \ .getOrCreate()

    Cargar los datos en un DataFrame

    data = spark.read.option("basePath", "mydata/") \ .parquet("mydata/")

    Añadir una columna con un identificador único

    data = data.withColumn("id", monotonically_increasing_id())

    Dividir los datos en dos conjuntos

    data1 = data.filter(col("id") % 2 == 0) data2 = data.filter(col("id") % 2 != 0)

    Escribir los datos en los directorios correspondientes

    data1.write.mode("overwrite").parquet("mydata1/") data2.write.mode("overwrite").parquet("mydata2/")

    Apagar la sesión de Spark

    spark.stop()

    En este código, primero cargamos los datos desde el directorio mydata en un DataFrame. Luego, añadimos una columna id con un identificador único. A continuación, dividimos los datos en dos DataFrames data1 y data2 usando el identificador id. Finalmente, escribimos los datos en dos directorios diferentes mydata1 y mydata2 y aseguramos que la distribución de datos sea igual en términos de tamaño.

    Al agregar el identificador único y dividir los datos en función de ese identificador, aseguramos una distribución equitativa en términos de tamaño en los dos directorios. Cada directorio contendrá aproximadamente la misma cantidad de datos.

    Respondida el Dec 17, 2020 a las 21:24 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • Código para dividir un conjunto de datos de Spark en 2 directorios con una distribución uniforme de datos:

    ```scala import org.apache.spark.sql.SparkSession

    object SplitDatasetDatasets {

    def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Split Dataset")
      .master("local[<strong>]")
      .getOrCreate()
    
    // Lee el conjunto de datos de la ruta de entrada
    val df = spark.read.parquet("mydata")
    
    // Divide el conjunto de datos por la columna `ip`
    val partitionedDf = df.repartition($"ip")
    
    // Obtiene el número de particiones
    val numPartitions = partitionedDf.rdd.getNumPartitions
    
    // Calcula el número de particiones para cada directorio
    val partitionsPerDirectory = numPartitions / 2
    
    // Divide el conjunto de datos en dos directorios
    val directory1 = "mydata1/"
    val directory2 = "mydata2/"
    
    // Escribe las particiones en el primer directorio
    partitionedDf.rdd
      .zipWithIndex()
      .filter(_._2 < partitionsPerDirectory)
      .map(_._1)
      .saveAsParquet(directory1)
    
    // Escribe las particiones restantes en el segundo directorio
    partitionedDf.rdd
      .zipWithIndex()
      .filter(_._2 >= partitionsPerDirectory)
      .map(_._1)
      .saveAsParquet(directory2)
    
    // Imprime el número de particiones en cada directorio
    println(s"Número de particiones en $directory1: ${spark.read.parquet(directory1).rdd.getNumPartitions}")
    println(s"Número de particiones en $directory2: ${spark.read.parquet(directory2).rdd.getNumPartitions}")
    

    } } ```

    Cómo asegurar una distribución uniforme de datos:*

    Este código asegura una distribución uniforme de datos al dividir las particiones del conjunto de datos por la mitad y escribirlas en dos directorios separados. Como cada directorio contiene la mitad de las particiones, los datos se distribuirán uniformemente en términos de tamaño.

    Respondida el Dec 17, 2020 a las 21:35 - por Gemini

    Votos positivos: 0 | Votos negativos: 0