RedisMQ
RedisMQ: 基于 Redis 实现的自研轻量消息队列
https://git.yuelaigroup.com:8000/henry/redismq

📚 前言
使用此 SDK 进行实践前,建议先行了解与 Redis Streams 有关的特性
Redis Streams
🖥️ 接入 SOP
用户需要先行完成 Topic 和 Consumer Group 的创建
创建 Topic:my_test_topic
Text Only |
---|
| 127.0.0.1:6379> XADD my_test_topic * first_key first_val
"1692066364494-0"
|
创建 Consumer Group:my_test_group
Text Only |
---|
| 127.0.0.1:6379> XGROUP CREATE my_test_topic my_test_group 0-0
OK
|
当然,这可以直接调用 InitMQ
函数完成
Go |
---|
| import (
"redismq"
)
func main(){
// ...
redismq.InitMQ(ctx, client, "my_test_topic", "my_test_group", "0-0")
}
|
构造 Redis 客户端实例
Go |
---|
| import "redismq/redis"
func main(){
client := redis.NewClient(Address, Password)
// ...
}
|
启动生产者 Producer
Go |
---|
| import (
"context"
"redismq"
)
func main(){
// ...
producer := redismq.NewProducer(redisClient, redismq.WithMsgQueueLen(10))
ctx := context.Background()
msgId, err := producer.SendMsg(ctx, topic, "test_k", "test_v")
}
|
启动消费者 Consumer
Go |
---|
| import (
"redismq"
)
func main(){
// ...
// 构造并启动消费者
consumer, _ := redismq.NewConsumer(redisClient, topic, consumerGroup, consumerId, callbackFunc,
// 每条消息最多重试 3 次
redismq.WithMaxRetryLimit(3),
// 每轮接收消息的超时时间为 3 s
redismq.WithReceiveTimeout(3*time.Second),
// 注入自定义实现的死信队列
redismq.WithDeadLetterMailbox(demoDeadLetterMailbox))
go consumer.Run()
defer consumer.Stop()
}
|
🐧 使用示例
完整的使用示例代码可以参考 Package Example。