English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

Stream Redis

O Redis Stream é o 5versão 0, nova estrutura de dados adicionada.

O Redis Stream é usado principalmente para fila de mensagens (MQ, Message Queue), o Redis em si já tem um Redis publish-subscribe (pub/sub) é usado para implementar funcionalidades de fila de mensagens, mas ele tem a desvantagem de que as mensagens não podem ser persistidas, se ocorrer interrupção de rede, falha do Redis, etc., as mensagens serão perdidas.

Em termos simples, o pub/sub) pode distribuir mensagens, mas não pode registrar mensagens históricas.

O Redis Stream oferece funcionalidades de persistência de mensagens e cópia de segurança mestre-e-esclavo, permitindo que qualquer cliente acesse dados de qualquer momento e lembre-se da posição de acesso de cada cliente, além de garantir que as mensagens não sejam perdidas.

A estrutura do Redis Stream é como mostrado a seguir, ele tem uma lista de mensagens, que une todas as mensagens adicionadas, cada mensagem tem um ID único e o conteúdo correspondente:

Cada Stream tem um nome único, que é a chave do Redis, ele é criado automaticamente quando usamos pela primeira vez o comando xadd para anexar mensagens.

Análise da imagem acima:

  • Grupo de Consumidores :Grupo de consumo, criado com o comando XGROUP CREATE, um grupo de consumo pode ter múltiplos consumidores (Consumer).

  • last_delivered_id :Cursor, cada grupo de consumo terá um cursor last_delivered_id, e qualquer consumidor que leia mensagens fará com que o cursor last_delivered_id avance.

  • pending_ids :O estado do consumidor (Consumer), que serve para manter os IDs não confirmados do consumidor. pending_ids registra as mensagens lidas pelo cliente, mas ainda não ack (caractere de reconhecimento: caractere de reconhecimento).

Comandos relacionados ao fila de mensagens:

  • XADD - Adicionar mensagem ao final

  • XTRIM - Aparar o fluxo, limitar o comprimento

  • XDEL - Excluir mensagens

  • XLEN - Obter o número de elementos contidos no fluxo, ou seja, o comprimento das mensagens

  • XRANGE - Obter lista de mensagens, automaticamente filtrando mensagens excluídas

  • XREVRANGE -  Obter lista de mensagens em ordem inversa, ID de maior para menor

  • XREAD - Obter lista de mensagens de forma bloqueada ou não bloqueada

Comandos relacionados ao grupo de consumidores:

  • XGROUP CREATE - Criar grupo de consumidores

  • XREADGROUP GRUPO - Ler mensagens do grupo de consumidores

  • XACK - Marcar mensagem como "tratada"

  • XGROUP SETID - Definir novo ID da mensagem de entrega final para o grupo de consumidores

  • XGROUP DELCONSUMER - Excluir consumidor

  • XGROUP DESTROY - Excluir grupo de consumidores

  • XPENDING - Exibir informações relacionadas a mensagens pendentes

  • XCLAIM - Transferir a propriedade de pertencimento da mensagem

  • XINFO - Ver informações do fluxo e do grupo de consumidores

  • XINFO GROUPS - Imprimir informações do grupo de consumidores

  • XINFO STREAM - Imprimir informações do fluxo

XADD

Usar XADD para adicionar mensagens à fila, se a fila especificada não existir, cria uma fila, a sintaxe do formato do XADD é:

XADD chave ID campo valor [campo valor ...]
  • key : Nome da fila, se não existir, será criada

  • ID : ID da mensagem, usamos * representa o gerado pelo redis, pode ser personalizado, mas deve garantir a crescente por conta própria.

  • valor do campo : Registro.

redis> XADD mystream * nome Sara sobrenome O'Connor
"1601372323627-0"
redis> XADD mystream * campo1 valor1 campo2 valor2 campo3 valor3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) ""1601372323627-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "O'Connor"
2) 1) ""1601372323627-1"
   2) 1) "campo"1"
      2) "valor"1"
      3) "campo"2"
      4) "valor"2"
      5) "campo"3"
      6) "valor"3"
redis>

XTRIM

Usar XTRIM para aparar o fluxo, limitar o comprimento, a sintaxe do formato é:

XTRIM chave MAXLEN [~] contagem
  • key : Nome da fila

  • MAXLEN : Comprimento

  • count :数量

127.0.0.1:6379> XADD mystream * campo1 A campo2 B campo3 C campo4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(inteiro) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) ""1601372434568-0"
   2) 1) "campo"1"
      2) "A"
      3) "campo"2"
      4) "B"
      5) "campo"3"
      6) "C"
      7) "campo"4"
      8) "D"
127.0.0.1:6379> 
redis>

XDEL

Usar XDEL para excluir mensagens, a sintaxe do formato é:

XDEL chave ID [ID ...]
  • key: Nome da fila

  • ID :消息 ID

> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) ""1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) ""3"

XLEN

Usar XLEN para obter o número de elementos contidos no fluxo, ou seja, o comprimento das mensagens, a sintaxe do formato é:

XLEN chave
  • key: Nome da fila

redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>

XRANGE

使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XRANGE key start end [COUNT count]
  • key :队列名

  • start :开始值, - 表示最小值

  • end :结束值, + 表示最大值

  • count :数量

redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) ""1601372577811-0"
   2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) ""1601372577811-1"
   2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
redis>

XREVRANGE

使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:

XREVRANGE key end start [COUNT count]
  • key :队列名

  • end :结束值, + 表示最大值

  • start :开始值, - 表示最小值

  • count :数量

redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) ""1601372731459-3"
   2) 1) "name"
      2) "Ngozi"
      3) "surname"
      4) "Adichie"
redis>

XREAD

使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  • count :数量

  • milissegundos :可选,阻塞毫秒数,没有设置就是非阻塞模式

  • key :队列名

  • id :消息 ID

# 从 Stream 头部读取两条消息
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
   2) 1) 1) 1526984818136-0
         2) 1) "duration"
            2) ""1532"
            3) "event"-id"
            4) ""5"
            5) "user"-id"
            6) ""7782813"
      2) 1) 1526999352406-0
         2) 1) "duration"
            2) ""812"
            3) "event"-id"
            4) ""9"
            5) "user"-id"
            6) ""388234"
2) 1) "writers"
   2) 1) 1) 1526985676425-0
         2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
         2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"

XGROUP CREATE

使用 XGROUP CREATE 创建消费者组,语法格式:

XGROUP [CREATE key groupname id-ou-$] [SETID key groupname id]-ou-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  • key : Nome da fila, se não existir, será criada

  • groupname : Nome do grupo.

  • $ : Indica que seja consumido a partir do final, aceita apenas novas mensagens, todas as mensagens do Stream atual serão ignoradas.

Consumir a partir do início:

XGROUP CREATE mystream consumidor-grupo-name 0-0

Consumir a partir do final:

XGROUP CREATE mystream consumidor-grupo-name $

XREADGROUP GRUPO

Usar XREADGROUP GRUPO para ler mensagens do grupo de consumo, formato da sintaxe:

XREADGROUP GRUPO grupo consumidor [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key [...]] ID [ID [...]]
  • grupo : Nome do grupo de consumo

  • consumidor : Nome do consumidor.

  • count : Quantidade de leitura.

  • milissegundos : Milissegundos de bloqueio.

  • key : Nome da fila.

  • ID : ID da mensagem.

XREADGROUP GRUPO consumidor-grupo-name consumidor-name CONTAR 1 STREAMS mystream >