Recuperar mensajes de redis stream

Tengo una aplicación NodeJS que está usando Redis stream (library 'ioredis') para pasar información. El problema es que cuando agrego un mensaje a un flujo y trato de recuperarlo, tengo que bajar mucho nivel de Arrays:

      const message = await redis.xreadgroup('GROUP', orderGroup, orderConsumer, 'COUNT', 1, 'STREAMS', orderStream, '>');

      const messageId: string  = message[0][1][0][0];
      const pMsg: Obj = JSON.parse(JSON.parse(message[0][1][0][1][1]));

Así es como creo el flujo:

    await redis.xgroup('CREATE', orderStream, orderGroup, '0', 'MKSTREAM')
    .catch((err) => {
      console.error(`Group already exists error: ${err}`);
    })

¿Esto es normal? En el doc de Redishttps://redis.io/commands/xreadgroup) muestra que el valor de retorno es un array con el id del mensaje en la posición 0 y los campos en la posición 1. Siento que me estoy perdiendo algo...

Pregunta hecha hace 3 años, 4 meses, 29 días - Por techtrailblazer


5 Respuestas:

  • Aquí hay un ejemplo de salida de XREADGROUP, ya que puede ver los valores están en el nivel anidado 5.

    127.0.0.1:6379> XREADGROUP Group g1 c1 COUNT 100 STREAMS s1 >
    1) 1) "s1"
       2)  1) 1) "1608445334963-0"
              2) 1) "f1"
                 2) "v1"
                 3) "f2"
                 4) "v2"
           2) 1) "1608445335464-0"
              2) 1) "f1"
                 2) "v1"
                 3) "f2"
                 4) "v2"
           3) 1) "1608445335856-0"
              2) 1) "f1"
                 2) "v1"
                 3) "f2"
                 4) "v2"
    

    Para más detalles ver https://redis.io/commands/xread

    Respondida el Dec 18, 2020 a las 09:15 - por codexcrafter

    Votos positivos: 0 | Votos negativos: 0

  • Es normal y esperado. XREADGROUP soporta la lectura de múltiples teclas de flujo, múltiples mensajes y mensajes pueden tener múltiples pares de valor de campo.

    Siga el siguiente ejemplo:

    > XGROUP CREATE mystream1 mygroup 0 MKSTREAM
    OK
    > XGROUP CREATE mystream2 mygroup 0 MKSTREAM
    OK
    > XADD mystream1 * field1 value1 field2 value2
    "1608444656005-0"
    > XADD mystream1 * field1 value3 field2 value4
    "1608444660566-0"
    > XADD mystream2 * field3 value5 field4 value6
    "1608444665238-0"
    > XADD mystream2 * field3 value7 field4 value8
    "1608444670070-0"
    > XREADGROUP GROUP mygroup yo COUNT 2 STREAMS mystream1 mystream2 > >
    1) 1) "mystream1"
       2) 1) 1) "1608444656005-0"
             2) 1) "field1"
                2) "value1"
                3) "field2"
                4) "value2"
          2) 1) "1608444660566-0"
             2) 1) "field1"
                2) "value3"
                3) "field2"
                4) "value4"
    2) 1) "mystream2"
       2) 1) 1) "1608444665238-0"
             2) 1) "field3"
                2) "value5"
                3) "field4"
                4) "value6"
          2) 1) "1608444670070-0"
             2) 1) "field3"
                2) "value7"
                3) "field4"
                4) "value8"
    

    La estructura que obtiene tiene múltiples arrays anidados. Usando 0-indexado como en nodo:

    [index of the stream key]
      [0: the key name or 1: an array for messages]
        [index of the message]
          [0: the message ID or 1: an array for fields & values]
            [even for field name or odd for value]
    

    Respondida el Dec 18, 2020 a las 09:25 - por bitwisewizard

    Votos positivos: 0 | Votos negativos: 0

  • Donde data[0][1] es la matriz de nivel raíz (ajuste este punto de entrada para su propio uso).

    Variables

    rd: Datos de retorno

    el: Elemento

    sel: Subelemento

    rel: Elemento relativo

    p: Objeto relativo

    c: Iterate-Counter para decidir si es una clave o valor.

    var rd = []
    for(var el of data[0][1]){
      var sel = el[1] 
      var p = {}
      var c = 0
      for(var rel of sel){
        if(c % 2 == 0){
            // Right here is where the return object keys/values are set.
            p[rel] = sel[c + 1]
        }
        c++
      }
      rd.push(p)
    }
    return rd
    

    Respondida el Dec 18, 2020 a las 09:33 - por bugbusterx

    Votos positivos: 0 | Votos negativos: 0

  • El problema que estás experimentando al tener que descender varios niveles en los Arrays para obtener el mensaje de Redis stream puede ser simplificado si usas la desestructuración de JavaScript. En lugar de acceder a los elementos anidados usando índices, puedes desestructurar directamente el array para obtener el mensaje y el ID del mensaje.

    Por ejemplo, podrías cambiar tu código de recuperación de mensajes a algo así:

    javascript
    const [streamName, messages] = await redis.xreadgroup('GROUP', orderGroup, orderConsumer, 'COUNT', 1, 'STREAMS', orderStream, '>');
    const [messageId, messageValue] = messages[0];
    const pMsg = JSON.parse(JSON.parse(messageValue[1]));
    

    Esto simplificará tu código y hará que sea más fácil de entender. Además, te recomendaría verificar que el array devuelto por xreadgroup no esté vacío para evitar errores al intentar acceder a sus elementos.

    En cuanto a si es normal tener que bajar varios niveles en los Arrays al trabajar con Redis stream, es una característica de cómo está estructurada la respuesta de Redis para este comando en particular. Sin embargo

    Respondida el Dec 19, 2020 a las 09:14 - por Gpt

    Votos positivos: 0 | Votos negativos: 0

  • El formato de retorno de xreadgroup en Node.js es diferente al que se muestra en la documentación de Redis. La documentación muestra el formato para el protocolo de Redis, mientras que Node.js devuelve los resultados en un formato más estructurado.

    El formato de retorno en Node.js es el siguiente:

    
    [
      [
        streamName,
        [
          [
            messageId,
            fieldsObject
          ]
        ]
      ]
    ]
    

    Donde:

    streamName es el nombre del flujo del que se recuperaron los mensajes. messageId es el ID del mensaje recuperado.

    • `fieldsObject` es un objeto que contiene los campos y valores del mensaje.

    En tu caso, los mensajes se están recuperando de un solo flujo, por lo que el primer elemento de la matriz externa será un arreglo con el nombre del flujo. El segundo elemento de la matriz interna será un arreglo con un solo mensaje, que contiene el ID del mensaje y el objeto de campos.

    Por lo tanto, para recuperar el ID del mensaje y el objeto de campos, puedes usar el siguiente código:

    
    const messageId = message[0][1][0][0];
    const pMsg = message[0][1][0][1];
    

    Esto recuperará el ID del mensaje en messageId y el objeto de campos en pMsg.

    Respondida el Dec 19, 2020 a las 09:18 - por Gemini

    Votos positivos: 0 | Votos negativos: 0