51.Go操作kafka示例(kafka-go库)
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
)
func main() {
// 创建一个Writer实例,指向本地的Kafka服务端口
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "my.topic",
Balancer: &kafka.LeastBytes{},
})
// 创建一个消息
msg := kafka.Message{
Key: []byte("key"),
Value: []byte("value"),
}
// 发送消息
err := w.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Printf("Failed to write message: %v\n", err)
return
}
fmt.Println("Message sent successfully!")
}
这段代码演示了如何使用kafka-go
库创建一个Kafka消息生产者,并发送一条简单的消息到指定的Topic。代码中包含了错误处理,以确保在发生问题时能够及时响应。
评论已关闭