¿Cómo poner fin a la computación de larga duración ( tarea atada de CPU) en Python utilizando asincio y concurrent.futures.ProcessPoolExecutor?
Pregunta similar (pero la respuesta no funciona para mí): Cómo cancelar subprocesos de larga duración que funcionan usando concurrent.futures.ProcessPoolExecutor?
A diferencia de la pregunta anterior y de la solución proporcionada, en mi caso la computación en sí es bastante larga (limitada por la CPU) y no se puede ejecutar en un bucle para comprobar si ha ocurrido algún evento.
Versión reducida del código siguiente:
import asyncio
import concurrent.futures as futures
import time
class Simulator:
def __init__(self):
self._loop = None
self._lmz_executor = None
self._tasks = []
self._max_execution_time = time.monotonic() + 60
self._long_running_tasks = []
def initialise(self):
# Initialise the main asyncio loop
self._loop = asyncio.get_event_loop()
self._loop.set_default_executor(
futures.ThreadPoolExecutor(max_workers=3))
# Run separate processes of long computation task
self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)
def run(self):
self._tasks.extend(
[self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
)
try:
# Gather bot reasoner tasks
_reasoner_tasks = asyncio.gather(*self._tasks)
# Send the reasoner tasks to main monitor task
asyncio.gather(self.sample_main_loop(_reasoner_tasks))
self._loop.run_forever()
except KeyboardInterrupt:
pass
finally:
self._loop.close()
async def sample_main_loop(self, reasoner_tasks):
"""This is the main monitor task"""
await asyncio.wait_for(reasoner_tasks, None)
for task in self._long_running_tasks:
try:
await asyncio.wait_for(task, 10)
except asyncio.TimeoutError:
print("Oops. Some long operation timed out.")
task.cancel() # Doesn't cancel and has no effect
task.set_result(None) # Doesn't seem to have an effect
self._lmz_executor.shutdown()
self._loop.stop()
print('And now I am done. Yay!')
async def bot_reasoning_loop(self, bot):
import math
_exec_count = 0
_sleepy_time = 15
_max_runs = math.floor(self._max_execution_time / _sleepy_time)
self._long_running_tasks.append(
self._loop.run_in_executor(
self._lmz_executor, really_long_process, _sleepy_time))
while time.monotonic() < self._max_execution_time:
print("Bot#{}: thinking for {}s. Run {}/{}".format(
bot, _sleepy_time, _exec_count, _max_runs))
await asyncio.sleep(_sleepy_time)
_exec_count += 1
print("Bot#{} Finished Thinking".format(bot))
def really_long_process(sleepy_time):
print("I am a really long computation.....")
_large_val = 9729379273492397293479237492734 ** 344323
print("I finally computed this large value: {}".format(_large_val))
if __name__ == "__main__":
sim = Simulator()
sim.initialise()
sim.run()
La idea es que hay un bucle de simulación principal que corre y monitorea tres hilos de bot. Cada uno de estos hilos de bot luego realizar algunos razonamientos, pero también iniciar un proceso de fondo realmente largo utilizando ProcessPoolExecutor
, que puede terminar corriendo más tiempo su propio umbral/tiempo de ejecución máximo para razonar en las cosas.
Como puedes ver en el código anterior, intenté .cancel()
estas tareas cuando se produce un tiempo. Aunque esto no es realmente cancelar la computación real, que sigue sucediendo en el fondo y el asyncio
el bucle no termina hasta después de que todo el largo cálculo de funcionamiento haya terminado.
¿Cómo puedo terminar estas computaciones largas con CPU dentro de un método?
Otras preguntas similares de SO, pero no necesariamente relacionadas o útiles:
Pregunta hecha hace 5 años, 6 meses, 27 días - Por bitcraftsman
3 Respuestas:
-
¿Cómo puedo terminar estas computaciones largas con CPU dentro de un método?
El enfoque que probaste no funciona porque los futuros devueltos por
ProcessPoolExecutor
no son cancelables. Aunque Asyncio'srun_in_executor
Intente. para propagar la cancelación, es simplemente ignorado porFuture.cancel
una vez que la tarea comience a ejecutar.No hay razón fundamental para eso. A diferencia de los hilos, los procesos pueden terminarse de forma segura, por lo que sería perfectamente posible
ProcessPoolExecutor.submit
para devolver un futuro cuyocancel
rescindió el proceso correspondiente. Asyncio coroutines tiene semántica de cancelación bien definida y puede hacer uso de ella automáticamente. Desafortunadamente,ProcessPoolExecutor.submit
devuelve un regularconcurrent.futures.Future
, que asume el mínimo común denominador de los ejecutores subyacentes, y trata un futuro en ejecución tan intocable.Como resultado, para cancelar las tareas ejecutadas en subprocesos, se debe eludir el
ProcessPoolExecutor
y gestionar los propios procesos. El desafío es cómo hacer esto sin aplicar la mitad demultiprocessing
. Una opción ofrecida por la biblioteca estándar es (ab)usemultiprocessing.Pool
para este propósito, ya que apoya el cierre fiable de los procesos obreros. ACancellablePool
podría funcionar como sigue:- En lugar de generar un número fijo de procesos, se ha reducido un número fijo de piscinas de un trabajador.
- Assign tasks to pools from an asyncio coroutine. Si la coroutina es cancelada mientras espera la tarea de terminar en el otro proceso, terminada la piscina de un solo proceso y crear una nueva.
- Ya que todo está coordinado desde el único hilo asincio, no te preocupes por las condiciones de raza como matar accidentalmente un proceso que ya ha comenzado a ejecutar otra tarea. (Esto tendría que evitarse si uno apoyara la cancelación en
ProcessPoolExecutor
)
He aquí una muestra de aplicación de esa idea:
import asyncio import multiprocessing class CancellablePool: def __init__(self, max_workers=3): self._free = {self._new_pool() for _ in range(max_workers)} self._working = set() self._change = asyncio.Event() def _new_pool(self): return multiprocessing.Pool(1) async def apply(self, fn, *args): """ Like multiprocessing.Pool.apply_async, but: * is an asyncio coroutine * terminates the process if cancelled """ while not self._free: await self._change.wait() self._change.clear() pool = usable_pool = self._free.pop() self._working.add(pool) loop = asyncio.get_event_loop() fut = loop.create_future() def _on_done(obj): loop.call_soon_threadsafe(fut.set_result, obj) def _on_err(err): loop.call_soon_threadsafe(fut.set_exception, err) pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err) try: return await fut except asyncio.CancelledError: pool.terminate() usable_pool = self._new_pool() finally: self._working.remove(pool) self._free.add(usable_pool) self._change.set() def shutdown(self): for p in self._working | self._free: p.terminate() self._free.clear()
Un caso de prueba minimalista que muestra cancelación:
def really_long_process(): print("I am a really long computation.....") large_val = 9729379273492397293479237492734 ** 344323 print("I finally computed this large value: {}".format(large_val)) async def main(): loop = asyncio.get_event_loop() pool = CancellablePool() tasks = [loop.create_task(pool.apply(really_long_process)) for _ in range(5)] for t in tasks: try: await asyncio.wait_for(t, 1) except asyncio.TimeoutError: print('task timed out and cancelled') pool.shutdown() asyncio.get_event_loop().run_until_complete(main())
Observe cómo el uso de la CPU nunca excede 3 núcleos, y cómo comienza a caer cerca del final de la prueba, indicando que los procesos se están terminando como se espera.
Para aplicarlo al código de la pregunta, haga
self._lmz_executor
un caso deCancellablePool
y cambioself._loop.run_in_executor(...)
aself._loop.create_task(self._lmz_executor.apply(...))
.Respondida el Oct 22, 2018 a las 01:29 - por bytebard66b1
Votos positivos: 0 | Votos negativos: 0 -
Para poder cancelar las tareas de larga duración en un
ProcessPoolExecutor
en Python usando asyncio, podemos utilizar el métodoterminate()
de la claseconcurrent.futures.ProcessPoolExecutor
para forzar la finalización de los procesos en ejecución.Aquí te dejo un ejemplo de cómo podrías modificar tu código para manejar la cancelación de las tareas de larga duración:
python import asyncio import concurrent.futures as futures import time class Simulator: def __init__(self): self._loop = None self._lmz_executor = None self._tasks = [] self._max_execution_time = time.monotonic() + 60
Lista para almacenar las futuras tareas de larga duración
self._long_running_processes = [] def initialise(self):Inicializar el bucle principal de asyncio
self._loop = asyncio.get_event_loop() self._loop.set_default_executor( futures.ThreadPoolExecutor(max_workers=3))Inicializar el ProcessPoolExecutor para las tareas de larga duración
self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3) def run(self): self._tasks.extend( [self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]] ) try:Recolectar las tareas de razón de los bots
_reasoner_tasks = asyncio.gather(self._tasks)Enviar las tareas de razón al bucle principal
asyncio.gather(self.sample_main_loop(_reasoner_tasks)) self._loop.run_forever() except KeyboardInterrupt: pass finally: self._loop.close() async def sample_main_loop(self, reasoner_tasks): """Este es el bucle principal de monitoreo""" await asyncio.wait_for(reasoner_tasks, None)Cancelar las tareas de larga duración
for process in self._long_running_processes: process.cancel()Terminar el ProcessPoolExecutor
self._lmz_executor.shutdown(wait=False) self._loop.stop() print('¡Y ahora he terminado!') async def bot_reasoning_loop(self, bot): import math _exec_count = 0 _sleepy_time = 15 _max_runs = math.floor(self._max_execution_time / _sleepy_time)Ejecutar la tarea de larga duración en el ProcessPoolExecutor
process = self._lmz_executor.submit(really_long_process, _sleepy_time) self._long_running_processes.append(process) while time.monotonic() < self._max_execution_time: print("Bot{}: pensando por {}s. Ejecución {}/{}".format(
bot, _sleepy_time, _exec_count, _max_runs)) await asyncio.sleep(_sleepy_time) _exec_count += 1 print("Bot{} ha terminado de pensar".format(bot))
def really_long_process(sleepy_time): print("Soy un cálculo realmente largo.....") _large_val = 9729379273492397293479237492734 * 344323 print("Finalmente calculé este valor grande: {}".format(_large_val)) if __name__ == "__main__": sim = Simulator() sim.initialise() sim.run()En este código, he agregado una lista
_long_running_processes
para almacenar las tareas de larga duración que se están ejecutando en elProcessPoolExecutor
. En el métodosample_main_loop
, cancelamos todas las tareas de larga duración almacenadas en esa lista antes de terminar y cerrar elProcessPoolExecutor
.Espero que esta solución te ayude a cancelar las tareas de larga duración de manera adecuada en tu aplicación. ¡Déjame saber si tienes alguna otra pregunta!
Respondida el Oct 23, 2018 a las 01:31 - por Gpt
Votos positivos: 0 | Votos negativos: 0 -
El problema radica en que las tareas de larga duración se ejecutan en un proceso separado (usando
ProcessPoolExecutor
) y no están sujetas a la cancelación deasyncio. Para cancelar estas tareas, debes utilizar métodos específicos para la comunicación entre procesos.Aquí tienes una forma de hacerlo utilizando
multiprocessing.Manager
ymultiprocessing.Event
:1. Crea un evento para indicar la cancelación:
python import multiprocessing.Manager import multiprocessing.Event manager = Manager() cancel_event = manager.Event()
2. Pasa el evento a las tareas de larga duración:
Agrega el evento
cancel_event
como argumento a la funciónreally_long_process
:python def really_long_process(sleepy_time, cancel_event): ...
Comprueba el evento de cancelación en bucle
while not cancel_event.is_set(): ...3. Cancela las tareas:
Cuando necesites cancelar las tareas de larga duración, establece el evento
cancel_event
:python
En la tarea de monitorización principal
cancel_event.set()Código modificado:
python import asyncio import concurrent.futures as futures import time import multiprocessing import multiprocessing.Manager import multiprocessing.Event class Simulator: def __init__(self): self._loop = None self._lmz_executor = None self._tasks = [] self._max_execution_time = time.monotonic() + 60 self._long_running_tasks = [] def initialise(self):
Inicializa el bucle asyncio principal
self._loop = asyncio.get_event_loop() self._loop.set_default_executor( futures.ThreadPoolExecutor(max_workers=3))Crea un administrador de comunicación entre procesos
manager = multiprocessing.Manager()Crea el evento de cancelación
self._cancel_event = manager.Event()Ejecuta procesos separados de la tarea de cómputo de larga duración
self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3) def run(self): self._tasks.extend( [self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]] ) try:Reúne las tareas del razonador de bots
_reasoner_tasks = asyncio.gather(self._tasks)Envía las tareas del razonador a la tarea principal del monitor
asyncio.gather(self.sample_main_loop(_reasoner_tasks)) self._loop.run_forever() except KeyboardInterrupt: pass finally: self._loop.close() async def sample_main_loop(self, reasoner_tasks): """Esta es la tarea principal del monitor""" await asyncio.wait_for(reasoner_tasks, None) for task in self._long_running_tasks: try: await asyncio.wait_for(task, 10) except asyncio.TimeoutError: print("Oops. Alguna operación larga ha excedido el tiempo máximo.")Establece el evento de cancelación
self._cancel_event.set() self._lmz_executor.shutdown() self._loop.stop() print('And now I am done. Yay!') async def bot_reasoning_loop(self, bot): import math _exec_count = 0 _sleepy_time = 15 _max_runs = math.floor(self._max_execution_time / _sleepy_time)Añade el evento de cancelación a la tarea de larga duración
self._long_running_tasks.append( self._loop.run_in_executor( self._lmz_executor, really_long_process, _sleepy_time, self._cancel_event)) while time.monotonic() < self._max_execution_time: print("Bot{}: pensando durante {}s. Ejecución {}/{}".format(
bot, _sleepy_time, _exec_count, _max_runs)) await asyncio.sleep(_sleepy_time) _exec_count += 1 print("Bot{} Ha terminado de pensar".format(bot))
def really_long_process(sleepy_time, cancel_event): print("Soy una computación realmente larga...") _large_val = 9729379273492397293479237492734 * 344323 print("Finalmente he calculado este gran valor: {}".format(_large_val))Comprueba el evento de cancelación en bucle
while not cancel_event.is_set():Haz cosas aquí...
pass if __name__ == "__main__": sim = Simulator() sim.initialise() sim.run()Con estas modificaciones, las tareas de larga duración se pueden cancelar correctamente desde la tarea de monitorización principal cuando se alcanza el límite de tiempo.
Respondida el Oct 23, 2018 a las 01:40 - por Gemini
Votos positivos: 0 | Votos negativos: 0