玖富娱乐平台全网唯一指定1956注册开户网站

RabbitMQ与.net core(二)Producer与Exchange_玖富娱乐主管

日期:2019-01-08 浏览:
玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。

Producer:音讯的生产者,也就是建立音讯的工具

Exchange:音讯的接收者,也就是用来吸收音讯的工具,Exchange吸收到音讯后将音讯依照划定规矩发送到与他绑定的Queue中。下面我们来界说一个Producer与Exchange。

1.新建.netcore console项目,并引入RabbitMQ.Client的Nuget包

2.建立Exchange

using RabbitMQ.Client;

namespace RabbitMQConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "39.**.**.**";
            factory.Port = 5672;
            factory.VirtualHost = "/";
            factory.UserName = "root";
            factory.Password = "root";

            var exchange = "change2";
            var route = "route2";
            var queue = "queue2";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type:"direct", durable: true, autoDelete: false);   //建立Exchange
                    
                }
            }
        }
    }
}

能够看到Echange的参数有:

type:可选项为,fanout,direct,topic,headers。区分以下:

    fanout:发送到一切与以后Exchange绑定的Queue中

    direct:发送到与音讯的routeKey雷同的Rueue中

    topic:fanout的隐约版本

    headers:发送到与音讯的header属性雷同的Queue中

durable:耐久化

autoDelete:当末了一个绑定(行列或许exchange)被unbind以后,该exchange自动被删除。

 运转顺序,能够在可视化界面看到change2

接下来我们能够建立与change2绑定的queue

3.建立Queue

                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);  #建立queue2
                    channel.QueueBind(queue, exchange, route);  #将queue2绑定到exchange2
                }

能够看到Echange的参数有:

durable:耐久化

exclusive:若是为true,则queue只在channel存在时存在,channel封闭则queue消逝

autoDelete:当末了一个绑定(行列或许exchange)被unbind以后,该exchange自动被删除。

去可视化界面看Queue

-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。-

4.发送音讯

                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                    channel.QueueBind(queue, exchange, route);
                    var props = channel.CreateBasicProperties();
                    props.Persistent = true; #耐久化
                    channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
                }

5.消耗音讯

using RabbitMQ.Client;
using System;
using System.Text;

namespace RabbitMQClient
{
    class Program
    {
        private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
        {
            HostName = "39.**.**.**",
            Port = 5672,
            UserName = "root",
            Password = "root",
            VirtualHost = "/"
        };
        static void Main(string[] args)
        {
            var exchange = "change2";
            var route = "route2";
            var queue = "queue2";


            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                channel.QueueBind(queue, exchange, route);
                while (true)
                {
                    var message = channel.BasicGet(queue, true);  #第二个参数申明自动开释音讯,如为false需手动开释音讯
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***吸收时候:{0},音讯内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    }
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }
            }
        }
    }
}

运转检察效果

检察可视化界面

6.手动开释音讯

                while (true)
                {
                    var message = channel.BasicGet(queue, false);#设置为手动开释
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***吸收时候:{0},音讯内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    }
                    channel.BasicAck(message.DeliveryTag, false); #手动开释
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }

我们再发一条音讯,然后最先消耗,加个断点调试一下

检察一下Queue中音讯状况

然后直接作废调试,不让顺序走到开释的那一步,再检察一下音讯状况

这么说来只要不走到 channel.BasicAck(message.DeliveryTag, false);这一行,音讯就不会被开释掉,我们让顺序直接走到这一行代码,检察一下音讯的状况

如图已被开释了

7.让失利的音讯回到行列中

                while (true)
                {
                    var message = channel.BasicGet(queue, false);
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***吸收时候:{0},音讯内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        Console.WriteLine(message.DeliveryTag);    #以后音讯被处置惩罚的次序数
                        if (1==1)
                            channel.BasicReject(message.DeliveryTag, true);
                    }
                    
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }

从新发送4条音讯

最先消耗

我们能够看到音讯一向没有没消耗,由于音讯被处置惩罚以后又放到了队尾

8.监听音讯

 using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                channel.QueueBind(queue, exchange, route);

                channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);  #一次接收10条音讯,不然rabbit会把一切的音讯一次性推到client,会增大client的负荷
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received  = (model, ea) =>
                {
                    Byte[] body = ea.Body;
                    String message = Encoding.UTF8.GetString(body);
                    Console.WriteLine( message Thread.CurrentThread.ManagedThreadId);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };

                channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
                Console.ReadLine();
            }

 

-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。


平台新闻

联系方式丨CONTACT

  • 全国热线:7711177
  • 传真热线:010-88888888
  • Q Q咨询:7711177
  • 企业邮箱:
首页
电话
短信