MQTT记录(概述,docker部署,基于spring-integration-mqtt实现消息订阅与发布,客户端工具测试)
    		       		warning:
    		            这篇文章距离上次修改已过433天,其中的内容可能已经有所变动。
    		        
        		                
                以下是一个使用Spring Integration MQTT实现消息发布和订阅的简单示例。
首先,添加Spring Integration MQTT的依赖:
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.1</version>
</dependency>接下来,配置Spring Integration MQTT消息通道:
@Configuration
@IntegrationComponentScan
public class MqttConfiguration {
 
    @Value("${mqtt.broker.url}")
    private String brokerUrl;
 
    @Value("${mqtt.client.id}")
    private String clientId;
 
    @Value("${mqtt.username}")
    private String username;
 
    @Value("${mqtt.password}")
    private String password;
 
    @Value("${mqtt.default.topic}")
    private String defaultTopic;
 
    @Bean
    public MqttPahoClientFactory mqttClient() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{brokerUrl});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
 
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
 
    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }
 
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClient(), defaultTopic);
        adapter.setCompletionTimeout(5000);
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
 
    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClient());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        return messageHandler;
    }
}在上述配置中,我们定义了MQTT客户端工厂、输入和输出消息通道,以及用于订阅默认主题的MqttPahoMessageDrivenChannelAdapter和用于发布消息的MqttPahoMessageHandler。
最后,你可以使用以下方式发送和接收消息:
@Component
public class MqttService {
 
    @Autowired
    private MessageChannel mqttOutputChannel;
 
    public void sendMessage(String payload) {
        Message<String> message = MessageBuilder.withPayload(payload).build();
 
评论已关闭