第一节:kafka golang sarama初体验,2024年最新最新出炉
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有的follower同步完成
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新消息随机分配到分区
config.Producer.Return.Successes = true // 成功交付的消息列表
// 构造消息
msg := &sarama.ProducerMessage{}
msg.Topic = "go-kafka-test"
msg.Value = sarama.StringEncoder("Hello Kafka!")
// 生产者客户端
client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
panic(err)
}
fmt.Printf("partition:%d, offset:%d\n", pid, offset)
}
这段代码使用了sarama库来连接Kafka服务器,并发送了一条简单的消息。首先,它配置了生产者,然后创建了一个消息并指定了主题,最后通过NewSyncProducer函数连接到Kafka服务器,并使用SendMessage方法发送了这条消息。如果发送成功,它会打印出消息被分配到的分区和偏移量。
评论已关闭