文章目录
ZMQ 通信协议小结 🐝
前言 🔍
zmq的三种模型 💦
1、Request_Reply模式(请求——应答): REP、 REQ ☎️
伪代码
应用场景
2、Publish-Subscribe模式(发布——订阅): PUB、SUB 🎙
伪代码
应用场景
3、Parallel Pipeline模式(push——pull): PUSH、PULL 🔗
伪代码
应用场景
ZMQ 通信协议小结 🐝
最近有时间了把这个坑填一填!!!
前言 🔍
项目中涉及到 zmq通信协议相关内容,所以将学习、使用过程同步分享
通篇以代码分享为主,且本文对底层socket不做过多叙述,以实际应用为准,希望能帮到各位!
Talk is cheap, Show me the code
zmq的三种模型 💦
1、Request_Reply模式(请求——应答): REP、 REQ ☎️
一发一收 无缓存 断开连接数据丢失;
生产中也可以一个server对应多个client;
双向消息,REP(server)端必须recv到REQ(client)的消息之后,调用send返回,否则通道堵塞; 相同的 REQ(client)端负责send消息到REP(server),然后调用recv获取REP(server)的返回;
伪代码
server.py
# 1、Request_Reply模式
# server
import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://*:5556')
while True:
message = socket.recv()
print(message)
socket.send('server response')
Jetbrains全家桶1年46,售后保障稳定
client.py
# client import zmqimport syscontext = zmq.Context()socket = context.socket(zmq.REQ)socket.connect('tcp://localhost:5556')while True: data = raw_input('input your data:') if data == 'q': sys.exit() socket.send(data) response = socket.recv() print(response)
应用场景
场景说明:
我们定义一个非阻塞 的消息通道, 用作发送特定的Python结构体数据,包含三个文件如下:
Code:
server.py
import time
import zmq
from data import zmqStruct
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5656")
while True:
try:
message = socket.recv_pyobj(zmq.NOBLOCK)
print(message)
#time.sleep(1)
socket.send_pyobj('123123123')
except zmq.Again as e:
if e.errno!=zmq.EAGAIN:
print(repr(e))
time.sleep(1)
client.py
from data import zmqStruct
def zmqREQ():
import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://{}:5656".format('192.168.24.107'))
return socket
sendStruct = zmqStruct()
zmqClient = zmqREQ()
zmqClient.send_pyobj(sendStruct)
print zmqClient.recv_pyobj()
data.py
class zmqStruct(onject): # 消息结构体
def __init__(self, cmd=0, data=None, desc=''):
self.cmd = cmd
self.data = data
self.desc = desc
----
2、Publish-Subscribe模式(发布——订阅): PUB、SUB 🎙
广播所有client,无缓存,断开连接数据丢失。(当然所有的问题都可以通过增加中间层的方式解决);
发布端发布主题topic,订阅端只会收到已订阅的主题topic;
PUB端发送消息,SUB端接受消息;
SUB可以注册多个PUB;
如果PUB没有任何SUB,那么消息将会被丢弃;
SUB端消费过慢,消息则堆积到PUB端
单工-单向数据传输
伪代码
server.py
# 2、Publish-Subscribe模式
# server
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5005")
while True:
msg = input('input your data:').encode('utf-8')
socket.send(msg)
client.py
# client
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:5005')
# 使用socket.setsockopt()进行过滤
socket.setsockopt(zmq.SUBSCRIBE,b'')
while True:
print(socket.recv_string())
应用场景
场景说明:
我们假定 有一个任务调度器 , 结构为 1个 master 对应 10个 slave, master接受任务,将任务投递给 slave.
Code:
master.py
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5000")
tasks = [i for i in range(100)]
def pub():
# 这个延时 是为了服务端绑定 socket 后会等待200毫秒避免消息丢失; 也是为了保证服客户端环境完备的折中之举
time.sleep(1)
for i in tasks:
socket.send(str(i))
if __name__ == '__main__':
pub()
slave.py
import time
import threading
from concurrent.futures import ThreadPoolExecutor
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:5000")
socket.setsockopt(zmq.SUBSCRIBE, '')
threadpool = ThreadPoolExecutor(10)
def submsg():
""" socket 接受消息使用 `zmq.NOBLOCK` 非阻塞模式来进行,可以保证保证循环体内的其他功能正常使用 :return: """
while 1:
try:
msg = socket.recv(flags=zmq.NOBLOCK)
except zmq.Again as e:
if e.errno != zmq.EAGAIN:
print(repr(e))
else:
print '接收到广播消息,线程池投递任务 msg={}'.format(msg)
threadpool.submit(work, msg)
def work(msg):
print '开始工作 参数{}'.format(msg)
time.sleep(2) # 模拟功能执行时间
print '结束工作'
if __name__ == '__main__':
submsg()
----
3、Parallel Pipeline模式(push——pull): PUSH、PULL 🔗
管道模式(单工) – 单向通道;
可以由三部分组成:push推送数据,work缓存数据,pull竞争数据,断开连接数据不丢失,重连继续发送。work中间件可以去掉;
伪代码
server.py
# 3、Parallel Pipeline模式
# server
import zmq
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind('tcp://*:5566')
while True:
data = socket.recv()
print(data)
work.py
# work 无work push 会阻塞掉
import zmq
context = zmq.Context()
recive = context.socket(zmq.PULL)
recive.connect('tcp://127.0.0.1:5565')
sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5566')
while True:
data = recive.recv()
sender.send(data)
client.py
# client
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind('tcp://*:5565')
while True:
data = raw_input('input your data:')
socket.send(data)
应用场景
场景说明:
Code:
Error:
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/hz/110533.html