欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

MongoDb+RabbitMQ搭建系统通知模块

时间:2023-04-21
MongoDb + RabbitMQ 搭建系统通知模块

使用springboot实现

一、选择MongoDB和RabbitMQ理由 对于公告消息,本设计是设计为 为每个用户创建一条公告信息(原因是:方便记录用户对于消息的已读和未读状态,这样设计会更符合用户需求),因此设计两个表,message(存储消息及其发送者消息) 和 message_ref(存储接收者,及已读和新接收状态)数据库选择为MongoDB,原因是MongoDB对于海量数据以及高并发情况下的读写数据很有优势,同时他的数据存储是以文档结构存储,简单来说就是用JSON格式,他会更加类似关系型数据库的存储,同时MongoDB从3.X开始支持集合的连接查询,就可以实现message和message_ref连接查询,查询出某个用户拥有的信息。由于存在海量数据,高并发情况下,MongoDB也支持不了瞬时写入百万数据,因此引入消息队列MQ来进行削峰填谷。选择RabbitMQ的原因是,它的可靠性和稳定性比较好,而且不仅支持消息的异步收发,还支持消息的同步收发 二、简单逻辑

1.发送系统通知时,先把消息数据插入message表中,然后把消息推送到相关的消息队列(以用户id作为routing-key)中去

3.消费者消费消息时,取消自动应答,每次从消息队列中取数据的时候,把数据插入message_ref(lastFlag为true【新信息】,readFlag为false【未读】)中,插入成功后再发送消息的Ack应答,让消息队列删除消息

