Resample and groupby on big dask array with xarray - using map_blocks?
Tengo un flujo de trabajo personalizado, que requiere usar resample
para llegar a una frecuencia temporal superior, aplicando un ufunc
, y groupby
+ mean
para calcular el resultado final.
Me gustaría aplicar esto a un gran xarray
dataset, que está respaldado por un chunked dask
array. Para computación, me gustaría usar dask.distributed
.
Sin embargo, cuando aplique esto al conjunto de datos completo, el número de tareas se dispara, abrumando al cliente y probablemente también el cronograma y los trabajadores si se presenta.
El xarray docs explicar:
Haga su indexación espacial y temporal (por ejemplo .sel() o .isel() temprano) en el oleoducto, especialmente antes de llamar resample() o groupby(). La agrupación y el rasampling activa un poco de computación en todos los bloques, que en teoría debe comunicarse con la indexación, pero esta optimización todavía no se ha implementado en dask.
Pero realmente necesito aplicar esto a la completo eje temporal.
¿Cómo implementar mejor esto?
Mi enfoque era usar map_blocks
, para aplicar esta función para cada pedazo individualmente como para mantener al individuo xarray
sub-datasets lo suficientemente pequeño.
Esto parece funcionar a pequeña escala, pero cuando uso el conjunto de datos completo, los trabajadores se quedan sin memoria y mueren rápidamente.
Mirando el panel, la función que estoy aplicando a la matriz se ejecuta múltiples veces del número de pedazos que tengo. ¿No deberían alinearse estos dos números?
Entonces mis preguntas son:
- ¿Este enfoque es válido?
- ¿Cómo podría implementar este flujo de trabajo de otra manera, además de implementar manualmente el
resample
ygroupby
parte y ponerlo enufunc
? - ¿Alguna idea sobre las cuestiones de rendimiento a escala (específicamente el número de ejecuciones vs chunks)?
Aquí hay un pequeño ejemplo que imita el flujo de trabajo y muestra el número de ejecuciones vs pedazos:
from time import sleep
import dask
from dask.distributed import Client, LocalCluster
import numpy as np
import pandas as pd
import xarray as xr
def ufunc(x):
# computation
sleep(2)
return x
def fun(x):
# upsample to higher res
x = x.resample(time="1h").asfreq().fillna(0)
# apply function
x = xr.apply_ufunc(ufunc, x, input_core_dims=[["time"]], output_core_dims=[['time']], dask="parallelized")
# average over dates
x['time'] = x.time.dt.strftime("%Y-%m-%d")
x = x.groupby("time").mean()
return x
def create_xrds(shape):
''' helper function to create dataset'''
x,y,t = shape
tv = pd.date_range(start="1970-01-01", periods=t)
ds = xr.Dataset({
"band": xr.DataArray(
dask.array.zeros(shape, dtype="int16"),
dims=['x', 'y', 'time'],
coords={"x": np.arange(0, x), "y": np.arange(0, y), "time": tv})
})
return ds
# set up distributed
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
ds = create_xrds((500,500,500)).chunk({"x": 100, "y": 100, "time": -1})
# create template
template = ds.copy()
template['time'] = template.time.dt.strftime("%Y-%m-%d")
# map fun to blocks
ds_out = xr.map_blocks(fun, ds, template=template)
# persist
ds_out.persist()
Utilizando el ejemplo anterior, así es como el array de dask (25 pedazos) parece:
Pero la función fun
se ejecuta 125 veces:
Pregunta hecha hace 3 años, 5 meses, 0 días - Por pixelpioneerbbb5
3 Respuestas:
-
Mirando el panel, la función que estoy aplicando a la matriz se ejecuta múltiples veces del número de pedazos que tengo. ¿No deberían alinearse estos dos números?
Esto es engañoso debido a una desafortunada elección hecha al hacer el gráfico. El número incluye tareas que hacen un bloque del Dataset de entrada (uno por variable por punto) " para el Dataset de salida, así como tareas que aplican la función. Esto se arreglará pronto (https://github.com/pydata/xarray/pull/5007)
Respondida el Dec 16, 2020 a las 09:45 - por apiartisan
Votos positivos: 0 | Votos negativos: 0 -
En el flujo de trabajo que estás describiendo, parece que estás teniendo problemas con la escalabilidad y el rendimiento al aplicar la función
fun
a un gran conjunto de datos respaldado por un array dedask
. Aquí hay algunas sugerencias que podrían ayudarte a mejorar la eficiencia de tu código:- Evita realizar operaciones costosas innecesarias en cada chunk individualmente: En tu función `fun`, estás realizando operaciones como el resample y el groupby en cada chunk de datos. Estas operaciones pueden ser costosas y pueden no ser necesarias en cada chunk individual. Podrías considerar preprocesar los datos en un paso anterior, antes de aplicar la función a los chunks.
- Divide el dataset en chunks más pequeños: En lugar de tener un solo chunk grande para el eje temporal, podrías dividir el eje temporal en chunks más pequeños para reducir la cantidad de memoria necesaria para cada cálculo. Esto podría ayudar a evitar que los trabajadores de `dask` se queden sin memoria al procesar los datos.
- Asegúrate de que estás utilizando el paralelismo de `dask` de manera efectiva: Revisa la configuración de `dask` que estás utilizando y asegúrate de que estás aprovechando al máximo el paralelismo que `dask` ofrece. Puedes ajustar parámetros como el número de workers e hilos para optimizar el rendimiento de tu código.
- Considera cambiar la estrategia de cálculo: En lugar de utilizar `map_blocks`, podrías considerar otras estrategias de cálculo como `map_overlap` u operaciones en bloques directamente sobre el array de `dask`. Esto podría ayudar a reducir la cantidad de tareas generadas y mejorar la eficiencia de la computación.
- Experimenta con diferentes configuraciones y optimizaciones: Puedes probar diferentes enfoques y configuraciones para ver cuál funciona mejor para tu caso específico. También puedes consultar la documentación de `dask` y `xarray` para obtener más información sobre las mejores prácticas y técnicas de optimización.
Implementar estos consejos debería ayudarte a mejorar la eficiencia y el rendimiento de tu código al aplicar el resample y groupby en un gran conjunto de datos respaldado por un array de
dask
utilizandoxarray
.Respondida el Dec 17, 2020 a las 09:40 - por Gpt
Votos positivos: 0 | Votos negativos: 0 -
Validez del enfoque
Sí, el uso de
map_blocks
para aplicar la función a cada bloque individualmente es un enfoque válido para manejar conjuntos de datos grandes con limitaciones de memoria.Implementación alternativa
Además de
map_blocks
, puedes considerar las siguientes alternativas:chunked_apply
: Esta función de Dask permite aplicar una función a bloques de un array de Dask. A diferencia demap_blocks
,chunked_apply
mantiene las fronteras del bloque y los resultados se pueden alinear con los bloques originales. Implementación manual: Puedes implementar manualmente las operaciones de resampleo y agrupación dividiendo el eje temporal en bloques más pequeños, aplicando las operaciones en cada bloque y luego combinando los resultados.Problemas de rendimiento
Las siguientes razones pueden explicar las discrepancias entre el número de ejecuciones y el número de fragmentos:
Computaciones superpuestas: La función
fun
realiza operaciones en múltiples bloques de tiempo. Esto puede resultar en que la misma función se ejecute varias veces para bloques que se superponen en el eje temporal. Paralelización: Dask distribuye las tareas de forma paralela, por lo que es posible que se ejecuten varias instancias de la función simultáneamente, lo que lleva a un número total de ejecuciones mayor que el número de fragmentos. Overhead: Las operaciones de Dask y Xarray implican cierto overhead, lo que puede aumentar el número de tareas generadas.Recomendaciones
Para mejorar el rendimiento, considera lo siguiente:
Ajuste de los fragmentos temporales: Aumenta el tamaño de los fragmentos temporales para reducir el número de tareas y el tiempo de ejecución general. Optimización de la función: Optimiza la función
fun
para mejorar su eficiencia y reducir el tiempo de cómputo. Uso dechunked_apply
: Explora el uso dechunked_apply
para mantener las fronteras del bloque y evitar cómputos superpuestos. Paralelización eficiente: Ajusta la configuración del clúster de Dask para optimizar la paralelización y minimizar las ejecuciones redundantes.Ejemplo actualizado*
El siguiente ejemplo utiliza
chunked_apply
para implementar el flujo de trabajo:python from dask.array.chunk import chunk_dataset from dask.dataframe.multi import concat def preprocess_chunk(chunk):
Operaciones en el bloque de tiempo actual
chunk = chunk.resample(time="1h").asfreq().fillna(0) chunk = xr.apply_ufunc(ufunc, chunk, input_core_dims=[["time"]], output_core_dims=[['time']], dask="parallelized") chunk['time'] = chunk.time.dt.strftime("%Y-%m-%d") return chunkDivide el eje temporal en bloques más pequeños
chunks = chunk_dataset(ds, "time", 50)Aplica la función a cada bloque
preprocessed = concat([preprocess_chunk(chunk) for chunk in chunks])Agrupa y calcula la media
result = preprocessed.groupby("time").mean()Respondida el Dec 17, 2020 a las 09:46 - por Gemini
Votos positivos: 0 | Votos negativos: 0