obtener nombre de archivo y tiempo de modificación/creación de archivos como (key, valor) par en RDD utilizando pyspark

Tengo carpetas con muchos archivos (por ejemplo, más de 100k), algunos archivos pequeños (menos de 1kb) y algunos archivos grandes (por ejemplo, varios MB).

Me gustaría utilizar pyspark y escanear todos los archivos bajo estas carpetas, por ejemplo "C:\Xiang". Los nombres de los archivos son, por ejemplo, Carpeta 1:

C:\Xiang\fold1\filename1.txt
C:\Xiang\fold1\filename2.txt
C:\Xiang\fold1\filename3.txt
C:\Xiang\fold1\filename1_.meta.txt
C:\Xiang\fold1\filename2_.meta.txt
...

"fold2", "fold3", ... tienen una estructura similar.

Me gustaría escanear todos los archivos bajo estas carpetas y obtener el tiempo de modificación de cada archivo. Idealmente, se puede guardar en un RDD, con par como (key, valor) con clave el nombre de archivo (por ejemplo C:\Xiang\filename1.txt) y valorar el tiempo de modificación (por ejemplo, 2020-12-16 13:40). Para que pueda realizar más operaciones en estos archivos, por ejemplo filtrar por el tiempo de modificación y abrir los archivos seleccionados. ...

¿Alguna idea?

Pregunta hecha hace 3 años, 4 meses, 29 días - Por codecraftsman


3 Respuestas:

  • Uso pathlib para obtener la última vez modificada y el mapa en su rdd de nombres de archivos:

    import os
    import pathlib
    
    rdd = sc.parallelize(os.listdir("C:\Xiang"))  # try slash if backslash doesn't work
    rdd2 = rdd.keyBy(lambda x: x).map(lambda f: (f[0], pathlib.Path(f[1]).stat().st_mtime))
    

    Respondida el Dec 16, 2020 a las 12:50 - por coderchampion

    Votos positivos: 0 | Votos negativos: 0

  • Para lograr esto en PySpark, puedes seguir los siguientes pasos:

    • Importa las bibliotecas necesarias de PySpark:
    python
    from pyspark import SparkContext
    
    • Inicializa el SparkContext:
    python
    sc = SparkContext("local", "GetFileModificationTime")
    
    • Crea una función para obtener el nombre de archivo y su tiempo de modificación:
    python
    import os
    
    def get_file_modification_time(file_path):
        modification_time = os.path.getmtime(file_path)
        return (file_path, modification_time)
    
    • Utiliza la función `wholeTextFiles` de PySpark para leer todos los archivos en las carpetas especificadas y mapearlos con la función anterior:
    python
    files_rdd = sc.wholeTextFiles("C:/Xiang//")
    modification_times_rdd = files_rdd.map(lambda x: get_file_modification_time(x[0]))
    
    • Puedes convertir el RDD a un diccionario si deseas trabajar con él de esa forma:
    python
    modification_times_dict = modification_times_rdd.collectAsMap()
    

    Ahora, en modification_times_dict tendrás un diccionario donde la clave es el nombre del archivo completo y el valor es el tiempo de modificación. Puedes filtrar, realizar operaciones o cualquier otra manipulación que necesites en base a estos datos.

    Espero que esta solución te sea útil. ¡Déjame saber si necesitas más ayuda!

    Respondida el Dec 17, 2020 a las 12:50 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • Paso 1: Crea un RDD de rutas de archivo

    python
    import os
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("FileMetadata").getOrCreate()
    rdd = spark.sparkContext.wholeTextFiles("C:\\Xiang\\*\\")
    

    Paso 2: Parsear el nombre del archivo y el tiempo de modificación

    python
    def parse_file_info(line):
        filename, content = line
        mtime = os.path.getmtime(filename)  

    Obtiene el tiempo de modificación de la ruta del archivo

    return (filename, mtime) rdd = rdd.map(parse_file_info)

    Paso 3: Almacenar como pares (clave, valor)

    python
    from pyspark.sql.types import StringType, LongType
    schema = StringType(), LongType()
    
    rdd = rdd.map(lambda x: x).toDF(schema)
    rdd.createOrReplaceTempView("fileMetadata")
    

    Ejemplo de uso:

    Puedes consultar la tabla fileMetadata para obtener el nombre del archivo y el tiempo de modificación:

    python
    spark.sql("SELECT  FROM fileMetadata ORDER BY mtime DESC").show()
    

    También puedes filtrar por tiempo de modificación:

    python
    min_mtime = "2020-12-16 13:40"
    spark.sql(f"SELECT  FROM fileMetadata WHERE mtime > '{min_mtime}'").show()
    

    Nota:

    El uso de wholeTextFiles puede no ser eficiente para archivos grandes. Considera usar binaryFiles si los archivos son demasiado grandes. Si necesitas el tiempo de creación del archivo, utiliza os.path.getctime(filename) en lugar de os.path.getmtime(filename).

    Respondida el Dec 17, 2020 a las 12:54 - por Gemini

    Votos positivos: 0 | Votos negativos: 0