Resample and groupby on big dask array with xarray - using map_blocks?

Tengo un flujo de trabajo personalizado, que requiere usar resample para llegar a una frecuencia temporal superior, aplicando un ufunc, y groupby + mean para calcular el resultado final.

Me gustaría aplicar esto a un gran xarray dataset, que está respaldado por un chunked dask array. Para computación, me gustaría usar dask.distributed.

Sin embargo, cuando aplique esto al conjunto de datos completo, el número de tareas se dispara, abrumando al cliente y probablemente también el cronograma y los trabajadores si se presenta.

El xarray docs explicar:

Haga su indexación espacial y temporal (por ejemplo .sel() o .isel() temprano) en el oleoducto, especialmente antes de llamar resample() o groupby(). La agrupación y el rasampling activa un poco de computación en todos los bloques, que en teoría debe comunicarse con la indexación, pero esta optimización todavía no se ha implementado en dask.

Pero realmente necesito aplicar esto a la completo eje temporal.

¿Cómo implementar mejor esto?

Mi enfoque era usar map_blocks, para aplicar esta función para cada pedazo individualmente como para mantener al individuo xarray sub-datasets lo suficientemente pequeño.

Esto parece funcionar a pequeña escala, pero cuando uso el conjunto de datos completo, los trabajadores se quedan sin memoria y mueren rápidamente.

Mirando el panel, la función que estoy aplicando a la matriz se ejecuta múltiples veces del número de pedazos que tengo. ¿No deberían alinearse estos dos números?

Entonces mis preguntas son:

  • ¿Este enfoque es válido?
  • ¿Cómo podría implementar este flujo de trabajo de otra manera, además de implementar manualmente el resample y groupby parte y ponerlo en ufunc?
  • ¿Alguna idea sobre las cuestiones de rendimiento a escala (específicamente el número de ejecuciones vs chunks)?

Aquí hay un pequeño ejemplo que imita el flujo de trabajo y muestra el número de ejecuciones vs pedazos:

from time import sleep

import dask
from dask.distributed import Client, LocalCluster
import numpy as np
import pandas as pd
import xarray as xr

def ufunc(x):
    # computation
    sleep(2)
    return x

def fun(x):
    # upsample to higher res
    x = x.resample(time="1h").asfreq().fillna(0)
    
    # apply function
    x = xr.apply_ufunc(ufunc, x, input_core_dims=[["time"]], output_core_dims=[['time']], dask="parallelized")
    
    # average over dates
    x['time'] = x.time.dt.strftime("%Y-%m-%d")
    x = x.groupby("time").mean()

    return x

def create_xrds(shape):
    ''' helper function to create dataset'''
    x,y,t = shape

    tv = pd.date_range(start="1970-01-01", periods=t)
    
    ds = xr.Dataset({
    "band": xr.DataArray(
        dask.array.zeros(shape, dtype="int16"),
        dims=['x', 'y', 'time'],
        coords={"x": np.arange(0, x), "y": np.arange(0, y), "time": tv})
    })
    
    return ds


# set up distributed

cluster = LocalCluster(n_workers=2)
client = Client(cluster)

ds = create_xrds((500,500,500)).chunk({"x": 100, "y": 100, "time": -1})

# create template

template = ds.copy()
template['time'] = template.time.dt.strftime("%Y-%m-%d")

# map fun to blocks
ds_out = xr.map_blocks(fun, ds, template=template)

# persist

ds_out.persist()

Utilizando el ejemplo anterior, así es como el array de dask (25 pedazos) parece:

enter image description here

Pero la función fun se ejecuta 125 veces:

enter image description here

Pregunta hecha hace 3 años, 5 meses, 0 días - Por pixelpioneerbbb5


