ZeroMQ(Ø)是一个轻量级的消息内核,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。
ZeroMQ是基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。ZeroMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间。
ZeroMQ几乎所有的I/O操作都是异步的,主线程不会被阻塞。ZeroMQ会根据用户调用zmq_init函数时传入的接口参数,创建对应数量的I/O Thread。
ZeroMQ不是单独的服务或者程序,仅仅是一套组件,其封装了网络通信、消息队列、线程调度等功能,向上层提供简洁的API,应用程序通过加载库文件,调用API函数来实现高性能网络通信。
ZeroMQ将消息通信分成4种模型,分别是
一对一结对模型(Exclusive-Pair):一个TCP Connection,但是TCP Server只能接受一个连接,数据可以双向流动
请求回应模型(Request-Reply):跟一对一结对模型的区别在于请求端可以是1~N个,该模型主要用于远程调用及任务分配等
发布订阅模型(Publish-Subscribe):发布端单向分发数据,且不关心是否把全部信息发送给订阅端。如果发布端开始发布信息时,订阅端尚未连接上来,则这些信息会被直接丢弃。订阅端未连接导致信息丢失的问题,可以通过与请求回应模型组合来解决。订阅端只负责接收,而不能反馈,且在订阅端消费速度慢于发布端的情况下,会在订阅端堆积数据。该模型主要用于数据分发。天气预报、微博明星粉丝可以应用这种经典模型。
推拉模型(Push-Pull):Server端作为Push端,而Client端作为Pull端,如果有多个Client端同时连接到Server端,则Server端会在内部做一个负载均衡,采用平均分配的算法,将所有消息均衡发布到Client端上。与发布订阅模型相比,推拉模型在没有消费者的情况下,发布的消息不会被消耗掉;在消费者能力不够的情况下,能够提供多消费者并行消费解决方案。该模型主要用于多任务并行。
请求响应模型
发布订阅模型
推拉模型
异步请求响应
实例1-
namespace HelloWorldDemoSeparateThreads
输出结果
视频演示
#主程序 public class Program { public static void Main(string[] args) { Server server = new Server();//服务器 server.Run();//运行服务器 foreach (Client client in Enumerable.Range(0, 5).Select( x => new Client(string.Format("client {0}", x)))) //创建5个客户端线程 { client.Run();//运行每个客户端 } Console.ReadLine(); } }
#客户端类 sealed class Client { private readonly string clientName;//私有只读字段 public Client(string clientName)//客户端构造函数:客户端名 { this.clientName = clientName; } public void Run() { Task.Run(() => { using (NetMQContext ctx = NetMQContext.Create())//零消息队列上下文 { using (var client = ctx.CreateRequestSocket())//创建请求套接字 客户端 { client.Connect("tcp://127.0.0.1:5556");//客户端连接服务器IP和端口 while (true) { client.Send(string.Format("Hello from client {0}", clientName));//客户端发送:客户端名 string fromServerMessage = client.ReceiveString();//客户端接收服务器发来的字符串:阻塞线程 Console.WriteLine("From Server: {0} running on ThreadId : {1}", fromServerMessage, Thread.CurrentThread.ManagedThreadId);// 一个整数,表示此托管线程的唯一标识符。 Thread.Sleep(5000);//等5秒 } } } }); } }
#服务器类 sealed class Server //服务器类 { public void Run() { Task.Run(() => { using (NetMQContext ctx = NetMQContext.Create())//零消息队列上下文 { using (var server = ctx.CreateResponseSocket())//创建响应套接字 { server.Bind("tcp://127.0.0.1:5556");//绑定IP 端口 while (true) { string fromClientMessage = server.ReceiveString();//从客户端接收字符串,阻塞线程 Console.WriteLine("From Client: {0} running on ThreadId : {1}", fromClientMessage, Thread.CurrentThread.ManagedThreadId);//显示接收内容和当前线程的主线程ID server.Send("Hi Back");//发送给客户端 } } } }); } }
实例2-SendMore 测试
[TestFixture]//测试装置表示执行一项或多项测试所需的准备工作,以及任何相关的清理操作。 例如,这可能涉及创建临时或代理数据库、目录或启动服务器进程。 public class SendMoreTests { [Test] public void SendMoreTest() { using (NetMQContext ctx = NetMQContext.Create()) { using (var server = ctx.CreateResponseSocket())//服务器:响应套接字 { server.Bind("tcp://127.0.0.1:5556"); using (var client = ctx.CreateRequestSocket())//客户端 :请求套接字 { client.Connect("tcp://127.0.0.1:5556"); //客户端发消息 client.SendMore("A");//字符串+bool变量dontWait默认false client.Send("Hello"); //server receive 1st part服务器端接收第一部分 bool more; string m = server.ReceiveString(out more); Assert.AreEqual("A", m);//判断接收的第一部分是否为“A” Assert.IsTrue(more);//判断是否为sendmore方法 //server receive 2nd part接收第二部分 string m2 = server.ReceiveString(out more); Assert.AreEqual("Hello", m2);//判断接收数据是否为Hello Assert.False(more);//send方法 //server send message, this time use NetMqMessage //which will be sent as frames if the client calls //ReceieveMessage() //服务器发送消息,这次使用 NetMqMessage,如果客户端调用 ReceieveMessage(),它将作为NetMQframes发送 var m3 = new NetMQMessage();//消息 m3.Append("From"); m3.Append("Server"); server.SendMessage(m3);//发送消息 //client receive客户端接收消息 var m4 = client.ReceiveMessage(); Assert.AreEqual(2, m4.frameCount);//判断帧数据量 Assert.AreEqual("From", m4[0].ConvertToString());// 第一个NetMQframe 是否为Form Assert.AreEqual("Server", m4[1].ConvertToString());// 第一个NetMQframe 是否为Server } } } } }
实例3-ZeroMqIdentity 标识符
视频演示
using System;using System.Collections.Generic;using System.Threading;using System.Threading.Tasks;using Microsoft.SqlServer.Server;using NetMQ;using NetMQ.Sockets;namespace ZeroMqIdentity{ public class Program : IDisposable//一个标准的释放非托管资源的类应该去实现IDisposable接口 { private List
实例4-Poller 轮询器测试
[TestFixture] public class PollerTests //轮询器 测试 { [Test] public void SingleSocketPollTest() //单套接字轮训测试 { using (NetMQContext contex = NetMQContext.Create())//零消息队列上下文 { using (var rep = contex.CreateResponseSocket())//创建服务器端 响应套接字 { rep.Bind("tcp://127.0.0.1:5002");//绑定地址端口 using (var req = contex.CreateRequestSocket())//创建客户端 请求套接字 using (Poller poller = new Poller()) //初始化轮训器 { req.Connect("tcp://127.0.0.1:5002");//客户端连接服务器 //ReceiveReady 事件 由轮询器引发 rep.ReceiveReady += (s, a) => //服务器的接收就绪事件:参数 s a:事件参数 {//Response类型的socket也是同步的,与Request的意思差不多,不过顺序是先recv再send bool more; string m = a.Socket.ReceiveString(out more);//接收客户端 发来的请求 Assert.False(more);//断言more 为false Assert.AreEqual("Hello", m);//断言接收的字符串为Hello a.Socket.Send("World");//服务端响应请求 }; //rep 接收客户端发来消息Hello,响应World poller.AddSocket(rep);//将服务器套接字 添加到 轮训器。 Task pollerTask = Task.Factory.StartNew(poller.Start);//启动轮训器 开始轮询,引发服务器rep接收就绪事件 //对于Request类型的socket,它是同步的,它一个时刻只能对一个连接进行操作,在一个连接上发送了数据之后, //必须接着在这个连接上执行recv,也就是send与recv必须同时匹配出现 req.Send("Hello");//客户端发送Hello给服务器端 bool more2; string m1 = req.ReceiveString(out more2);//接收服务器端响应的字符串, Assert.IsFalse(more2);//断言more2为false Assert.AreEqual("World", m1);//断言 服务器响应数据位World poller.Stop();//停止轮训 Thread.Sleep(100); Assert.IsTrue(pollerTask.IsCompleted);//断言轮训器任务完毕 } } } } //https://blog.csdn.net/zhanghe_zht/article/details/88312442?spm=1001.2014.3001.5502 //在REQ和REP中间加入代理:ROUTER和DEALER [Test] //https://blog.csdn.net/weixin_42784553/article/details/105049576 public void AddSocketDuringWorkTest()//测试 工作时添加套接字 { using (NetMQContext contex = NetMQContext.Create())//零消息队列上下文 { // we are using three responses to make sure we actually //move the correct socket and other sockets still work //我们正在使用三个响应来确保我们确实移动了正确的套接字 并且其他套接字仍然有效 using (var router = contex.CreateRouterSocket())//创建路由器套接字1 using (var router2 = contex.CreateRouterSocket())//创建路由器套接字2 { //Router类型的socket是异步的,他可以在随时执行recv与send, //而不必在同一时刻必须要强制在某个连接上进行操作,它会根据标志帧来具体的区分应该在哪一个链接上进行操作 router.Bind("tcp://127.0.0.1:5002"); router2.Bind("tcp://127.0.0.1:5003"); //Dealer类型的socket,这个更简单的,异步。。。它基本上就没做啥工作。 using (var dealer = contex.CreateDealerSocket()) using (var dealer2 = contex.CreateDealerSocket()) using (Poller poller = new Poller())//轮训器 { dealer.Connect("tcp://127.0.0.1:5002");//router1---dealer1 dealer2.Connect("tcp://127.0.0.1:5003");//router2---dealer2 bool router1arrived = false; //路由1标志位 消息抵达路由1 bool router2arrived = false; bool more; //ReceiveReady 事件由轮询器引发 router2.ReceiveReady += (s, a) =>//路由2接收就绪 { router2.Receive(out more); router2.Receive(out more); router2arrived = true; }; //ReceiveReady 事件由轮询器引发 router.ReceiveReady += (s, a) =>//路由1接收就绪 { router1arrived = true; router.Receive(out more);//路由1接收消息 router.Receive(out more); poller.AddSocket(router2);//路由1接收完毕消息,再引发路由器2的接收就绪事件 }; poller.AddSocket(router);//引发路由1的接收就绪事件 Task task = Task.Factory.StartNew(poller.Start);//开始轮询路由器套接字 dealer.Send("1"); Thread.Sleep(300); dealer2.Send("2"); Thread.Sleep(300); poller.Stop(true); task.Wait();//等待task完成 Assert.IsTrue(router1arrived);// 消息抵达路由1 Assert.IsTrue(router2arrived); } } } } [Test] public void AddSocketAfterRemovingTest() // { using (NetMQContext contex = NetMQContext.Create()) //创建零消息队列上下文 { // we are using three responses to make sure we actually //move the correct socket and other sockets still work //我们正在使用三个响应来确保我们确实移动了正确的套接字并且其他套接字仍然有效 using (var router = contex.CreateRouterSocket()) using (var router2 = contex.CreateRouterSocket()) using (var router3 = contex.CreateRouterSocket()) { router.Bind("tcp://127.0.0.1:5002"); router2.Bind("tcp://127.0.0.1:5003"); router3.Bind("tcp://127.0.0.1:5004"); using (var dealer = contex.CreateDealerSocket()) using (var dealer2 = contex.CreateDealerSocket()) using (var dealer3 = contex.CreateDealerSocket()) using (Poller poller = new Poller()) { dealer.Connect("tcp://127.0.0.1:5002"); dealer2.Connect("tcp://127.0.0.1:5003"); dealer3.Connect("tcp://127.0.0.1:5004"); bool router1arrived = false; bool router2arrived = false; bool router3arrived = false; bool more; //The ReceiveReady event is raised by the Poller router.ReceiveReady += (s, a) => { router1arrived = true; router.Receive(out more); router.Receive(out more); poller.RemoveSocket(router);//轮训器引发路由器接收就绪事件后,从轮训器移除 }; poller.AddSocket(router);//添加路由器套接字到轮训器 //The ReceiveReady event is raised by the Poller router3.ReceiveReady += (s, a) => { router3.Receive(out more); router3.Receive(out more); router3arrived = true; }; //The ReceiveReady event is raised by the Poller router2.ReceiveReady += (s, a) => { router2arrived = true; router2.Receive(out more); router2.Receive(out more); poller.AddSocket(router3); }; poller.AddSocket(router2); Task task = Task.Factory.StartNew(poller.Start); dealer.Send("1"); Thread.Sleep(300); dealer2.Send("2"); Thread.Sleep(300); dealer3.Send("3"); Thread.Sleep(300); poller.Stop(true); task.Wait(); Assert.IsTrue(router1arrived); Assert.IsTrue(router2arrived); Assert.IsTrue(router3arrived); } } } } [Test] public void AddTwoSocketAfterRemovingTest() { using (NetMQContext contex = NetMQContext.Create()) { // we are using three responses to make sure we actually //move the correct socket and other sockets still work using (var router = contex.CreateRouterSocket()) using (var router2 = contex.CreateRouterSocket()) using (var router3 = contex.CreateRouterSocket()) using (var router4 = contex.CreateRouterSocket()) { router.Bind("tcp://127.0.0.1:5002"); router2.Bind("tcp://127.0.0.1:5003"); router3.Bind("tcp://127.0.0.1:5004"); router4.Bind("tcp://127.0.0.1:5005"); using (var dealer = contex.CreateDealerSocket()) using (var dealer2 = contex.CreateDealerSocket()) using (var dealer3 = contex.CreateDealerSocket()) using (var dealer4 = contex.CreateDealerSocket()) using (Poller poller = new Poller()) { dealer.Connect("tcp://127.0.0.1:5002"); dealer2.Connect("tcp://127.0.0.1:5003"); dealer3.Connect("tcp://127.0.0.1:5004"); dealer4.Connect("tcp://127.0.0.1:5005"); int router1arrived = 0; int router2arrived = 0; bool router3arrived = false; bool router4arrived = false; bool more; //The ReceiveReady event is raised by the Poller router.ReceiveReady += (s, a) => { router1arrived++; //运行路由器套接字的次数 router.Receive(out more); router.Receive(out more); poller.RemoveSocket(router);//运行后移除 }; poller.AddSocket(router); //The ReceiveReady event is raised by the Poller router3.ReceiveReady += (s, a) => { router3.Receive(out more); router3.Receive(out more); router3arrived = true; }; //The ReceiveReady event is raised by the Poller router4.ReceiveReady += (s, a) => { router4.Receive(out more); router4.Receive(out more); router4arrived = true; }; //The ReceiveReady event is raised by the Poller router2.ReceiveReady += (s, a) => { router2arrived++; router2.Receive(out more); router2.Receive(out more); if (router2arrived == 1) { poller.AddSocket(router3); poller.AddSocket(router4); } }; poller.AddSocket(router2); Task task = Task.Factory.StartNew(poller.Start); dealer.Send("1"); Thread.Sleep(300); dealer2.Send("2"); Thread.Sleep(300); dealer3.Send("3"); dealer4.Send("4"); dealer2.Send("2"); dealer.Send("1"); Thread.Sleep(300);//停止轮训前 确保轮询任务执行完毕 poller.Stop(true);//停止轮询 task.Wait(); router.Receive(true, out more);// Assert.IsTrue(more); router.Receive(true, out more);// Assert.IsFalse(more); Assert.AreEqual(1, router1arrived); Assert.AreEqual(2, router2arrived);//路由器2接收事件执行了两次 Assert.IsTrue(router3arrived); Assert.IsTrue(router4arrived); } } } } [Test] public void CancelSocketTest() //取消套接字测试 { using (NetMQContext contex = NetMQContext.Create()) { // 我们正在使用三个响应来确保我们确实移动了正确的套接字并且其他套接字仍然有效 using (var router = contex.CreateRouterSocket()) using (var router2 = contex.CreateRouterSocket()) using (var router3 = contex.CreateRouterSocket()) { router.Bind("tcp://127.0.0.1:5002"); router2.Bind("tcp://127.0.0.1:5003"); router3.Bind("tcp://127.0.0.1:5004"); using (var dealer = contex.CreateDealerSocket()) using (var dealer2 = contex.CreateDealerSocket()) using (var dealer3 = contex.CreateDealerSocket()) using (Poller poller = new Poller()) { dealer.Connect("tcp://127.0.0.1:5002"); dealer2.Connect("tcp://127.0.0.1:5003"); dealer3.Connect("tcp://127.0.0.1:5004"); bool first = true; //The ReceiveReady event is raised by the Poller router2.ReceiveReady += (s, a) => { bool more; // identity 标识 byte[] identity = a.Socket.Receive(out more);//路由器2 套接字接收字节数组 // message消息 a.Socket.Receive(out more);//接收 a.Socket.SendMore(identity);//发送标识 a.Socket.Send("2");//发送数据 }; poller.AddSocket(router2); //ReceiveReady 事件由轮询器引发 router.ReceiveReady += (s, a) => { if (!first) { Assert.Fail("This should happen because we cancelled the socket"); } first = false; bool more; // identity 标识 a.Socket.Receive(out more); string m = a.Socket.ReceiveString(out more);//数据 Assert.False(more);//断言为 false Assert.AreEqual("Hello", m); //断言接收的数据位Hello //取消套接字 cancelling the socket poller.RemoveSocket(a.Socket);//移除router 套接字。取消套接字:之后不会再触发该路由器套接字的准备就绪事件 }; poller.AddSocket(router); //The ReceiveReady event is raised by the Poller router3.ReceiveReady += (s, a) => { bool more; // identity byte[] identity = a.Socket.Receive(out more); // message a.Socket.Receive(out more); a.Socket.SendMore(identity).Send("3"); }; poller.AddSocket(router3); Task pollerTask = Task.Factory.StartNew(poller.Start);//启动轮询路由器任务 dealer.Send("Hello"); // sending this should not arrive on the poller, //therefore response for this will never arrive dealer.Send("Hello2"); Thread.Sleep(100); // sending this should not arrive on the poller, //therefore response for this will never arrive dealer.Send("Hello3");//发送这个不应该到达轮询器,因此对此的响应永远不会到达 Thread.Sleep(500); bool more2; // 确保在取消之前 定义的套接字仍然有效 dealer2.Send("1"); string msg = dealer2.ReceiveString(out more2); Assert.AreEqual("2", msg); // making sure the socket defined after the one cancelled still works dealer3.Send("1"); msg = dealer3.ReceiveString(out more2); Assert.AreEqual("3", msg); // we have to give this some time if we want to make sure //it's really not happening and it not only because of time //如果我们想确保它真的没有发生,我们必须给它一些时间,这不仅仅是因为时间 Thread.Sleep(300); poller.Stop();//停止轮训器任务 Thread.Sleep(100); Assert.IsTrue(pollerTask.IsCompleted);//断言轮训器poller 任务执行完毕 } } } } }
实例5-ConcurrentQueueDemo 并发队列演示
输出结果
public class Program { public void Run() { //NOTES //1、Use many threads each writing to ConcurrentQueue //2、Extra thread to read from ConcurrentQueue, and this is the one that // will deal with writing to the server //1、使用多个线程,每个线程都写入 ConcurrentQueue并发队列 //2。从 ConcurrentQueue 读取的额外线程,这是处理写入服务器的线程 ConcurrentQueue
参考:
https://netmq.readthedocs.io/en/latest/ NetMQ官方文档
https://github.com/zeromq/netmq 官方源码
https://zguide.zeromq.org/
https://www.cnblogs.com/peterYong/p/11157298.html
https://www.cnblogs.com/rainbowzc/p/3357594.html
https://ssup2.github.io/theory_analysis/ZeroMQ/
https://blog.csdn.net/u011285477/article/details/100688003?spm=1001.2014.3001.5502
https://blog.csdn.net/tjcwt2011/article/details/81542944
电子书下载:
「ZeroMQ」https://www.aliyundrive.com/s/RGPoWUWrjn6 点击链接保存,或者复制本段内容,打开「阿里云盘」APP ,无需下载极速在线查看,视频原画倍速播放。
The End