关键词搜索

源码搜索 ×
×

Java基于队列和数据库批量维护用户在线离线状态和位置信息

发布2020-08-29浏览801次

详情内容

目录

前言介绍

用户数据迁移

用户状态数据批量处理

队列数据批量消费


前言介绍

系统用户实时位置都保存在redis中,我们采用redis过期方式来监听用户位置离线,离线的用户key会放入对应的队列进行消费(这里的队列可以是Java队列也可以是RabbitMQ之类的消息中间件,我们采用的是ConcurrentLinkedQueue)。项目中对用户实时位置有两个要求(特殊行业对人员位置安全规范的要求):

  • a.用户最后一次经纬度实时位置更新到数据库
  • b.用户离线后更新用户在线状态

为了解决每次用户位置心跳更新导致数据库连接不足问题,我采用了批量更新的方式来解决大批量的用户上线和离线更新。解决了如下问题:

  • a.解决用户状态表统计数据不一致问题
  • b.解决数据库更新占用数据库连接过多问题(可实现批量更新上线状态位置和离线状态,最后一次位置保存更新降低数据库压力)
  • c.解决redis和database在线状态延迟过大问题

用户数据迁移

针对需要维护最后用户在线位置数据的这类用户,我们有单独用户角色加以限定。这种用户只需要简单将数据做个分表保存起来就可以了。首次初始化这类数据是根据已有用户数据直接采用SQL 支持的SELECT [A,B,....] INTO TABLE_SUB FROM TABLE_MAIN 

示例脚本如下:

  1. -- 注意:======依赖触发器更新用户信息=====
  2. -- 修改目的:
  3. -- a.解决用户状态表格统计数据不一致问题
  4. -- b.解决数据库更新占用数据库连接过多问题(可实现批量更新上线状态位置和离线状态,最后一次位置保存更新降低数据库压力)
  5. -- c.解决redis和database在线状态延迟过大问题
  6. -- 可反复执行如下表数据记录
  7. DROP TABLE if exists xh_yw.xh_user_online_tb ;
  8. SELECT
  9. -- 组织机构ID
  10. i_orgid,
  11. -- 组织机构编号
  12. c_orgbh,
  13. -- 组织机构名称
  14. c_orgname,
  15. -- 用户ID
  16. i_userid,
  17. -- 用户姓名
  18. c_userealname,
  19. -- 手机号码
  20. c_usertel,
  21. -- 时间戳
  22. make_timestamp(2020,1,1,0,0,0) AS lasttime,
  23. -- 最后位置:经度
  24. 0.0 AS longitude,
  25. -- 最后位置:纬度
  26. 0.0 AS latitude,
  27. -- 是否在线: 0 离线 1 在线
  28. 0 AS is_online ,
  29. -- 日期make_date(2020,1,1) AS last_date
  30. '2020-01-01' AS last_date
  31. INTO xh_yw.xh_user_online_tb
  32. FROM xh_ht.fs_yw_base_user where i_userid
  33. -- 角色过滤
  34. IN (select distinct i_userid from xh_ht.fs_yw_user_role where i_roleid = 5 )

用户状态数据批量处理

批量处理都依赖数据库支持的方式。

MySQL参考:https://www.cnblogs.com/mslagee/p/6509682.html

Postgresql参考:https://www.itranslater.com/qa/detailshttps://files.jxasp.com/image/2583251656280376320

