订阅模型
订阅模型是RabbitMQ的消息模型之一,其中订阅模型又分为三种,分别是Fanout、Direct、Topic。
订阅模型区分于基本消息模型和Work队列消息模型,在于订阅模型的生产者并不直接将消息发送到队列中,而是将消息发送到交换机中,然后由交换机选择合适的策略发送到合适的队列中,最后再由消费者接收消息。
订阅模型的特点
- 订阅模型是一个生产者,多个消费者。
- 订阅模型的每一个消费者都有自己的一个消息队列。
- 订阅模型生产者并不将消息发送给消息队列,而是发送给交换机。
- 订阅模型的每个消息队列,都要绑定到交换机上。
- 订阅模型的消息,从生产者生产,发送非交换机,然后由交换机发送给消息队列,最后被消费者接收消费。
订阅模型交换机类型
订阅模型之所以被分为三种,其实就是因为交换机的类型不同而划分的,交换机的类型不同,消息通过交换机发送给消息队列的策略也就不同。
交换机的类型主要分为三种,也就是订阅模型的三种类型。
- Fanout:扇出,消息进入交换机后,交换机以广播的形式发送给所有队列。
- Direct:定向发送,消息进入交换机后,交换机通过消息指定的RoutingKey(路由Key),发送到指定的队列中,在这种消息模型下,每个队列在绑定到交换机的时候,都需要指定RoutingKey(路由Key)。
- Topic:通配符模糊发送,消息的发送策略基本和Direct相似,但是不同的是,Topic交换机可以通过通配符的方式,模糊选择RoutingKey(路由Key),从而将消息发送到消息队列中。
订阅模型Fanout
订阅模型Fanout,Fanout(扇出)是交换机类型,也是交换机发送消息给消息队列的策略。
Fanout即交换机以广播形式,将消息发送给所有的消息队列,每个和消息队列绑定的消费者都可以接收并消费该消息队列。
JavaAPI实现订阅模型Fanout
准备工作
使用JavaAPI实现消息队列模型,需要先做一些准备工作,这些准备工作包括搭建项目环境和准备工具类,这些工作在消息队列RabbitMQ之基本消息模型已经准备好了,就不重复准备了。
创建生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class Send { private final static String EXCHANGE_NAME = "amq.fanout";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true); String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); channel.close(); connection.close(); } }
|
创建消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class Recv1 { private final static String QUEUE_NAME = "fanout_exchange_queue_1"; private final static String EXCHANGE_NAME = "amq.fanout";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(" [消费者1] received : " + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
|
创建消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class Recv2 { private final static String QUEUE_NAME = "fanout_exchange_queue_2"; private final static String EXCHANGE_NAME = "amq.fanout";
public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(" [消费者2] received : " + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
|