跳转至

RedisMQ

RedisMQ: 基于 Redis 实现的自研轻量消息队列

https://git.yuelaigroup.com:8000/henry/redismq

39eea620-8c4d-4216-9b6d-29ba066fc435

📚 前言

使用此 SDK 进行实践前,建议先行了解与 Redis Streams 有关的特性

Redis Streams

🖥️ 接入 SOP

用户需要先行完成 Topic 和 Consumer Group 的创建

创建 Topic:my_test_topic

Text Only
127.0.0.1:6379> XADD my_test_topic * first_key first_val
"1692066364494-0"

创建 Consumer Group:my_test_group

Text Only
127.0.0.1:6379> XGROUP CREATE my_test_topic my_test_group 0-0
OK

当然,这可以直接调用 InitMQ 函数完成

Go
1
2
3
4
5
6
7
import (
    "redismq"
)
func main(){
    // ...
    redismq.InitMQ(ctx, client, "my_test_topic", "my_test_group", "0-0")
}

构造 Redis 客户端实例

Go
1
2
3
4
5
import "redismq/redis"
func main(){
    client := redis.NewClient(Address, Password)
    // ...
}

启动生产者 Producer

Go
import (
    "context"
    "redismq"
)
func main(){
    // ...
    producer := redismq.NewProducer(redisClient, redismq.WithMsgQueueLen(10))
    ctx := context.Background()
    msgId, err := producer.SendMsg(ctx, topic, "test_k", "test_v")
}

启动消费者 Consumer

Go
import (
    "redismq"
)
func main(){
    // ...
    // 构造并启动消费者
    consumer, _ := redismq.NewConsumer(redisClient, topic, consumerGroup, consumerId, callbackFunc,
        // 每条消息最多重试 3 次
        redismq.WithMaxRetryLimit(3),
        // 每轮接收消息的超时时间为 3 s
        redismq.WithReceiveTimeout(3*time.Second),
        // 注入自定义实现的死信队列
        redismq.WithDeadLetterMailbox(demoDeadLetterMailbox))

    go consumer.Run()
    defer consumer.Stop()
}

🐧 使用示例

完整的使用示例代码可以参考 Package Example。