以下是源码中涉及到使用pipeline的地方
localProcess是nano中处理客户端请求消息的方法
func (h *LocalHandler) localProcess(... ...) {
// NOTE: msg是message.Message类型 if pipe := h.pipeline; pipe != nil {
err := pipe.Inbound().Process(session, msg) if err != nil {
log.Println("Pipeline process failed: " + err.Error()) return } } // ... 执行请求消息对应handler }
write是nano中处理服务端响应消息的方法,说白了就是读写分离的异步写消息的地方
func (a *agent) write(){
for {
select{
case: ... ... case data := <-a.chSend: // 业务消息序列化 ... // 包装成Message if pipe := a.pipeline; pipe != nil {
err := pipe.Outbound().Process(a.session, m) if err != nil {
log.Println("broken pipeline", err.Error()) break } // 发送 Message } } } }
Pipeline含有两个pipelineChannel(以下简称Channel), 一个称为outbound,一个称为inbound。每个Channel可以注册一系列Pipeline.Func, 在调用Process顺序执行。
type Func func(s *session.Session, msg *message.Message) error
源码进入=> lonng/nano/blob/master/pipeline/pipeline.go
也就是说,Pipeline相当于Message与Handler(业务逻辑)的中间人,如果我们需要在message和Handler的中间对message做一些处理可以在Pipeline中完成。比如,流量统计、加密、校验等行为。
今天的文章 pipeline是什么?——lonng/nano分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/92358.html