edgehub源码分析

edgehub 源码分析

edgehub是Edge上的通信接口模块,用于云边消息同步

结构定义及初始化

edgehub的结构定义

type EdgeHub struct {
    context       *beehiveContext.Context
    chClient      clients.Adapter
    config        *config.ControllerConfig
    reconnectChan chan struct{}
    cancel        context.CancelFunc
    syncKeeper    map[string]chan model.Message
    keeperLock    sync.RWMutex
}

在注册edgehub模块的时间 对edgehub进行了初始化

func Register() {
    core.Register(&EdgeHub{
        config:        &config.GetConfig().CtrConfig,
        reconnectChan: make(chan struct{}),
        syncKeeper:    make(map[string]chan model.Message),
    })
}

在模块启动时先将拿到beehiveContext,然后获取EdgehubConfig

在启动时间加载ControllerConfig 根据使用的ControllerConfig中的protocol加载对应的websocket config或者quicconfig

然后初始化EdgeHub.Controller.chClient 配置,然后初始化对应连接用于和cloudcore通信

给其他组件同步连接成功状态

然后通过pubConnectInfo给其他的组(resource,twin,func,user)发消息,告诉他们 云端连接成功了

message结构定义如下

type Message struct { Header MessageHeader json:"header" Router MessageRoute json:"route,omitempty" Content interface{} json:"content" }

header包含了

  • 消息的ID

  • 消息的父ID

  • 时间戳

  • 是否被同步

router 定义了以下对象

  • 来源

  • 广播到哪个组

  • 动作

  • 操作的资源

发消息

我们可以看到model.NewMessage("").BuildRouter接收四个参数,分别为:

  • 来源

  • 发给哪个组

  • 资源类型

  • 动作

这里的NewMessage parentID参数为空 证明这是消息的发起者

接下来启动了三个协程

  • routeToEdge

  • routeToCloud

  • keepalive

routeToEdge

routeToEdge接收信息 然后发送信息到对应的group, 判断group是否存在,判断是否是已有同步响应,如果没有发送给对应组。这里就使用到了beehive的messageContext. 接下来根据parentid将此条消息发送到syncKeeper channel里

routeToCloud

将在channel收到的消息发送到云端,同时将消息保存在syncKeeper,这里创建了一个定时器,过期的话会自动删除

keepalive

根据心跳时间向云端发送心跳

扫描关注我:

微信

Last updated

Was this helpful?