¿Cómo poner fin a la computación de larga duración ( tarea atada de CPU) en Python utilizando asincio y concurrent.futures.ProcessPoolExecutor?

Pregunta similar (pero la respuesta no funciona para mí): Cómo cancelar subprocesos de larga duración que funcionan usando concurrent.futures.ProcessPoolExecutor?

A diferencia de la pregunta anterior y de la solución proporcionada, en mi caso la computación en sí es bastante larga (limitada por la CPU) y no se puede ejecutar en un bucle para comprobar si ha ocurrido algún evento.

Versión reducida del código siguiente:

import asyncio
import concurrent.futures as futures
import time

class Simulator:
    def __init__(self):
        self._loop = None
        self._lmz_executor = None
        self._tasks = []
        self._max_execution_time = time.monotonic() + 60
        self._long_running_tasks = []

    def initialise(self):
        # Initialise the main asyncio loop
        self._loop = asyncio.get_event_loop()
        self._loop.set_default_executor(
            futures.ThreadPoolExecutor(max_workers=3))

        # Run separate processes of long computation task
        self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3)

    def run(self):
        self._tasks.extend(
            [self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]]
        )

        try:
            # Gather bot reasoner tasks
            _reasoner_tasks = asyncio.gather(*self._tasks)
            # Send the reasoner tasks to main monitor task
            asyncio.gather(self.sample_main_loop(_reasoner_tasks))
            self._loop.run_forever()
        except KeyboardInterrupt:
            pass
        finally:
            self._loop.close()

    async def sample_main_loop(self, reasoner_tasks):
        """This is the main monitor task"""
        await asyncio.wait_for(reasoner_tasks, None)
        for task in self._long_running_tasks:
            try:
                await asyncio.wait_for(task, 10)
            except asyncio.TimeoutError:
                print("Oops. Some long operation timed out.")
                task.cancel()  # Doesn't cancel and has no effect
                task.set_result(None)  # Doesn't seem to have an effect

        self._lmz_executor.shutdown()
        self._loop.stop()
        print('And now I am done. Yay!')

    async def bot_reasoning_loop(self, bot):
        import math

        _exec_count = 0
        _sleepy_time = 15
        _max_runs = math.floor(self._max_execution_time / _sleepy_time)

        self._long_running_tasks.append(
            self._loop.run_in_executor(
                    self._lmz_executor, really_long_process, _sleepy_time))

        while time.monotonic() < self._max_execution_time:
            print("Bot#{}: thinking for {}s. Run {}/{}".format(
                    bot, _sleepy_time, _exec_count, _max_runs))
            await asyncio.sleep(_sleepy_time)
            _exec_count += 1

        print("Bot#{} Finished Thinking".format(bot))

def really_long_process(sleepy_time):
    print("I am a really long computation.....")
    _large_val = 9729379273492397293479237492734 ** 344323
    print("I finally computed this large value: {}".format(_large_val))

if __name__ == "__main__":
    sim = Simulator()
    sim.initialise()
    sim.run()

La idea es que hay un bucle de simulación principal que corre y monitorea tres hilos de bot. Cada uno de estos hilos de bot luego realizar algunos razonamientos, pero también iniciar un proceso de fondo realmente largo utilizando ProcessPoolExecutor, que puede terminar corriendo más tiempo su propio umbral/tiempo de ejecución máximo para razonar en las cosas.

Como puedes ver en el código anterior, intenté .cancel() estas tareas cuando se produce un tiempo. Aunque esto no es realmente cancelar la computación real, que sigue sucediendo en el fondo y el asyncio el bucle no termina hasta después de que todo el largo cálculo de funcionamiento haya terminado.

¿Cómo puedo terminar estas computaciones largas con CPU dentro de un método?

Otras preguntas similares de SO, pero no necesariamente relacionadas o útiles:

  1. Asyncio: ¿Es posible cancelar un futuro dirigido por un Ejecutor?
  2. Cómo terminar una única tarea asinc en multiprocesamiento si esa única tarea asinc supera un tiempo límite en Python
  3. Multiprocesamiento asincrónico con una piscina de trabajadores en Python: ¿cómo seguir yendo después del tiempo libre?