3.前端设计定时器轮训:每次读取消息的时候,先从消息队列中接收消息(步骤3;然后把message_ref中的lastFlag改为false,更新的数量即为新消息的数量;查询readFlag为false的数据(查询未读数据),然后把这些数据传给前端

三、实现

有些地方引用了hutool工具,可自行引入依赖

cn.hutool hutool-all 5.4.0

3.1、MongoDB数据库表的设计 3.1.1.message集合

集合相当于MySQL中的数据表,但是没有固定的表结构。集合有什么字段,取决于保存在其中的数据。下面这张表格是Message集合中JSON数据的结构要求。

字段类型备注_idUUID自动生成的主键值uuidUUIDUUID值,并且设置有唯一性索引,防止消息被重复消费senderIdInteger发送者ID,就是用户ID。如果是系统自动发出,这个ID值是0senderPhotoString发送者的头像URL。在消息页面要显示发送人的头像senderNameString发送者名称,也就是用户姓名。在消息页面要显示发送人的名字msgString消息正文sendTimeDate发送时间

uuid : 防止重复消费

消息积压过多的情况下,如果第一次轮询还没结束,第二次轮询就开始了,那就可能出现把重复的数据写入数据库中,因此如果每条MQ消息都有唯一的UUID,第一个消费者把消息保存到数据库,那么第二个消费者就无法再把这条消息保存到数据库,解决了消息的重复消费问题。

3.1.2 message_ref集合

虽然message集合记录的是消息内容及其发送者,message_ref集合来记录接收人和已读状态。

字段类型备注_idUUID主键messageIdUUIDmessage记录的_idreceiverIdString接收人IDreadFlagBoolean是否已读lastFlagBoolean是否为新接收的消息3.1.3 连接查询

执行两个集合的联合查询,根据接收人来查询消息,并且按照消息发送时间降序排列,查询前50条记录

db.message.aggregate([ { $set: { "id": { $toString: "$_id" } } }, { $lookup:{ from:"message_ref", localField:"id", foreignField:"messageId", as:"ref" }, }, { $match:{"ref.receiverId": 1} }, { $sort: {sendTime : -1} }, { $skip: 0 }, { $limit: 50 }])

解析1.$set: { "id": { $toString: "$_id" } }添加一个字段,字段名为id,值为"_id"的值,格式为string类型3.$lookup:{ from:"message_ref", localField:"id", foreignField:"messageId", as:"ref" },连接另一个表from:连接哪个表localField :以自己的那个字段与另一个表连接foreignField:另一个表的连接字段as:给连接的那个表起别名3.$match:{"ref.receiverId": 1}查询条件(相当于where),该例子表示where ref.receiverId = 14.$sort: {sendTime : -1}排序,该例子为根据sendTime按降序排5.$skip: 0 从那条数据开始(数据是从0开始算的,即0为第一条数据),相当于分页查询的start ,即sql语句limit start,length中的start6.$limit: 50查询几条数据,相当于分页查询的length,即sql语句limit start,length中的length

3.2 程序中实现数据表实体设计

依赖导入

org.springframework.boot spring-boot-starter-data-mongodb

mongoDB配置

spring: #MongoDB data: mongodb: #ip host: localhost #端口 port: 37017 #数据库 database: pbms authentication-database: admin username: admin password: abc133456

3.2.1 pojo实体类

@Data@document(collection = "message")public class MessageEntity implements Serializable { @Id private String _id; @Indexed(unique = true) private String uuid; @Indexed private Integer senderId; private String senderPhoto=""; //默认系统图片 private String senderName; @Indexed private Date sendTime; private String msg;}

@document(collection = "message_ref")@Datapublic class MessageRefEntity implements Serializable { @Id private String _id; @Indexed private String messageId; @Indexed private Integer receiverId; @Indexed private Boolean readFlag; @Indexed private Boolean lastFlag;}

解析1.@document(collection = "message") 表示映射到Mongodb文档上的领域对象3.@Id 表示某个域为ID域3.@Indexed 表示某个字段为Mongodb的索引字段

3.2.2 DAO

messageDao

@Repositorypublic class MessageDao { @Autowired private MongoTemplate mongoTemplate; //插入消息数据 public String insert(MessageEntity entity) { //把北京时间转换成格林尼治时间 Date sendTime = entity.getSendTime(); sendTime = DateUtil.offset(sendTime, DateField.HOUR, -8); entity.setSendTime(sendTime); entity = mongoTemplate.save(entity); return entity.get_id(); } //分页查询某个人的信息 public List searchMessageByPage(int userId,long start,long length){ JSonObject json = new JSonObject(); json.set("$toString","$_id"); //类比上面所说的连接查询,以那个为逻辑 Aggregation aggregation = Aggregation.newAggregation( Aggregation.addFields().addField("id").withValue(json).build(), Aggregation.lookup("message_ref","id","messageId","ref"), Aggregation.match(Criteria.where("ref.receiverId").is(userId)), Aggregation.sort(Sort.by(Sort.Direction.DESC,"sendTime")), Aggregation.skip(start), Aggregation.limit(length) ); AggregationResults message = mongoTemplate.aggregate(aggregation, "message", HashMap.class); List list = message.getMappedResults(); list.forEach(one -> { List refList = (List) one.get("ref"); MessageRefEntity entity = refList.get(0); Boolean readFlag = entity.getReadFlag(); String id = entity.get_id(); one.remove("ref"); one.put("readFlag",readFlag); one.put("refId",id); one.remove("_id"); //把格林尼治时间转换成北京时间 Date sendTime = (Date) one.get("sendTime"); sendTime = DateUtil.offset(sendTime, DateField.HOUR, 8); String today = DateUtil.today(); //如果是今天的消息,只显示发送时间,不需要显示日期 if (today.equals(DateUtil.date(sendTime).toDateStr())) { one.put("sendTime", DateUtil.format(sendTime, "HH:mm")); } //如果是以往的消息,只显示日期,不显示发送时间 else { one.put("sendTime", DateUtil.format(sendTime, "yyyy/MM/dd")); } }); return list; } //根据id查询某条消息详细内容 public HashMap searchMessageById(String id){ HashMap message = mongoTemplate.findById(id, HashMap.class, "message"); Date sendTime = (Date) message.get("sendTime"); //把格林尼治时间转换成北京时间 sendTime = DateUtil.date(sendTime).offset(DateField.HOUR, 8); message.replace("sendTime", DateUtil.format(sendTime, "yyyy-MM-dd HH:mm")); return message; }}

MessageRefDao

@Repositorypublic class MessageRefDao { @Autowired private MongoTemplate mongoTemplate; public String insert(MessageRefEntity entity){ entity = mongoTemplate.save(entity); return entity.get_id(); } public long searchUnreadCount(int userId){ Query query = new Query(); query.addCriteria(Criteria.where("readFlag").is(false).and("receiverId").is(userId)); long count = mongoTemplate.count(query, MessageRefEntity.class); return count; } public long searchLastCount(int userId){ Query query = new Query(); query.addCriteria(Criteria.where("lastFlag").is(true).and("receiverId").is(userId)); Update update = new Update(); update.set("lastFlag",false); UpdateResult result = mongoTemplate.updateMulti(query, update, "message_ref"); long count = result.getModifiedCount(); return count; } public long updateUnreadMessage(String id){ Query query = new Query(); query.addCriteria(Criteria.where("_id").is(id)); Update update = new Update(); update.set("readFlag",true); UpdateResult result = mongoTemplate.updateFirst(query, update, "message_ref"); long count = result.getModifiedCount(); return count; } public long deleteMessageRefById(String id){ Query query = new Query(); query.addCriteria(Criteria.where("_id").is(id)); DeleteResult messageRef = mongoTemplate.remove(query, "message_ref"); long deletedCount = messageRef.getDeletedCount(); return deletedCount; } public long deleteUserMessageRef(int userId){ Query query = new Query(); query.addCriteria(Criteria.where("receiverId").is(userId)); DeleteResult ref = mongoTemplate.remove(query, "message_ref"); long deletedCount = ref.getDeletedCount(); return deletedCount; }}

3.3 业务层设计

基本上是引用DAO层

public interface MessageService { public String insertMessage(MessageEntity entity); public String insertRef(MessageRefEntity entity); public long searchUnreadCount(int userId); public long searchLastCount(int userId); public List searchMessageByPage(int userId, long start, int length) ; public HashMap searchMessageById(String id); public long updateUnreadMessage(String id) ; public long deleteMessageRefById(String id); public long deleteUserMessageRef(int userId);}

@Servicepublic class MessageServiceImpl implements MessageService { @Autowired private MessageDao messageDao; @Autowired private MessageRefDao messageRefDao; @Override public String insertMessage(MessageEntity entity) { String id = messageDao.insert(entity); return id; } @Override public String insertRef(MessageRefEntity entity) { String id = messageRefDao.insert(entity); return id; } @Override public long searchUnreadCount(int userId) { long count = messageRefDao.searchUnreadCount(userId); return count; } @Override public long searchLastCount(int userId) { long count = messageRefDao.searchLastCount(userId); return count; } @Override public List searchMessageByPage(int userId, long start, int length) { List list = messageDao.searchMessageByPage(userId, start, length); return list; } @Override public HashMap searchMessageById(String id) { HashMap map = messageDao.searchMessageById(id); return map; } @Override public long updateUnreadMessage(String id) { long count = messageRefDao.updateUnreadMessage(id); return count; } @Override public long deleteMessageRefById(String id) { long count = messageRefDao.deleteMessageRefById(id); return count; } @Override public long deleteUserMessageRef(int userId) { long count = messageRefDao.deleteUserMessageRef(userId); return count; }}

3.4 web层 1.获取分页消息列表

@ApiModel@Datapublic class SearchMessageByPageForm { @NotNull @Min(1) private Integer page; @NotNull @Range(min = 1,max = 40) private Integer length;}@RestController@RequestMapping("/message")@Api("消息模块网络接口")public class MessageController { @Autowired private JwtUtil jwtUtil; @Autowired private MessageService messageService; @PostMapping("/searchMessageByPage") @ApiOperation("获取分页消息列表") public R searchMessageByPage(@Valid @RequestBody SearchMessageByPageForm form, @RequestHeader("token") String token) { int userId = jwtUtil.getUserId(token); int page = form.getPage(); int length = form.getLength(); long start = (page - 1) * length; List list = messageService.searchMessageByPage(userId, start, length); return R.ok().put("result", list); }}

3.根据ID查询消息

@ApiModel@Datapublic class SearchMessageByIdForm { @NotBlank private String id;}public class MessageController { …… @PostMapping("/searchMessageById") @ApiOperation("根据ID查询消息") public R searchMessageById(@Valid @RequestBody SearchMessageByIdForm form) { HashMap map = messageService.searchMessageById(form.getId()); return R.ok().put("result", map); }}

3.把未读消息更新成已读消息

@ApiModel@Datapublic class UpdateUnreadMessageForm { @NotBlank private String id;}public class MessageController { …… @PostMapping("/updateUnreadMessage") @ApiOperation("未读消息更新成已读消息") public R updateUnreadMessage(@Valid @RequestBody UpdateUnreadMessageForm form) { long rows = messageService.updateUnreadMessage(form.getId()); return R.ok().put("result", rows == 1 ? true : false); }}

4.删除消息

@Data@ApiModelpublic class DeleteMessageRefByIdForm { @NotBlank private String id;}public class MessageController { …… @PostMapping("/deleteMessageRefById") @ApiOperation("删除消息") public R deleteMessageRefById(@Valid @RequestBody DeleteMessageRefByIdForm form){ long rows=messageService.deleteMessageRefById(form.getId()); return R.ok().put("result", rows == 1 ? true : false); }}

5.轮询接收系统消息

public class MessageController { …… @GetMapping("/refreshMessage") @ApiOperation("刷新用户的消息") public R refreshMessage(@RequestHeader("token") String token) { int userId = jwtUtil.getUserId(token); //异步接收消息 messageTask.receiveAysnc(userId + ""); //查询接收了多少条消息 long lastRows=messageService.searchLastCount(userId); //查询未读数据 long unreadRows = messageService.searchUnreadCount(userId); return R.ok().put("lastRows", lastRows).put("unreadRows", unreadRows); }}

3.5 使用RabbitMQ实现削峰填谷 3.5.1 导入依赖

com.rabbitmq amqp-client 5.9.0 org.springframework.boot spring-boot-starter-amqp

3.5.2 配置类

注意这里可能存在rabbitMQ连接不上的问题:ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN、For details see the broker logfile.

原因是不支持使用默认用户进行非本地连接,应新建用户

如何新建用户

@Configurationpublic class RabbitMQConfig { @Bean public ConnectionFactory getFactory(){ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("ip"); //rabbitMQ所处服务器ip factory.setPort(5672); //端口 factory.setUsername("admin"); //用户名 factory.setPassword("admin"); //密码 return factory; }}

3.5.3 收发消息

这里只是使用的rabbitMQ的简单模式,即一个消费者对应一个队列

如有其他需求可自行学习rabbitMQ的其他模式

rabbitMQ相关笔记

@Slf4j@Componentpublic class MessageTask { @Autowired private ConnectionFactory factory; @Autowired private MessageService messageService; public void send(String topic, MessageEntity entity) { String id = messageService.insertMessage(entity); //向MongoDB保存消息数据,返回消息ID //向RabbitMQ发送消息 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //连接到某个Topic channel.queueDeclare(topic, true, false, false, null); HashMap header = new HashMap(); //存放属性数据 header.put("messageId", id); //创建AMQP协议参数对象,添加附加属性 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build(); channel.basicPublish("", topic, properties, entity.getMsg().getBytes()); log.debug("消息发送成功"); } catch (Exception e) { log.error("执行异常", e); throw new PBMSException("向MQ发送消息失败"); } } @Async public void sendAsync(String topic, MessageEntity entity) { send(topic, entity); } public int receive(String topic) { int i = 0; try (//接收消息数据 Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 从队列中获取消息,不自动确认 channel.queueDeclare(topic, true, false, false, null); //Topic中有多少条数据未知,所以使用死循环接收数据,直到接收不到消息,退出死循环 while (true) { //创建响应接收数据,禁止自动发送Ack应答 GetResponse response = channel.basicGet(topic, false); if (response != null) { AMQP.BasicProperties properties = response.getProps(); Map header = properties.getHeaders(); //获取附加属性对象 String messageId = header.get("messageId").toString(); byte[] body = response.getBody();//获取消息正文 String message = new String(body); log.debug("从RabbitMQ接收的消息:" + message); MessageRefEntity entity = new MessageRefEntity(); entity.setMessageId(messageId); entity.setReceiverId(Integer.parseInt(topic)); entity.setReadFlag(false); entity.setLastFlag(true); messageService.insertRef(entity); //把消息存储在MongoDB中 //数据保存到MongoDB后,才发送Ack应答,让Topic删除这条消息 long deliveryTag = response.getEnvelope().getDeliveryTag(); channel.basicAck(deliveryTag, false); i++; } else { break; //接收不到消息,则退出死循环 } } } catch (Exception e) { log.error("执行异常", e); } return i; } @Async public int receiveAysnc(String topic) { return receive(topic); } public void deleteQueue(String topic) { try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDelete(topic); log.debug("消息队列成功删除"); } catch (Exception e) { log.error("删除队列失败", e); throw new PBMSException("删除队列失败"); } } @Async public void deleteQueueAsync(String topic) { deleteQueue(topic); }}

3.6 使用

1.发送消息
在要发送信息的地方:创建MessageEntity,调用messageService.insertMessage,插入message表,然后调用messageTask.send发送消息到消息队列

2.接收消息
调用messageController.refreshMessage()接收消息,同时可获取新消息数量和未读消息数量

附录

测试数据

@Test void contextLoads() { for (int i = 1; i <= 100; i++) { MessageEntity message = new MessageEntity(); message.setUuid(IdUtil.simpleUUID()); message.setSenderId(0); message.setSenderName("系统消息"); message.setMsg("这是第" + i + "条测试消息"); message.setSendTime(new Date()); String id=messageService.insertMessage(message); MessageRefEntity ref=new MessageRefEntity(); ref.setMessageId(id); ref.setReceiverId(11); //注意:这是接收人ID ref.setLastFlag(true); ref.setReadFlag(false); messageService.insertRef(ref); } }

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。