先看我们最后实现的一个效果 1。手机端向主题topic111发送消息,并接收。(手机测试工具名称:MQTT调试器) 2。控制台打印 MQTT基本简介 MQTT是用于物联网(IoT)的OASIS标准消息传递协议。它被设计为一种极其轻量级的发布订阅消息传输,非常适合连接具有小代码足迹和最小网络带宽的远程设备。MQTT协议简介 MQTT是客户端服务器发布订阅消息传输协议。它重量轻、开放、简单,并且易于实施。这些特性使其非常适合在许多情况下使用,包括受限制的环境,例如机器对机器(M2M)和物联网(IoT)环境中的通信,其中需要小代码足迹和或网络带宽非常宝贵。 该协议通过TCPIP或其他提供有序、无损、双向连接的网络协议运行。其特点包括: 使用发布订阅消息模式,提供一对多的消息分发和应用程序的解耦。 与有效负载内容无关的消息传输。 消息传递的三种服务质量: o最多一次,根据操作环境的最大努力传递消息。可能会发生消息丢失。例如,此级别可用于环境传感器数据,其中单个读数是否丢失并不重要,因为下一个读数将很快发布。 o至少一次,保证消息到达但可能出现重复。 oExactlyonce,保证消息只到达一次。例如,此级别可用于重复或丢失消息可能导致应用不正确费用的计费系统。 最小化传输开销和协议交换以减少网络流量。 发生异常断开时通知相关方的机制。EMQX简介 通过开放标准物联网协议MQTT、CoAP和LwM2M连接任何设备。使用EMQXEnterprise集群轻松扩展到数千万并发MQTT连接。 并且EMQX还是开源的,又支持集群,所以还是一个比较不错的选择EMQX集群搭建 前期准备: 1。两台服务器:我的两个服务器一台是腾讯云、一台是阿里云的(不要问为什么,薅羊毛得来的)咱们暂且叫他们mqttservicealiyun和 mqttservicetxyun吧。 2。一个域名:mqtt。zhouhong。icu安装开始1。分别在两台服务器上执行以下操作进行安装(如果是单机:只需要进行下面1、2操作就安装完成了)1。下载wgethttps:www。emqx。comzhdownloadsbroker4。4。4emqx4。4。4otp24。1。53el8amd64。rpm2。安装sudoyuminstallemqx4。4。4otp24。1。53el8amd64。rpm3。修改配置文件vimetcemqxemqx。conf4。修改以下内容注意node。name是当前这台服务器名称node。namemqttservicetxyunxxx。xx。xxx。xxcluster。static。seedsmqttservicetxyunxxx。xx。xxx。xx,mqttservicealiyunxxx。xx。xxx。xxcluster。discoverystaticcluster。namemymqttcluster2。分别启动两台服务器的EMQXsudoemqxstart 3。到浏览器输入http:xxx。xx。xxx。xxx:18083查看(随便一台都可以,默认账号admin密码public),注意打开18083,1883安全组 4。nginx负载均衡 nginx搭建很简单略过,大家只需要修改以下nginx。conf里面的内容即可stream{upstreammqtt。zhouhong。icu{zonetcpservers64k;serverxxx。xx。xxx。xx:1883weight1maxfails3failtimeout30s;serverxxx。xx。xxx。xx:1883weight1maxfails3failtimeout30s;}server{listen8883proxypassmqtt。zhouhong。proxybuffersize4k;sslhandshaketimeout15s;sslcertificateetcnginx7967358www。mqtt。zhouhong。icu。sslcertificatekeyetcnginx7967358www。mqtt。zhouhong。icu。}}与SpringBoot集成并实现服务器端监控对应topic下的消息 1。项目搭建引入MQTT相关jar包dependencygroupIdorg。springframework。integrationgroupIdspringintegrationstreamartifactIddependencydependencygroupIdorg。springframework。integrationgroupIdspringintegrationmqttartifactIddependencyyml配置文件(如果大家没搭建好的话,可以直接使用我搭建的这个)server:port:8080mqtt:单机版只需要把域名改为ip既可hostUrl:tcp:mqtt。zhouhong。icu:1883username:adminpassword:public服务端clientId(发送端自己定义)clientId:serviceclientidcleanSession:truereconnect:truetimeout:100keepAlive:100defaultTopic:topic111qos:0属性配置description:date:202261615:51author:zhouhongComponentConfigurationProperties(mqtt)DatapublicclassMqttProperties{用户名privateS密码privateS连接地址privateStringhostU客户端Id,同一台服务器下,不允许出现重复的客户端idprivateStringclientId;默认连接主题privateS超时时间设置会话心跳时间单位为秒服务器会每隔1。520秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制privateintkeepA设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接privateBooleancleanS是否断线重连privateB连接方式privateI}发送消息回调description:发生消息成功后的回调date:202261615:55author:zhouhongComponentLog4j2publicclassMqttSendCallBackimplementsMqttCallbackExtended{客户端断开后触发paramthrowableOverridepublicvoidconnectionLost(Throwablethrowable){log。info(发送消息回调:连接断开,可以做重连);}客户端收到消息触发paramtopic主题parammqttMessage消息OverridepublicvoidmessageArrived(Stringtopic,MqttMessagemqttMessage)throwsException{log。info(发送消息回调:接收消息主题:topic);log。info(发送消息回调:接收消息内容:newString(mqttMessage。getPayload()));}发布消息成功paramtokentokenOverridepublicvoiddeliveryComplete(IMqttDeliveryTokentoken){String〔〕topicstoken。getTopics();for(Stringtopic:topics){log。info(发送消息回调:向主题:topic发送消息成功!);}try{MqttMessagemessagetoken。getMessage();byte〔〕payloadmessage。getPayload();StringsnewString(payload,UTF8);log。info(发送消息回调:消息的内容是:s);}catch(MqttExceptione){e。printStackTrace();}catch(UnsupportedEncodingExceptione){e。printStackTrace();}}连接emq服务器后触发parambparamsOverridepublicvoidconnectComplete(booleanb,Strings){log。info(ClientId:MqttAcceptClient。client。getClientId()客户端连接成功!);}}接收消息回调description:接收消息后的回调date:202261615:52author:zhouhongComponentLog4j2publicclassMqttAcceptCallbackimplementsMqttCallbackExtended{ResourceprivateMqttAcceptClientmqttAcceptC客户端断开后触发paramthrowableOverridepublicvoidconnectionLost(Throwablethrowable){log。info(接收消息回调:连接断开,可以做重连);if(MqttAcceptClient。clientnull!MqttAcceptClient。client。isConnected()){log。info(接收消息回调:emqx重新连接。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。);mqttAcceptClient。reconnection();}}客户端收到消息触发paramtopic主题parammqttMessage消息OverridepublicvoidmessageArrived(Stringtopic,MqttMessagemqttMessage)throwsException{log。info(接收消息回调:接收消息主题:topic);log。info(接收消息回调:接收消息内容:newString(mqttMessage。getPayload()));}发布消息成功paramtokentokenOverridepublicvoiddeliveryComplete(IMqttDeliveryTokentoken){String〔〕topicstoken。getTopics();for(Stringtopic:topics){log。info(接收消息回调:向主题:topic发送消息成功!);}try{MqttMessagemessagetoken。getMessage();byte〔〕payloadmessage。getPayload();StringsnewString(payload,UTF8);log。info(接收消息回调:消息的内容是:s);}catch(MqttExceptione){e。printStackTrace();}catch(UnsupportedEncodingExceptione){e。printStackTrace();}}连接emq服务器后触发parambparamsOverridepublicvoidconnectComplete(booleanb,Strings){log。info(ClientId:MqttAcceptClient。client。getClientId()客户端连接成功!);以结尾表示订阅所有以test开头的主题订阅所有机构主题mqttAcceptClient。subscribe(topic111,0);}}发消息description:发送消息date:202261616:01author:zhouhongComponentpublicclassMqttSendClient{AutowiredprivateMqttSendCallBackmqttSendCallBAutowiredprivateMqttPropertiesmqttPpublicMqttClientconnect(){MqttCtry{StringuuidUUID。randomUUID()。toString()。replaceAll(,);clientnewMqttClient(mqttProperties。getHostUrl(),uuid,newMemoryPersistence());MqttConnectOptionsoptionsnewMqttConnectOptions();options。setUserName(mqttProperties。getUsername());options。setPassword(mqttProperties。getPassword()。toCharArray());options。setConnectionTimeout(mqttProperties。getTimeout());options。setKeepAliveInterval(mqttProperties。getKeepAlive());options。setCleanSession(true);options。setAutomaticReconnect(false);try{设置回调client。setCallback(mqttSendCallBack);client。connect(options);}catch(Exceptione){e。printStackTrace();}}catch(Exceptione){e。printStackTrace();}}发布消息主题格式:server:report:orgCode(参数实际使用机构代码)paramretained是否保留parampushMessage消息体publicvoidpublish(booleanretained,Stringtopic,StringpushMessage){MqttMessagemessagenewMqttMessage();message。setQos(mqttProperties。getQos());message。setRetained(retained);message。setPayload(pushMessage。getBytes());MqttClientmqttClientconnect();try{mqttClient。publish(topic,message);}catch(MqttExceptione){e。printStackTrace();}finally{disconnect(mqttClient);close(mqttClient);}}关闭连接parammqttClientpublicstaticvoiddisconnect(MqttClientmqttClient){try{if(mqttClient!null){mqttClient。disconnect();}}catch(MqttExceptione){e。printStackTrace();}}释放资源parammqttClientpublicstaticvoidclose(MqttClientmqttClient){try{if(mqttClient!null){mqttClient。close();}}catch(MqttExceptione){e。printStackTrace();}}}接收消息description:服务器段端连接订阅消息、监控topicdate:202261615:52author:zhouhongComponentLog4j2publicclassMqttAcceptClient{AutowiredLazyprivateMqttAcceptCallbackmqttAcceptCAutowiredprivateMqttPropertiesmqttPpublicstaticMqttCprivatestaticMqttClientgetClient(){}privatestaticvoidsetClient(MqttClientclient){MqttAcceptClient。}客户端连接publicvoidconnect(){MqttCtry{clientId使用服务器yml里面配置的clientIdclientnewMqttClient(mqttProperties。getHostUrl(),mqttProperties。getClientId(),newMemoryPersistence());MqttConnectOptionsoptionsnewMqttConnectOptions();options。setUserName(mqttProperties。getUsername());options。setPassword(mqttProperties。getPassword()。toCharArray());options。setConnectionTimeout(mqttProperties。getTimeout());options。setKeepAliveInterval(mqttProperties。getKeepAlive());options。setAutomaticReconnect(mqttProperties。getReconnect());options。setCleanSession(mqttProperties。getCleanSession());MqttAcceptClient。setClient(client);try{设置回调client。setCallback(mqttAcceptCallback);client。connect(options);}catch(Exceptione){e。printStackTrace();}}catch(Exceptione){e。printStackTrace();}}重新连接publicvoidreconnection(){try{client。connect();}catch(MqttExceptione){e。printStackTrace();}}订阅某个主题paramtopic主题paramqos连接方式publicvoidsubscribe(Stringtopic,intqos){log。info(开始订阅主题topic);try{client。subscribe(topic,qos);}catch(MqttExceptione){e。printStackTrace();}}取消订阅某个主题paramtopicpublicvoidunsubscribe(Stringtopic){log。info(开始取消订阅主题topic);try{client。unsubscribe(topic);}catch(MqttExceptione){e。printStackTrace();}}}服务端启动时连接订阅主题并监控description:启动后连接MQTT服务器,监听mqttmytopic这个topic发送的消息date:202261615:57author:zhouhongConfigurationpublicclassMqttConfig{ResourceprivateMqttAcceptClientmqttAcceptCBeanpublicMqttAcceptClientgetMqttPushClient(){mqttAcceptClient。connect();returnmqttAcceptC}}发消息控制类description:发消息控制类date:202261615:58author:zhouhongRestControllerpublicclassSendController{ResourceprivateMqttSendClientmqttSendCPostMapping(mqttsendmessage)publicvoidsendMessage(RequestBodySendParamsendParam){mqttSendClient。publish(false,sendParam。getTopic(),sendParam。getMessageContent());}}2。测试postman调用发消息接口 控制台日志 使用另外一个移动端MQTT调试工具测试手机端向主题topic111发送消息,并接收。 2。控制台打印