Pregunta hecha hace 5 años, 6 meses, 27 días - Por bitcraftsman


3 Respuestas:

  • ¿Cómo puedo terminar estas computaciones largas con CPU dentro de un método?

    El enfoque que probaste no funciona porque los futuros devueltos por ProcessPoolExecutor no son cancelables. Aunque Asyncio's run_in_executor Intente. para propagar la cancelación, es simplemente ignorado por Future.cancel una vez que la tarea comience a ejecutar.

    No hay razón fundamental para eso. A diferencia de los hilos, los procesos pueden terminarse de forma segura, por lo que sería perfectamente posible ProcessPoolExecutor.submit para devolver un futuro cuyo cancel rescindió el proceso correspondiente. Asyncio coroutines tiene semántica de cancelación bien definida y puede hacer uso de ella automáticamente. Desafortunadamente, ProcessPoolExecutor.submit devuelve un regular concurrent.futures.Future, que asume el mínimo común denominador de los ejecutores subyacentes, y trata un futuro en ejecución tan intocable.

    Como resultado, para cancelar las tareas ejecutadas en subprocesos, se debe eludir el ProcessPoolExecutor y gestionar los propios procesos. El desafío es cómo hacer esto sin aplicar la mitad de multiprocessing. Una opción ofrecida por la biblioteca estándar es (ab)use multiprocessing.Pool para este propósito, ya que apoya el cierre fiable de los procesos obreros. A CancellablePool podría funcionar como sigue:

    • En lugar de generar un número fijo de procesos, se ha reducido un número fijo de piscinas de un trabajador.
    • Assign tasks to pools from an asyncio coroutine. Si la coroutina es cancelada mientras espera la tarea de terminar en el otro proceso, terminada la piscina de un solo proceso y crear una nueva.
    • Ya que todo está coordinado desde el único hilo asincio, no te preocupes por las condiciones de raza como matar accidentalmente un proceso que ya ha comenzado a ejecutar otra tarea. (Esto tendría que evitarse si uno apoyara la cancelación en ProcessPoolExecutor)

    He aquí una muestra de aplicación de esa idea:

    import asyncio
    import multiprocessing
    
    class CancellablePool:
        def __init__(self, max_workers=3):
            self._free = {self._new_pool() for _ in range(max_workers)}
            self._working = set()
            self._change = asyncio.Event()
    
        def _new_pool(self):
            return multiprocessing.Pool(1)
    
        async def apply(self, fn, *args):
            """
            Like multiprocessing.Pool.apply_async, but:
             * is an asyncio coroutine
             * terminates the process if cancelled
            """
            while not self._free:
                await self._change.wait()
                self._change.clear()
            pool = usable_pool = self._free.pop()
            self._working.add(pool)
    
            loop = asyncio.get_event_loop()
            fut = loop.create_future()
            def _on_done(obj):
                loop.call_soon_threadsafe(fut.set_result, obj)
            def _on_err(err):
                loop.call_soon_threadsafe(fut.set_exception, err)
            pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err)
    
            try:
                return await fut
            except asyncio.CancelledError:
                pool.terminate()
                usable_pool = self._new_pool()
            finally:
                self._working.remove(pool)
                self._free.add(usable_pool)
                self._change.set()
    
        def shutdown(self):
            for p in self._working | self._free:
                p.terminate()
            self._free.clear()
    

    Un caso de prueba minimalista que muestra cancelación:

    def really_long_process():
        print("I am a really long computation.....")
        large_val = 9729379273492397293479237492734 ** 344323
        print("I finally computed this large value: {}".format(large_val))
    
    async def main():
        loop = asyncio.get_event_loop()
        pool = CancellablePool()
    
        tasks = [loop.create_task(pool.apply(really_long_process))
                 for _ in range(5)]
        for t in tasks:
            try:
                await asyncio.wait_for(t, 1)
            except asyncio.TimeoutError:
                print('task timed out and cancelled')
        pool.shutdown()
    
    asyncio.get_event_loop().run_until_complete(main())
    

    Observe cómo el uso de la CPU nunca excede 3 núcleos, y cómo comienza a caer cerca del final de la prueba, indicando que los procesos se están terminando como se espera.

    Para aplicarlo al código de la pregunta, haga self._lmz_executor un caso de CancellablePool y cambio self._loop.run_in_executor(...) a self._loop.create_task(self._lmz_executor.apply(...)).

    Respondida el Oct 22, 2018 a las 01:29 - por bytebard66b1

    Votos positivos: 0 | Votos negativos: 0

  • Para poder cancelar las tareas de larga duración en un ProcessPoolExecutor en Python usando asyncio, podemos utilizar el método terminate() de la clase concurrent.futures.ProcessPoolExecutor para forzar la finalización de los procesos en ejecución.

    Aquí te dejo un ejemplo de cómo podrías modificar tu código para manejar la cancelación de las tareas de larga duración:

    python
    import asyncio
    import concurrent.futures as futures
    import time
    
    class Simulator:
        def __init__(self):
            self._loop = None
            self._lmz_executor = None
            self._tasks = []
            self._max_execution_time = time.monotonic() + 60
            

    Lista para almacenar las futuras tareas de larga duración

    self._long_running_processes = [] def initialise(self):

    Inicializar el bucle principal de asyncio

    self._loop = asyncio.get_event_loop() self._loop.set_default_executor( futures.ThreadPoolExecutor(max_workers=3))

    Inicializar el ProcessPoolExecutor para las tareas de larga duración

    self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3) def run(self): self._tasks.extend( [self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]] ) try:

    Recolectar las tareas de razón de los bots

    _reasoner_tasks = asyncio.gather(self._tasks)

    Enviar las tareas de razón al bucle principal

    asyncio.gather(self.sample_main_loop(_reasoner_tasks)) self._loop.run_forever() except KeyboardInterrupt: pass finally: self._loop.close() async def sample_main_loop(self, reasoner_tasks): """Este es el bucle principal de monitoreo""" await asyncio.wait_for(reasoner_tasks, None)

    Cancelar las tareas de larga duración

    for process in self._long_running_processes: process.cancel()

    Terminar el ProcessPoolExecutor

    self._lmz_executor.shutdown(wait=False) self._loop.stop() print('¡Y ahora he terminado!') async def bot_reasoning_loop(self, bot): import math _exec_count = 0 _sleepy_time = 15 _max_runs = math.floor(self._max_execution_time / _sleepy_time)

    Ejecutar la tarea de larga duración en el ProcessPoolExecutor

    process = self._lmz_executor.submit(really_long_process, _sleepy_time) self._long_running_processes.append(process) while time.monotonic() < self._max_execution_time: print("Bot

    {}: pensando por {}s. Ejecución {}/{}".format(

    bot, _sleepy_time, _exec_count, _max_runs)) await asyncio.sleep(_sleepy_time) _exec_count += 1 print("Bot

    {} ha terminado de pensar".format(bot))

    def really_long_process(sleepy_time): print("Soy un cálculo realmente largo.....") _large_val = 9729379273492397293479237492734
    * 344323 print("Finalmente calculé este valor grande: {}".format(_large_val)) if __name__ == "__main__": sim = Simulator() sim.initialise() sim.run()

    En este código, he agregado una lista _long_running_processes para almacenar las tareas de larga duración que se están ejecutando en el ProcessPoolExecutor. En el método sample_main_loop, cancelamos todas las tareas de larga duración almacenadas en esa lista antes de terminar y cerrar el ProcessPoolExecutor.

    Espero que esta solución te ayude a cancelar las tareas de larga duración de manera adecuada en tu aplicación. ¡Déjame saber si tienes alguna otra pregunta!

    Respondida el Oct 23, 2018 a las 01:31 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • El problema radica en que las tareas de larga duración se ejecutan en un proceso separado (usando ProcessPoolExecutor) y no están sujetas a la cancelación deasyncio. Para cancelar estas tareas, debes utilizar métodos específicos para la comunicación entre procesos.

    Aquí tienes una forma de hacerlo utilizando multiprocessing.Manager y multiprocessing.Event:

    1. Crea un evento para indicar la cancelación:

    python
    import multiprocessing.Manager
    import multiprocessing.Event
    
    manager = Manager()
    cancel_event = manager.Event()
    

    2. Pasa el evento a las tareas de larga duración:

    Agrega el evento cancel_event como argumento a la función really_long_process:

    python
    def really_long_process(sleepy_time, cancel_event):
        ...
        

    Comprueba el evento de cancelación en bucle

    while not cancel_event.is_set(): ...

    3. Cancela las tareas:

    Cuando necesites cancelar las tareas de larga duración, establece el evento cancel_event:

    python
    

    En la tarea de monitorización principal

    cancel_event.set()

    Código modificado:

    python
    import asyncio
    import concurrent.futures as futures
    import time
    import multiprocessing
    import multiprocessing.Manager
    import multiprocessing.Event
    
    class Simulator:
        def __init__(self):
            self._loop = None
            self._lmz_executor = None
            self._tasks = []
            self._max_execution_time = time.monotonic() + 60
            self._long_running_tasks = []
    
        def initialise(self):
            

    Inicializa el bucle asyncio principal

    self._loop = asyncio.get_event_loop() self._loop.set_default_executor( futures.ThreadPoolExecutor(max_workers=3))

    Crea un administrador de comunicación entre procesos

    manager = multiprocessing.Manager()

    Crea el evento de cancelación

    self._cancel_event = manager.Event()

    Ejecuta procesos separados de la tarea de cómputo de larga duración

    self._lmz_executor = futures.ProcessPoolExecutor(max_workers=3) def run(self): self._tasks.extend( [self.bot_reasoning_loop(bot_id) for bot_id in [1, 2, 3]] ) try:

    Reúne las tareas del razonador de bots

    _reasoner_tasks = asyncio.gather(self._tasks)

    Envía las tareas del razonador a la tarea principal del monitor

    asyncio.gather(self.sample_main_loop(_reasoner_tasks)) self._loop.run_forever() except KeyboardInterrupt: pass finally: self._loop.close() async def sample_main_loop(self, reasoner_tasks): """Esta es la tarea principal del monitor""" await asyncio.wait_for(reasoner_tasks, None) for task in self._long_running_tasks: try: await asyncio.wait_for(task, 10) except asyncio.TimeoutError: print("Oops. Alguna operación larga ha excedido el tiempo máximo.")

    Establece el evento de cancelación

    self._cancel_event.set() self._lmz_executor.shutdown() self._loop.stop() print('And now I am done. Yay!') async def bot_reasoning_loop(self, bot): import math _exec_count = 0 _sleepy_time = 15 _max_runs = math.floor(self._max_execution_time / _sleepy_time)

    Añade el evento de cancelación a la tarea de larga duración

    self._long_running_tasks.append( self._loop.run_in_executor( self._lmz_executor, really_long_process, _sleepy_time, self._cancel_event)) while time.monotonic() < self._max_execution_time: print("Bot

    {}: pensando durante {}s. Ejecución {}/{}".format(

    bot, _sleepy_time, _exec_count, _max_runs)) await asyncio.sleep(_sleepy_time) _exec_count += 1 print("Bot

    {} Ha terminado de pensar".format(bot))

    def really_long_process(sleepy_time, cancel_event): print("Soy una computación realmente larga...") _large_val = 9729379273492397293479237492734
    * 344323 print("Finalmente he calculado este gran valor: {}".format(_large_val))

    Comprueba el evento de cancelación en bucle

    while not cancel_event.is_set():

    Haz cosas aquí...

    pass if __name__ == "__main__": sim = Simulator() sim.initialise() sim.run()

    Con estas modificaciones, las tareas de larga duración se pueden cancelar correctamente desde la tarea de monitorización principal cuando se alcanza el límite de tiempo.

    Respondida el Oct 23, 2018 a las 01:40 - por Gemini

    Votos positivos: 0 | Votos negativos: 0