每一秒钟的时间都值得铭记

0%

SpringBoot使用Redis实现消息订阅

Redis不仅仅是一个优秀的非关系型缓存数据库,更是内置了一套消息机制。

关联知识

搭建项目环境

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<!--SpringBoot为父项目-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
</parent>

<dependencies>
<!--测试功能的启动器,用于整合junit测试功能-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--Spring-Data-Redis的启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

创建监听者类

监听者1

1
2
3
4
5
6
7
8
9
@Component
public class MyListener1 implements MessageListener {

@Override
public void onMessage(Message message, byte[] pattern) {
System.out.println("MyListener1收到消息:"+message);
System.out.println("Mylistener1:"+new String(pattern));
}
}

监听者2

1
2
3
4
5
6
7
8
9
@Component
public class MyListener2 implements MessageListener {

@Override
public void onMessage(Message message, byte[] pattern) {
System.out.println("MyListener2收到消息:"+message);
System.out.println("Mylistener2:"+new String(pattern));
}
}

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Configuration
@EnableCaching
public class CacheConfig extends CachingConfigurationSelector {

@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter MyListener1,
MessageListenerAdapter MyListener2) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅了一个叫chanel01的通道,这个container可以添加多个messageListener
container.addMessageListener(MyListener1, new PatternTopic("Mylistener1"));
//订阅了一个叫chanel02的通道
container.addMessageListener(MyListener2, new PatternTopic("Mylistener2"));
return container;
}

@Bean
MessageListenerAdapter MyListener1() {
return new MessageListenerAdapter(new MyListener1());
}

@Bean
MessageListenerAdapter MyListener2() {
return new MessageListenerAdapter(new MyListener2());
}

@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate template = new StringRedisTemplate(factory);
//定义value的序列化方式
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);

template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}

测试类发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest
@RunWith(SpringRunner.class)
public class PubSubTest {

@Autowired
private StringRedisTemplate stringRedisTemplate;

@Test
public void test01(){
stringRedisTemplate.convertAndSend("Mylistener1","Redis发布的第一条消息");
stringRedisTemplate.convertAndSend("Mylistener2","Redis发布的第二条消息");
}
}

测试结果

1
2
3
4
MyListener2收到消息:Redis发布的第二条消息
Mylistener2:Mylistener2
MyListener1收到消息:Redis发布的第一条消息
Mylistener1:Mylistener1
坚持原创技术分享,您的支持将鼓励我继续创作!
-------------这是我的底线^_^-------------