每一秒钟的时间都值得铭记

0%

消息队列RabbitMQ之基本消息模型

基本消息模型

RabbitMQ的基本消息模型,是RabbitMQ五种消息模型中最为基础的一种,该模型简单直接,容易理解。

在这里插入图片描述

  • P为消息生产者,P本质上就是一个发送消息的应用程序,P每生产一条消息,就会将消息放入消息队列中。
  • 红色部分就是消息队列,顾名思义,它本质上就是一个队列,只不过这个队列是用来存取消息的,它同样遵循队列的基本特点,先进先出,后进后出
  • C为消息消费者,C本质上同样是一个接收消息的应用程序,C会不断监听队列中的消息,每当队列中出现新的消息,C就会开始消费队列中的消息。

JavaAPI实现基本消息模型

消息队列RabbitMQ之初学者中已经描述过了,RabbitMQ是一款基于AMQP协议的消息队列产品,它不受客户端和中间件使用不同语言开发条件的限制,同时在官方教程中也表明了,RabbitMQ可以接受Java,Python等语言的客户端API。

我们这里就使用JavaAPI来实现RabbitMQ基本消息模型。

搭建项目环境

SpringBoot中有一个amqp的启动器,是SpringBoot为我们封装好的amqp消息队列的操作API。当然,我们不会直接使用SpringBoot的API来实现基本数据模型,而是使用被封装的RabbitMQ Client来实现基本数据模型。

1
2
3
4
5
6
7
8
9
10
11
12
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

准备工具类

因为我们使用JavaAPI来操作RabbitMQ,自然需要通过IP和端口号,以及RabbitMQ的账号密码,来获取Java程序与RabbitMQ之间的连接,这是一繁琐的操作,我们直接将这些操作封装成一个独立的工具类,方便我们使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ConnectionUtils {

public static Connection getConnection() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置连接IP地址
factory.setHost("192.168.177.128");
//设置连接端口号
factory.setPort(5672);
//设置RabbitMQ连接的虚拟主机
factory.setVirtualHost("/mymq");
//设置RabbitMQ的用户名
factory.setUsername("admin");
//设置RabbitMQ的密码
factory.setPassword("admin");
//使用连接工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}

创建生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Send {
//RabbitMQ的队列名称
private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] args) throws IOException, TimeoutException {
//使用工具类,获取RabbitMQ的连接
Connection connection = ConnectionUtils.getConnection();
//使用连接创建通道
Channel channel = connection.createChannel();
//使用通道声明一个队列,参数分别为:队列名称,是否持久化,是否为排他队列,是否自动删除,其他属性
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//需要发送的消息
String message = "Hello World!";
//使用通道发送消息,参数分别为:交换机名称,路由Key,其他属性,消息正文
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
//关闭通道
channel.close();
//关闭连接
connection.close();
}
}

创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Recv {
//RabbitMQ的队列名称
private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] args) throws IOException, TimeoutException {
//使用工具类,获取RabbitMQ的连接
Connection connection = ConnectionUtils.getConnection();
//使用连接创建通道
Channel channel = connection.createChannel();
//使用通道声明一个队列,参数分别为:队列名称,是否持久化,是否为排他队列,是否自动删除,其他属性
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
//覆写该方法,该方法类似于监听器,当队列中有消息的时候,就会被调用执行
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//byte[] body参数即为消息体,当该消息体被使用,即认为该消息已被消费
System.out.println(new String(body));
}
};
//监听队列,参数分别为:队列名称,是否自动确认消息已被消费,队列消费者
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}

消息确认机制

在这里插入图片描述

消息自动确认(Ack)机制的问题

由于消费者中的消息体是byte类型的数组结构,但是我们在实际使用的使用的时候,可能会先对byte类型的数组进行一些字符串的处理,然后才会真正消费该数据。
消息被自动确认之后,消息就会从消息队列中移除,但是如果又因为其他意外原因,导致消息未能被真正消费,那么这条消息就直接丢失了。

所以消息自动确认机制可能会导致消息的丢失。
在这里插入图片描述

消息手动确认(Ack)机制

  • 消息自动Ack:消息一旦被接收,消费者立刻自动发送Ack。
  • 消息手动Ack:消息接收后,不会自动发送Ack,需要程序手动Ack,确认消息已被接收。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Recv {
//RabbitMQ的队列名称
private final static String QUEUE_NAME = "simple_queue";

public static void main(String[] args) throws IOException, TimeoutException {
//使用工具类,获取RabbitMQ的连接
Connection connection = ConnectionUtils.getConnection();
//使用连接创建通道
Channel channel = connection.createChannel();
//使用通道声明一个队列,参数分别为:队列名称,是否持久化,是否为排他队列,是否自动删除,其他属性
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
//覆写该方法,该方法类似于监听器,当队列中有消息的时候,就会被调用执行
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//byte[] body参数即为消息体,当该消息体被使用,即认为该消息已被消费
System.out.println(new String(body));
//手动Ack,确认消息已被消费
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//监听队列,参数分别为:队列名称,是否自动确认消息已被消费,队列消费者
//第二个参数为false,即不自动Ack,而是改用手动Ack
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}

自动Ack和手动Ack的选择

自动Ack和手动Ack各有优势,那么我们应该什么时候使用自动Ack,什么时候使用手动Ack呢?

我们应该根据消息的重要性来进行选择。

  • 如果一个消息不是很重要,即使丢失也不会对系统造成影响,那么我们使用自动Ack会比较方便。
  • 如果一个消息非常重要,不允许该消息发生丢失的情况,那么我们就应该使用手动Ack来进行消息确认,防止消息丢失。
坚持原创技术分享,您的支持将鼓励我继续创作!
-------------这是我的底线^_^-------------