package main
import (
"fmt"
"github.com/Shopify/sarama"
)
// 生产者
func NewAsyncProducer(addrs []string, topic string) (sarama.AsyncProducer, error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
producer, err := sarama.NewAsyncProducer(addrs, config)
if err != nil {
return nil, err
}
// 处理成功投递的消息
go func(p sarama.AsyncProducer) {
for suc := range p.Successes() {
fmt.Printf("消息成功投递到分区 %d 偏移量 %d\n", suc.Partition, suc.Offset)
}
}(producer)
return producer, nil
}
// 消费者
func NewConsumer(addrs []string, topic string, partition int32) (sarama.Consumer, error) {
consumer, err := sarama.NewConsumer(addrs, nil)
if err != nil {
return nil, err
}
partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
if err != nil {
return nil, err
}
// 处理消息
go func(pc sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("消息分区 %d 偏移量 %d 值: %s\n", msg.Partition, msg.Offset, string(msg.Value))
}
}(partitionConsumer)
return consumer, nil
}
func main() {
// Kafka 集群地址
addrs := []string{"127.0.0.1:9092"}
// 主题
topic := "my_topic"
// 创建异步生产者
producer, err := NewAsyncProducer(addrs, topic)
if err != nil {
panic(err)
}
// 创建消费者
consumer, err := NewConsumer(addrs, topic, int32(0))
if err != nil {
panic(err)
}
defer func() {
consumer.Close()
}()
// 生产者发送消息
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder("Hello Kafka!"),
}
producer.Input() <- msg
// 等待程序结束
// 实际应用中,这里可能需要一个优雅退出的逻辑
select {}
}
这段代码首先定义了一个异步生产者的创建函数NewAsyncProducer
,它配置了生产者,并且启动了一个goroutine来处理成功投递的消息。然后定义了一个消费者的创建函数NewConsumer
,它启动了一个goroutine来处理接收到的消息。最后,在main
函数中创建了生产者和消费者,并发送了一条消息。这个例子展示了如何使用sarama包来连接Kafka并进行消息的生产和消费。