生活工程体验信仰哲学精神
投稿投诉
精神世界
探索历史
哲学文学
艺术价值
信仰创造
境界审美
体验技术
技能工具
工程信息
医学生产
生活运用
操作能力

物联网微消息队列MQTT介绍EMQX集群搭建以及与Sprin

7月23日 发如雪投稿
  先看我们最后实现的一个效果
  1。手机端向主题topic111发送消息,并接收。(手机测试工具名称:MQTT调试器)
  2。控制台打印
  MQTT基本简介
  MQTT是用于物联网(IoT)的OASIS标准消息传递协议。它被设计为一种极其轻量级的发布订阅消息传输,非常适合连接具有小代码足迹和最小网络带宽的远程设备。MQTT协议简介
  MQTT是客户端服务器发布订阅消息传输协议。它重量轻、开放、简单,并且易于实施。这些特性使其非常适合在许多情况下使用,包括受限制的环境,例如机器对机器(M2M)和物联网(IoT)环境中的通信,其中需要小代码足迹和或网络带宽非常宝贵。
  该协议通过TCPIP或其他提供有序、无损、双向连接的网络协议运行。其特点包括:
  使用发布订阅消息模式,提供一对多的消息分发和应用程序的解耦。
  与有效负载内容无关的消息传输。
  消息传递的三种服务质量:
  o最多一次,根据操作环境的最大努力传递消息。可能会发生消息丢失。例如,此级别可用于环境传感器数据,其中单个读数是否丢失并不重要,因为下一个读数将很快发布。
  o至少一次,保证消息到达但可能出现重复。
  oExactlyonce,保证消息只到达一次。例如,此级别可用于重复或丢失消息可能导致应用不正确费用的计费系统。
  最小化传输开销和协议交换以减少网络流量。
  发生异常断开时通知相关方的机制。EMQX简介
  通过开放标准物联网协议MQTT、CoAP和LwM2M连接任何设备。使用EMQXEnterprise集群轻松扩展到数千万并发MQTT连接。
  并且EMQX还是开源的,又支持集群,所以还是一个比较不错的选择EMQX集群搭建
  前期准备:
  1。两台服务器:我的两个服务器一台是腾讯云、一台是阿里云的(不要问为什么,薅羊毛得来的)咱们暂且叫他们mqttservicealiyun和
  mqttservicetxyun吧。
  2。一个域名:mqtt。zhouhong。icu安装开始1。分别在两台服务器上执行以下操作进行安装(如果是单机:只需要进行下面1、2操作就安装完成了)1。下载wgethttps:www。emqx。comzhdownloadsbroker4。4。4emqx4。4。4otp24。1。53el8amd64。rpm2。安装sudoyuminstallemqx4。4。4otp24。1。53el8amd64。rpm3。修改配置文件vimetcemqxemqx。conf4。修改以下内容注意node。name是当前这台服务器名称node。namemqttservicetxyunxxx。xx。xxx。xxcluster。static。seedsmqttservicetxyunxxx。xx。xxx。xx,mqttservicealiyunxxx。xx。xxx。xxcluster。discoverystaticcluster。namemymqttcluster2。分别启动两台服务器的EMQXsudoemqxstart
  3。到浏览器输入http:xxx。xx。xxx。xxx:18083查看(随便一台都可以,默认账号admin密码public),注意打开18083,1883安全组
  4。nginx负载均衡
  nginx搭建很简单略过,大家只需要修改以下nginx。conf里面的内容即可stream{upstreammqtt。zhouhong。icu{zonetcpservers64k;serverxxx。xx。xxx。xx:1883weight1maxfails3failtimeout30s;serverxxx。xx。xxx。xx:1883weight1maxfails3failtimeout30s;}server{listen8883proxypassmqtt。zhouhong。proxybuffersize4k;sslhandshaketimeout15s;sslcertificateetcnginx7967358www。mqtt。zhouhong。icu。sslcertificatekeyetcnginx7967358www。mqtt。zhouhong。icu。}}与SpringBoot集成并实现服务器端监控对应topic下的消息
  1。项目搭建引入MQTT相关jar包dependencygroupIdorg。springframework。integrationgroupIdspringintegrationstreamartifactIddependencydependencygroupIdorg。springframework。integrationgroupIdspringintegrationmqttartifactIddependencyyml配置文件(如果大家没搭建好的话,可以直接使用我搭建的这个)server:port:8080mqtt:单机版只需要把域名改为ip既可hostUrl:tcp:mqtt。zhouhong。icu:1883username:adminpassword:public服务端clientId(发送端自己定义)clientId:serviceclientidcleanSession:truereconnect:truetimeout:100keepAlive:100defaultTopic:topic111qos:0属性配置description:date:202261615:51author:zhouhongComponentConfigurationProperties(mqtt)DatapublicclassMqttProperties{用户名privateS密码privateS连接地址privateStringhostU客户端Id,同一台服务器下,不允许出现重复的客户端idprivateStringclientId;默认连接主题privateS超时时间设置会话心跳时间单位为秒服务器会每隔1。520秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制privateintkeepA设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接privateBooleancleanS是否断线重连privateB连接方式privateI}发送消息回调description:发生消息成功后的回调date:202261615:55author:zhouhongComponentLog4j2publicclassMqttSendCallBackimplementsMqttCallbackExtended{客户端断开后触发paramthrowableOverridepublicvoidconnectionLost(Throwablethrowable){log。info(发送消息回调:连接断开,可以做重连);}客户端收到消息触发paramtopic主题parammqttMessage消息OverridepublicvoidmessageArrived(Stringtopic,MqttMessagemqttMessage)throwsException{log。info(发送消息回调:接收消息主题:topic);log。info(发送消息回调:接收消息内容:newString(mqttMessage。getPayload()));}发布消息成功paramtokentokenOverridepublicvoiddeliveryComplete(IMqttDeliveryTokentoken){String〔〕topicstoken。getTopics();for(Stringtopic:topics){log。info(发送消息回调:向主题:topic发送消息成功!);}try{MqttMessagemessagetoken。getMessage();byte〔〕payloadmessage。getPayload();StringsnewString(payload,UTF8);log。info(发送消息回调:消息的内容是:s);}catch(MqttExceptione){e。printStackTrace();}catch(UnsupportedEncodingExceptione){e。printStackTrace();}}连接emq服务器后触发parambparamsOverridepublicvoidconnectComplete(booleanb,Strings){log。info(ClientId:MqttAcceptClient。client。getClientId()客户端连接成功!);}}接收消息回调description:接收消息后的回调date:202261615:52author:zhouhongComponentLog4j2publicclassMqttAcceptCallbackimplementsMqttCallbackExtended{ResourceprivateMqttAcceptClientmqttAcceptC客户端断开后触发paramthrowableOverridepublicvoidconnectionLost(Throwablethrowable){log。info(接收消息回调:连接断开,可以做重连);if(MqttAcceptClient。clientnull!MqttAcceptClient。client。isConnected()){log。info(接收消息回调:emqx重新连接。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。);mqttAcceptClient。reconnection();}}客户端收到消息触发paramtopic主题parammqttMessage消息OverridepublicvoidmessageArrived(Stringtopic,MqttMessagemqttMessage)throwsException{log。info(接收消息回调:接收消息主题:topic);log。info(接收消息回调:接收消息内容:newString(mqttMessage。getPayload()));}发布消息成功paramtokentokenOverridepublicvoiddeliveryComplete(IMqttDeliveryTokentoken){String〔〕topicstoken。getTopics();for(Stringtopic:topics){log。info(接收消息回调:向主题:topic发送消息成功!);}try{MqttMessagemessagetoken。getMessage();byte〔〕payloadmessage。getPayload();StringsnewString(payload,UTF8);log。info(接收消息回调:消息的内容是:s);}catch(MqttExceptione){e。printStackTrace();}catch(UnsupportedEncodingExceptione){e。printStackTrace();}}连接emq服务器后触发parambparamsOverridepublicvoidconnectComplete(booleanb,Strings){log。info(ClientId:MqttAcceptClient。client。getClientId()客户端连接成功!);以结尾表示订阅所有以test开头的主题订阅所有机构主题mqttAcceptClient。subscribe(topic111,0);}}发消息description:发送消息date:202261616:01author:zhouhongComponentpublicclassMqttSendClient{AutowiredprivateMqttSendCallBackmqttSendCallBAutowiredprivateMqttPropertiesmqttPpublicMqttClientconnect(){MqttCtry{StringuuidUUID。randomUUID()。toString()。replaceAll(,);clientnewMqttClient(mqttProperties。getHostUrl(),uuid,newMemoryPersistence());MqttConnectOptionsoptionsnewMqttConnectOptions();options。setUserName(mqttProperties。getUsername());options。setPassword(mqttProperties。getPassword()。toCharArray());options。setConnectionTimeout(mqttProperties。getTimeout());options。setKeepAliveInterval(mqttProperties。getKeepAlive());options。setCleanSession(true);options。setAutomaticReconnect(false);try{设置回调client。setCallback(mqttSendCallBack);client。connect(options);}catch(Exceptione){e。printStackTrace();}}catch(Exceptione){e。printStackTrace();}}发布消息主题格式:server:report:orgCode(参数实际使用机构代码)paramretained是否保留parampushMessage消息体publicvoidpublish(booleanretained,Stringtopic,StringpushMessage){MqttMessagemessagenewMqttMessage();message。setQos(mqttProperties。getQos());message。setRetained(retained);message。setPayload(pushMessage。getBytes());MqttClientmqttClientconnect();try{mqttClient。publish(topic,message);}catch(MqttExceptione){e。printStackTrace();}finally{disconnect(mqttClient);close(mqttClient);}}关闭连接parammqttClientpublicstaticvoiddisconnect(MqttClientmqttClient){try{if(mqttClient!null){mqttClient。disconnect();}}catch(MqttExceptione){e。printStackTrace();}}释放资源parammqttClientpublicstaticvoidclose(MqttClientmqttClient){try{if(mqttClient!null){mqttClient。close();}}catch(MqttExceptione){e。printStackTrace();}}}接收消息description:服务器段端连接订阅消息、监控topicdate:202261615:52author:zhouhongComponentLog4j2publicclassMqttAcceptClient{AutowiredLazyprivateMqttAcceptCallbackmqttAcceptCAutowiredprivateMqttPropertiesmqttPpublicstaticMqttCprivatestaticMqttClientgetClient(){}privatestaticvoidsetClient(MqttClientclient){MqttAcceptClient。}客户端连接publicvoidconnect(){MqttCtry{clientId使用服务器yml里面配置的clientIdclientnewMqttClient(mqttProperties。getHostUrl(),mqttProperties。getClientId(),newMemoryPersistence());MqttConnectOptionsoptionsnewMqttConnectOptions();options。setUserName(mqttProperties。getUsername());options。setPassword(mqttProperties。getPassword()。toCharArray());options。setConnectionTimeout(mqttProperties。getTimeout());options。setKeepAliveInterval(mqttProperties。getKeepAlive());options。setAutomaticReconnect(mqttProperties。getReconnect());options。setCleanSession(mqttProperties。getCleanSession());MqttAcceptClient。setClient(client);try{设置回调client。setCallback(mqttAcceptCallback);client。connect(options);}catch(Exceptione){e。printStackTrace();}}catch(Exceptione){e。printStackTrace();}}重新连接publicvoidreconnection(){try{client。connect();}catch(MqttExceptione){e。printStackTrace();}}订阅某个主题paramtopic主题paramqos连接方式publicvoidsubscribe(Stringtopic,intqos){log。info(开始订阅主题topic);try{client。subscribe(topic,qos);}catch(MqttExceptione){e。printStackTrace();}}取消订阅某个主题paramtopicpublicvoidunsubscribe(Stringtopic){log。info(开始取消订阅主题topic);try{client。unsubscribe(topic);}catch(MqttExceptione){e。printStackTrace();}}}服务端启动时连接订阅主题并监控description:启动后连接MQTT服务器,监听mqttmytopic这个topic发送的消息date:202261615:57author:zhouhongConfigurationpublicclassMqttConfig{ResourceprivateMqttAcceptClientmqttAcceptCBeanpublicMqttAcceptClientgetMqttPushClient(){mqttAcceptClient。connect();returnmqttAcceptC}}发消息控制类description:发消息控制类date:202261615:58author:zhouhongRestControllerpublicclassSendController{ResourceprivateMqttSendClientmqttSendCPostMapping(mqttsendmessage)publicvoidsendMessage(RequestBodySendParamsendParam){mqttSendClient。publish(false,sendParam。getTopic(),sendParam。getMessageContent());}}2。测试postman调用发消息接口
  控制台日志
  使用另外一个移动端MQTT调试工具测试手机端向主题topic111发送消息,并接收。
  2。控制台打印
