博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ学习之:(七)Fanout Exchange (转贴+我的评论)
阅读量:7267 次
发布时间:2019-06-29

本文共 5255 字,大约阅读时间需要 17 分钟。

From:http://lostechies.com/derekgreer/2012/05/16/rabbitmq-for-windows-fanout-exchanges/

 

PunCha: There is not too much to say about this topic....

 

 

RabbitMQ for Windows: Fanout Exchanges

This is the sixth installment to the series: RabbitMQ for Windows. In the , we walked through creating a direct exchange example and introduced the push API. In this installment, we’ll walk through a fanout exchange example.

As discussed earlier in the series, the fanout exchange type is useful for facilitating the  pattern. When we publish a message to a fanout exchange, the message is delivered indiscriminately to all bound queues. With the Direct, Topic, and Headers exchange types, a criteria is used by a routing algorithm taking the form of a routing key or a collection of message headers depending on the exchange type in question. A routing key or a collection of message headers may also be specified with the fanout exchange which will be delivered as part of the message’s metadata, but they will not be used as a filter in determining which queue receives a published message.

To demonstrate the fanout exchange, we’ll use a stock ticker example. In the previous example, logs were routed to queues based upon a matching routing key (an empty string in the logging example’s case). In this example, we’d like our messages to be delivered to all bound queues regardless of qualification.

Similar to the previous example, we’ll create a Producer console application which periodically publishes stock quote messages and a Consumer console application which displays the message to the console.

We’ll start our Producer app as before by establishing a connection using the default settings, creating the connection, and creating a channel:

namespace Producer{
class Program { static volatile bool _cancelling; static void Main(string[] args) { var connectionFactory = new ConnectionFactory(); IConnection connection = connectionFactory.CreateConnection(); IModel channel = connection.CreateModel(); } } }

Next, we need to declare an exchange of type “fanout”. We’ll name our new exchange “fanout-exchange-example”:

channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Fanout, false, true, null);

To publish the stock messages periodically, we’ll call a PublishQuotes() method with the provided channel and run it on a background thread:

var thread = new Thread(() => PublishQuotes(channel)); thread.Start();

Next, we’ll provide a way to exit the application by prompting the user to enter ‘x’ and use a simple Boolean to signal the background thread when to exit:

Console.WriteLine("Press 'x' to exit"); var input = (char) Console.Read(); _cancelling = true;

Lastly, we need to close the channel and connection:

channel.Close();connection.Close();

For our PublishQuotes() method, well iterate over a set of stock symbols, retrieve the stock information for each symbol, and publish a simple string-based message in the form [symbol]:[price]:

static void PublishQuotes(IModel channel) { while (true) { if (_cancelling) return; IEnumerable quotes = FetchStockQuotes(new[] { "GOOG", "HD", "MCD"}); foreach (string quote in quotes) { byte[] message = Encoding.UTF8.GetBytes(quote); channel.BasicPublish("direct-exchange-example", "", null, message); } Thread.Sleep(5000); } }

To implement the FetchStockQuotes() method, we’ll use the Yahoo Finance API which entails retrieving an XML-based list of stock quotes and parsing out the bit of information we’re interested in for our example:

static IEnumerable
FetchStockQuotes(string[] symbols) { var quotes = new List
(); string url = string.Format("http://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quotes%20where%20symbol%20in%20({0})&env=store://datatables.org/alltableswithkeys", String.Join("%2C", symbols.Select(s => "%22" + s + "%22"))); var wc = new WebClient { Proxy = WebRequest.DefaultWebProxy}; var ms = new MemoryStream(wc.DownloadData(url)); var reader = new XmlTextReader(ms); XDocument doc = XDocument.Load(reader); XElement results = doc.Root.Element("results"); foreach (string symbol in symbols) { XElement q = results.Elements("quote").First(w => w.Attribute("symbol").Value == symbol); quotes.Add(symbol + ":" + q.Element("AskRealtime").Value); } return quotes; }

Here is the complete Producer listing:

using System;using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Net; using System.Text; using System.Threading; using System.Xml; using System.Xml.Linq; using RabbitMQ.Client; namespace Producer { class Program { static volatile bool _cancelling; static void Main(string[] args) { var connectionFactory = new ConnectionFactory(); IConnection connection = connectionFactory.CreateConnection(); IModel channel = connection.CreateModel(); channel.ExchangeDeclare("direct-exchange-example", ExchangeType.Fanout, false, true, null); var thread = new Thread(() => PublishQuotes(channel)); thread.Start(); Console.WriteLine("Press 'x' to exit"); var input = (char) Console.Read(); _cancelling = true; channel.Close(); connection.Close(); } static void PublishQuotes(IModel channel) { while (true) { if (_cancelling) return; IEnumerable quotes =

转载地址:http://vlrdm.baihongyu.com/

你可能感兴趣的文章
估算之痛
查看>>
[20171206]rman与truncate2.txt
查看>>
谈消息总线的路由模型
查看>>
结构体中指针赋值问题的分析及C代码示例
查看>>
java并发编程学习:如何等待多个线程执行完成后再继续后续处理(synchronized、join、FutureTask、CyclicBarrier)...
查看>>
TortoiseGit配合msysGit在Git@OSC代码托管的傻瓜教程
查看>>
边缘计算“CROSS”欧洲新战场
查看>>
WeUI—微信官方UI库
查看>>
KNIMI数据挖掘建模与分析系列_004_利用KNIMI做客户流失预测
查看>>
Nginx中的root&alias文件路径及索引目录配置详解
查看>>
用Keras开发字符级神经网络语言模型
查看>>
Socket编程中的强制关闭与优雅关闭及相关socket选项
查看>>
1682亿!!阿里工程师如何喝着茶创造双11奇迹?
查看>>
《音乐达人秀:Adobe Audition实战200例》——1.3 数字录音记录生活越来越便捷
查看>>
东半球最先进的 debug 技巧
查看>>
《CCNP安全防火墙642-618认证考试指南》——第1章Cisco ASA自适应安全设备概述
查看>>
ToroDB —— 基于 PostgreSQL 的 JSON 数据库
查看>>
尊敬的开发世界,现出你的梦魇吧,我来了
查看>>
《Java多线程编程核心技术》——1.9节yield方法
查看>>
《WebGL入门指南》——第2章,第2.5节本章小结
查看>>