项目需求:在管理员点击督办之后;对app作出一个消息提醒。

在管理员点击督办之后;对app作出一个消息提醒。 推的比较常见,就是例如这个网站的这个问题,维护着一张关注者的列表,每当触发这个问题推送的条件时(例如有人回答
问题),就把这个通知发送给每个关注者。

   拉的相对麻烦一点,就是推的反向,例如每个用户都有一张关注问题的列表,每当用户上线的时候,对每个问题进行轮询,

当问题的事件列表出现了比我原本时间戳大的信息就进行拉取。

技术选型:

4.编写实体类消息 实体Message.java

package com.lpl.bean;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
 * 发送的消息实体
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message implements Serializable {

    private String id;          //消息id
    private String personNo;    //发送人的学工号(可指定多个,以逗号隔开,不超过1000个)
    private String title;       //消息标题
    private String content;     //消息内容
    private String type;        //消息类型,system(系统消息)、sms(短信消息)
    private Date createTime;    //创建时间
    private Date updateTime;    //更新时间
    private String statusCode;  //消息发送结果状态码(4000表示成功,4001表示失败)
   
}
 

结果常量类ConstantResult.java

公共返回结果类CommonResult.java

package com.lpl.common;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 返回的指定公共结果
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CommonResult<T> {

    private String code;    //返回状态码
    private String msg;     //返回提示信息
    private T data;         //返回数据

    public CommonResult(String code, String msg){
        this.code = code;
        this.msg = msg;
    }

}

我们操作redis需要用到RedisTemplate,编写redis配置类RedisConfig.java。这里配置了多个消息监听适配器以通过不同的方法去监听、订阅不同的redis channel消息。

package com.lpl.config;

import com.lpl.listener.Receiver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * redis自定义配置类
 */
@Configuration
public class RedisConfig {

    /**
     * 返回一个RedisTemplate Bean
     * @param redisConnectionFactory    如果配置了集群版则使用集群版,否则使用单机版
     * @return
     */
    @Bean(name = "redisTemplate")
    public RedisTemplate<?, ?> getRedisTemplate(RedisConnectionFactory redisConnectionFactory){

        RedisTemplate<?, ?> template = new RedisTemplate<>();
        //设置key和value序列化机制
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new Jackson2JsonRedisSerializer<Object>(Object.class));
        template.setConnectionFactory(redisConnectionFactory);  //设置单机或集群版连接工厂

        return template;
    }

    /**
     * 系统消息适配器
     * @param receiver
     * @return
     */
    @Bean(name = "systemAdapter")
    public MessageListenerAdapter systemAdapter(Receiver receiver){
        //指定类中回调接收消息的方法
        MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "systemMessage");
        //adapter.afterPropertiesSet();
        return adapter;
    }

    /**
     * 短信消息适配器
     * @param receiver
     * @return
     */
    @Bean(name = "smsAdapter")
    public MessageListenerAdapter smsAdapter(Receiver receiver){
        //指定类中回调接收消息的方法
        MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "smsMessage");
        //adapter.afterPropertiesSet();
        return adapter;
    }

    /**
     * 构建redis消息监听器容器
     * @param connectionFactory
     * @param systemAdapter
     * @param smsAdapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter systemAdapter, MessageListenerAdapter smsAdapter){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //指定不同的方法监听不同的频道
        container.addMessageListener(systemAdapter, new PatternTopic("system"));
        container.addMessageListener(smsAdapter, new PatternTopic("sms"));
        return container;
    }
}

若要使用redis集群版,需要增加如下配置类:
读取集群配置属性类RedisClusterProperty.java

package com.lpl.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
import java.util.List;

/**
 * redis集群配置属性,在redis实例工厂提供参数配置
 */
@Component
@Validated
@Data
@ConfigurationProperties(value = "spring.redis.cluster")
public class RedisClusterProperty {

    private List<String> nodes;     //集群各节点ip和port

}

6.编写Service层

消息发布业务层接口PublisherService.java

4.编写消息处理类

消息处理类(多线程处理)SendAndStorageProcess.java,类中发消息的方法使线程休眠了两秒,模拟发消息的相对耗时操作,用于验证多线程。

package com.lpl.service;
import com.alibaba.fastjson.JSONObject;
import com.lpl.bean.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

/**
 *  发送和存储消息任务
 */
@Component
public class SendAndStorageProcess{

    @Autowired
    private ThreadPoolTaskExecutor threadTaskExecutor;  //注入线程池

    /**
     * 多线程调用发送消息
     * @param message
     * @return
     */
    public JSONObject sendAndStorageMsg(Message message) {

        Future<JSONObject> future = threadTaskExecutor.submit(new Callable<JSONObject>() {  //采用带返回值的方式
            @Override
            public JSONObject call() throws Exception {

                //1.调用相对比较耗时的发消息方法
                String code = sendMessage(message);
                message.setUpdateTime(new Date());
                if ("200".equals(code)){    //发送成功
                    message.setStatusCode("4000");
                }else{  //发送失败
                    message.setStatusCode("4001");
                }

                //2.存储消息
                storageMessage(message);

                JSONObject result = new JSONObject();
                result.put("code", "200");
                result.put("msg", "发送消息成功!");
                return result;
            }
        });

        JSONObject jsonResult = new JSONObject();   //返回结果
        try{
            if (future.isDone()){   //线程调度结束时,才获取结果
                jsonResult = future.get();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        return jsonResult;      //消息发送与存储结果
    }

    /**
     * 调用接口发送消息
     * @param message
     * @return
     */
    private String sendMessage(Message message) {
        try{
            //TODO 这里写一些发消息的业务逻辑

            Thread.sleep(2000);     //增加耗时操作,查看多线程效果
            System.out.println(Thread.currentThread().getName() + "线程发送消息成功,消息内容:" + message.getContent());
            return "200";   //发送消息结果状态码
        }catch (Exception e){
            System.out.println(Thread.currentThread().getName() + "线程发送消息失败,消息内容:" + message.getContent());
            e.printStackTrace();
        }
        return "500";   //发送消息结果状态码
    }

    /**
     * 存消息到数据库
     * @param message
     * @return
     */
    private void storageMessage(Message message) {
        try{
            //TODO 这里执行插入消息到数据操作
            System.out.println(Thread.currentThread().getName() + "线程插入消息到数据库成功,消息内容:" + message.getContent());
        }catch (Exception e){
            System.out.println(Thread.currentThread().getName() + "线程插入消息到数据库失败,消息内容:" + message.getContent());
            e.printStackTrace();
        }
    }
}

5.消息消费测试

end.png

https://www.codenong.com/cs106834803/
https://www.zhihu.com/question/20380990
https://blog.csdn.net/cdy1996/article/details/83933366

标签: none

相关阅读

  • 测试信息
  • 开发商:阿里巴巴
  • 版本号:1.0
  • 配色:(企业家)” 推选活动结果
  • 测试信息
  • 测试信息
  • 测试信息