go开源网络库nano(6)-hander逻辑

go开源网络库nano(6)-hander逻辑hander 前言 hander 定义方法 1 注册服务本地服远程服务 2 获取服务本地远程 3 查找远程服务 4 处理调用包的处理消息处理本地调用处理远程调用处理下一章前言 packageclust 注册服务头文件 hander gohander 定义 typeLocalHan localService string component Se receivedatao sessionwillb

前言

package cluster

注册服务

头文件 hander.go

hander 定义

type LocalHandler struct { 
    localServices map[string]*component.Service // all registered service localHandlers map[string]*component.Handler // all handler method mu sync.RWMutex remoteServices map[string][]*clusterpb.MemberInfo pipeline pipeline.Pipeline currentNode *Node } func NewHandler(currentNode *Node, pipeline pipeline.Pipeline) *LocalHandler { 
    h := &LocalHandler{ 
    localServices: make(map[string]*component.Service), localHandlers: make(map[string]*component.Handler), remoteServices: map[string][]*clusterpb.MemberInfo{ 
   }, pipeline: pipeline, currentNode: currentNode, } return h } 

// 握手和心跳消息

var ( // cached serialized data hrd []byte // handshake response data hbd []byte // heartbeat packet data ) type rpcHandler func(session *session.Session, msg *message.Message, noCopy bool) func cache() { 
    data, err := json.Marshal(map[string]interface{ 
   }{ 
    "code": 200, "sys": map[string]float64{ 
   "heartbeat": env.Heartbeat.Seconds()}, }) if err != nil { 
    panic(err) } hrd, err = codec.Encode(packet.Handshake, data) if err != nil { 
    panic(err) } hbd, err = codec.Encode(packet.Heartbeat, nil) if err != nil { 
    panic(err) } } 

方法

1. 注册服务

本地服

务和处理逻辑的hander

func (h *LocalHandler) register(comp component.Component, opts []component.Option) error { 
    //注册新的服务 s := component.NewService(comp, opts) //是否已经注册 if _, ok := h.localServices[s.Name]; ok { 
    return fmt.Errorf("handler: service already defined: %s", s.Name) } //解析符合条件的hander if err := s.ExtractHandler(); err != nil { 
    return err } // register all localHandlers h.localServices[s.Name] = s for name, handler := range s.Handlers { 
    n := fmt.Sprintf("%s.%s", s.Name, name) log.Println("Register local handler", n) h.localHandlers[n] = handler } return nil } 

远程服务

func (h *LocalHandler) initRemoteService(members []*clusterpb.MemberInfo) { 
    for _, m := range members { 
    h.addRemoteService(m) } } func (h *LocalHandler) addRemoteService(member *clusterpb.MemberInfo) { 
    h.mu.Lock() defer h.mu.Unlock() for _, s := range member.Services { 
    log.Println("Register remote service", s) h.remoteServices[s] = append(h.remoteServices[s], member) } } 

2. 获取服务

本地

func (h *LocalHandler) LocalService() []string { 
    var result []string for service := range h.localServices { 
    result = append(result, service) } sort.Strings(result) return result } 

远程

func (h *LocalHandler) RemoteService() []string { 
    h.mu.RLock() defer h.mu.RUnlock() var result []string for service := range h.remoteServices { 
    result = append(result, service) } sort.Strings(result) return result } 

3. 查找远程服务

func (h *LocalHandler) findMembers(service string) []*clusterpb.MemberInfo { 
    h.mu.RLock() defer h.mu.RUnlock() return h.remoteServices[service] } 

4. 处理调用

包的处理

