Golang 整合RocketMQ
为了在Go语言中整合RocketMQ,你需要使用rocketmq-go客户端。以下是一个简单的例子,展示了如何发送和接收消息。
首先,通过以下命令安装rocketmq-go客户端:
go get github.com/apache/rocketmq-go/v2@latest
以下是发送消息和接收消息的简单例子:
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-go/v2"
"github.com/apache/rocketmq-go/v2/consumer"
"github.com/apache/rocketmq-go/v2/primitive"
)
func main() {
// 1. 创建消息生产者
producer, err := rocketmq.NewProducer(
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithRetry(2),
)
if err != nil {
fmt.Println(err)
return
}
// 2. 启动消息生产者
err = producer.Start()
if err != nil {
fmt.Println(err)
return
}
// 3. 创建消息消费者
consumer, err := rocketmq.NewPushConsumer(
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithConsumerGroup("group"),
)
if err != nil {
fmt.Println(err)
return
}
// 4. 订阅主题和标签
err = consumer.Subscribe(primitive.Topic("topic"), consumer.MessageSelector{
Type: primitive.TAG,
Expression: "tagA || tagB",
})
if err != nil {
fmt.Println(err)
return
}
// 5. 启动消费者进行消息消费
err = consumer.Start(context.Background(), func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
fmt.Printf("receive message: %s\n", msg.Body)
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err)
return
}
// 6. 发送消息
msg := &primitive.Message{
Topic: "topic",
Body: []byte("Hello RocketMQ!"),
}
res, err := producer.SendSync(context.Background(), msg)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("send message id: %s\n", res.MsgID)
}
在这个例子中,我们创建了一个消息生产者用于发送消息,以及一个消息消费者用于接收消息。我们通过producer.SendSync
发送了一条消息,并且注册了一个回调函数到consumer.Start
来处理接收到的消息。
确保RocketMQ的nameserver地址是正确的,并且RocketMQ服务已经启动。这个例子假设你已经有了一个RocketMQ的nameserver在运行,并且正确配置了主题(topic)和标签(tag)。
评论已关闭