English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
O Redis Stream é o 5versão 0, nova estrutura de dados adicionada.
O Redis Stream é usado principalmente para fila de mensagens (MQ, Message Queue), o Redis em si já tem um Redis publish-subscribe (pub/sub) é usado para implementar funcionalidades de fila de mensagens, mas ele tem a desvantagem de que as mensagens não podem ser persistidas, se ocorrer interrupção de rede, falha do Redis, etc., as mensagens serão perdidas.
Em termos simples, o pub/sub) pode distribuir mensagens, mas não pode registrar mensagens históricas.
O Redis Stream oferece funcionalidades de persistência de mensagens e cópia de segurança mestre-e-esclavo, permitindo que qualquer cliente acesse dados de qualquer momento e lembre-se da posição de acesso de cada cliente, além de garantir que as mensagens não sejam perdidas.
A estrutura do Redis Stream é como mostrado a seguir, ele tem uma lista de mensagens, que une todas as mensagens adicionadas, cada mensagem tem um ID único e o conteúdo correspondente:
Cada Stream tem um nome único, que é a chave do Redis, ele é criado automaticamente quando usamos pela primeira vez o comando xadd para anexar mensagens.
Análise da imagem acima:
Grupo de Consumidores :Grupo de consumo, criado com o comando XGROUP CREATE, um grupo de consumo pode ter múltiplos consumidores (Consumer).
last_delivered_id :Cursor, cada grupo de consumo terá um cursor last_delivered_id, e qualquer consumidor que leia mensagens fará com que o cursor last_delivered_id avance.
pending_ids :O estado do consumidor (Consumer), que serve para manter os IDs não confirmados do consumidor. pending_ids registra as mensagens lidas pelo cliente, mas ainda não ack (caractere de reconhecimento: caractere de reconhecimento).
Comandos relacionados ao fila de mensagens:
XADD - Adicionar mensagem ao final
XTRIM - Aparar o fluxo, limitar o comprimento
XDEL - Excluir mensagens
XLEN - Obter o número de elementos contidos no fluxo, ou seja, o comprimento das mensagens
XRANGE - Obter lista de mensagens, automaticamente filtrando mensagens excluídas
XREVRANGE - Obter lista de mensagens em ordem inversa, ID de maior para menor
XREAD - Obter lista de mensagens de forma bloqueada ou não bloqueada
Comandos relacionados ao grupo de consumidores:
XGROUP CREATE - Criar grupo de consumidores
XREADGROUP GRUPO - Ler mensagens do grupo de consumidores
XACK - Marcar mensagem como "tratada"
XGROUP SETID - Definir novo ID da mensagem de entrega final para o grupo de consumidores
XGROUP DELCONSUMER - Excluir consumidor
XGROUP DESTROY - Excluir grupo de consumidores
XPENDING - Exibir informações relacionadas a mensagens pendentes
XCLAIM - Transferir a propriedade de pertencimento da mensagem
XINFO - Ver informações do fluxo e do grupo de consumidores
XINFO GROUPS - Imprimir informações do grupo de consumidores
XINFO STREAM - Imprimir informações do fluxo
Usar XADD para adicionar mensagens à fila, se a fila especificada não existir, cria uma fila, a sintaxe do formato do XADD é:
XADD chave ID campo valor [campo valor ...]
key : Nome da fila, se não existir, será criada
ID : ID da mensagem, usamos * representa o gerado pelo redis, pode ser personalizado, mas deve garantir a crescente por conta própria.
valor do campo : Registro.
redis> XADD mystream * nome Sara sobrenome O'Connor "1601372323627-0" redis> XADD mystream * campo1 valor1 campo2 valor2 campo3 valor3 "1601372323627-1" redis> XLEN mystream (integer) 2 redis> XRANGE mystream - + 1) 1) ""1601372323627-0" 2) 1) "name" 2) "Sara" 3) "surname" 4) "O'Connor" 2) 1) ""1601372323627-1" 2) 1) "campo"1" 2) "valor"1" 3) "campo"2" 4) "valor"2" 5) "campo"3" 6) "valor"3" redis>
Usar XTRIM para aparar o fluxo, limitar o comprimento, a sintaxe do formato é:
XTRIM chave MAXLEN [~] contagem
key : Nome da fila
MAXLEN : Comprimento
count :数量
127.0.0.1:6379> XADD mystream * campo1 A campo2 B campo3 C campo4 D "1601372434568-0" 127.0.0.1:6379> XTRIM mystream MAXLEN 2 (inteiro) 0 127.0.0.1:6379> XRANGE mystream - + 1) 1) ""1601372434568-0" 2) 1) "campo"1" 2) "A" 3) "campo"2" 4) "B" 5) "campo"3" 6) "C" 7) "campo"4" 8) "D" 127.0.0.1:6379> redis>
Usar XDEL para excluir mensagens, a sintaxe do formato é:
XDEL chave ID [ID ...]
key: Nome da fila
ID :消息 ID
> XADD mystream * a 1 1538561698944-0 > XADD mystream * b 2 1538561700640-0 > XADD mystream * c 3 1538561701744-0 > XDEL mystream 1538561700640-0 (integer) 1 127.0.0.1:6379> XRANGE mystream - + 1) 1) 1538561698944-0 2) 1) "a" 2) ""1" 2) 1) 1538561701744-0 2) 1) "c" 2) ""3"
Usar XLEN para obter o número de elementos contidos no fluxo, ou seja, o comprimento das mensagens, a sintaxe do formato é:
XLEN chave
key: Nome da fila
redis> XADD mystream * item 1 "1601372563177-0" redis> XADD mystream * item 2 "1601372563178-0" redis> XADD mystream * item 3 "1601372563178-1" redis> XLEN mystream (integer) 3 redis>
使用 XRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:
XRANGE key start end [COUNT count]
key :队列名
start :开始值, - 表示最小值
end :结束值, + 表示最大值
count :数量
redis> XADD writers * name Virginia surname Woolf "1601372577811-0" redis> XADD writers * name Jane surname Austen "1601372577811-1" redis> XADD writers * name Toni surname Morrison "1601372577811-2" redis> XADD writers * name Agatha surname Christie "1601372577812-0" redis> XADD writers * name Ngozi surname Adichie "1601372577812-1" redis> XLEN writers (integer) 5 redis> XRANGE writers - + COUNT 2 1) 1) ""1601372577811-0" 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) ""1601372577811-1" 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen" redis>
使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息 ,语法格式:
XREVRANGE key end start [COUNT count]
key :队列名
end :结束值, + 表示最大值
start :开始值, - 表示最小值
count :数量
redis> XADD writers * name Virginia surname Woolf "1601372731458-0" redis> XADD writers * name Jane surname Austen "1601372731459-0" redis> XADD writers * name Toni surname Morrison "1601372731459-1" redis> XADD writers * name Agatha surname Christie "1601372731459-2" redis> XADD writers * name Ngozi surname Adichie "1601372731459-3" redis> XLEN writers (integer) 5 redis> XREVRANGE writers + - COUNT 1 1) 1) ""1601372731459-3" 2) 1) "name" 2) "Ngozi" 3) "surname" 4) "Adichie" redis>
使用 XREAD 以阻塞或非阻塞方式获取消息列表 ,语法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
count :数量
milissegundos :可选,阻塞毫秒数,没有设置就是非阻塞模式
key :队列名
id :消息 ID
# 从 Stream 头部读取两条消息 > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) "mystream" 2) 1) 1) 1526984818136-0 2) 1) "duration" 2) ""1532" 3) "event"-id" 4) ""5" 5) "user"-id" 6) ""7782813" 2) 1) 1526999352406-0 2) 1) "duration" 2) ""812" 3) "event"-id" 4) ""9" 5) "user"-id" 6) ""388234" 2) 1) "writers" 2) 1) 1) 1526985676425-0 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) 1526985685298-0 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen"
使用 XGROUP CREATE 创建消费者组,语法格式:
XGROUP [CREATE key groupname id-ou-$] [SETID key groupname id]-ou-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
key : Nome da fila, se não existir, será criada
groupname : Nome do grupo.
$ : Indica que seja consumido a partir do final, aceita apenas novas mensagens, todas as mensagens do Stream atual serão ignoradas.
Consumir a partir do início:
XGROUP CREATE mystream consumidor-grupo-name 0-0
Consumir a partir do final:
XGROUP CREATE mystream consumidor-grupo-name $
Usar XREADGROUP GRUPO para ler mensagens do grupo de consumo, formato da sintaxe:
XREADGROUP GRUPO grupo consumidor [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key [...]] ID [ID [...]]
grupo : Nome do grupo de consumo
consumidor : Nome do consumidor.
count : Quantidade de leitura.
milissegundos : Milissegundos de bloqueio.
key : Nome da fila.
ID : ID da mensagem.
XREADGROUP GRUPO consumidor-grupo-name consumidor-name CONTAR 1 STREAMS mystream >