go使用logrus进行日志收集并异步发布日志到kafka
package main
import (
"github.com/sirupsen/logrus"
"github.com/bshuster-repo/logrus-kafka-hook"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
// 配置 Kafka 连接参数
kafkaCfg := kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
}
// 配置 logrus-kafka-hook
hook, err := logrustkafka.NewKafkaHook(kafkaCfg, &logruskafka.Config{
Topic: "logs",
Async: true, // 异步模式
RequiredAcks: kafka.RequireAllInSync, // 等待所有副本都成功收到数据才返回
ProducerIface: &kafka.Producer{}, // 可以注入自定义的 Producer 实例
SuccessQueueSize: 10000, // 成功发送的日志队列长度
ErrorQueueSize: 10000, // 发送失败的日志队列长度
ErrorHandler: func(err error, m *kafka.Message) {
logrus.Errorf("Failed to send message: %s", err)
},
})
if err != nil {
logrus.Panicf("Failed to create kafka hook: %s", err)
}
// 将 hook 添加到 logrus
logrus.AddHook(hook)
// 使用 logrus 记录日志
logrus.WithFields(logrus.Fields{
"animal": "walrus",
}).Info("A walrus appears")
// 确保所有日志都已经发送
hook.Flush()
}
这段代码首先配置了 Kafka 连接参数,然后创建了一个 logrus-kafka-hook
,并将其添加到 logrus。之后,使用 logrus 记录一条日志,并调用 Flush
确保所有日志都已经发送。这个例子展示了如何将 logrus 与 Kafka 集成,并实现日志的异步发送。
评论已关闭