每一秒钟的时间都值得铭记

0%

消息队列RabbitMQ之订阅模型Topic

订阅模型

关于RabbitMQ订阅模型的知识,可以参考前置知识中的消息队列RabbitMQ之订阅模型Fanout博客,该篇博客已经讲解了RabbitMQ的订阅模型。

订阅模型Topic

订阅模型Topic的工作模式基本和Direct相似,不同之处在于,Topic的队列绑定交换机的路由Key可以使用通配符的写法,路由可以通过模糊匹配,发送到符合要求的队列中去。

Topic的路由Key通配符

  • #代码一个或多个单词。
  • *代表一个单词。

【注意】通配符是在队列绑定交换机的路由Key上使用的,如果是在消息的路由Key上,只会被识别为一个普通单词。
【注意】路由Key可以由多个单词组成,单词与单词之间,使用.号隔开。

JavaAPI实现订阅模型Topic

准备工作

使用JavaAPI实现消息队列模型,需要先做一些准备工作,这些准备工作包括搭建项目环境和准备工具类,这些工作在消息队列RabbitMQ之基本消息模型已经准备好了,就不重复准备了。

创建生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Send {
//RabbitMQ的交换机名称
private final static String EXCHANGE_NAME = "amq.topic";

public static void main(String[] argv) throws Exception {
//使用工具类,获取RabbitMQ的连接
Connection connection = ConnectionUtils.getConnection();
//使用连接创建通道
Channel channel = connection.createChannel();
//声明交换机exchange,指定类型为fanout,BuiltinExchangeType是一个枚举类,最后一个参数为指定交换机的持久化属性
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,true);
//需要发送的消息
String insert_message = "插入数据";
//使用通道发送消息到交换机,参数分别为:交换机名称,路由Key,其他属性,消息正文
channel.basicPublish(EXCHANGE_NAME, "insert.user.data", null, insert_message.getBytes());
//需要发送的消息
String delete_message = "删除数据";
//使用通道发送消息到交换机,参数分别为:交换机名称,路由Key,其他属性,消息正文
channel.basicPublish(EXCHANGE_NAME, "delete.user.data", null, delete_message.getBytes());
//关闭通道
channel.close();
//关闭连接
connection.close();
}
}

创建消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Recv1 {
//消费者的队列名称
private final static String QUEUE_NAME = "topic_exchange_queue_1";
//RabbitMQ的交换机名称
private final static String EXCHANGE_NAME = "amq.topic";

public static void main(String[] argv) throws Exception {
//使用工具类,获取RabbitMQ的连接
Connection connection = ConnectionUtils.getConnection();
//使用连接创建通道
Channel channel = connection.createChannel();
//使用通道声明一个队列,参数分别为:队列名称,是否持久化,是否为排他队列,是否自动删除,其他属性
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机,路由key为insert和delete,接受路由Key为insert和delete的消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert.#");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete.*");
//定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//覆写该方法,该方法类似于监听器,当队列中有消息的时候,就会被调用执行
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//byte[] body参数即为消息体,当该消息体被使用,即认为该消息已被消费
System.out.println(" [消费者1] received : " + new String(body));
}
};
//监听队列,参数分别为:队列名称,是否自动确认消息已被消费,队列消费者
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

创建消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Recv2 {
//消费者的队列名称
private final static String QUEUE_NAME = "topic_exchange_queue_2";
//RabbitMQ的交换机名称
private final static String EXCHANGE_NAME = "amq.topic";

public static void main(String[] argv) throws Exception {
//使用工具类,获取RabbitMQ的连接
Connection connection = ConnectionUtils.getConnection();
//使用连接创建通道
Channel channel = connection.createChannel();
//使用通道声明一个队列,参数分别为:队列名称,是否持久化,是否为排他队列,是否自动删除,其他属性
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机,路由key为insert,只接收路由Key为delete的消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete.#");
//定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//覆写该方法,该方法类似于监听器,当队列中有消息的时候,就会被调用执行
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//byte[] body参数即为消息体,当该消息体被使用,即认为该消息已被消费
System.out.println(" [消费者2] received : " + new String(body));
}
};
//监听队列,参数分别为:队列名称,是否自动确认消息已被消费,队列消费者
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
坚持原创技术分享,您的支持将鼓励我继续创作!
-------------这是我的底线^_^-------------