package main
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// 初始化Kafka消费者
func NewKafkaConsumer(brokers []string, groupID string, topics []string) (sarama.ConsumerGroup, error) {
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
// 创建消费者实例
consumer, err := cluster.NewConsumer(brokers, groupID, topics, config)
if err != nil {
return nil, err
}
return consumer, nil
}
func main() {
brokers := []string{"localhost:9092"} // Kafka 集群地址
groupID := "my-group" // 消费者组ID
topics := []string{"my-topic"} // 需要消费的主题
// 初始化Kafka消费者
consumer, err := NewKafkaConsumer(brokers, groupID, topics)
if err != nil {
log.Fatalf("Failed to start consumer: %s", err)
}
defer func() {
err := consumer.Close()
if err != nil {
log.Printf("Failed to close consumer: %s", err)
}
}()
// 监听操作系统信号
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
// 消费者处理逻辑
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case msg, ok := <-consumer.Messages():
if !ok {
log.Println("Consumer closed.")
return
}
fmt.Printf("Message topic: %s, partition: %d, offset: %d, key: %s, value: %s\n",
msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
case err := <-consumer.Errors():
log.Printf("Error: %s\n", err)
case ntf := <-consumer.Notifications():
log.Printf("Rebalanced: %+v\n", ntf)
case <-signals:
log.Println("Received shutdown signal, exiting...")
return
}
}
}()
wg.Wait()
}
这段代码演示了如何在Go语言中使用sarama库创建一个简单的Kafka消费者,并监听特定的主题。它使用了sarama-cluster库来简化消费者的使用,并处理了操作系统的信号以优雅地关闭消费者。这是分布式系统中常见的Kafka消费者模式,对于学习分布式消息队列和Go语言的开发者来说,具有很好的教育价值。