3 Respuestas:

  • Mirando el panel, la función que estoy aplicando a la matriz se ejecuta múltiples veces del número de pedazos que tengo. ¿No deberían alinearse estos dos números?

    Esto es engañoso debido a una desafortunada elección hecha al hacer el gráfico. El número incluye tareas que hacen un bloque del Dataset de entrada (uno por variable por punto) " para el Dataset de salida, así como tareas que aplican la función. Esto se arreglará pronto (https://github.com/pydata/xarray/pull/5007)

    Respondida el Dec 16, 2020 a las 09:45 - por apiartisan

    Votos positivos: 0 | Votos negativos: 0

  • En el flujo de trabajo que estás describiendo, parece que estás teniendo problemas con la escalabilidad y el rendimiento al aplicar la función fun a un gran conjunto de datos respaldado por un array de dask. Aquí hay algunas sugerencias que podrían ayudarte a mejorar la eficiencia de tu código:

    • Evita realizar operaciones costosas innecesarias en cada chunk individualmente: En tu función `fun`, estás realizando operaciones como el resample y el groupby en cada chunk de datos. Estas operaciones pueden ser costosas y pueden no ser necesarias en cada chunk individual. Podrías considerar preprocesar los datos en un paso anterior, antes de aplicar la función a los chunks.
    • Divide el dataset en chunks más pequeños: En lugar de tener un solo chunk grande para el eje temporal, podrías dividir el eje temporal en chunks más pequeños para reducir la cantidad de memoria necesaria para cada cálculo. Esto podría ayudar a evitar que los trabajadores de `dask` se queden sin memoria al procesar los datos.
    • Asegúrate de que estás utilizando el paralelismo de `dask` de manera efectiva: Revisa la configuración de `dask` que estás utilizando y asegúrate de que estás aprovechando al máximo el paralelismo que `dask` ofrece. Puedes ajustar parámetros como el número de workers e hilos para optimizar el rendimiento de tu código.
    • Considera cambiar la estrategia de cálculo: En lugar de utilizar `map_blocks`, podrías considerar otras estrategias de cálculo como `map_overlap` u operaciones en bloques directamente sobre el array de `dask`. Esto podría ayudar a reducir la cantidad de tareas generadas y mejorar la eficiencia de la computación.
    • Experimenta con diferentes configuraciones y optimizaciones: Puedes probar diferentes enfoques y configuraciones para ver cuál funciona mejor para tu caso específico. También puedes consultar la documentación de `dask` y `xarray` para obtener más información sobre las mejores prácticas y técnicas de optimización.

    Implementar estos consejos debería ayudarte a mejorar la eficiencia y el rendimiento de tu código al aplicar el resample y groupby en un gran conjunto de datos respaldado por un array de dask utilizando xarray.

    Respondida el Dec 17, 2020 a las 09:40 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • Validez del enfoque

    Sí, el uso de map_blocks para aplicar la función a cada bloque individualmente es un enfoque válido para manejar conjuntos de datos grandes con limitaciones de memoria.

    Implementación alternativa

    Además de map_blocks, puedes considerar las siguientes alternativas:

    chunked_apply: Esta función de Dask permite aplicar una función a bloques de un array de Dask. A diferencia de map_blocks, chunked_apply mantiene las fronteras del bloque y los resultados se pueden alinear con los bloques originales. Implementación manual: Puedes implementar manualmente las operaciones de resampleo y agrupación dividiendo el eje temporal en bloques más pequeños, aplicando las operaciones en cada bloque y luego combinando los resultados.

    Problemas de rendimiento

    Las siguientes razones pueden explicar las discrepancias entre el número de ejecuciones y el número de fragmentos:

    Computaciones superpuestas: La función fun realiza operaciones en múltiples bloques de tiempo. Esto puede resultar en que la misma función se ejecute varias veces para bloques que se superponen en el eje temporal. Paralelización: Dask distribuye las tareas de forma paralela, por lo que es posible que se ejecuten varias instancias de la función simultáneamente, lo que lleva a un número total de ejecuciones mayor que el número de fragmentos. Overhead: Las operaciones de Dask y Xarray implican cierto overhead, lo que puede aumentar el número de tareas generadas.

    Recomendaciones

    Para mejorar el rendimiento, considera lo siguiente:

    Ajuste de los fragmentos temporales: Aumenta el tamaño de los fragmentos temporales para reducir el número de tareas y el tiempo de ejecución general. Optimización de la función: Optimiza la función fun para mejorar su eficiencia y reducir el tiempo de cómputo. Uso de chunked_apply: Explora el uso de chunked_apply para mantener las fronteras del bloque y evitar cómputos superpuestos. Paralelización eficiente: Ajusta la configuración del clúster de Dask para optimizar la paralelización y minimizar las ejecuciones redundantes.

    Ejemplo actualizado*

    El siguiente ejemplo utiliza chunked_apply para implementar el flujo de trabajo:

    python
    from dask.array.chunk import chunk_dataset
    from dask.dataframe.multi import concat
    
    def preprocess_chunk(chunk):
        

    Operaciones en el bloque de tiempo actual

    chunk = chunk.resample(time="1h").asfreq().fillna(0) chunk = xr.apply_ufunc(ufunc, chunk, input_core_dims=[["time"]], output_core_dims=[['time']], dask="parallelized") chunk['time'] = chunk.time.dt.strftime("%Y-%m-%d") return chunk

    Divide el eje temporal en bloques más pequeños

    chunks = chunk_dataset(ds, "time", 50)

    Aplica la función a cada bloque

    preprocessed = concat([preprocess_chunk(chunk) for chunk in chunks])

    Agrupa y calcula la media

    result = preprocessed.groupby("time").mean()

    Respondida el Dec 17, 2020 a las 09:46 - por Gemini

    Votos positivos: 0 | Votos negativos: 0