func (h *LocalHandler) processPacket(agent *agent, p *packet.Packet) error { 
    switch p.Type { 
    case packet.Handshake: if _, err := agent.conn.Write(hrd); err != nil { 
    return err } agent.setStatus(statusHandshake) if env.Debug { 
    log.Println(fmt.Sprintf("Session handshake Id=%d, Remote=%s", agent.session.ID(), agent.conn.RemoteAddr())) } case packet.HandshakeAck: agent.setStatus(statusWorking) if env.Debug { 
    log.Println(fmt.Sprintf("Receive handshake ACK Id=%d, Remote=%s", agent.session.ID(), agent.conn.RemoteAddr())) } case packet.Data: if agent.status() < statusWorking { 
    return fmt.Errorf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s", agent.conn.RemoteAddr().String()) } msg, err := message.Decode(p.Data) if err != nil { 
    return err } h.processMessage(agent, msg) case packet.Heartbeat: // expected } agent.lastAt = time.Now().Unix() return nil } 

消息处理

//根据消息处理调用,然后根据是本地调用还是远程调用

func (h *LocalHandler) processMessage(agent *agent, msg *message.Message) { 
    var lastMid uint64 switch msg.Type { 
    case message.Request: lastMid = msg.ID case message.Notify: lastMid = 0 default: log.Println("Invalid message type: " + msg.Type.String()) return } handler, found := h.localHandlers[msg.Route] if !found { 
    h.remoteProcess(agent.session, msg, false) } else { 
    h.localProcess(handler, lastMid, agent.session, msg) } } 

本地调用处理

func (h *LocalHandler) localProcess(handler *component.Handler, lastMid uint64, session *session.Session, msg *message.Message) { 
    if pipe := h.pipeline; pipe != nil { 
    err := pipe.Inbound().Process(session, msg) if err != nil { 
    log.Println("Pipeline process failed: " + err.Error()) return } } var payload = msg.Data var data interface{ 
   } if handler.IsRawArg { 
    data = payload } else { 
    data = reflect.New(handler.Type.Elem()).Interface() err := env.Serializer.Unmarshal(payload, data) if err != nil { 
    log.Println(fmt.Sprintf("Deserialize to %T failed: %+v (%v)", data, err, payload)) return } } if env.Debug { 
    log.Println(fmt.Sprintf("UID=%d, Message={%s}, Data=%+v", session.UID(), msg.String(), data)) } args := []reflect.Value{ 
   handler.Receiver, reflect.ValueOf(session), reflect.ValueOf(data)} task := func() { 
    switch v := session.NetworkEntity().(type) { 
    case *agent: v.lastMid = lastMid case *acceptor: v.lastMid = lastMid } result := handler.Method.Func.Call(args) if len(result) > 0 { 
    if err := result[0].Interface(); err != nil { 
    log.Println(fmt.Sprintf("Service %s error: %+v", msg.Route, err)) } } } index := strings.LastIndex(msg.Route, ".") if index < 0 { 
    log.Println(fmt.Sprintf("nano/handler: invalid route %s", msg.Route)) return } // A message can be dispatch to global thread or a user customized thread service := msg.Route[:index] if s, found := h.localServices[service]; found && s.SchedName != "" { 
    sched := session.Value(s.SchedName) if sched == nil { 
    log.Println(fmt.Sprintf("nanl/handler: cannot found `schedular.LocalScheduler` by %s", s.SchedName)) return } local, ok := sched.(scheduler.LocalScheduler) if !ok { 
    log.Println(fmt.Sprintf("nanl/handler: Type %T does not implement the `schedular.LocalScheduler` interface", sched)) return } local.Schedule(task) } else { 
    scheduler.PushTask(task) } } 

远程调用处理

func (h *LocalHandler) remoteProcess(session *session.Session, msg *message.Message, noCopy bool) { 
    index := strings.LastIndex(msg.Route, ".") if index < 0 { 
    log.Println(fmt.Sprintf("nano/handler: invalid route %s", msg.Route)) return } service := msg.Route[:index] members := h.findMembers(service) if len(members) == 0 { 
    log.Println(fmt.Sprintf("nano/handler: %s not found(forgot registered?)", msg.Route)) return } // Select a remote service address // 1. Use the service address directly if the router contains binding item // 2. Select a remote service address randomly and bind to router var remoteAddr string if addr, found := session.Router().Find(service); found { 
    remoteAddr = addr } else { 
    remoteAddr = members[rand.Intn(len(members))].ServiceAddr session.Router().Bind(service, remoteAddr) } pool, err := h.currentNode.rpcClient.getConnPool(remoteAddr) if err != nil { 
    log.Println(err) return } var data = msg.Data if !noCopy && len(msg.Data) > 0 { 
    data = make([]byte, len(msg.Data)) copy(data, msg.Data) } // Retrieve gate address and session id gateAddr := h.currentNode.ServiceAddr sessionId := session.ID() switch v := session.NetworkEntity().(type) { 
    case *acceptor: gateAddr = v.gateAddr sessionId = v.sid } client := clusterpb.NewMemberClient(pool.Get()) switch msg.Type { 
    case message.Request: request := &clusterpb.RequestMessage{ 
    GateAddr: gateAddr, SessionId: sessionId, Id: msg.ID, Route: msg.Route, Data: data, } _, err = client.HandleRequest(context.Background(), request) case message.Notify: request := &clusterpb.NotifyMessage{ 
    GateAddr: gateAddr, SessionId: sessionId, Route: msg.Route, Data: data, } _, err = client.HandleNotify(context.Background(), request) } if err != nil { 
    log.Println(fmt.Sprintf("Process remote message (%d:%s) error: %+v", msg.ID, msg.Route, err)) } } 

下一章

下一章看mesage消息

今天的文章 go开源网络库nano(6)-hander逻辑分享到此就结束了,感谢您的阅读。
编程小号
上一篇 2024-12-21 19:33
下一篇 2024-12-21 19:30

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/92344.html