Work队列消息模型
Work队列消息模型,也被称作工作队列消息模型,或者竞争消费模型。

Work队列消息模型与基本消息队列模型的区别就在于,Work队列消息模型让多个消费者绑定一个队列,这样就可以快速处理消息队列中的消息,从而避免了队列中消息堆积的问题。
JavaAPI实现Work队列消息模型
准备工作
使用JavaAPI实现消息队列模型,需要先做一些准备工作,这些准备工作包括搭建项目环境和准备工具类,这些工作在消息队列RabbitMQ之基本消息模型已经准备好了,就不重复准备了。
创建生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
   | public class Send {          private final static String QUEUE_NAME = "work_queue";
      public static void main(String[] args) throws IOException, TimeoutException {                  Connection connection = ConnectionUtils.getConnection();                  Channel channel = connection.createChannel();                  channel.queueDeclare(QUEUE_NAME,false,false,false,null);                  for(int i = 0; i < 50; i++){                          String message = "Hello World!" + i;                          channel.basicPublish("",QUEUE_NAME,null,message.getBytes());         }                  channel.close();                  connection.close();     } }
  | 
 
创建消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
   | public class Recv1 {          private final static String QUEUE_NAME = "work_queue";
      public static void main(String[] args) throws IOException, TimeoutException {                  Connection connection = ConnectionUtils.getConnection();                  Channel channel = connection.createChannel();                  channel.queueDeclare(QUEUE_NAME,false,false,false,null);                  channel.basicQos(1);                  DefaultConsumer consumer = new DefaultConsumer(channel){                          @Override             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                                  System.out.println("[消费者1]"+new String(body));                                  channel.basicAck(envelope.getDeliveryTag(),false);             }         };                  channel.basicConsume(QUEUE_NAME,false,consumer);     } }
  | 
 
创建消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
   | public class Recv2 { 	     private final static String QUEUE_NAME = "work_queue";
      public static void main(String[] args) throws IOException, TimeoutException {                  Connection connection = ConnectionUtils.getConnection();                  Channel channel = connection.createChannel();                  channel.queueDeclare(QUEUE_NAME,false,false,false,null);                  channel.basicQos(1);                  DefaultConsumer consumer = new DefaultConsumer(channel){                          @Override             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                                  System.out.println("[消费者2]"+new String(body));                                  try {                     Thread.sleep(1000);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 channel.basicAck(envelope.getDeliveryTag(),false);             }         };                  channel.basicConsume(QUEUE_NAME,false,consumer);     } }
  | 
 
Work队列消息模型代码实现的注意点
与基本消息模型的实现相比,Work队列消息模型在代码实现上多了这么一行代码:
这是将消费者设置为每次只消费一条数据,如果不设置这个值,那么在一开始,每个消费者都是默认平均消费的,即每个消费者各消费25条消息。
但是在消费者2中,消息的处理速度非常慢(每处理一条消息,线程休眠一秒钟),所以为了能够最大化利用资源,所以设置每个消费者每次只消费一条消息,这样消费消息更快的消费者就可以消费更多的消息。
【注意】在Work队列消息模型中,应该采用手动Ack的方式,因为如果是自动Ack,那么每次已使用body的消息体,消费者就自动Ack告诉RabbitMQ消息已被消费,那么RabbitMQ就会立刻将下一条消息发送给该消费者,这样消费者的线程休眠问题就体现不出来。