我也沒想到 springboot + rabbitmq 做智能家居,會(huì)這么簡單
前一段有幸參與到一個(gè)智能家居項(xiàng)目的開發(fā),由于之前都沒有過這方面的開發(fā)經(jīng)驗(yàn),所以對智能硬件的開發(fā)模式和技術(shù)棧都頗為好奇。
產(chǎn)品是一款可燃?xì)怏w報(bào)警器,如果家中燃?xì)庑孤稘舛鹊竭_(dá)一定閾值,報(bào)警器檢測到并上傳氣體濃度值給后臺(tái),后臺(tái)以電話、短信、微信等方式,提醒用戶家中可能有氣體泄漏。
用戶還可能向報(bào)警器發(fā)一些關(guān)閉報(bào)警、調(diào)整音量的指令等。整體功能還是比較簡單的,大致的邏輯如下圖所示:
但當(dāng)我真正的參與其中開發(fā)時(shí),其實(shí)有一點(diǎn)小小的失望,因?yàn)樵谡麄€(gè)研發(fā)過程中,并沒用到什么新的技術(shù),還是常規(guī)的幾種中間件,只不過換個(gè)用法而已。
技術(shù)選型用rabbitmq
來做核心的組件,主要考慮到運(yùn)維成本低,組內(nèi)成員使用的熟練度比較高。
下面和小伙伴分享一下如何用 springboot
+ rabbitmq
搭建物聯(lián)網(wǎng)(IOT
)平臺(tái),其實(shí)智能硬件也沒想象的那么高不可攀!
很多小伙伴可能有點(diǎn)懵?rabbitmq
不是消息隊(duì)列嗎?怎么又能做智能硬件了?
其實(shí)rabbitmq
有兩種協(xié)議,我們平時(shí)接觸的消息隊(duì)列是用的AMQP
協(xié)議,而用在智能硬件中的是MQTT
協(xié)議。
一、什么是 MQTT協(xié)議?
MQTT
全稱(Message Queue Telemetry Transport):一種基于發(fā)布/訂閱(publish
/subscribe
)模式的輕量級
通訊協(xié)議,通過訂閱相應(yīng)的主題來獲取消息,是物聯(lián)網(wǎng)(Internet of Thing
)中的一個(gè)標(biāo)準(zhǔn)傳輸協(xié)議。
該協(xié)議將消息的發(fā)布者(publisher
)與訂閱者(subscriber
)進(jìn)行分離,因此可以在不可靠的網(wǎng)絡(luò)環(huán)境中,為遠(yuǎn)程連接的設(shè)備提供可靠的消息服務(wù),使用方式與傳統(tǒng)的MQ有點(diǎn)類似。
TCP
協(xié)議位于傳輸層,MQTT
協(xié)議位于應(yīng)用層,MQTT
協(xié)議構(gòu)建于TCP/IP
協(xié)議上,也就是說只要支持TCP/IP
協(xié)議棧的地方,都可以使用MQTT
協(xié)議。
二、為什么要用 MQTT協(xié)議?
MQTT
協(xié)議為什么在物聯(lián)網(wǎng)(IOT)中如此受偏愛?而不是其它協(xié)議,比如我們更為熟悉的 HTTP
協(xié)議呢?
-
首先
HTTP
協(xié)議它是一種同步協(xié)議,客戶端請求后需要等待服務(wù)器的響應(yīng)。而在物聯(lián)網(wǎng)(IOT)環(huán)境中,設(shè)備會(huì)很受制于環(huán)境的影響,比如帶寬低、網(wǎng)絡(luò)延遲高、網(wǎng)絡(luò)通信不穩(wěn)定等,顯然異步消息協(xié)議更為適合IOT
應(yīng)用程序。 -
HTTP
是單向的,如果要獲取消息客戶端必須發(fā)起連接,而在物聯(lián)網(wǎng)(IOT)應(yīng)用程序中,設(shè)備或傳感器往往都是客戶端,這意味著它們無法被動(dòng)地接收來自網(wǎng)絡(luò)的命令。 -
通常需要將一條命令或者消息,發(fā)送到網(wǎng)絡(luò)上的所有設(shè)備上。
HTTP
要實(shí)現(xiàn)這樣的功能不但很困難,而且成本極高。
三、MQTT協(xié)議介紹
前邊說過MQTT
是一種輕量級的協(xié)議,它只專注于發(fā)消息, 所以此協(xié)議的結(jié)構(gòu)也非常簡單。
MQTT數(shù)據(jù)包
在MQTT
協(xié)議中,一個(gè)MQTT
數(shù)據(jù)包由:固定頭
(Fixed header)、 可變頭
(Variable header)、 消息體
(payload)三部分構(gòu)成。
-
固定頭(Fixed header),所有數(shù)據(jù)包中都有固定頭,包含數(shù)據(jù)包類型及數(shù)據(jù)包的分組標(biāo)識。 -
可變頭(Variable header),部分?jǐn)?shù)據(jù)包類型中有可變頭。 -
內(nèi)容消息體(Payload),存在于部分?jǐn)?shù)據(jù)包類,是客戶端收到的具體消息內(nèi)容。
1、固定頭
固定頭部,使用兩個(gè)字節(jié),共16位:(4-7)位表示消息類型,使用4位二進(jìn)制表示,可代表如下的16種消息類型,不過 0 和 15位置屬于保留待用,所以共14種消息事件類型。
DUP Flag(重試標(biāo)識)
DUP Flag:保證消息可靠傳輸,消息是否已送達(dá)的標(biāo)識。默認(rèn)為0,只占用一個(gè)字節(jié),表示第一次發(fā)送,當(dāng)值為1時(shí),表示當(dāng)前消息先前已經(jīng)被傳送過。
QoS Level(消息質(zhì)量等級)
QoS Level:消息的質(zhì)量等級,后邊會(huì)詳細(xì)介紹
RETAIN(持久化)
-
值為
1
:表示發(fā)送的消息需要一直持久保存,而且不受服務(wù)器重啟影響,不但要發(fā)送給當(dāng)前的訂閱者,且以后新加入的客戶端訂閱了此Topic
,訂閱者也會(huì)馬上得到推送。注意:新加入的訂閱者,只會(huì)取出最新的一個(gè)RETAIN flag = 1
的消息推送。 -
值為
0
:僅為當(dāng)前訂閱者推送此消息。
Remaining Length(剩余長度)
在當(dāng)前消息中剩余的byte
(字節(jié))數(shù),包含可變頭部和消息體payload。
2、可變頭
固定頭部僅定義了消息類型和一些標(biāo)志位,一些消息的元數(shù)據(jù)需要放入可變頭部中。可變頭部內(nèi)容字節(jié)長度 + 消息體payload = 剩余長度。
可變頭部居于固定頭部和payload中間,包含了協(xié)議名稱,版本號,連接標(biāo)志,用戶授權(quán),心跳時(shí)間等內(nèi)容。
可變頭存在于這些類型的消息:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。
3、消息體payload
消息體payload只存在于CONNECT
、PUBLISH
、SUBSCRIBE
、SUBACK
、UNSUBSCRIBE
這幾種類型的消息:
-
CONNECT
:包含客戶端的ClientId
、訂閱的Topic
、Message
以及用戶名
和密碼
。 -
PUBLISH
:向?qū)?yīng)主題發(fā)送消息。 -
SUBSCRIBE
:要訂閱的主題以及QoS
。 -
SUBACK
:服務(wù)器對于SUBSCRIBE
所申請的主題及QoS
進(jìn)行確認(rèn)和回復(fù)。 -
UNSUBSCRIBE
:取消要訂閱的主題。
消息質(zhì)量(QoS )
消息質(zhì)量
(Quality of Service),即消息的發(fā)送質(zhì)量,發(fā)布者(publisher
)和訂閱者(subscriber
)都可以指定qos
等級,有QoS 0
、QoS 1
、QoS 2
三個(gè)等級。
下邊分別說明一下這三個(gè)等級的區(qū)別。
1、Qos 0
Qos 0:At most once
(至多一次)只發(fā)送一次消息,不保證消息是否成功送達(dá),沒有確認(rèn)機(jī)制,消息可能會(huì)丟失或重復(fù)。
2、Qos 1
Qos 1:At least once
(至少一次),相對于QoS 0
而言Qos 1
增加了ack
確認(rèn)機(jī)制,發(fā)送者(publisher
)推送消息到MQTT代理(broker
)時(shí),兩者自身都會(huì)先持久化消息,只有當(dāng)publisher
或者 Broker
分別收到 PUBACK
確認(rèn)時(shí),才會(huì)刪除自身持久化的消息,否則就會(huì)重發(fā)。
但有個(gè)問題,盡管我們可以通過確認(rèn)來保證一定收到客戶端 或 服務(wù)器的message
,可我們卻不能保證僅收到一次message
,也就是當(dāng)客戶端publisher
沒收到Broker
的puback
或者 Broker
沒有收到subscriber
的puback
,那么就會(huì)一直重發(fā)。
publisher -> broker 大致流程:
-
publisher store msg -> publish ->broker (傳遞message) -
broker -> puback -> publisher delete msg (確認(rèn)傳遞成功)
3、Qos 2
Qos 2:Exactly once
(只有一次),相對于QoS 1
,QoS 2
升級實(shí)現(xiàn)了僅接受一次message
,publisher
和 broker
同樣對消息進(jìn)行持久化,其中 publisher
緩存了message
和 對應(yīng)的msgID
,而 broker
緩存了 msgID
,可以保證消息不重復(fù),由于又增加了一個(gè)confirm
機(jī)制,整個(gè)流程變得復(fù)雜很多。
publisher -> broker 大致流程:
-
publisher store msg -> publish ->broker -> broker store -
msgID(傳遞message) broker -> puberc (確認(rèn)傳遞成功) -
publisher -> pubrel ->broker delete msgID (告訴broker刪除msgID) -
broker -> pubcomp -> publisher delete msg (告訴publisher刪除msg)
LWT(最后遺囑)
LWT
全稱為 Last Will and Testament
,其實(shí)遺囑是一個(gè)由客戶端預(yù)先定義好的主題和對應(yīng)消息,附加在CONNECT
的數(shù)據(jù)包中,包括遺愿主題
、遺愿 QoS
、遺愿消息
等。
當(dāng)MQTT代理 Broker
檢測到有客戶端client
非正常斷開連接時(shí),再由服務(wù)器主動(dòng)發(fā)布此消息,然后相關(guān)的訂閱者會(huì)收到消息。
舉個(gè)栗子:聊天室中所有人都訂閱一個(gè)叫talk
的主題 ,但小富由于網(wǎng)絡(luò)抖動(dòng)突然斷開了鏈接,這時(shí)聊天室中所有訂閱主題 talk
的客戶端都會(huì)收到一個(gè) “小富離開聊天室
” 的遺愿消息。
遺囑的相關(guān)參數(shù):
-
Will Flag
:是否使用 LWT,1 開啟 -
Will Topic
:遺愿主題名,不可使用通配符 -
Will Qos
:發(fā)布遺愿消息時(shí)使用的 QoS -
Will Retain
:遺愿消息的 Retain 標(biāo)識 -
Will Message
:遺愿消息內(nèi)容
那客戶端Client
有哪些場景是非正常斷開連接呢?
-
Broker
檢測到底層的 I/O 異常; -
客戶端 未能在心跳 Keep Alive
的間隔內(nèi)和Broker
進(jìn)行消息交互; -
客戶端 在關(guān)閉底層 TCP
連接前沒有發(fā)送DISCONNECT
數(shù)據(jù)包; -
客戶端 發(fā)送錯(cuò)誤格式的數(shù)據(jù)包到 Broker
,導(dǎo)致關(guān)閉和客戶端的連接等。
注意:當(dāng)客戶端通過發(fā)布 DISCONNECT
數(shù)據(jù)包斷開連接時(shí),屬于正常斷開連接,并不會(huì)觸發(fā) LWT
的機(jī)制,與此同時(shí)Broker
還會(huì)丟棄掉當(dāng)前客戶端在連接時(shí)指定的相關(guān) LWT
參數(shù)。
四、MQTT協(xié)議應(yīng)用場景
MQTT
協(xié)議廣泛應(yīng)用于物聯(lián)網(wǎng)、移動(dòng)互聯(lián)網(wǎng)、智能硬件、車聯(lián)網(wǎng)、電力能源等領(lǐng)域。使用的場景也是非常非常多,下邊列舉一些:
-
物聯(lián)網(wǎng)M2M通信,物聯(lián)網(wǎng)大數(shù)據(jù)采集 -
Android消息推送,WEB消息推送 -
移動(dòng)即時(shí)消息,例如Facebook Messenger -
智能硬件、智能家具、智能電器 -
車聯(lián)網(wǎng)通信,電動(dòng)車站樁采集 -
智慧城市、遠(yuǎn)程醫(yī)療、遠(yuǎn)程教育 -
電力、石油與能源等行業(yè)市場
五、代碼實(shí)現(xiàn)
具體 rabbitmq
的環(huán)境搭建就不贅述了,網(wǎng)上教程比較多,有條件的用服務(wù)器,沒條件的像我搞個(gè)Windows
版的也很快樂嘛。
1、啟用 rabbitmq的mqtt協(xié)議
我們先開啟 rabbitmq
的 mqtt
協(xié)議,因?yàn)槟J(rèn)安裝下是關(guān)閉的,命令如下:
rabbitmq-plugins enable rabbitmq_mqtt
2、mqtt 客戶端依賴包
上一步中安裝rabbitmq
環(huán)境并開啟 mqtt
協(xié)議后,實(shí)際上mqtt
消息代理服務(wù)就搭建好了,接下來要做的就是實(shí)現(xiàn)客戶端消息的推送和訂閱。
這里使用spring-integration-mqtt
、org.eclipse.paho.client.mqttv3
兩個(gè)工具包實(shí)現(xiàn)。
<!--mqtt依賴包-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
3、消息發(fā)送者
消息的發(fā)送比較簡單,主要是應(yīng)用到@ServiceActivator
注解,需要注意messageHandler.setAsync
屬性,如果設(shè)置成false
,關(guān)閉異步模式發(fā)送消息時(shí)可能會(huì)阻塞。
@Configuration
public class IotMqttProducerConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttConfig.getServers());
return factory;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "iotMqttInputChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getServerClientId(), mqttClientFactory());
messageHandler.setAsync(false);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
}
MQTT
對外提供發(fā)送消息的API
時(shí),需要使用@MessagingGateway
注解,去提供一個(gè)消息網(wǎng)關(guān)代理,參數(shù)defaultRequestChannel
指定發(fā)送消息綁定的channel
。
可以實(shí)現(xiàn)三種API
接口,payload
為發(fā)送的消息,topic
發(fā)送消息的主題,qos
消息質(zhì)量。
@MessagingGateway(defaultRequestChannel = "iotMqttInputChannel")
public interface IotMqttGateway {
// 向默認(rèn)的 topic 發(fā)送消息
void sendMessage2Mqtt(String payload);
// 向指定的 topic 發(fā)送消息
void sendMessage2Mqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
// 向指定的 topic 發(fā)送消息,并指定服務(wù)質(zhì)量參數(shù)
void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
4、消息訂閱
消息訂閱和我們平時(shí)用的MQ消息監(jiān)聽實(shí)現(xiàn)思路基本相似,@ServiceActivator
注解表明當(dāng)前方法用于處理MQTT
消息,inputChannel
參數(shù)指定了用于接收消息的channel
。
/**
* @Author: xiaofu
* @Description: 消息訂閱配置
* @date 2020/6/8 18:24
*/
@Configuration
public class IotMqttSubscriberConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttConfig.getServers());
return factory;
}
@Bean
public MessageChannel iotMqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(iotMqttInputChannel());
return adapter;
}
/**
* @author xiaofu
* @description 消息訂閱
* @date 2020/6/8 18:20
*/
@Bean
@ServiceActivator(inputChannel = "iotMqttInputChannel")
public MessageHandler handlerTest() {
return message -> {
try {
String string = message.getPayload().toString();
System.out.println("接收到消息:" + string);
} catch (MessagingException ex) {
//logger.info(ex.getMessage());
}
};
}
}
六、測試消息
額~ 由于本渣渣對硬件一竅不通,為了模擬硬件的發(fā)送消息,只能借助一下工具,其實(shí)硬件端實(shí)現(xiàn)MQTT
協(xié)議,跟我們前邊的基本沒什么區(qū)別,只不過換種語言嵌入到硬件中而已。
這里選的測試工具為mqttbox
,下載地址:http://workswithweb.com/mqttbox.html
1、測試消息發(fā)送
我們用先用mqttbox
模擬向主題mqtt_test_topic
發(fā)送消息,看后臺(tái)是否能成功接收到。
看到后臺(tái)成功拿到了向主題mqtt_test_topic
發(fā)送的消息。
2、測試消息訂閱
用mqttbox
模擬訂閱主題mqtt_test_topic
,在后臺(tái)向主題mqtt_test_topic
發(fā)送一條消息,這里我簡單的寫了個(gè)controller
調(diào)用API發(fā)送消息。
http://127.0.0.1:8080/fun/testMqtt?topic=mqtt_test_topic&message=我是后臺(tái)向主題 mqtt_test_topic 發(fā)送的消息我們看mqttbox
的訂閱消息,已經(jīng)成功的接收到了后臺(tái)的消息,到此我們的MQTT
通信環(huán)境就算搭建成功了。如果把mqttbox
工具換成具體硬件設(shè)備,整個(gè)流程就是我們常說的智能家居了,其實(shí)真的沒那么難。
七、應(yīng)用注意事項(xiàng)
在我們實(shí)際的生產(chǎn)環(huán)境中遇到過的問題,這里分享一下讓大家少踩坑。
clientId 要唯一
在客戶端connect
連接的時(shí),會(huì)有一個(gè)clientId
參數(shù),需要每個(gè)客戶端都保持唯一的。但我們在開發(fā)測試階段clientId
直接在代碼中寫死了,而且服務(wù)都是單實(shí)例部署,并沒有暴露出什么問題。
MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
然而在生產(chǎn)環(huán)境內(nèi)側(cè)的時(shí)候,由于服務(wù)是多實(shí)例集群部署,結(jié)果出現(xiàn)了下邊的奇怪問題。同一時(shí)間內(nèi)只能有一個(gè)客戶端能拿到消息,其他客戶端不但不能消費(fèi)消息,而且還在不斷的掉線重連:Lost connection: 已斷開連接; retrying...
。
這就是由于clientId
相同導(dǎo)致客戶端間相互競爭消費(fèi),最后將clientId
獲取方式換成從發(fā)號器中拿,問題就好了,所以這個(gè)地方是需要特別注意的。
平時(shí)程序在開發(fā)環(huán)境沒問題,可偏偏到了生產(chǎn)環(huán)境就一大堆問題,很多都是因?yàn)榉?wù)部署方式不同導(dǎo)致的。所以多學(xué)習(xí)分布式還是很有必要的。
八、其他中間件
MQTT
它只是一種協(xié)議,支持MQTT
協(xié)議的消息中間件產(chǎn)品非常多,下邊的也只是其中的一部分
-
Mosquitto -
Eclipse Paho -
RabbitMQ -
Apache ActiveMQ -
HiveMQ -
JoramMQ -
ThingMQ -
VerneMQ -
Apache Apollo -
emqttd Xively -
IBM Websphere .....
總結(jié)
我也是第一次做和硬件相關(guān)的項(xiàng)目,之前聽到智能家居都會(huì)覺得好高大上,但實(shí)際上手開發(fā)后發(fā)現(xiàn),技術(shù)嘛萬變不離其宗,也只是換種用法而已。
雙手奉上項(xiàng)目 demo 的github
地址 :https://github.com/chengxy-nds/springboot-rabbitmq-mqtt.git,感興趣的小伙伴可以下載跑一跑,實(shí)現(xiàn)起來非常的簡單。
特別推薦一個(gè)分享架構(gòu)+算法的優(yōu)質(zhì)內(nèi)容,還沒關(guān)注的小伙伴,可以長按關(guān)注一下:
長按訂閱更多精彩▼
如有收獲,點(diǎn)個(gè)在看,誠摯感謝
免責(zé)聲明:本文內(nèi)容由21ic獲得授權(quán)后發(fā)布,版權(quán)歸原作者所有,本平臺(tái)僅提供信息存儲(chǔ)服務(wù)。文章僅代表作者個(gè)人觀點(diǎn),不代表本平臺(tái)立場,如有問題,請聯(lián)系我們,謝謝!