博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
c#开源消息队列中间件EQueue 教程
阅读量:5789 次
发布时间:2019-06-18

本文共 10161 字,大约阅读时间需要 33 分钟。

一、简介

EQueue是一个参照RocketMQ实现的开源消息队列中间件,具体可以参看作者的文章《》。项目开源地址:,项目中包含了队列的全部源代码以及如何使用的示例。

二、安装EQueue

Producer、Consumer、Broker支持分布式部署,安装EQueue需要.NET 4, Visual Studio 2010/2012/2013. 目前EQueue是个类库,需要自己实现Broker的宿主,可以参照QuickStart,创建一个QuickStart.BrokerServer项目,通过Visual Studio的Nuget 查找equeue

using System;using System.Text;using ECommon.Autofac;using ECommon.Configurations;using ECommon.JsonNet;using ECommon.Log4Net;using EQueue.Broker;using EQueue.Configurations;using EQueue.Protocols;namespace QuickStart.BrokerServer{    class Program    {        static void Main(string[] args)        {            InitializeEQueue();            var setting = new BrokerSetting();            setting.NotifyWhenMessageArrived = false;            setting.DeleteMessageInterval = 1000;            new BrokerController(setting).Initialize().Start();            Console.ReadLine();        }        static void InitializeEQueue()        {            Configuration                .Create()                .UseAutofac()                .RegisterCommonComponents()                .UseLog4Net()                .UseJsonNet()                .RegisterEQueueComponents();        }    }}

InitializeEQueue方法初始化EQueue的环境,使用了Autofac作为IOC容器,使用log4Net记录日志, 我们看一下RegisterEQueueComponents方法:

public static class ConfigurationExtensions    {        public static Configuration RegisterEQueueComponents(this Configuration configuration)        {            configuration.SetDefault
(); configuration.SetDefault
(); configuration.SetDefault
(); configuration.SetDefault
(); configuration.SetDefault
(); configuration.SetDefault
(); return configuration; } }

代码中涉及到6个组件:

  • IAllocateMessageQueueStrategy
  • IQueueSelector
  • ILocalOffsetStore
  • IMessageStore
  • IMessageService
  • IOffsetManager

DeleteMessageInterval 这个属性是用来设置equeue的定时删除间隔,单位为毫秒,默认值是一个小时。另外还有ProducerSocketSetting 和 ConsumerSocketSetting 分别用于设置Producer连接Broker和Consumer连接Broker的IP和端口,默认端口是5000和5001。

public class BrokerSetting    {        public SocketSetting ProducerSocketSetting { get; set; }        public SocketSetting ConsumerSocketSetting { get; set; }        public bool NotifyWhenMessageArrived { get; set; }        public int DeleteMessageInterval { get; set; }        public BrokerSetting()        {            ProducerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5000, Backlog = 5000 };            ConsumerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5001, Backlog = 5000 };            NotifyWhenMessageArrived = true;            DeleteMessageInterval = 1000 * 60 * 60;        }    }

运行项目,如果显示下面类似内容,说明Broker启动成功:

2014-03-23 20:10:30,255  INFO BrokerController - Broker started, producer:[169.254.80.80:5000], consumer:[169.254.80.80:5001]

三、在Visual Studio中开发测试

1.创建一个VS项目 QuickStart.ProducerClient,通过Nuget引用EQueue,编写下面Producer代码

using System;using System.Collections.Generic;using System.Diagnostics;using System.Text;using System.Threading;using System.Threading.Tasks;using ECommon.Autofac;using ECommon.Configurations;using ECommon.IoC;using ECommon.JsonNet;using ECommon.Log4Net;using ECommon.Scheduling;using EQueue.Clients.Producers;using EQueue.Configurations;using EQueue.Protocols;namespace QuickStart.ProducerClient{    class Program    {        static void Main(string[] args)        {            InitializeEQueue();            var scheduleService = ObjectContainer.Resolve
(); var producer = new Producer().Start(); var total = 1000; var parallelCount = 10; var finished = 0; var messageIndex = 0; var watch = Stopwatch.StartNew(); var action = new Action(() => { for (var index = 1; index <= total; index++) { var message = "message" + Interlocked.Increment(ref messageIndex); producer.SendAsync(new Message("SampleTopic", Encoding.UTF8.GetBytes(message)), index.ToString()).ContinueWith(sendTask => { var finishedCount = Interlocked.Increment(ref finished); if (finishedCount % 1000 == 0) { Console.WriteLine(string.Format("Sent {0} messages, time spent:{1}", finishedCount, watch.ElapsedMilliseconds)); } }); } }); var actions = new List
(); for (var index = 0; index < parallelCount; index++) { actions.Add(action); } Parallel.Invoke(actions.ToArray()); Console.ReadLine(); } static void InitializeEQueue() { Configuration .Create() .UseAutofac() .RegisterCommonComponents() .UseLog4Net() .UseJsonNet() .RegisterEQueueComponents(); } }}

Producer对象在使用之前必须要调用Start初始化,初始化一次即可, 注意:切记不可以在每次发送消息时,都调用Start方法。Producer 默认连接本机的5000端口,可以通过ProducerSetting 进行设置,可以参看下面的代码:

