本帖最后由 sandman78 于 2017-8-7 17:14 编辑
本项目实现公用洗衣机的移动端预订、付费、排队。洗衣机通过WIFI联网,采用MQTT协议与服务器通讯,执行服务器下发的命令。具体实现为接收mqtt服务器发布的命令,根据需要发到串口控制洗衣机工作。mqtt应用层采用json数据格式,串口采用自定义二进制格式。关于串口编程之前有过介绍,Go的标准库支持json处理,这里只介绍
开发板的mqtt应用部分、不涉及具体业务。
mqtt数据处理
mqtt简介
mqtt是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布,目前最新版本为v3.1.1。MQTT最大的优点在于可以以极少的代码和有限的带宽,为远程设备提供实时可靠的消息服务。做为一种低开销、低带宽占用的即时通讯协议,MQTT在物联网、小型设备、移动应用等方面有广泛的应用。详细介绍可参见官网的介绍。
go语言对MQTT的支持
eclipse提供了paho.mqtt.golang客户端包
首先下载
- $ go get github.com/eclipse/paho.mqtt.golang
使用之前导入
- import "github.com/eclipse/paho.mqtt.golang"
设置MQTT的参数
- opts := MQTT.NewClientOptions().AddBroker(addr).SetClientID("go_pc").SetCleanSession(true).SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
- choke <- [2]string{msg.Topic(), string(msg.Payload())}
- })
AddBroker用于设置MQTT服务器的地址
SetClientID设置client的ID,在同一网络中,客户端ID是唯一的.
SetCleanSession, 如果设置为false,Client断开连接后,Server应该保存Client的订阅信息。如果为true,表示Server应该立刻丢弃任何会话状态信息。
SetDefaultPublishHandler,设置收到消息后的默认处理函数,这里把收到的主题和消息内容通过choke发送给处理函数。
连接MQTT服务器
- client := MQTT.NewClient(opts)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
订阅关注的消息
- for _, topic := range subTopics {
- if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
- fmt.Println(token.Error())
- os.Exit(1)
- }
- }
subTopics为字符串数组保存着待订阅的主题。
运行消息发布goroutine,通过<-message接收发布的消息和主题,message类型为chan [2]string分别对应主题和消息
- go func() {
- for {
- msg := <-message
- token := client.Publish(msg[0], 0, false, msg[1])
- token.Wait()
- }
- }()
主循环
- for {
- select {
- case data := <-choke:
- ... // 这里处理接收到的订阅消息
- message <- [2]string{"topic1/topic2", string(buf)} // 发布消息
- case <-time.After(time.Second * 1):
- ... // 这里可处理定时任务
- }
- }
- }
参考完整代码如下
- func qmqtt(addr string, posid []string) {
- c := &serial.Config{Name: PORT, Baud: 115200}
- s, err := serial.OpenPort(c)
- defer s.Close()
- if err != nil {
- log.Fatal(err)
- }
- var subTopics = []string{"washer/c2d/raw/" + POSIDS}
- choke := make(chan [2]string)
- message := make(chan [2]string)
- opts := MQTT.NewClientOptions().AddBroker(addr).SetClientID("go_pc").SetCleanSession(true).SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
- choke <- [2]string{msg.Topic(), string(msg.Payload())}
- })
- client := MQTT.NewClient(opts)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
- for _, topic := range subTopics {
- if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
- fmt.Println(token.Error())
- os.Exit(1)
- }
- }
- go func() {
- for {
- msg := <-message
- token := client.Publish(msg[0], 0, false, msg[1])
- token.Wait()
- }
- }()
- p := packet.NewPacket(s, 0x55, []int{6, 0, 1})
- x := washer.Encode(POSID, 0x11, []byte{})
- for {
- select {
- case data := <-choke:
- buf, err := p.Exchange([]byte(data[1]))
- if err != nil {
- fmt.Println(err)
- continue
- }
- if _, err := washer.Decode(POSID, 0x11, buf); err != nil {
- fmt.Println(err)
- continue
- }
- message <- [2]string{"washer/d2c/" + POSIDS, string(buf)}
- }
- }
- }