Go操作Kafka之kafka-go
warning:
这篇文章距离上次修改已过188天,其中的内容可能已经有所变动。
以下是使用kafka-go
包创建生产者和消费者的简单示例代码。
生产者:
package main
import (
"fmt"
"github.com/segmentio/kafka-go"
)
func main() {
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
})
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("key"),
Value: []byte("value"),
},
)
if err != nil {
fmt.Printf("Failed to write message: %v\n", err)
}
}
消费者:
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
)
func main() {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "my-topic",
GroupID: "my-group",
})
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
fmt.Printf("Failed to read message: %v\n", err)
break
}
fmt.Printf("Message on %s: key = %s, value = %s, timestamp = %v\n",
m.Topic, m.Key, m.Value, m.Time)
}
}
确保您已经安装了kafka-go
包,并且Kafka服务器正在运行在localhost
的9092
端口。
生产者代码向名为my-topic
的主题发送一条消息。
消费者代码从同一个主题读取消息,并打印出来。注意,这里的消费者使用了一个群组IDmy-group
,在实际应用中,群组ID应该是唯一的。
评论已关闭