public class ProducerSetting    {        public string BrokerAddress { get; set; }        public int BrokerPort { get; set; }        public int SendMessageTimeoutMilliseconds { get; set; }        public int UpdateTopicQueueCountInterval { get; set; }        public ProducerSetting()        {            BrokerAddress = SocketUtils.GetLocalIPV4().ToString();            BrokerPort = 5000;            SendMessageTimeoutMilliseconds = 1000 * 10;            UpdateTopicQueueCountInterval = 1000 * 5;        }

2、创建一个VS项目 QuickStart.ConsumerClient,通过Nuget引用EQueue,编写下面Consumer代码

using System;using System.Linq;using System.Text;using System.Threading;using ECommon.Autofac;using ECommon.Configurations;using ECommon.IoC;using ECommon.JsonNet;using ECommon.Log4Net;using ECommon.Scheduling;using EQueue.Broker;using EQueue.Clients.Consumers;using EQueue.Configurations;using EQueue.Protocols;namespace QuickStart.ConsumerClient{    class Program    {        static void Main(string[] args)        {            InitializeEQueue();            var messageHandler = new MessageHandler();            var consumer1 = new Consumer("Consumer1", "group1").Subscribe("SampleTopic").Start(messageHandler);            var consumer2 = new Consumer("Consumer2", "group1").Subscribe("SampleTopic").Start(messageHandler);            var consumer3 = new Consumer("Consumer3", "group1").Subscribe("SampleTopic").Start(messageHandler);            var consumer4 = new Consumer("Consumer4", "group1").Subscribe("SampleTopic").Start(messageHandler);            Console.WriteLine("Start consumer load balance, please wait for a moment.");            var scheduleService = ObjectContainer.Resolve
(); var waitHandle = new ManualResetEvent(false); var taskId = scheduleService.ScheduleTask(() => { var c1AllocatedQueueIds = consumer1.GetCurrentQueues().Select(x => x.QueueId); var c2AllocatedQueueIds = consumer2.GetCurrentQueues().Select(x => x.QueueId); var c3AllocatedQueueIds = consumer3.GetCurrentQueues().Select(x => x.QueueId); var c4AllocatedQueueIds = consumer4.GetCurrentQueues().Select(x => x.QueueId); if (c1AllocatedQueueIds.Count() == 1 && c2AllocatedQueueIds.Count() == 1 && c3AllocatedQueueIds.Count() == 1 && c4AllocatedQueueIds.Count() == 1) { Console.WriteLine(string.Format("Consumer load balance finished. Queue allocation result: c1:{0}, c2:{1}, c3:{2}, c4:{3}", string.Join(",", c1AllocatedQueueIds), string.Join(",", c2AllocatedQueueIds), string.Join(",", c3AllocatedQueueIds), string.Join(",", c4AllocatedQueueIds))); waitHandle.Set(); } }, 1000, 1000); waitHandle.WaitOne(); scheduleService.ShutdownTask(taskId); Console.ReadLine(); } static void InitializeEQueue() { Configuration .Create() .UseAutofac() .RegisterCommonComponents() .UseLog4Net() .UseJsonNet() .RegisterEQueueComponents(); } } class MessageHandler : IMessageHandler { private int _handledCount; public void Handle(QueueMessage message, IMessageContext context) { var count = Interlocked.Increment(ref _handledCount); if (count % 1000 == 0) { Console.WriteLine("Total handled {0} messages.", count); } context.OnMessageHandled(message); } }}

使用方式给用户感觉是消息从EQueue服务器推到了应用客户端。 但是实际Consumer内部是使用长轮询Pull方式从EQueue服务器拉消息,然后再回调用户Listener方法。Consumer默认连接本机的5001端口,可以通过ConsumerSetting 进行设置,可以参看下面的代码:

public class ConsumerSetting    {        public string BrokerAddress { get; set; }        public int BrokerPort { get; set; }        public int RebalanceInterval { get; set; }        public int UpdateTopicQueueCountInterval { get; set; }        public int HeartbeatBrokerInterval { get; set; }        public int PersistConsumerOffsetInterval { get; set; }        public PullRequestSetting PullRequestSetting { get; set; }        public MessageModel MessageModel { get; set; }        public MessageHandleMode MessageHandleMode { get; set; }        public ConsumerSetting()        {            BrokerAddress = SocketUtils.GetLocalIPV4().ToString();            BrokerPort = 5001;            RebalanceInterval = 1000 * 5;            HeartbeatBrokerInterval = 1000 * 5;            UpdateTopicQueueCountInterval = 1000 * 5;            PersistConsumerOffsetInterval = 1000 * 5;            PullRequestSetting = new PullRequestSetting();            MessageModel = MessageModel.Clustering;            MessageHandleMode = MessageHandleMode.Parallel;        }

本文转自 张善友 51CTO博客,原文链接:http://blog.51cto.com/shanyou/1381850,如需转载请自行联系原作者

你可能感兴趣的文章
iOS的主要框架介绍 (转载)
查看>>
react报错this.setState is not a function
查看>>
poj 1183
查看>>
从根本解决跨域(nginx部署解决方案)
查看>>
javascript实现的一个信息提示的小功能/
查看>>
Centos7.x:开机启动服务的配置和管理
查看>>
HTML5 浏览器返回按钮/手机返回按钮事件监听
查看>>
xss
查看>>
iOS:百度长语音识别具体的封装:识别、播放、进度刷新
查看>>
JS获取服务器时间并且计算距离当前指定时间差的函数
查看>>
华为硬件工程师笔试题
查看>>
jquery居中窗口-页面加载直接居中
查看>>
cd及目录快速切换
查看>>
Unity Shaders and Effects Cookbook (3-5) 金属软高光
查看>>
31-hadoop-hbase-mapreduce操作hbase
查看>>
C++ 代码风格准则:POD
查看>>
linux-友好显示文件大小
查看>>
【转】【WPF】WPF中MeasureOverride ArrangeOverride 的理解
查看>>
【转】二叉树的非递归遍历
查看>>
NYOJ283对称排序
查看>>