edged源码分析
service bus
ServiceBus是一个运行在边缘的HTTP客户端,接受来自云上服务的请求, 与运行在边缘端的HTTP服务器交互,提供了云上服务通过HTTP协议访问边缘端HTTP服务器的能力。
代码逻辑
servicebus的功能比较简单
func (sb *servicebus) Start(c *beehiveContext.Context) {
// no need to call TopicInit now, we have fixed topic
var ctx context.Context
sb.context = c
ctx, sb.cancel = context.WithCancel(context.Background())
var htc = new(http.Client)
htc.Timeout = time.Second * 10
var uc = new(util.URLClient)
uc.Client = htc
//Get message from channel
for {
select {
case <-ctx.Done():
klog.Warning("ServiceBus stop")
return
default:
}
msg, err := sb.context.Receive("servicebus")
if err != nil {
klog.Warningf("servicebus receive msg error %v", err)
continue
}
go func() {
klog.Infof("ServiceBus receive msg")
source := msg.GetSource()
if source != sourceType {
return
}
resource := msg.GetResource()
r := strings.Split(resource, ":")
if len(r) != 2 {
m := "the format of resource " + resource + " is incorrect"
klog.Warningf(m)
code := http.StatusBadRequest
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
}
content, err := json.Marshal(msg.GetContent())
if err != nil {
klog.Errorf("marshall message content failed %v", err)
m := "error to marshal request msg content"
code := http.StatusBadRequest
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
}
var httpRequest util.HTTPRequest
if err := json.Unmarshal(content, &httpRequest); err != nil {
m := "error to parse http request"
code := http.StatusBadRequest
klog.Errorf(m, err)
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
}
operation := msg.GetOperation()
targetURL := "http://127.0.0.1:" + r[0] + "/" + r[1]
resp, err := uc.HTTPDo(operation, targetURL, httpRequest.Header, httpRequest.Body)
if err != nil {
m := "error to call service"
code := http.StatusNotFound
klog.Errorf(m, err)
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
}
resp.Body = http.MaxBytesReader(nil, resp.Body, maxBodySize)
resBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
if err.Error() == "http: request body too large" {
err = fmt.Errorf("response body too large")
}
m := "error to receive response, err: " + err.Error()
code := http.StatusInternalServerError
klog.Errorf(m, err)
if response, err := buildErrorResponse(msg.GetID(), m, code); err == nil {
sb.context.SendToGroup(modules.HubGroup, response)
}
return
}
response := util.HTTPResponse{Header: resp.Header, StatusCode: resp.StatusCode, Body: resBody}
responseMsg := model.NewMessage(msg.GetID())
responseMsg.Content = response
responseMsg.SetRoute("servicebus", modules.UserGroup)
sb.context.SendToGroup(modules.HubGroup, *responseMsg)
}()
}
}
根据代码分为以下步骤:
拿到回传的beehiveContext,初始化http连接,设置超时十秒
初始化URLClient
从beehiveContext 里面接收接收servicebus的消息,获取消息来源,来源必须是router_rest
然后获取消息包含的resource,根据代码可以看出resource是一个以冒号分割的字符串,根据后续代码可以知道,实际上就是 port:url
获取对应的消息内容,该内容是一个json,最终被反序列化为
type HTTPRequest struct {
Header http.Header `json:"header"`
Body []byte `json:"body"`
}
获取动作 - 即http的 get/post/put...
发出请求接收返回,判断返回数据量大小,最大为 5*1e6个字节,超过就报错
以接收到的消息ID作为父ID通过返回数据给edgehub
扫描关注我:
Last updated