SpringBoot项目整合Kafka+es+logstash+kibana日志收集
整合Spring Boot项目与Kafka、Elasticsearch (ES)、Logstash和Kibana (ELK) 用于日志收集,你需要按以下步骤操作:
配置Kafka:
在
application.properties
或application.yml
中配置Kafka生产者和消费者的基本属性。配置Elasticsearch:
在
application.properties
或application.yml
中配置Elasticsearch的基本属性。配置Logstash:
编辑Logstash配置文件,以从Kafka主题读取消息,并将其转发到Elasticsearch。
配置Kibana:
在Kibana配置中指定Elasticsearch作为数据源,以便可视化和搜索日志。
在Spring Boot中集成:
- 使用
spring-kafka
依赖与Kafka集成。 - 使用
spring-data-elasticsearch
依赖与Elasticsearch集成。 - 在Spring Boot应用程序中,创建一个Aspect或Interceptor,用于捕获应用程序日志,并将其发送到Kafka。
- 使用
以下是一个简化的示例:
1. 添加依赖 (pom.xml
):
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Elasticsearch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
2. Kafka配置 (application.yml
):
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
producer:
value-serializer: org.apache.kafka.common.serialization.StringSerializer
3. Elasticsearch配置 (application.yml
):
spring:
data:
elasticsearch:
cluster-name: my-cluster
cluster-nodes: localhost:9300
4. Logstash配置 (logstash.conf
):
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["my-topic"]
}
}
filter {
# 根据需要添加过滤器配置
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "my-logs-%{+YYYY.MM.dd}"
}
}
5. 日志集成:
@Aspect
@Component
public class LoggingAspect {
private final KafkaTemplate<String, String> kafkaTemplate;
public LoggingAspect(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Around("execution(* com.yourpackage..*.*(..))")
public Object logAround(ProceedingJoinPoint joinPoint) throws Throwable {
// 记录日志信息
String logMessage = ...;
// 发送到Kafka
kafkaTemplate.send
评论已关闭