No reiniciar la fuente de akka que se crea a partir de la respuesta Htp

He creado al cliente WS en el marco de juego2.6 que se conecta a REST API. Esta API de reposo está enviando constantemente fragmentos/stream de datos. Pero entonces durante pocos segundos el servidor REST API baja y se reinicia de nuevo. He escrito al cliente para reiniciar el flujo pero parece que no es capaz de reiniciarlo.

    def employesCompany() = Action.async { implicit request: Request[AnyContent] =>
    val client =   ws.underlying
    
    val wsrequest: WSRequest = ws.url(playConfiguration.get[String]("employes.service.url"))

    val futureResponse: Future[WSResponse] = wsrequest.stream()

    futureResponse.flatMap { response =>
      val source: Source[ByteString, Any] = response.bodyAsSource
      val publisher = source.toMat(Sink.asPublisher(true))(Keep.right).run()
      val flow = Flow[ByteString].map(res => res.utf8String)
      val ns = Source.fromPublisher(publisher).via(flow)
      val restartSource = RestartSource.withBackoff(Duration.apply(1, "sec"), Duration.apply(3, "sec"), 0.2) { () =>
        ns.map {
          elem =>
            println(elem)
            elem
        }
      }
     
      Future.successful(Ok.chunked(restartSource via EventSource.flow).as(ContentTypes.EVENT_STREAM))
    }
  }

Estoy enfrentando el error siguiente:

ERROR] [12/20/2020 18:35:27.585] [play-dev-mode-akka.actor.default-dispatcher-8] [RestartWithBackoffSource(akka://play-dev-mode)] **Restarting graph due to failure
java.io.IOException: An existing connection was forcibly closed by the remote host**
        at sun.nio.ch.SocketDispatcher.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at play.shaded.ahc.io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:368)
        at play.shaded.ahc.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:891)
        at play.shaded.ahc.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:277)
        at play.shaded.ahc.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
        at play.shaded.ahc.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
        at play.shaded.ahc.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
        at play.shaded.ahc.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
        at play.shaded.ahc.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
        at play.shaded.ahc.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at play.shaded.ahc.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)

Pregunta hecha hace 3 años, 4 meses, 25 días - Por compilerhero


3 Respuestas:

  • El problema es que la solicitud ya se realiza cuando usted llama wsrequest.stream(), pero desea que se realice más tarde cuando se materialice la Fuente. Por lo tanto, necesita retrasar la creación de la Future por no llamar wsrequest.stream que Akka Stream lo haga.

    No he trabajado con Akka Stream en un tiempo, pero algo así debería funcionar:

        val responseStream = Source.lazyFuture(() => wsrequest.stream())
    
        responseStream.flatMapConcat { response =>
          // the rest like before
    
    

    Respondida el Dec 20, 2020 a las 14:14 - por bytebarde61c

    Votos positivos: 0 | Votos negativos: 0

  • El error que estás viendo indica que la conexión está siendo cerrada de forma abrupta por el servidor remoto. Esto puede suceder cuando el servidor se reinicia o hay algún problema de red.

    En tu código, estás utilizando RestartSource.withBackoff para reiniciar el flujo en caso de algún fallo, pero parece que el reinicio no está funcionando correctamente.

    Una posible solución para evitar reiniciar la conexión cada vez que el servidor se reinicie es mantener la conexión abierta y manejar los errores de forma más elegante. Puedes intentar capturar el error y volver a intentar la conexión en lugar de reiniciar todo el flujo.

    Aquí te dejo un ejemplo de cómo podrías modificar tu código para manejar los errores de forma más flexible:

    scala
        val restartSource = RestartSource.onFailuresWithBackoff(
          minBackoff = 1.second,
          maxBackoff = 30.seconds,
          randomFactor = 0.2
        ) { () =>
          Source.fromFutureSource(wsrequest.stream().map { response =>
            val source: Source[ByteString, Any] = response.bodyAsSource
            val publisher = source.toMat(Sink.asPublisher(true))(Keep.right).run()
            val flow = Flow[ByteString].map(res => res.utf8String)
            val ns = Source.fromPublisher(publisher).via(flow)
    
            ns.map { elem =>
              println(elem)
              elem
            }
          })
        }
    
        Future.successful(Ok.chunked(restartSource.via(EventSource.flow)).as(ContentTypes.EVENT_STREAM))
    

    Con esta modificación, el RestartSource solo intentará reiniciar la conexión en caso de errores persistentes, en lugar de hacerlo cada vez que se produce un fallo.

    Espero que esta información te ayude a resolver el problema. Si sigues experimentando dificultades, no dudes en consultarme.

    Respondida el Dec 21, 2020 a las 14:12 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • El error que estás experimentando indica que la conexión entre tu cliente y el servidor REST API se está cerrando inesperadamente. Hay algunas cosas que puedes verificar para resolver este problema:

    • Configura el manejador de errores: Agrega un manejador de errores al flujo `restartSource` para capturar el error `IOException`. Esto te permitirá registrar el error y potencialmente reiniciar el flujo si es necesario.
    • Verifica la configuración de tiempo de espera: Asegúrate de que los tiempos de espera de conexión y lectura estén configurados correctamente en la biblioteca de clientes HTTP. Si los tiempos de espera son demasiado cortos, la conexión puede cerrarse prematuramente.
    • Examina la API REST: Verifica que la API REST esté devolviendo códigos de estado HTTP correctos. Los códigos de estado de error pueden indicar que el servidor está experimentando problemas y que la conexión debe cerrarse.
    • Habilita el registro del lado del servidor: Habilita el registro del lado del servidor para la API REST para ver si hay alguna indicación de por qué se están cerrando las conexiones.
    • Usa un balanceador de carga: Si es posible, usa un balanceador de carga frente a la API REST para distribuir el tráfico y evitar sobrecargar cualquier servidor individual.
    • Intenta con un cliente HTTP diferente: Intenta usar una biblioteca de clientes HTTP diferente para ver si el problema persiste. Esto puede ayudar a descartar problemas específicos de la biblioteca.
    • Verifica la configuración de seguridad: Asegúrate de que se hayan configurado correctamente los certificados SSL y otras configuraciones de seguridad, si corresponde.
    • Examina los cortafuegos: Verifica que los cortafuegos no estén bloqueando la comunicación entre el cliente y el servidor.
    • Prueba de rendimiento: Realiza pruebas de rendimiento para simular el tráfico de producción y ver si el problema se reproduce bajo carga.
    1. Contacta al proveedor de la API REST: Si todo lo demás falla, comunícate con el proveedor de la API REST para obtener asistencia y descartar cualquier problema en su extremo.

    Respondida el Dec 21, 2020 a las 14:16 - por Gemini

    Votos positivos: 0 | Votos negativos: 0