Spring Boot整合EMQX(MQTT协议)主要涉及到以下几个步骤:
- 引入Spring Boot的MQTT依赖。
- 配置MQTT连接参数。
- 创建MQTT消息监听器。
- 发送和接收MQTT消息。
以下是一个简单的例子:
- 添加依赖到你的pom.xml:
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.1</version>
</dependency>
- 在application.properties中配置MQTT连接参数:
spring.mqtt.username=admin
spring.mqtt.password=public
spring.mqtt.url=tcp://localhost:1883
spring.mqtt.client.id=clientId
spring.mqtt.default.topic=testTopic
- 创建MQTT配置类:
@Configuration
public class MqttConfig {
 
    @Value("${spring.mqtt.username}")
    private String username;
 
    @Value("${spring.mqtt.password}")
    private String password;
 
    @Value("${spring.mqtt.url}")
    private String url;
 
    @Value("${spring.mqtt.client.id}")
    private String clientId;
 
    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;
 
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{url});
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
 
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
 
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(), defaultTopic);
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
}
- 创建监听器来接收消息:
@Component
public class MqttReceiver {
 
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;
 
    @JmsListener(destination = "testTopic")
    public void receiveMessage(String payload) {
        System.out.println("Received Message: " + payload);
        // 可以进行进一步处理,并通过SimpMessagingTemplate转发消息等。
    }
}
- 发送消息的方法:
@Autowired
private MqttPahoClientFactory mqttClientFactory;
 
@Autowired
private MessageChannel mqttOutboundChannel;
 
public void sendMessage(S