【RabbitMQ】三种Exchange模式——订阅、路由、通配符模式

https://blog.csdn.net/ww130929/article/details/72842234

https://blog.csdn.net/ww130929/article/details/72835517

https://blog.csdn.net/ww130929/article/details/72818303

https://blog.csdn.net/u013046597/article/details/72817959

 

标准的AMQP Exchange有4种: Direct, Topic, Headers, Fanout, 根据实际需要选择。
Direct: 路由模式,如果消息的routing key与bingding的routing key直接匹配的话, 消息将会路由到该队列上。
Topic: 通配符模式,如果消息的routing key与bingding的routing key符合通配符匹配的话, 消息将会路由到该队列上。(基本思想和路由模式是一样的,只不过路由键支持模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词)
Headers: 如果消息参数表中的头信息和值都与binding参数表中相匹配, 消息将会路由到该队列上。
Fanout: 订阅模式,不管消息的routing key和参数表的头信息/值是什么, 消息将会路由到该队列上。

 

   前两篇博客介绍了两种队列模式,这篇博客介绍订阅、路由和通配符模式,之所以放在一起介绍,是因为这三种模式都是用了Exchange交换机,消息没有直接发送到队列,而是发送到了交换机,经过队列绑定交换机到达队列。

 

一、订阅模式(Fanout Exchange):

   一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的exchange上面,那么该消息将会丢失,这是因为在rabbitMQ中exchange不具备存储消息的能力,只有队列具备存储消息的能力。

      

         

 

示例代码:

生产者:

 

public class Send {
  1. private final static String EXCHANGE_NAME = “test_exchange_fanout”;
  2. public static void main(String[] argv) throws Exception {
  3. // 获取到连接以及mq通道
  4. Connection connection = ConnectionUtil.getConnection();
  5. //从连接中创建通道
  6. Channel channel = connection.createChannel();
  7. // 声明exchange
  8. channel.exchangeDeclare(EXCHANGE_NAME, “fanout”);
  9. // 消息内容
  10. String message = “商品已经新增,id = 1000”;
  11. channel.basicPublish(EXCHANGE_NAME, “”, null, message.getBytes());
  12. System.out.println(” [x] Sent ‘” + message + “‘”);
  13. channel.close();
  14. connection.close();
  15. }
  16. }

 

消费者1:

 

public class Recv {
  1. private final static String QUEUE_NAME = “test_queue_fanout_1”;
  2. private final static String EXCHANGE_NAME = “test_exchange_fanout”;
  3. public static void main(String[] argv) throws Exception {
  4. // 获取到连接以及mq通道
  5. Connection connection = ConnectionUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 声明队列
  8. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  9. // 绑定队列到交换机
  10. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “”);
  11. // 同一时刻服务器只会发一条消息给消费者
  12. channel.basicQos(1);
  13. // 定义队列的消费者
  14. QueueingConsumer consumer = new QueueingConsumer(channel);
  15. // 监听队列,手动返回完成
  16. channel.basicConsume(QUEUE_NAME, true, consumer);
  17. // 获取消息
  18. while (true) {
  19. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  20. String message = new String(delivery.getBody());
  21. System.out.println(” 前台系统: ‘” + message + “‘”);
  22. Thread.sleep(10);
  23. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  24. }
  25. }
  26. }

消费者2的代码和消费者1的代码大致相同,只是队列的名称不一样,这样两个消费者有自己的队列,都可以接收到生产者发送的消息


但是如果生产者有新增商品,修改商品,删除商品的消息,消费者包快前台系统和搜索系统,要求前台系统接收修改和删除商品的消息,搜索系统接收新增商品、修改商品和删除商品的消息。所以使用这种订阅模式实现商品数据的同步并不合理。因此我们介绍下一种模式:路由模式。

 

二、路由模式(Direct Exchange)

  这种模式添加了一个路由键,生产者发布消息的时候添加路由键,消费者绑定队列到交换机时添加键值,这样就可以接收到需要接收的消息。

       


 

示例代码:

生产者:

 

public class Send {
  1. private final static String EXCHANGE_NAME = “test_exchange_direct”;
  2. public static void main(String[] argv) throws Exception {
  3. // 获取到连接以及mq通道
  4. Connection connection = ConnectionUtil.getConnection();
  5. Channel channel = connection.createChannel();
  6. // 声明exchange
  7. channel.exchangeDeclare(EXCHANGE_NAME, “direct”);
  8. // 消息内容
  9. String message = “删除商品, id = 1001”;
  10. channel.basicPublish(EXCHANGE_NAME, “delete”, null, message.getBytes());
  11. System.out.println(” [x] Sent ‘” + message + “‘”);
  12. channel.close();
  13. connection.close();
  14. }
  15. }

 

消费者1:接收更新和删除消息

 

public class Recv {
  1. private final static String QUEUE_NAME = “test_queue_direct_1”;
  2. private final static String EXCHANGE_NAME = “test_exchange_direct”;
  3. public static void main(String[] argv) throws Exception {
  4. // 获取到连接以及mq通道
  5. Connection connection = ConnectionUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 声明队列
  8. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  9. // 绑定队列到交换机
  10. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “update”);
  11. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “delete”);
  12. // 同一时刻服务器只会发一条消息给消费者
  13. channel.basicQos(1);
  14. // 定义队列的消费者
  15. QueueingConsumer consumer = new QueueingConsumer(channel);
  16. // 监听队列,手动返回完成
  17. channel.basicConsume(QUEUE_NAME, false, consumer);
  18. // 获取消息
  19. while (true) {
  20. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  21. String message = new String(delivery.getBody());
  22. System.out.println(” 前台系统: ‘” + message + “‘”);
  23. Thread.sleep(10);
  24. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  25. }
  26. }
  27. }

 

