springboot中使用redis实现异步消息通知
项目需求:在管理员点击督办之后;对app作出一个消息提醒。
在管理员点击督办之后;对app作出一个消息提醒。 推的比较常见,就是例如这个网站的这个问题,维护着一张关注者的列表,每当触发这个问题推送的条件时(例如有人回答
问题),就把这个通知发送给每个关注者。
拉的相对麻烦一点,就是推的反向,例如每个用户都有一张关注问题的列表,每当用户上线的时候,对每个问题进行轮询,
当问题的事件列表出现了比我原本时间戳大的信息就进行拉取。
技术选型:
4.编写实体类消息 实体Message.java
package com.lpl.bean;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* 发送的消息实体
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message implements Serializable {
private String id; //消息id
private String personNo; //发送人的学工号(可指定多个,以逗号隔开,不超过1000个)
private String title; //消息标题
private String content; //消息内容
private String type; //消息类型,system(系统消息)、sms(短信消息)
private Date createTime; //创建时间
private Date updateTime; //更新时间
private String statusCode; //消息发送结果状态码(4000表示成功,4001表示失败)
}
结果常量类ConstantResult.java
公共返回结果类CommonResult.java
package com.lpl.common;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 返回的指定公共结果
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CommonResult<T> {
private String code; //返回状态码
private String msg; //返回提示信息
private T data; //返回数据
public CommonResult(String code, String msg){
this.code = code;
this.msg = msg;
}
}
我们操作redis需要用到RedisTemplate,编写redis配置类RedisConfig.java。这里配置了多个消息监听适配器以通过不同的方法去监听、订阅不同的redis channel消息。
package com.lpl.config;
import com.lpl.listener.Receiver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* redis自定义配置类
*/
@Configuration
public class RedisConfig {
/**
* 返回一个RedisTemplate Bean
* @param redisConnectionFactory 如果配置了集群版则使用集群版,否则使用单机版
* @return
*/
@Bean(name = "redisTemplate")
public RedisTemplate<?, ?> getRedisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate<?, ?> template = new RedisTemplate<>();
//设置key和value序列化机制
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new Jackson2JsonRedisSerializer<Object>(Object.class));
template.setConnectionFactory(redisConnectionFactory); //设置单机或集群版连接工厂
return template;
}
/**
* 系统消息适配器
* @param receiver
* @return
*/
@Bean(name = "systemAdapter")
public MessageListenerAdapter systemAdapter(Receiver receiver){
//指定类中回调接收消息的方法
MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "systemMessage");
//adapter.afterPropertiesSet();
return adapter;
}
/**
* 短信消息适配器
* @param receiver
* @return
*/
@Bean(name = "smsAdapter")
public MessageListenerAdapter smsAdapter(Receiver receiver){
//指定类中回调接收消息的方法
MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "smsMessage");
//adapter.afterPropertiesSet();
return adapter;
}
/**
* 构建redis消息监听器容器
* @param connectionFactory
* @param systemAdapter
* @param smsAdapter
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter systemAdapter, MessageListenerAdapter smsAdapter){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//指定不同的方法监听不同的频道
container.addMessageListener(systemAdapter, new PatternTopic("system"));
container.addMessageListener(smsAdapter, new PatternTopic("sms"));
return container;
}
}
若要使用redis集群版,需要增加如下配置类:
读取集群配置属性类RedisClusterProperty.java
package com.lpl.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
import java.util.List;
/**
* redis集群配置属性,在redis实例工厂提供参数配置
*/
@Component
@Validated
@Data
@ConfigurationProperties(value = "spring.redis.cluster")
public class RedisClusterProperty {
private List<String> nodes; //集群各节点ip和port
}
6.编写Service层
消息发布业务层接口PublisherService.java
4.编写消息处理类
消息处理类(多线程处理)SendAndStorageProcess.java,类中发消息的方法使线程休眠了两秒,模拟发消息的相对耗时操作,用于验证多线程。
package com.lpl.service;
import com.alibaba.fastjson.JSONObject;
import com.lpl.bean.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
/**
* 发送和存储消息任务
*/
@Component
public class SendAndStorageProcess{
@Autowired
private ThreadPoolTaskExecutor threadTaskExecutor; //注入线程池
/**
* 多线程调用发送消息
* @param message
* @return
*/
public JSONObject sendAndStorageMsg(Message message) {
Future<JSONObject> future = threadTaskExecutor.submit(new Callable<JSONObject>() { //采用带返回值的方式
@Override
public JSONObject call() throws Exception {
//1.调用相对比较耗时的发消息方法
String code = sendMessage(message);
message.setUpdateTime(new Date());
if ("200".equals(code)){ //发送成功
message.setStatusCode("4000");
}else{ //发送失败
message.setStatusCode("4001");
}
//2.存储消息
storageMessage(message);
JSONObject result = new JSONObject();
result.put("code", "200");
result.put("msg", "发送消息成功!");
return result;
}
});
JSONObject jsonResult = new JSONObject(); //返回结果
try{
if (future.isDone()){ //线程调度结束时,才获取结果
jsonResult = future.get();
}
}catch (Exception e){
e.printStackTrace();
}
return jsonResult; //消息发送与存储结果
}
/**
* 调用接口发送消息
* @param message
* @return
*/
private String sendMessage(Message message) {
try{
//TODO 这里写一些发消息的业务逻辑
Thread.sleep(2000); //增加耗时操作,查看多线程效果
System.out.println(Thread.currentThread().getName() + "线程发送消息成功,消息内容:" + message.getContent());
return "200"; //发送消息结果状态码
}catch (Exception e){
System.out.println(Thread.currentThread().getName() + "线程发送消息失败,消息内容:" + message.getContent());
e.printStackTrace();
}
return "500"; //发送消息结果状态码
}
/**
* 存消息到数据库
* @param message
* @return
*/
private void storageMessage(Message message) {
try{
//TODO 这里执行插入消息到数据操作
System.out.println(Thread.currentThread().getName() + "线程插入消息到数据库成功,消息内容:" + message.getContent());
}catch (Exception e){
System.out.println(Thread.currentThread().getName() + "线程插入消息到数据库失败,消息内容:" + message.getContent());
e.printStackTrace();
}
}
}
5.消息消费测试
https://www.codenong.com/cs106834803/
https://www.zhihu.com/question/20380990
https://blog.csdn.net/cdy1996/article/details/83933366