投诉 评论

水果打成果汁营养素会流失吗水果打成果汁营养素会流失吗?许多人以为喝果汁等于吃水果,习惯以果汁取代新鲜水果,不过,营养师表示,喝果汁不但无法完全摄取到天然水果的营养素与膳食纤维,若每天都以果汁取代水……女孩子短发的发型风格不一可盐可甜在很多人的印象中,女生就是长发飘飘的。随着流行趋势的发展,不少女生都剪了短发,而且有着长发女生所缺乏的俏皮可爱、清爽干练。适合女生的短发发型也不断增多,样式丰富。根据风格……物联网微消息队列MQTT介绍EMQX集群搭建以及与Sprin先看我们最后实现的一个效果1。手机端向主题topic111发送消息,并接收。(手机测试工具名称:MQTT调试器)2。控制台打印MQTT基本简介MQTT是……吸毒男星复出拿影帝,缉毒警察的命谁来还那些吸过毒的明星,后来都怎么样了?至少柯震东似乎并没有因吸毒断送前途,反而在台湾的娱乐圈混得风生水起7月9日,柯震东在台北电影节上拿下了影帝。吸过毒的劣迹艺人……中小学教师任现职以来个人工作总结任现职以来,拥护中国共产党的领导,模范遵守中小学教师职业道德,认真贯彻党和国家的教育方针,积极实施素质教育,热情投身到学校教育教学工作中去。现做如下总结:一、思想政治与职……英语爱情经典句子Theworstwaytomisssomeoneistobesittingrightbesidethemknowingyoucan‘thavethem。失去某人,最糟糕的……科普不同的问题肌肤怎么选择适合的护肤品?皮肤干裂、痘痘、过敏、红血丝、缺水、起皮、细纹、斑点、出油。。。姐妹们经常为了各种肌肤问题担忧,看过众多博主的方法,但是还是无法从根源解决问题,有很多由于尝试各种的护肤品,把自……361跑鞋矩阵品牌介绍:361集团是一家集品牌、研发、设计、生产、经销为一体的综合性体育用品公司,其产品包括运动鞋、服装及相关配件、童装、时尚休闲等多品类构成,集团成立于2003年,在……种植三七如何育苗三七种子什么时候采收好?三七种子播种前怎么样消毒?三七育苗需要多少种子?以下小编就作简单介绍,供网友们参考。一、三七种子的采收与贮存三七果实一般称为红籽,于1112……如果这礼拜过得很惨来看看这些比你更惨的壹国庆前夕过得怎样?如果这礼拜过得很惨,来看看这些比你更惨的www。用力过猛导致上班迟到的人。吃早餐时被酸奶虐哭的人。买了甜甜圈却因此更加不幸福的人。开……坐在艾烟里的母亲作者:牧徐徐来源:《今日文摘》入冬,气温陡降。外出采访正好路过老家,虽然已是晚上8时多了,我还是想来个突然袭击,回家看一眼母亲。一起出来的同事把我送到村口,然……个小技巧让你越忙时间越多你们会发现小六是一个在职人员,但是同时又经常在网上写文章,做分享,做咨询,周末还沪宁线各个城市到处跑,同时他还是一个速读爱好者,一天一本书的节奏他哪里来的那么多时间呢?……
踏青游览尽享美丽春光3月10日A股利好利空消息拖垮一个人的,不是没钱,不是病痛,而是这3种弱者思维生个宝宝有多难时隔4年再次更新!华为Mate50X超大屏旗舰将至,搭载骁龙詹姆斯的关键球统计很好,为什么总是有人质疑他的攻坚能力不行?太罕见了!五十万分之一概率的胎中胎CBA三热点刘志轩创生涯新高,第二阶段继续赛会制,多名外援下2028年要量产7纳米光刻机,俄罗斯能行吗?2022好玩的塔防策略战斗游戏推荐塔防策略今日欧美明星时尚街拍图集(2022年11月17日)如何引导孩子正确对待生死?只需找到恰当的打开方式夏天宝宝需要补钙吗?宝宝补钙的原则想兄弟了的说说顺产坐月子饮食禁忌宋朝花钱买和平,到底做的对不对?史记季布栾布列传以项羽之气原文翻译及鉴赏有刺造句用有刺造句大全不加乙醇的汽油好吗带勤字的男孩名字父爱代际差别与个性制约什么情况下商标已经侵权怎样提高工作能力的特征如何有效掌握工作能力怎么描

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找