eventbus源码分析

EventBus

EventBus 是一个MQTT客户端

初始化

在初始化eventbus时获取mqtt模式 external/internal

启动

根据配置初始化Mqttclient,创建Internal Mqtt client或者external Mqtt client,设置qs,retain策略和队列的大小

external mqtt broker

InitSubClient

设置连接参数启动连接

func (mq *Client) InitSubClient() {
    timeStr := strconv.FormatInt(time.Now().UnixNano()/1e6, 10)
    right := len(timeStr)
    if right > 10 {
        right = 10
    }
    subID := fmt.Sprintf("hub-client-sub-%s", timeStr[0:right])
    subOpts := util.HubClientInit(mq.MQTTUrl, subID, "", "")
    subOpts.OnConnect = onSubConnect
    subOpts.AutoReconnect = false
    subOpts.OnConnectionLost = onSubConnectionLost
    mq.SubCli = MQTT.NewClient(subOpts)
    util.LoopConnect(subID, mq.SubCli)
    klog.Info("finish hub-client sub")
}

以下两个函数定义了当失联和连接时的处理逻辑

token用于确定连接状态 可以看到 它订阅了以下topic

当在这些topic中获得消息时,通过mqtt的Subscribe方法回调OnSubMessageReceived

OnSubMessageReceived

该函数判断topic,"$hw/events/device"和"$hw/events/node"开头发送给twingroup也就是devicetwin,其他信息发送给edgehub 然后通过SendToGroup发送到devicetwin

InitPubClient

InitPubClient只是创建了一个MQTTclient,然后每五秒钟连接一次mqtt server,当失败是通过,重新初始化

Internal mqtt broker

启动一个内置的qttserver

pubCloudMsgToEdge

在启动/连接完MQTTserver后,调用了pubCloudMsgToEdge方法

pubCloudMsgToEdge执行以下操作

  • 从beehive获取消息

  • 获取消息的动作和资源

  • 当动作为 subscribe 时从MQTT订阅消息

  • 当动作为 message 时,将消息的message发送给MQTT broker,消息类型是一个map,

  • 当动作为 publish 时,将消息的message发送给MQTT broker, 消息为一个字符串,topic和resource一致

  • 当动作为 get_result时,resource必须为auth_info,

    然后发送消息到fmt.Sprintf("$hw/events/node/%s/authInfo/get/result", mqttBus.NodeID)topic

扫描关注我:

微信

Last updated

Was this helpful?