Java代码示例:

  1. package com.patrol.position.service;
  2. import com.alibaba.fastjson.JSONArray;
  3. import com.forestar.platform.dao.DatabaseRepository;
  4. import com.patrol.beans.Constants;
  5. import com.patrol.beans.user.UserPosition;
  6. import com.patrol.beans.util.LogicUtil;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.apache.commons.lang3.ObjectUtils;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.beans.factory.annotation.Qualifier;
  11. import org.springframework.data.redis.core.RedisTemplate;
  12. import org.springframework.jdbc.core.JdbcTemplate;
  13. import org.springframework.stereotype.Service;
  14. import java.text.SimpleDateFormat;
  15. import java.util.*;
  16. /**
  17. * @Copyright: 2019-2021
  18. * @FileName: UserOnlineService.java
  19. * @Author: PJL
  20. * @Date: 2020/7/15 19:37
  21. * @Description: 用户在线中间数据表服务【通过队列方式批量更新】
  22. */
  23. @Slf4j
  24. @Service
  25. public class UserOnlineService {
  26. /**
  27. * Redis查询工具模板类
  28. */
  29. @Qualifier("redisTemplateByLettuce")
  30. @Autowired
  31. RedisTemplate redisTemplate;
  32. @Autowired
  33. JdbcTemplate jdbcTemplate;
  34. @Autowired
  35. DatabaseRepository databaseRepository;
  36. SimpleDateFormat sdfTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  37. SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd");
  38. /**
  39. * 用户在线位置中间表
  40. */
  41. final String XH_USER_ONLINE_TB = "XH_USER_ONLINE_TB";
  42. /**
  43. * 解析临时表数据多列值
  44. *
  45. * @param list
  46. * @return
  47. */
  48. private List parseMultiParams(List<UserPosition> list) {
  49. List result = new ArrayList();
  50. int count = list.size();
  51. Integer[] userArray = new Integer[count];
  52. String[] lastTimeArray = new String[count];
  53. String[] lastDateArray = new String[count];
  54. Double[] longitudeArray = new Double[count];
  55. Double[] latitudeArray = new Double[count];
  56. UserPosition userPosition;
  57. Date date;
  58. for (int i = 0; i < count; i++) {
  59. userPosition = list.get(i);
  60. if (null != userPosition) {
  61. date = new Date(userPosition.getTimestamp());
  62. userArray[i] = Integer.valueOf(userPosition.getUserId());
  63. lastTimeArray[i] = new StringBuffer("date_trunc('second', TIMESTAMP '").append(sdfTime.format(date)).append("')").toString();
  64. String[] dateValues = sdfDate.format(date).split("-");
  65. lastDateArray[i] = new StringBuffer("make_date(").append(dateValues[0]).append(",").append(dateValues[1]).append(",").append(dateValues[2]).append(")").toString();
  66. longitudeArray[i] = userPosition.getPosition()[0];
  67. latitudeArray[i] = userPosition.getPosition()[1];
  68. }
  69. }
  70. result.add(userArray);
  71. result.add(lastTimeArray);
  72. result.add(lastDateArray);
  73. result.add(longitudeArray);
  74. result.add(latitudeArray);
  75. return result;
  76. }
  77. /**
  78. * 批量更新用户上线状态表
  79. *
  80. * @param list
  81. */
  82. public void batchOnline(List<UserPosition> list) {
  83. if (ObjectUtils.isNotEmpty(list)) {
  84. List paramList = this.parseMultiParams(list);
  85. String userIds = JSONArray.toJSONString(paramList.get(0));
  86. StringBuffer lastTimes = new StringBuffer();
  87. String[] timeList = (String[]) paramList.get(1);
  88. for (String s : timeList) {
  89. if (lastTimes.length() == 0) {
  90. lastTimes.append(s);
  91. } else {
  92. lastTimes.append(",").append(s);
  93. }
  94. }
  95. String[] dateList = (String[]) paramList.get(2);
  96. StringBuffer lastDates = new StringBuffer();
  97. for (String s : dateList) {
  98. if (lastDates.length() == 0) {
  99. lastDates.append(s);
  100. } else {
  101. lastDates.append(",").append(s);
  102. }
  103. }
  104. String longitudes = JSONArray.toJSONString(paramList.get(3));
  105. String latitudes = JSONArray.toJSONString(paramList.get(4));
  106. StringBuffer sb = new StringBuffer(" UPDATE ")
  107. .append(Constants.DB_YW_TABLE_SPACE).append(XH_USER_ONLINE_TB).append(" a ")
  108. .append(" SET ")
  109. .append(" LASTTIME = u.LASTTIME,")
  110. .append(" LAST_DATE = u.LAST_DATE,")
  111. .append(" LONGITUDE = u.LONGITUDE,")
  112. .append(" LATITUDE = u.LATITUDE,")
  113. .append(" IS_ONLINE = 1 ")
  114. .append(" FROM ( SELECT ")
  115. .append(" unnest(array").append(userIds).append(") ").append(" as I_USERID,")
  116. .append(" unnest(array[").append(lastTimes).append("]) ").append(" as LASTTIME,")
  117. .append(" unnest(array[").append(lastDates).append("]) ").append(" as LAST_DATE,")
  118. .append(" unnest(array").append(longitudes).append(") ").append(" as LONGITUDE,")
  119. .append(" unnest(array").append(latitudes).append(") ").append(" as LATITUDE")
  120. .append(" ) as u ")
  121. .append(" WHERE a.I_USERID = u.I_USERID ");
  122. jdbcTemplate.execute(sb.toString());
  123. }
  124. }
  125. /**
  126. * 用户离线状态修改(0:离线 1:在线)
  127. *
  128. * @param userIdList
  129. */
  130. public void updateUserOffline(List<String> userIdList) {
  131. String[] userIds = new String[userIdList.size()];
  132. userIds = userIdList.toArray(userIds);
  133. String filter = LogicUtil.getOrgFilterString(userIds);
  134. String sql = new StringBuffer(" UPDATE ").append(XH_USER_ONLINE_TB).append(" SET IS_ONLINE = 0 WHERE I_USERID IN (").append(filter).append(")").toString();
  135. databaseRepository.execute(XH_USER_ONLINE_TB, sql);
  136. }
  137. }

