Springboot對(duì)接mqtt的項(xiàng)目實(shí)踐
在Spring Boot中對(duì)接MQTT協(xié)議,可以使用Eclipse Paho客戶端和Spring Integration MQTT模塊。以下是詳細(xì)實(shí)現(xiàn)步驟:
1. 添加依賴
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Spring Integration MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!-- Eclipse Paho MQTT Client -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
2. 配置MQTT連接參數(shù)
# application.yml mqtt: broker-url: tcp://localhost:1883 username: admin password: password client-id: spring-boot-client default-topic: test/topic timeout: 30 keepalive: 60 completion-timeout: 30000
3. MQTT配置類
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
public class MqttConfig {
@Autowired
private MqttProperties mqttProperties;
// MQTT連接配置
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{mqttProperties.getBrokerUrl()});
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepalive());
options.setAutomaticReconnect(true);
options.setCleanSession(true);
return options;
}
// MQTT客戶端工廠
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(mqttConnectOptions());
return factory;
}
// 出站消息通道(用于發(fā)送消息)
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(mqttProperties.getClientId() + "-producer", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
return messageHandler;
}
// 出站通道
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
// 入站消息適配器(用于接收消息)
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId() + "-consumer",
mqttClientFactory(), mqttProperties.getDefaultTopic());
adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
// 入站通道
@Bean
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
// 入站消息處理器
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
String payload = (String) message.getPayload();
System.out.println("Received message from topic: " + topic + ", payload: " + payload);
// 處理接收到的消息
processMessage(topic, payload);
}
};
}
}
4. 配置屬性類
@ConfigurationProperties(prefix = "mqtt")
@Component
@Data
public class MqttProperties {
private String brokerUrl;
private String username;
private String password;
private String clientId;
private String defaultTopic;
private int timeout;
private int keepalive;
private int completionTimeout;
}
5. MQTT服務(wù)類
@Service
public class MqttService {
@Autowired
private MessageChannel mqttOutboundChannel;
// 發(fā)送消息到指定主題
public void sendMessage(String topic, String message) {
mqttOutboundChannel.send(MessageBuilder.withPayload(message)
.setHeader("mqtt_topic", topic)
.build());
}
// 發(fā)送消息到默認(rèn)主題
public void sendMessage(String message) {
mqttOutboundChannel.send(MessageBuilder.withPayload(message).build());
}
// 發(fā)送帶QoS的消息
public void sendMessage(String topic, String message, int qos) {
mqttOutboundChannel.send(MessageBuilder.withPayload(message)
.setHeader("mqtt_topic", topic)
.setHeader("mqtt_qos", qos)
.build());
}
}
6. 消息處理器
@Component
public class MqttMessageProcessor {
private static final Logger logger = LoggerFactory.getLogger(MqttMessageProcessor.class);
public void processMessage(String topic, String payload) {
logger.info("Processing MQTT message - Topic: {}, Payload: {}", topic, payload);
// 根據(jù)不同的主題進(jìn)行不同的處理
switch (topic) {
case "test/topic":
handleTestTopic(payload);
break;
case "sensor/data":
handleSensorData(payload);
break;
default:
handleDefaultMessage(topic, payload);
}
}
private void handleTestTopic(String payload) {
logger.info("處理測(cè)試主題消息: {}", payload);
// 具體的業(yè)務(wù)邏輯
}
private void handleSensorData(String payload) {
logger.info("處理傳感器數(shù)據(jù): {}", payload);
try {
// 解析JSON數(shù)據(jù)等操作
// ObjectMapper mapper = new ObjectMapper();
// SensorData data = mapper.readValue(payload, SensorData.class);
} catch (Exception e) {
logger.error("解析傳感器數(shù)據(jù)失敗", e);
}
}
private void handleDefaultMessage(String topic, String payload) {
logger.info("處理默認(rèn)消息 - Topic: {}, Payload: {}", topic, payload);
}
}
7. 控制器示例
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttService mqttService;
@PostMapping("/publish")
public ResponseEntity<String> publishMessage(@RequestParam String topic,
@RequestParam String message) {
try {
mqttService.sendMessage(topic, message);
return ResponseEntity.ok("Message published successfully");
} catch (Exception e) {
return ResponseEntity.status(500).body("Failed to publish message: " + e.getMessage());
}
}
@PostMapping("/publish/default")
public ResponseEntity<String> publishToDefaultTopic(@RequestParam String message) {
try {
mqttService.sendMessage(message);
return ResponseEntity.ok("Message published to default topic");
} catch (Exception e) {
return ResponseEntity.status(500).body("Failed to publish message: " + e.getMessage());
}
}
}
8. 主應(yīng)用類
@SpringBootApplication
@EnableConfigurationProperties
public class MqttApplication {
public static void main(String[] args) {
SpringApplication.run(MqttApplication.class, args);
}
}
9. 測(cè)試MQTT服務(wù)
可以使用MQTT.fx或其他MQTT客戶端工具進(jìn)行測(cè)試:
- 啟動(dòng)Spring Boot應(yīng)用
- 使用MQTT客戶端訂閱主題
test/topic - 調(diào)用API發(fā)送消息:
curl -X POST "http://localhost:8080/mqtt/publish?topic=test/topic&message=Hello MQTT"
主要特性
- 自動(dòng)重連: 配置了自動(dòng)重連機(jī)制
- QoS支持: 支持不同的服務(wù)質(zhì)量等級(jí)
- 多主題訂閱: 可以訂閱多個(gè)主題
- 異步處理: 消息發(fā)送支持異步模式
- 配置靈活: 通過(guò)配置文件管理連接參數(shù)
這樣你就實(shí)現(xiàn)了一個(gè)完整的Spring Boot MQTT集成方案,可以方便地進(jìn)行消息的發(fā)布和訂閱。
到此這篇關(guān)于Springboot對(duì)接mqtt的項(xiàng)目實(shí)踐的文章就介紹到這了,更多相關(guān)Springboot對(duì)接mqtt內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot集成mqtt的實(shí)踐開(kāi)發(fā)
- springboot整合mqtt的詳細(xì)圖文教程
- SpringBoot實(shí)現(xiàn)MQTT消息發(fā)送和接收方式
- springboot集成mqtt超級(jí)詳細(xì)步驟
- SpringBoot集成MQTT實(shí)現(xiàn)交互服務(wù)通信
- springboot使用EMQX(MQTT協(xié)議)的實(shí)現(xiàn)
- SpringBoot集成mqtt的多模塊項(xiàng)目配置詳解
- SpringBoot2.0集成MQTT消息推送功能實(shí)現(xiàn)
- SpringBoot項(xiàng)目接入MQTT的詳細(xì)指南
相關(guān)文章
如何利用Java爬蟲(chóng)獲取蘇寧易購(gòu)商品詳情
蘇寧易購(gòu)作為中國(guó)領(lǐng)先的電商平臺(tái)之一,提供了豐富的商品信息,本文將介紹如何使用Java語(yǔ)言開(kāi)發(fā)爬蟲(chóng),獲取蘇寧易購(gòu)商品的詳細(xì)信息,感興趣的朋友一起看看吧2024-12-12
Java開(kāi)發(fā)中請(qǐng)求頭的概念與寫法代碼示例
本文介紹了Java開(kāi)發(fā)中請(qǐng)求頭(RequestHeaders)的用途、組成和常見(jiàn)字段,并提供了使用HttpURLConnection、HttpClient(Java11及以上)和ApacheHttpClient發(fā)送帶有請(qǐng)求頭的HTTP請(qǐng)求的代碼示例,感興趣的朋友跟隨小編一起看看吧2025-12-12
深入講解java線程與synchronized關(guān)鍵字
Java 中多線程的同步依靠的是對(duì)象鎖機(jī)制,synchronized關(guān)鍵字就是利用了封裝對(duì)象鎖來(lái)實(shí)現(xiàn)對(duì)共享資源的互斥訪問(wèn)。下面這篇文章主要介紹了java線程與synchronized關(guān)鍵字的相關(guān)資料,需要的朋友可以參考下。2017-03-03
JavaWeb入門:ServletContext詳解和應(yīng)用
這篇文章主要介紹了Java ServletContext對(duì)象用法解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2021-07-07
mybatis中BigDecimal中的0存為null的坑及解決
在使用MyBatis進(jìn)行數(shù)據(jù)庫(kù)操作時(shí),若Java中屬性類型為BigDecimal且值為0,插入數(shù)據(jù)庫(kù)時(shí)可能會(huì)變?yōu)閚ull,而不是0,這個(gè)問(wèn)題可能是由于MyBatis在處理BigDecimal類型時(shí)的弱類型判斷導(dǎo)致的,當(dāng)BigDecimal變量與空字符串進(jìn)行比較時(shí),MyBatis可能將其視為null2024-10-10
Java網(wǎng)絡(luò)編程之簡(jiǎn)單的服務(wù)端客戶端應(yīng)用實(shí)例
這篇文章主要介紹了Java網(wǎng)絡(luò)編程之簡(jiǎn)單的服務(wù)端客戶端應(yīng)用,以實(shí)例形式較為詳細(xì)的分析了java網(wǎng)絡(luò)編程的原理與服務(wù)器端客戶端的實(shí)現(xiàn)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-04-04

