I. Introduction
For fanout type Exchange, the routing rule is very simple: it will route all messages sent to the Exchange to all queues bound to it. Suppose there is a chat room where all clients subscribe to the same fanout exchange type, then all clients can receive the messages sent by each client, because everyone has subscribed. At this time, you only need to limit it briefly. Only messages related to me can be displayed on the chat interface. In this way, mutual communication can be achieved.
II. Examples
2.1 environmental preparation
This example is implemented with EasyNetQ. Please install it on NuGet first.
2.2 entity
Create a new entity class MessageBody:
public class MessageBody { public string FromUserId { get; set; } public string Message { get; set; } public string ToUserId { get; set; } }
2.3 main window
Create a new ChatMain form:
The code is as follows:
public partial class ChatMain : Form { public ChatMain() { InitializeComponent(); } /// <summary> /// Client A /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void Button1_Click(object sender, EventArgs e) { ChatWith chatWith = new ChatWith(currentUserId: "UserA") { StartPosition = FormStartPosition.CenterScreen }; chatWith.Show(); } /// <summary> /// Client B /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void Button2_Click(object sender, EventArgs e) { ChatWith chatWith = new ChatWith(currentUserId: "UserB") { StartPosition = FormStartPosition.CenterScreen }; chatWith.Show(); } /// <summary> /// Client C /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void Button3_Click(object sender, EventArgs e) { ChatWith chatWith = new ChatWith(currentUserId: "UserC") { StartPosition = FormStartPosition.CenterScreen }; chatWith.Show(); } /// <summary> /// Client D /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void Button4_Click(object sender, EventArgs e) { ChatWith chatWith = new ChatWith(currentUserId: "UserD") { StartPosition = FormStartPosition.CenterScreen }; chatWith.Show(); } }
2.4 client form
Create a new ChatWith form:
The code is as follows:
public partial class ChatWith : Form { public delegate void ChatWithDelegate(); public delegate void ChatWithDelegate<T1>(T1 obj1); public delegate void ChatWithDelegate<T1, T2>(T1 obj1, T2 obj2); public string CurrentUserId { get; } private IBus bus; public const string ConnStringMQ = "host=192.168.2.242:5672,192.168.2.165:5672;virtualHost=/;username=hello;password=world"; public const string FanoutExchange = "fanoutEC"; /// <summary> /// Parametrical constructor /// </summary> /// <param name="currentUserId"></param> public ChatWith(string currentUserId) { InitializeComponent(); //In a multithreaded program, the newly created thread cannot be accessed UI A window control created by a thread. //At this time, if you want to access the control of the form, you can use the CheckForIllegalCrossThreadCalls Set to false. //At this point, the thread can safely access the form control. CheckForIllegalCrossThreadCalls = false; CurrentUserId = currentUserId; } /// <summary> /// ShowMessage heavy load /// </summary> /// <param name="msg"></param> private void ShowMessage(string msg) { if (InvokeRequired)//InvokeRequired: When the current thread is not the thread that created the control true { BeginInvoke(new ChatWithDelegate<string>(ShowMessage), msg); } else { ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msg }); lvwReceiveMsg.Items.Insert(0, item); } } /// <summary> /// ShowMessage heavy load /// </summary> /// <param name="toUserId"></param> /// <param name="msg"></param> private void ShowMessage(string toUserId, string msg) { if (InvokeRequired) { BeginInvoke(new ChatWithDelegate<string, string>(ShowMessage), toUserId, msg); } else { ListViewItem item = new ListViewItem(new string[] { DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), toUserId, msg }); lvwReceiveMsg.Items.Insert(0, item); } } /// <summary> /// Bind queue and subscribe /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void ChatWith_Load(object sender, EventArgs e) { cmbOnLine.SelectedIndex = 0; Text = Text + $"[{CurrentUserId}]"; //Not available here using,Otherwise, the subscriber will be released immediately and cannot subscribe to the message. bus = RabbitHutch.CreateBus(ConnStringMQ); { if (bus.IsConnected) { var exchange = bus.Advanced.ExchangeDeclare(name: FanoutExchange, type: ExchangeType.Fanout); var queue = bus.Advanced.QueueDeclare(name: $"{FanoutExchange}_queue_{CurrentUserId}"); bus.Advanced.Bind(exchange: exchange, queue: queue, routingKey: ""); bus.Advanced.Consume(queue, registration => { registration.Add<MessageBody>((message, info) => { if (message.Body.ToUserId == CurrentUserId) { ShowMessage(message.Body.FromUserId, message.Body.Message); } }); }); } else { ShowMessage("Server connection failed."); } } } /// <summary> /// Send out /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void btnSend_Click(object sender, EventArgs e) { try { using (var bus = RabbitHutch.CreateBus(ConnStringMQ)) { if (bus.IsConnected) { if (cmbOnLine.Text == "*")//Group sending { foreach (var item in cmbOnLine.Items.Cast<string>().Where(s => s != "*" && s != CurrentUserId)) { var exchange = bus.Advanced.ExchangeDeclare(name: FanoutExchange, type: ExchangeType.Fanout); var messageBody = new MessageBody { FromUserId = CurrentUserId, Message = txtSendMsg.Text, ToUserId = item }; bus.Advanced.Publish(exchange: exchange, routingKey: "", mandatory: false, message: new Message<MessageBody>(messageBody)); } } else//Private chat { var exchange = bus.Advanced.ExchangeDeclare(name: FanoutExchange, type: ExchangeType.Fanout); var messageBody = new MessageBody { FromUserId = CurrentUserId, Message = txtSendMsg.Text, ToUserId = cmbOnLine.Text }; bus.Advanced.Publish(exchange: exchange, routingKey: "", mandatory: false, message: new Message<MessageBody>(messageBody)); } } else { ShowMessage("Failed to send message."); } } } catch (Exception ex) { ShowMessage(ex.Message); } } /// <summary> /// Close /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void btnClose_Click(object sender, EventArgs e) { Close(); } /// <summary> /// Form close event /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void ChatWith_FormClosed(object sender, FormClosedEventArgs e) { bus?.Dispose(); } }