消费者2:接收insert,update,delete的消息

 

public class Recv2 {
  1. private final static String QUEUE_NAME = “test_queue_direct_2”;
  2. private final static String EXCHANGE_NAME = “test_exchange_direct”;
  3. public static void main(String[] argv) throws Exception {
  4. // 获取到连接以及mq通道
  5. Connection connection = ConnectionUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 声明队列
  8. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  9. // 绑定队列到交换机
  10. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “insert”);
  11. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “update”);
  12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “delete”);
  13. // 同一时刻服务器只会发一条消息给消费者
  14. channel.basicQos(1);
  15. // 定义队列的消费者
  16. QueueingConsumer consumer = new QueueingConsumer(channel);
  17. // 监听队列,手动返回完成
  18. channel.basicConsume(QUEUE_NAME, false, consumer);
  19. // 获取消息
  20. while (true) {
  21. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  22. String message = new String(delivery.getBody());
  23. System.out.println(” 搜索系统: ‘” + message + “‘”);
  24. Thread.sleep(10);
  25. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  26. }
  27. }
  28. }

如果生产者发布了insert消息,那么消费者2可以收到,消费者 1收不到,如果发布了update或者delete消息,两个消费者都可以收到。如果发布ABC消息两个消费者都收不到,因为没有绑定这个键值。这种模式基本满足了我们的需求,但是还不够灵活,下面介绍另外一个模式。

 

三、通配符模式(Topic Exchange)

   基本思想和路由模式是一样的,只不过路由键支持模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词

       

示例代码:

生产者:

 

public class Send {
  1. private final static String EXCHANGE_NAME = “test_exchange_topic”;
  2. public static void main(String[] argv) throws Exception {
  3. // 获取到连接以及mq通道
  4. Connection connection = ConnectionUtil.getConnection();
  5. Channel channel = connection.createChannel();
  6. // 声明exchange
  7. channel.exchangeDeclare(EXCHANGE_NAME, “topic”);
  8. // 消息内容
  9. String message = “删除商品,id = 1001”;
  10. channel.basicPublish(EXCHANGE_NAME, “item.delete”, null, message.getBytes());
  11. System.out.println(” [x] Sent ‘” + message + “‘”);
  12. channel.close();
  13. connection.close();
  14. }
  15. }

消费者1:

 

public class Recv {
  1. private final static String QUEUE_NAME = “test_queue_topic_1”;
  2. private final static String EXCHANGE_NAME = “test_exchange_topic”;
  3. public static void main(String[] argv) throws Exception {
  4. // 获取到连接以及mq通道
  5. Connection connection = ConnectionUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 声明队列
  8. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  9. // 绑定队列到交换机
  10. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “item.update”);
  11. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “item.delete”);
  12. // 同一时刻服务器只会发一条消息给消费者
  13. channel.basicQos(1);
  14. // 定义队列的消费者
  15. QueueingConsumer consumer = new QueueingConsumer(channel);
  16. // 监听队列,手动返回完成
  17. channel.basicConsume(QUEUE_NAME, false, consumer);
  18. // 获取消息
  19. while (true) {
  20. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  21. String message = new String(delivery.getBody());
  22. System.out.println(” 前台系统: ‘” + message + “‘”);
  23. Thread.sleep(10);
  24. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  25. }
  26. }
  27. }

 

消费者2:

 

public class Recv2 {
  1. private final static String QUEUE_NAME = “test_queue_topic_2”;
  2. private final static String EXCHANGE_NAME = “test_exchange_topic”;
  3. public static void main(String[] argv) throws Exception {
  4. // 获取到连接以及mq通道
  5. Connection connection = ConnectionUtil.getConnection();
  6. Channel channel = connection.createChannel();
  7. // 声明队列
  8. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  9. // 绑定队列到交换机
  10. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, “item.#”);
  11. // 同一时刻服务器只会发一条消息给消费者
  12. channel.basicQos(1);
  13. // 定义队列的消费者
  14. QueueingConsumer consumer = new QueueingConsumer(channel);
  15. // 监听队列,手动返回完成
  16. channel.basicConsume(QUEUE_NAME, false, consumer);
  17. // 获取消息
  18. while (true) {
  19. QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  20. String message = new String(delivery.getBody());
  21. System.out.println(” 搜索系统: ‘” + message + “‘”);
  22. Thread.sleep(10);
  23. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  24. }
  25. }
  26. }

消费者1是按需索取,并没有使用通配符模式,而是用的完全匹配,消费者2使用通配符模式,这样以item.开头的消息都会全部接收。

 

小结:

  1.与简单模式和work模式对比,前面两种同一个消息只能被一个消费者获取,而今天的这三种模式,可以实现一个消息被多个消费者 获取。

  2.fanout这种模式没有加入路由器,队列与exchange绑定后,就会接收到所有的消息,其余两种增加了路由键,并且第三种增加通配符,更加便利。

 

Leave a Reply

Your email address will not be published. Required fields are marked *