要在Spring Boot项目中集成Apache Pulsar,你需要做以下几步:
- 添加Pulsar客户端依赖到你的
pom.xml
文件中。 - 配置Pulsar客户端。
- 创建Pulsar生产者和消费者。
以下是一个简单的例子:
Step 1: 添加Pulsar客户端依赖
<!-- Pulsar Client -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.9.1</version>
</dependency>
Step 2: 配置Pulsar客户端
在application.properties
或application.yml
中添加Pulsar的配置:
# Pulsar configuration
pulsar.service-url=pulsar://localhost:6650
pulsar.listener-name=listener_name
pulsar.operation-timeout-ms=1000
Step 3: 创建Pulsar生产者和消费者
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class PulsarConfig {
@Value("${pulsar.service-url}")
private String serviceUrl;
@Bean
public PulsarClient pulsarClient() throws PulsarClientException {
return PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
}
@Bean
public Producer<String> producer() throws PulsarClientException {
return pulsarClient().newProducer(JSONSchema.ofString())
.topic("my-topic")
.create();
}
@Bean
public Consumer<String> consumer() throws PulsarClientException {
return pulsarClient().newConsumer(JSONSchema.ofString())
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
}
}
在这个配置类中,我们定义了pulsarClient
,producer
和consumer
的Bean,这样Spring Boot就会在应用启动时创建并管理它们。
使用生产者和消费者
@RestController
public class PulsarController {
@Autowired
private Producer<String> producer;
@Autowired
private Consumer<String> consumer;
@GetMapping("/send")
public void sendMessage() throws PulsarClientException {
producer.send("Hello, Pulsar!");
}
@PostConstruct
public void receiveMessages() {
consumer.receiveAsync().thenAccept(message -> {
System.out.println("Received message: " + message.getValue());
// Acknowledge the message
consumer.acknowledge(message);
});
}
}