eventbus源码分析
EventBus
EventBus 是一个MQTT客户端
初始化
在初始化eventbus时获取mqtt模式 external/internal
启动
根据配置初始化Mqttclient,创建Internal Mqtt client或者external Mqtt client,设置qs,retain策略和队列的大小
external mqtt broker
InitSubClient
设置连接参数启动连接
func (mq *Client) InitSubClient() {
timeStr := strconv.FormatInt(time.Now().UnixNano()/1e6, 10)
right := len(timeStr)
if right > 10 {
right = 10
}
subID := fmt.Sprintf("hub-client-sub-%s", timeStr[0:right])
subOpts := util.HubClientInit(mq.MQTTUrl, subID, "", "")
subOpts.OnConnect = onSubConnect
subOpts.AutoReconnect = false
subOpts.OnConnectionLost = onSubConnectionLost
mq.SubCli = MQTT.NewClient(subOpts)
util.LoopConnect(subID, mq.SubCli)
klog.Info("finish hub-client sub")
}以下两个函数定义了当失联和连接时的处理逻辑
token用于确定连接状态 可以看到 它订阅了以下topic
当在这些topic中获得消息时,通过mqtt的Subscribe方法回调OnSubMessageReceived
OnSubMessageReceived
该函数判断topic,"$hw/events/device"和"$hw/events/node"开头发送给twingroup也就是devicetwin,其他信息发送给edgehub 然后通过SendToGroup发送到devicetwin
InitPubClient
InitPubClient只是创建了一个MQTTclient,然后每五秒钟连接一次mqtt server,当失败是通过,重新初始化
Internal mqtt broker
启动一个内置的qttserver
pubCloudMsgToEdge
在启动/连接完MQTTserver后,调用了pubCloudMsgToEdge方法
pubCloudMsgToEdge执行以下操作
从beehive获取消息
获取消息的动作和资源
当动作为 subscribe 时从MQTT订阅消息
当动作为 message 时,将消息的message发送给MQTT broker,消息类型是一个map,
当动作为 publish 时,将消息的message发送给MQTT broker, 消息为一个字符串,topic和resource一致
当动作为 get_result时,resource必须为auth_info,
然后发送消息到
fmt.Sprintf("$hw/events/node/%s/authInfo/get/result", mqttBus.NodeID)topic
扫描关注我:

Last updated
Was this helpful?