订阅模型
关于RabbitMQ订阅模型的知识,可以参考前置知识中的消息队列RabbitMQ之订阅模型Fanout博客,该篇博客已经讲解了RabbitMQ的订阅模型。
订阅模型Topic
订阅模型Topic的工作模式基本和Direct相似,不同之处在于,Topic的队列绑定交换机的路由Key可以使用通配符的写法,路由可以通过模糊匹配,发送到符合要求的队列中去。
Topic的路由Key通配符
【注意】通配符是在队列绑定交换机的路由Key上使用的,如果是在消息的路由Key上,只会被识别为一个普通单词。
【注意】路由Key可以由多个单词组成,单词与单词之间,使用.
号隔开。
JavaAPI实现订阅模型Topic
准备工作
使用JavaAPI实现消息队列模型,需要先做一些准备工作,这些准备工作包括搭建项目环境和准备工具类,这些工作在消息队列RabbitMQ之基本消息模型已经准备好了,就不重复准备了。
创建生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class Send { private final static String EXCHANGE_NAME = "amq.topic";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,true); String insert_message = "插入数据"; channel.basicPublish(EXCHANGE_NAME, "insert.user.data", null, insert_message.getBytes()); String delete_message = "删除数据"; channel.basicPublish(EXCHANGE_NAME, "delete.user.data", null, delete_message.getBytes()); channel.close(); connection.close(); } }
|
创建消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class Recv1 { private final static String QUEUE_NAME = "topic_exchange_queue_1"; private final static String EXCHANGE_NAME = "amq.topic";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert.#"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete.*"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(" [消费者1] received : " + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
|
创建消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class Recv2 { private final static String QUEUE_NAME = "topic_exchange_queue_2"; private final static String EXCHANGE_NAME = "amq.topic";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete.#"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(" [消费者2] received : " + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
|