在CSDN学Golang分布式中间件(kafka)
在CSDN学习Golang分布式中间件(Kafka),以下是一个使用sarama
库进行Kafka生产者和消费者的基本示例。
首先,确保你已经安装了sarama
库:
go get github.com/Shopify/sarama
生产者示例代码:
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Println("Failed to start producer:", err)
return
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "myTopic",
Value: sarama.StringEncoder("Hello Kafka!"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Println("Failed to send message:", err)
return
}
fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
}
消费者示例代码:
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
fmt.Println("Failed to start consumer:", err)
return
}
defer consumer.Close()
partitionConsumer, err := consumer.ConsumePartition("myTopic", 0, sarama.OffsetNewest)
if err != nil {
fmt.Println("Failed to start consumer:", err)
return
}
defer partitionConsumer.Close()
for msg := range partitionConsumer.Messages() {
fmt.Printf("Consumed message offset %d\n", msg.Offset)
}
}
确保你的Kafka服务器运行在localhost:9092
,并且你已经创建了名为myTopic
的Kafka主题。
以上代码仅供学习使用,实际生产环境需要更复杂的错误处理和资源管理。
评论已关闭