package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"time"
)
func main() {
// 创建RocketMQ Producer
producer, err := rocketmq.NewProducer(
producer.WithGroupName("test_group"),
producer.WithNameServer([]string{"127.0.0.1:9876"}),
)
if err != nil {
fmt.Println(err)
return
}
// 启动Producer
err = producer.Start()
if err != nil {
fmt.Println(err)
return
}
// 创建RocketMQ Consumer
consumer, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("test_group"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
)
if err != nil {
fmt.Println(err)
return
}
// 订阅主题
err = consumer.Subscribe(
"TopicTest",
consumer.MessageSelector{},
func(context context.Context, msg primitive.Message) (consumer.ConsumeResult, error) {
fmt.Printf("Received message: %s\n", msg.Body)
return consumer.ConsumeSuccess, nil
},
)
if err != nil {
fmt.Println(err)
return
}
// 启动Consumer
err = consumer.Start()
if err != nil {
fmt.Println(err)
return
}
// 发送消息
msg := &primitive.Message{
Topic: "TopicTest",
Body: []byte("Hello RocketMQ"),
}
res, err := producer.SendSync(context.Background(), msg)
if err != nil {
fmt.Println(err)
} else {
fmt.Printf("Send message success, result:%v\n", res)
}
// 等待一段时间以便Consumer接收和处理消息
time.Sleep(10 * time.Second)
// 关闭Producer和Consumer
err = producer.Shutdown()
if err != nil {
fmt.Println(err)
}
err = consumer.Shutdown()
if err != nil {
fmt.Println(err)
}
}
这段代码展示了如何在Go语言中创建和启动RocketMQ的Producer和Consumer,并且如何发送和接收消息。代码中包含了错误处理,确保在出错时能够打印错误信息并优雅地关闭资源。