目录
前言介绍
系统用户实时位置都保存在redis中,我们采用redis过期方式来监听用户位置离线,离线的用户key会放入对应的队列进行消费(这里的队列可以是Java队列也可以是RabbitMQ之类的消息中间件,我们采用的是ConcurrentLinkedQueue)。项目中对用户实时位置有两个要求(特殊行业对人员位置安全规范的要求):
- a.用户最后一次经纬度实时位置更新到数据库
- b.用户离线后更新用户在线状态
为了解决每次用户位置心跳更新导致数据库连接不足问题,我采用了批量更新的方式来解决大批量的用户上线和离线更新。解决了如下问题:
- a.解决用户状态表统计数据不一致问题
- b.解决数据库更新占用数据库连接过多问题(可实现批量更新上线状态位置和离线状态,最后一次位置保存更新降低数据库压力)
- c.解决redis和database在线状态延迟过大问题
用户数据迁移
针对需要维护最后用户在线位置数据的这类用户,我们有单独用户角色加以限定。这种用户只需要简单将数据做个分表保存起来就可以了。首次初始化这类数据是根据已有用户数据直接采用SQL 支持的SELECT [A,B,....] INTO TABLE_SUB FROM TABLE_MAIN
示例脚本如下:
- -- 注意:======依赖触发器更新用户信息=====
- -- 修改目的:
- -- a.解决用户状态表格统计数据不一致问题
- -- b.解决数据库更新占用数据库连接过多问题(可实现批量更新上线状态位置和离线状态,最后一次位置保存更新降低数据库压力)
- -- c.解决redis和database在线状态延迟过大问题
- -- 可反复执行如下表数据记录
- DROP TABLE if exists xh_yw.xh_user_online_tb ;
-
- SELECT
- -- 组织机构ID
- i_orgid,
- -- 组织机构编号
- c_orgbh,
- -- 组织机构名称
- c_orgname,
- -- 用户ID
- i_userid,
- -- 用户姓名
- c_userealname,
- -- 手机号码
- c_usertel,
- -- 时间戳
- make_timestamp(2020,1,1,0,0,0) AS lasttime,
- -- 最后位置:经度
- 0.0 AS longitude,
- -- 最后位置:纬度
- 0.0 AS latitude,
- -- 是否在线: 0 离线 1 在线
- 0 AS is_online ,
- -- 日期make_date(2020,1,1) AS last_date
- '2020-01-01' AS last_date
-
- INTO xh_yw.xh_user_online_tb
-
- FROM xh_ht.fs_yw_base_user where i_userid
- -- 角色过滤
- IN (select distinct i_userid from xh_ht.fs_yw_user_role where i_roleid = 5 )
用户状态数据批量处理
批量处理都依赖数据库支持的方式。
MySQL参考:https://www.cnblogs.com/mslagee/p/6509682.html
Postgresql参考:https://www.itranslater.com/qa/detailshttps://files.jxasp.com/image/2583251656280376320
Java代码示例:
- package com.patrol.position.service;
-
- import com.alibaba.fastjson.JSONArray;
- import com.forestar.platform.dao.DatabaseRepository;
- import com.patrol.beans.Constants;
- import com.patrol.beans.user.UserPosition;
- import com.patrol.beans.util.LogicUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.ObjectUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.jdbc.core.JdbcTemplate;
- import org.springframework.stereotype.Service;
- import java.text.SimpleDateFormat;
- import java.util.*;
-
- /**
- * @Copyright: 2019-2021
- * @FileName: UserOnlineService.java
- * @Author: PJL
- * @Date: 2020/7/15 19:37
- * @Description: 用户在线中间数据表服务【通过队列方式批量更新】
- */
- @Slf4j
- @Service
- public class UserOnlineService {
-
- /**
- * Redis查询工具模板类
- */
- @Qualifier("redisTemplateByLettuce")
- @Autowired
- RedisTemplate redisTemplate;
-
- @Autowired
- JdbcTemplate jdbcTemplate;
-
- @Autowired
- DatabaseRepository databaseRepository;
-
- SimpleDateFormat sdfTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd");
-
- /**
- * 用户在线位置中间表
- */
- final String XH_USER_ONLINE_TB = "XH_USER_ONLINE_TB";
-
- /**
- * 解析临时表数据多列值
- *
- * @param list
- * @return
- */
- private List parseMultiParams(List<UserPosition> list) {
- List result = new ArrayList();
- int count = list.size();
- Integer[] userArray = new Integer[count];
- String[] lastTimeArray = new String[count];
- String[] lastDateArray = new String[count];
- Double[] longitudeArray = new Double[count];
- Double[] latitudeArray = new Double[count];
- UserPosition userPosition;
- Date date;
- for (int i = 0; i < count; i++) {
- userPosition = list.get(i);
- if (null != userPosition) {
- date = new Date(userPosition.getTimestamp());
- userArray[i] = Integer.valueOf(userPosition.getUserId());
- lastTimeArray[i] = new StringBuffer("date_trunc('second', TIMESTAMP '").append(sdfTime.format(date)).append("')").toString();
- String[] dateValues = sdfDate.format(date).split("-");
- lastDateArray[i] = new StringBuffer("make_date(").append(dateValues[0]).append(",").append(dateValues[1]).append(",").append(dateValues[2]).append(")").toString();
- longitudeArray[i] = userPosition.getPosition()[0];
- latitudeArray[i] = userPosition.getPosition()[1];
- }
- }
- result.add(userArray);
- result.add(lastTimeArray);
- result.add(lastDateArray);
- result.add(longitudeArray);
- result.add(latitudeArray);
- return result;
- }
-
-
- /**
- * 批量更新用户上线状态表
- *
- * @param list
- */
- public void batchOnline(List<UserPosition> list) {
- if (ObjectUtils.isNotEmpty(list)) {
- List paramList = this.parseMultiParams(list);
- String userIds = JSONArray.toJSONString(paramList.get(0));
- StringBuffer lastTimes = new StringBuffer();
- String[] timeList = (String[]) paramList.get(1);
- for (String s : timeList) {
- if (lastTimes.length() == 0) {
- lastTimes.append(s);
- } else {
- lastTimes.append(",").append(s);
- }
- }
- String[] dateList = (String[]) paramList.get(2);
- StringBuffer lastDates = new StringBuffer();
- for (String s : dateList) {
- if (lastDates.length() == 0) {
- lastDates.append(s);
- } else {
- lastDates.append(",").append(s);
- }
- }
- String longitudes = JSONArray.toJSONString(paramList.get(3));
- String latitudes = JSONArray.toJSONString(paramList.get(4));
- StringBuffer sb = new StringBuffer(" UPDATE ")
- .append(Constants.DB_YW_TABLE_SPACE).append(XH_USER_ONLINE_TB).append(" a ")
- .append(" SET ")
- .append(" LASTTIME = u.LASTTIME,")
- .append(" LAST_DATE = u.LAST_DATE,")
- .append(" LONGITUDE = u.LONGITUDE,")
- .append(" LATITUDE = u.LATITUDE,")
- .append(" IS_ONLINE = 1 ")
- .append(" FROM ( SELECT ")
- .append(" unnest(array").append(userIds).append(") ").append(" as I_USERID,")
- .append(" unnest(array[").append(lastTimes).append("]) ").append(" as LASTTIME,")
- .append(" unnest(array[").append(lastDates).append("]) ").append(" as LAST_DATE,")
- .append(" unnest(array").append(longitudes).append(") ").append(" as LONGITUDE,")
- .append(" unnest(array").append(latitudes).append(") ").append(" as LATITUDE")
- .append(" ) as u ")
- .append(" WHERE a.I_USERID = u.I_USERID ");
- jdbcTemplate.execute(sb.toString());
- }
-
- }
-
- /**
- * 用户离线状态修改(0:离线 1:在线)
- *
- * @param userIdList
- */
- public void updateUserOffline(List<String> userIdList) {
- String[] userIds = new String[userIdList.size()];
- userIds = userIdList.toArray(userIds);
- String filter = LogicUtil.getOrgFilterString(userIds);
- String sql = new StringBuffer(" UPDATE ").append(XH_USER_ONLINE_TB).append(" SET IS_ONLINE = 0 WHERE I_USERID IN (").append(filter).append(")").toString();
- databaseRepository.execute(XH_USER_ONLINE_TB, sql);
- }
- }
注意:Postgresql函数日期和时间戳需要特殊处理。
队列数据批量消费
用户在线离线只需要两个队列就可以区分开处理了。
- package com.patrol.position.queue;
-
- import com.patrol.beans.user.UserPosition;
- import com.patrol.config.condition.ServerCondition;
- import com.patrol.position.service.UserOnlineService;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.ObjectUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.ConcurrentLinkedQueue;
-
- /**
- * @Copyright: 2019-2021
- * @FileName: UpdateStatusOnlineQueue.java
- * @Author: PJL
- * @Date: 2020/8https://files.jxasp.com/image/20 11:03
- * @Description: 在线离线状态列表数据更新队列
- */
- @Slf4j
- @Component
- public class UpdateStatusQueue {
-
- @Autowired
- UserOnlineService userOnlineService;
-
- /**
- * 并发链表队列--在线位置队列
- */
- private static final ConcurrentLinkedQueue<UserPosition> onlineQueue = new ConcurrentLinkedQueue<>();
-
- /**
- * 并发链表队列--离线位置队列
- */
- private static final ConcurrentLinkedQueue<String> offlineQueue = new ConcurrentLinkedQueue<>();
-
- /**
- * 消费用户上线、离线下线任务
- */
- @PostConstruct
- private void consumeUserOnlineStatusQueue() {
- if (ServerCondition.isServer) {
- log.info(">>>>>>>>启动服务端消费线程....");
- /***********用户上线批量消费***********/
- this.userOnline();
-
- /***********用户离线状态批量消费***********/
- this.userOffline();
-
- log.info(">>>>>>>>启动服务端消费线程....完毕!");
- }
- }
-
-
- /**
- * 添加在线用户
- *
- * @param userPosition
- */
- public static void addToOnlineQueue(UserPosition userPosition) {
- if (ServerCondition.isServer) {
- onlineQueue.add(userPosition);
- }
- }
-
- /**
- * 添加离线用户
- *
- * @param userId
- */
- public static void addToOfflineQueue(String userId) {
- if (ServerCondition.isServer) {
- offlineQueue.add(userId);
- }
- }
-
- /**
- * 用户上线更新最后一次上线位置批量处理
- */
- private void userOnline() {
- new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
- int size = onlineQueue.size();
- if (size > 0) {
- List<UserPosition> list = new ArrayList<>();
- UserPosition userPosition;
- for (int i = 0; i < size; i++) {
- userPosition = onlineQueue.poll();
- if (null != userPosition) {
- list.add(userPosition);
- }
- }
- if (ObjectUtils.isNotEmpty(list)) {
- // 批量上线
- userOnlineService.batchOnline(list);
- }
- }
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }).start();
- }
-
- /**
- * 更新用户在线状态表为离线
- */
- private void userOffline() {
- new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
- int size = offlineQueue.size();
- if (size > 0) {
- List<String> list = new ArrayList<>();
- String userId;
- for (int i = 0; i < size; i++) {
- userId = offlineQueue.poll();
- if (null != userId && StringUtils.isNotEmpty(userId)) {
- list.add(userId);
- }
- }
- if (ObjectUtils.isNotEmpty(list)) {
- // 批量离线
- userOnlineService.updateUserOffline(list);
- }
- }
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }).start();
- }
- }
这里开了两个线程处理,根据用户规模可以改为多线程并发消费(但是请注意控制数据库连接)。