注意:Postgresql函数日期和时间戳需要特殊处理。

队列数据批量消费

用户在线离线只需要两个队列就可以区分开处理了。

  1. package com.patrol.position.queue;
  2. import com.patrol.beans.user.UserPosition;
  3. import com.patrol.config.condition.ServerCondition;
  4. import com.patrol.position.service.UserOnlineService;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.commons.lang3.ObjectUtils;
  7. import org.apache.commons.lang3.StringUtils;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.stereotype.Component;
  10. import javax.annotation.PostConstruct;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. import java.util.concurrent.ConcurrentLinkedQueue;
  14. /**
  15. * @Copyright: 2019-2021
  16. * @FileName: UpdateStatusOnlineQueue.java
  17. * @Author: PJL
  18. * @Date: 2020/8https://files.jxasp.com/image/20 11:03
  19. * @Description: 在线离线状态列表数据更新队列
  20. */
  21. @Slf4j
  22. @Component
  23. public class UpdateStatusQueue {
  24. @Autowired
  25. UserOnlineService userOnlineService;
  26. /**
  27. * 并发链表队列--在线位置队列
  28. */
  29. private static final ConcurrentLinkedQueue<UserPosition> onlineQueue = new ConcurrentLinkedQueue<>();
  30. /**
  31. * 并发链表队列--离线位置队列
  32. */
  33. private static final ConcurrentLinkedQueue<String> offlineQueue = new ConcurrentLinkedQueue<>();
  34. /**
  35. * 消费用户上线、离线下线任务
  36. */
  37. @PostConstruct
  38. private void consumeUserOnlineStatusQueue() {
  39. if (ServerCondition.isServer) {
  40. log.info(">>>>>>>>启动服务端消费线程....");
  41. /***********用户上线批量消费***********/
  42. this.userOnline();
  43. /***********用户离线状态批量消费***********/
  44. this.userOffline();
  45. log.info(">>>>>>>>启动服务端消费线程....完毕!");
  46. }
  47. }
  48. /**
  49. * 添加在线用户
  50. *
  51. * @param userPosition
  52. */
  53. public static void addToOnlineQueue(UserPosition userPosition) {
  54. if (ServerCondition.isServer) {
  55. onlineQueue.add(userPosition);
  56. }
  57. }
  58. /**
  59. * 添加离线用户
  60. *
  61. * @param userId
  62. */
  63. public static void addToOfflineQueue(String userId) {
  64. if (ServerCondition.isServer) {
  65. offlineQueue.add(userId);
  66. }
  67. }
  68. /**
  69. * 用户上线更新最后一次上线位置批量处理
  70. */
  71. private void userOnline() {
  72. new Thread(new Runnable() {
  73. @Override
  74. public void run() {
  75. while (true) {
  76. int size = onlineQueue.size();
  77. if (size > 0) {
  78. List<UserPosition> list = new ArrayList<>();
  79. UserPosition userPosition;
  80. for (int i = 0; i < size; i++) {
  81. userPosition = onlineQueue.poll();
  82. if (null != userPosition) {
  83. list.add(userPosition);
  84. }
  85. }
  86. if (ObjectUtils.isNotEmpty(list)) {
  87. // 批量上线
  88. userOnlineService.batchOnline(list);
  89. }
  90. }
  91. try {
  92. Thread.sleep(50);
  93. } catch (InterruptedException e) {
  94. e.printStackTrace();
  95. }
  96. }
  97. }
  98. }).start();
  99. }
  100. /**
  101. * 更新用户在线状态表为离线
  102. */
  103. private void userOffline() {
  104. new Thread(new Runnable() {
  105. @Override
  106. public void run() {
  107. while (true) {
  108. int size = offlineQueue.size();
  109. if (size > 0) {
  110. List<String> list = new ArrayList<>();
  111. String userId;
  112. for (int i = 0; i < size; i++) {
  113. userId = offlineQueue.poll();
  114. if (null != userId && StringUtils.isNotEmpty(userId)) {
  115. list.add(userId);
  116. }
  117. }
  118. if (ObjectUtils.isNotEmpty(list)) {
  119. // 批量离线
  120. userOnlineService.updateUserOffline(list);
  121. }
  122. }
  123. try {
  124. Thread.sleep(50);
  125. } catch (InterruptedException e) {
  126. e.printStackTrace();
  127. }
  128. }
  129. }
  130. }).start();
  131. }
  132. }

这里开了两个线程处理,根据用户规模可以改为多线程并发消费(但是请注意控制数据库连接)。

 

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载