edgehub源码分析
edgehub 源码分析
edgehub是Edge上的通信接口模块,用于云边消息同步
结构定义及初始化
edgehub的结构定义
type EdgeHub struct {
context *beehiveContext.Context
chClient clients.Adapter
config *config.ControllerConfig
reconnectChan chan struct{}
cancel context.CancelFunc
syncKeeper map[string]chan model.Message
keeperLock sync.RWMutex
}在注册edgehub模块的时间 对edgehub进行了初始化
func Register() {
core.Register(&EdgeHub{
config: &config.GetConfig().CtrConfig,
reconnectChan: make(chan struct{}),
syncKeeper: make(map[string]chan model.Message),
})
}在模块启动时先将拿到beehiveContext,然后获取EdgehubConfig
在启动时间加载ControllerConfig 根据使用的ControllerConfig中的protocol加载对应的websocket config或者quicconfig
然后初始化EdgeHub.Controller.chClient 配置,然后初始化对应连接用于和cloudcore通信
给其他组件同步连接成功状态
然后通过pubConnectInfo给其他的组(resource,twin,func,user)发消息,告诉他们 云端连接成功了
message结构定义如下
type Message struct { Header MessageHeader json:"header" Router MessageRoute json:"route,omitempty" Content interface{} json:"content" }
header包含了
消息的ID
消息的父ID
时间戳
是否被同步
router 定义了以下对象
来源
广播到哪个组
动作
操作的资源
发消息
我们可以看到model.NewMessage("").BuildRouter接收四个参数,分别为:
来源
发给哪个组
资源类型
动作
这里的NewMessage parentID参数为空 证明这是消息的发起者
接下来启动了三个协程
routeToEdge
routeToCloud
keepalive
routeToEdge
routeToEdge接收信息 然后发送信息到对应的group, 判断group是否存在,判断是否是已有同步响应,如果没有发送给对应组。这里就使用到了beehive的messageContext. 接下来根据parentid将此条消息发送到syncKeeper channel里
routeToCloud
将在channel收到的消息发送到云端,同时将消息保存在syncKeeper,这里创建了一个定时器,过期的话会自动删除
keepalive
根据心跳时间向云端发送心跳
扫描关注我:

Last updated
Was this helpful?