【实战】Spring Cloud Stream 3.1+整合Kafka
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface KafkaStreams {
String INPUT = "input";
String OUTPUT = "output";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
在配置类中定义输入和输出通道的绑定:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.handler.annotation.SendTo;
@EnableBinding({KafkaStreams.class})
public class KafkaStreamProcessor {
@StreamListener(KafkaStreams.INPUT)
@SendTo(KafkaStreams.OUTPUT)
public String process(String input) {
// 对输入的消息进行处理,并返回处理结果
return "Processed: " + input;
}
}
在application.yml
或application.properties
中配置Kafka binder:
spring.cloud.stream.bindings.input.destination=my-input-topic
spring.cloud.stream.bindings.input.group=my-consumer-group
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.bindings.output.destination=my-output-topic
spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.kafka.binder.zk-nodes=localhost:2181
这个简单的例子展示了如何使用Spring Cloud Stream和Kafka进行消息的接收和发送。在这个例子中,我们定义了一个接口KafkaStreams
来声明输入和输出通道,然后在配置类KafkaStreamProcessor
中使用@EnableBinding
注解绑定这些通道,并使用@StreamListener
注解来监听输入通道上的消息,使用@SendTo
注解来将处理后的消息发送到输出通道。最后,在配置文件中指定了Kafka的broker地址和Zookeeper节点信息。
评论已关闭