问答
直播中

刘 刚

10年用户 42经验值
擅长:6933
私信 关注

【MYS-6ULX-IOT试用体验】公用洗衣机无线控制(结项贴)

本帖最后由 sandman78 于 2017-8-7 17:14 编辑

本项目实现公用洗衣机的移动端预订、付费、排队。洗衣机通过WIFI联网,采用MQTT协议与服务器通讯,执行服务器下发的命令。具体实现为接收mqtt服务器发布的命令,根据需要发到串口控制洗衣机工作。mqtt应用层采用json数据格式,串口采用自定义二进制格式。关于串口编程之前有过介绍,Go的标准库支持json处理,这里只介绍开发板的mqtt应用部分、不涉及具体业务。

mqtt数据处理

mqtt简介

mqtt是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布,目前最新版本为v3.1.1。MQTT最大的优点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为一种低开销、低带宽占用的即时通讯协议,MQTT在物联网、小型设备、移动应用等方面有广泛的应用。详细介绍可参见官网的介绍。

go语言对MQTT的支持

eclipse提供了paho.mqtt.golang客户端包

首先下载

  1. $ go get github.com/eclipse/paho.mqtt.golang



使用之前导入
  1. import "github.com/eclipse/paho.mqtt.golang"


设置MQTT的参数
  1. opts := MQTT.NewClientOptions().AddBroker(addr).SetClientID("go_pc").SetCleanSession(true).SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
  2.     choke <- [2]string{msg.Topic(), string(msg.Payload())}
  3. })

AddBroker用于设置MQTT服务器的地址
SetClientID设置client的ID,在同一网络中,客户端ID是唯一的.
SetCleanSession, 如果设置为false,Client断开连接后,Server应该保存Client的订阅信息。如果为true,表示Server应该立刻丢弃任何会话状态信息。
SetDefaultPublishHandler,设置收到消息后的默认处理函数,这里把收到的主题和消息内容通过choke发送给处理函数。

连接MQTT服务器
  1. client := MQTT.NewClient(opts)
  2. if token := client.Connect(); token.Wait() && token.Error() != nil {
  3.     panic(token.Error())
  4. }

订阅关注的消息
  1. for _, topic := range subTopics {
  2.     if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
  3.         fmt.Println(token.Error())
  4.         os.Exit(1)
  5.     }
  6. }

subTopics为字符串数组保存着待订阅的主题。

运行消息发布goroutine,通过<-message接收发布的消息和主题,message类型为chan [2]string分别对应主题和消息
  1. go func() {
  2.     for {
  3.         msg := <-message
  4.         token := client.Publish(msg[0], 0, false, msg[1])
  5.         token.Wait()
  6.     }
  7. }()

主循环
  1. for {
  2.         select {
  3.         case data := <-choke:
  4.         ... // 这里处理接收到的订阅消息
  5.         message <- [2]string{"topic1/topic2", string(buf)} // 发布消息

  6.         case <-time.After(time.Second * 1):
  7.         ... // 这里可处理定时任务
  8.         }
  9.     }
  10. }

参考完整代码如下

  1. func qmqtt(addr string, posid []string) {
  2.     c := &serial.Config{Name: PORT, Baud: 115200}
  3.     s, err := serial.OpenPort(c)
  4.     defer s.Close()
  5.     if err != nil {
  6.         log.Fatal(err)
  7.     }

  8.     var subTopics = []string{"washer/c2d/raw/" + POSIDS}
  9.     choke := make(chan [2]string)
  10.     message := make(chan [2]string)

  11.     opts := MQTT.NewClientOptions().AddBroker(addr).SetClientID("go_pc").SetCleanSession(true).SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
  12.         choke <- [2]string{msg.Topic(), string(msg.Payload())}
  13.     })

  14.     client := MQTT.NewClient(opts)

  15.     if token := client.Connect(); token.Wait() && token.Error() != nil {
  16.         panic(token.Error())
  17.     }

  18.     for _, topic := range subTopics {
  19.         if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
  20.             fmt.Println(token.Error())
  21.             os.Exit(1)
  22.         }
  23.     }

  24.     go func() {
  25.         for {
  26.             msg := <-message
  27.             token := client.Publish(msg[0], 0, false, msg[1])
  28.             token.Wait()
  29.         }
  30.     }()

  31.     p := packet.NewPacket(s, 0x55, []int{6, 0, 1})
  32.     x := washer.Encode(POSID, 0x11, []byte{})
  33.     for {
  34.         select {
  35.         case data := <-choke:
  36.             buf, err := p.Exchange([]byte(data[1]))
  37.             if err != nil {
  38.                 fmt.Println(err)
  39.                 continue
  40.             }
  41.             if _, err := washer.Decode(POSID, 0x11, buf); err != nil {
  42.                 fmt.Println(err)
  43.                 continue
  44.             }
  45.             message <- [2]string{"washer/d2c/" + POSIDS, string(buf)}
  46.         }
  47.     }
  48. }


回帖(1)

jyaxz

2018-5-10 13:54:23
mqtt这个挺好的东西,谢谢楼主的研究
举报

更多回帖

发帖
×
20
完善资料,
赚取积分