MQTT记录(概述,docker部署,基于spring-integration-mqtt实现消息订阅与发布,客户端工具测试)
以下是一个使用Spring Integration MQTT实现消息发布和订阅的简单示例。
首先,添加Spring Integration MQTT的依赖:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.1</version>
</dependency>
接下来,配置Spring Integration MQTT消息通道:
@Configuration
@IntegrationComponentScan
public class MqttConfiguration {
@Value("${mqtt.broker.url}")
private String brokerUrl;
@Value("${mqtt.client.id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.default.topic}")
private String defaultTopic;
@Bean
public MqttPahoClientFactory mqttClient() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{brokerUrl});
options.setUserName(username);
options.setPassword(password.toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel mqttOutputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClient(), defaultTopic);
adapter.setCompletionTimeout(5000);
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutputChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClient());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
}
在上述配置中,我们定义了MQTT客户端工厂、输入和输出消息通道,以及用于订阅默认主题的MqttPahoMessageDrivenChannelAdapter
和用于发布消息的MqttPahoMessageHandler
。
最后,你可以使用以下方式发送和接收消息:
@Component
public class MqttService {
@Autowired
private MessageChannel mqttOutputChannel;
public void sendMessage(String payload) {
Message<String> message = MessageBuilder.withPayload(payload).build();
评论已关闭