关键词搜索

源码搜索 ×
×

JavaScript WebSocket实现长连接通信连接重连

发布2018-11-22浏览7454次

详情内容

JavaScript-JS依靠WebSoket也可以像其他语言一样能够实现通信代码,不过使用的时候需要注意检查连接断开的情况。最基本的必须保证IP和端口都可以访问。

目录

WebSocket代码及测试

调用初始化代码

通信示例代码

测试效果

WebSocket连接关闭重连处理

连接断开问题

代码实现重连

重连效果


WebSocket代码及测试

调用初始化代码

  1. CvNetVideo.Websocket = new WebSocket('ws://' + websocetHost + ":" + websocketPort);
  2. CvNetVideo.WebsocketCallbackLoader = new WebsocketCallbackLoader();
  3. CvNetVideo.WebsocketCallbackLoader.connect(CvNetVideo.Websocket);

通信示例代码

  1. /*
  2. * WebsocketCallback Loader
  3. */
  4. import JT1078 from '../JX/JTT1078';
  5. class WebsocketCallbackLoader {
  6. constructor() {
  7. this.websocket = undefined;
  8. this.onCallback = undefined;
  9. this.sendMessage = this.sendMessage;
  10. this.medisPackageAnalysis = new JT1078.MedisPackageAnalysis();
  11. this.splitData = new JT1078.SplitData();
  12. this.checkHelper = new JT1078.CheckHelper();
  13. //分包缓存数据 按SIM做key
  14. this.AllSimPackData = {};
  15. }
  16. connect(websocket) {
  17. this.websocket = websocket;
  18. this.websocket.onopen = this.openSocket.bind(this);
  19. this.websocket.onclose = this.closeSocket.bind(this);
  20. this.websocket.binaryType = 'arraybuffer';
  21. this.websocket.onmessage = this.receiveSocketMessage.bind(this);
  22. }
  23. closeSocket(e) {
  24. console.log('Websocket808 Disconnected!');
  25. }
  26. openSocket() {
  27. console.log('Websocket808 Open!');
  28. }
  29. receiveSocketMessage(event) {
  30. let data = new Uint8Array(event.data);
  31. this.splitData.SplitDataBy7E(data, 18, data.byteLength - 18, (bGps) => {
  32. if (bGps.length > 11 && this.checkHelper.CheckXOR(bGps, 0, bGps.length - 1) === bGps[bGps.length - 1])//效验通过
  33. {
  34. this.JX808DataPD(new Uint8Array(bGps));
  35. }
  36. });
  37. }
  38. JX808DataPD(bGps) {
  39. let head = new JT1078.JTHeader();
  40. head.FillByBinary(bGps, 0);
  41. if (head.PackInfo) {
  42. var sid = JT1078.GetFirstSerialNumber(head);
  43. let AllPackData = AllSimPackData[head.Sim];
  44. if (!AllPackData) {
  45. AllPackData = {};
  46. AllSimPackData[head.Sim] = AllPackData;
  47. }
  48. var pd = AllPackData[sid];
  49. if (pd) {
  50. if (!pd.PackData[head.PackInfo.Index]) {
  51. pd.PackData.Count += 1;
  52. }
  53. pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
  54. pd.LastRTime = DateTime.Now;
  55. if (this.CheckPackByPD(pd))
  56. AllPackData[sid] = null;
  57. }
  58. else {
  59. pd = {};
  60. pd.PackData = {};
  61. pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
  62. pd.PackData.Count = 1;
  63. pd.Head = new JT1078.JTHeader(head.MsgId, head.Sub, head.DataEncryptType, head.MsgLen, head.Sim, sid)
  64. pd.Head.PackInfo = new JT1078.JTPackageInfo(1, head.PackInfo.Sum);
  65. pd.StartTime = pd.LastRTime = DateTime.Now;
  66. AllPackData[sid] = pd;
  67. }
  68. } else {
  69. this.JX808Data(head, JT1078.GetMsbBody(head, bGps));
  70. }
  71. }
  72. CheckPackByPD(pd) {
  73. if (pd.PackData.Count === pd.Head.PackInfo.Sum)//判断数据包数和总包数是否相等
  74. {
  75. let tmpbuf = new Uint8Array();
  76. let index = 0;
  77. for (var i = 0; i < pd.Head.PackInfo.Sum; i++) {
  78. tmpbuf.set(pd.PackData[i], index);
  79. index += pd.PackData[i].byteLength;
  80. }
  81. //解析失败时返回true防止一直解析此错误数据
  82. try {
  83. this.JX808Data(pd.Head, tmpbuf);
  84. }
  85. catch (ex)
  86. {
  87. console.error(ex);
  88. return true;
  89. }
  90. return true;
  91. }
  92. return false;
  93. }
  94. JX808Data(head, bts) {
  95. switch (head.MsgId) {
  96. case 0x1205://终端上传音视频资源列表数据格式
  97. var videolist = new JT1078.JTVideoListInfo();
  98. videolist.FillByBinary(bts, 0);
  99. this.onCallback(videolist);
  100. break;
  101. case 0x1206://终端上传FTP完成通知
  102. break;
  103. default:
  104. }
  105. }
  106. sendMessage(data, callback) {
  107. this.onCallback = callback;
  108. switch (this.websocket.readyState) {
  109. case 0:
  110. console.log("Websocket808 readyState=CONNECTING");
  111. break;
  112. case 1:
  113. this.websocket.send(data);
  114. break;
  115. case 2:
  116. console.log("Websocket808 readyState=CLOSING");
  117. break;
  118. case 3:
  119. console.log("Websocket808 readyState=CLOSED");
  120. break;
  121. }
  122. }
  123. }
  124. export default WebsocketCallbackLoader;

测试效果

WebSocket连接关闭重连处理

连接断开问题

代码实现重连

改造一下调用代码和发送消息代码:

  1. static initWebsocket(websocetHost, websocketPort) {
  2. CvNetVideo.WebsocketCallbackLoader = new WebsocketCallbackLoader(websocetHost, websocketPort);
  3. CvNetVideo.WebsocketCallbackLoader.connect();
  4. }
  1. /*
  2. * WebsocketCallback Loader
  3. */
  4. import JT1078 from '../JX/JTT1078';
  5. import { setTimeout } from 'timers';
  6. class WebsocketCallbackLoader {
  7. constructor(host, port) {
  8. this.host = host;
  9. this.port = port;
  10. this.websocket = undefined;
  11. this.onCallback = undefined;
  12. this.sendMessage = this.sendMessage;
  13. this.medisPackageAnalysis = new JT1078.MedisPackageAnalysis();
  14. this.splitData = new JT1078.SplitData();
  15. this.checkHelper = new JT1078.CheckHelper();
  16. //分包缓存数据 按SIM做key
  17. this.AllSimPackData = {};
  18. }
  19. connect() {
  20. var url = "ws://" + this.host + ":" + this.port;
  21. console.log("Websocket808 WebSocket connect ..." + url);
  22. this.websocket = new WebSocket(url);
  23. this.websocket.onopen = this.openSocket.bind(this);
  24. this.websocket.onclose = this.closeSocket.bind(this);
  25. this.websocket.binaryType = 'arraybuffer';
  26. this.websocket.onmessage = this.receiveSocketMessage.bind(this);
  27. }
  28. closeSocket(e) {
  29. console.log('Websocket808 Disconnected!');
  30. }
  31. openSocket() {
  32. console.log('Websocket808 Open!');
  33. }
  34. receiveSocketMessage(event) {
  35. let data = new Uint8Array(event.data);
  36. this.splitData.SplitDataBy7E(data, 18, data.byteLength - 18, (bGps) => {
  37. if (bGps.length > 11 && this.checkHelper.CheckXOR(bGps, 0, bGps.length - 1) === bGps[bGps.length - 1])//效验通过
  38. {
  39. this.JX808DataPD(new Uint8Array(bGps));
  40. }
  41. });
  42. }
  43. JX808DataPD(bGps) {
  44. let head = new JT1078.JTHeader();
  45. head.FillByBinary(bGps, 0);
  46. if (head.PackInfo) {
  47. var sid = JT1078.GetFirstSerialNumber(head);
  48. let AllPackData = AllSimPackData[head.Sim];
  49. if (!AllPackData) {
  50. AllPackData = {};
  51. AllSimPackData[head.Sim] = AllPackData;
  52. }
  53. var pd = AllPackData[sid];
  54. if (pd) {
  55. if (!pd.PackData[head.PackInfo.Index]) {
  56. pd.PackData.Count += 1;
  57. }
  58. pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
  59. pd.LastRTime = DateTime.Now;
  60. if (this.CheckPackByPD(pd))
  61. AllPackData[sid] = null;
  62. }
  63. else {
  64. pd = {};
  65. pd.PackData = {};
  66. pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
  67. pd.PackData.Count = 1;
  68. pd.Head = new JT1078.JTHeader(head.MsgId, head.Sub, head.DataEncryptType, head.MsgLen, head.Sim, sid)
  69. pd.Head.PackInfo = new JT1078.JTPackageInfo(1, head.PackInfo.Sum);
  70. pd.StartTime = pd.LastRTime = DateTime.Now;
  71. AllPackData[sid] = pd;
  72. }
  73. } else {
  74. this.JX808Data(head, JT1078.GetMsbBody(head, bGps));
  75. }
  76. }
  77. CheckPackByPD(pd) {
  78. if (pd.PackData.Count === pd.Head.PackInfo.Sum)//判断数据包数和总包数是否相等
  79. {
  80. let tmpbuf = new Uint8Array();
  81. let index = 0;
  82. for (var i = 0; i < pd.Head.PackInfo.Sum; i++) {
  83. tmpbuf.set(pd.PackData[i], index);
  84. index += pd.PackData[i].byteLength;
  85. }
  86. //解析失败时返回true防止一直解析此错误数据
  87. try {
  88. this.JX808Data(pd.Head, tmpbuf);
  89. }
  90. catch (ex)
  91. {
  92. console.error(ex);
  93. return true;
  94. }
  95. return true;
  96. }
  97. return false;
  98. }
  99. JX808Data(head, bts) {
  100. switch (head.MsgId) {
  101. case 0x1205://终端上传音视频资源列表数据格式
  102. var videolist = new JT1078.JTVideoListInfo();
  103. videolist.FillByBinary(bts, 0);
  104. this.onCallback(videolist);
  105. break;
  106. case 0x1206://终端上传FTP完成通知
  107. break;
  108. default:
  109. }
  110. }
  111. printState() {
  112. switch (this.websocket.readyState) {
  113. case 0:
  114. console.log("Websocket808 readyState=CONNECTING");
  115. break;
  116. case 1:
  117. console.log("Websocket808 readyState=OPEN");
  118. break;
  119. case 2:
  120. console.log("Websocket808 readyState=CLOSING");
  121. break;
  122. case 3:
  123. console.log("Websocket808 readyState=CLOSED");
  124. break;
  125. }
  126. }
  127. sendData(data)
  128. {
  129. this.websocket.send(data);
  130. }
  131. sendMessage(data, callback) {
  132. this.onCallback = callback;
  133. this.printState();
  134. if (this.websocket.readyState === 0) {
  135. setTimeout(() => {
  136. this.sendData(data);
  137. }, 3000);
  138. } else if (this.websocket.readyState === 2 || this.websocket.readyState === 3) {
  139. this.connect();
  140. setTimeout(() => {
  141. this.printState();
  142. this.sendData(data);
  143. }, 3000);
  144. } else {
  145. this.sendData(data);
  146. }
  147. }
  148. }
  149. export default WebsocketCallbackLoader;

重连效果

 下面是连接断开后重连效果:

注意:重连是异步调用的,使用setTimeout(func,ms) 调用,因为连接创建需要时间,直接连很有可能正在连接。

使用队列处理连接失败数据

  1. /*
  2. * WebsocketCallback Loader
  3. */
  4. import JT1078 from '../JX/JTT1078';
  5. import Queue from '../utils/queue';
  6. class WebsocketCallbackLoader{
  7. constructor(host, port) {
  8. this.host = host;
  9. this.port = port;
  10. this.websocket = undefined;
  11. this.medisPackageAnalysis = new JT1078.MedisPackageAnalysis();
  12. this.splitData = new JT1078.SplitData();
  13. this.checkHelper = new JT1078.CheckHelper();
  14. //分包缓存数据 按SIM做key
  15. this.AllSimPackData = {};
  16. this.sendCache = new Queue();
  17. this.onCallback = {};
  18. }
  19. connect() {
  20. var url = "ws://" + this.host + ":" + this.port;
  21. this.websocket = new WebSocket(url);
  22. this.websocket.onopen = this.openSocket.bind(this);
  23. this.websocket.onclose = this.closeSocket.bind(this);
  24. this.websocket.binaryType = 'arraybuffer';
  25. this.websocket.onmessage = this.receiveSocketMessage.bind(this);
  26. }
  27. closeSocket(e) {
  28. console.log('Websocket808 Disconnected!');
  29. }
  30. openSocket() {
  31. console.log('Websocket808 Open!');
  32. while (this.sendCache.size() > 0) {
  33. this.sendData(this.sendCache.dequeue());
  34. }
  35. }
  36. receiveSocketMessage(event) {
  37. let data = new Uint8Array(event.data);
  38. this.splitData.SplitDataBy7E(data, 18, data.byteLength - 18, (bGps) => {
  39. if (bGps.length > 11 && this.checkHelper.CheckXOR(bGps, 0, bGps.length - 1) === bGps[bGps.length - 1])//效验通过
  40. {
  41. this.JX808DataPD(new Uint8Array(bGps));
  42. }
  43. });
  44. }
  45. JX808DataPD(bGps) {
  46. let head = new JT1078.JTHeader();
  47. head.FillByBinary(bGps, 0);
  48. if (head.PackInfo) {
  49. var sid = JT1078.GetFirstSerialNumber(head);
  50. let AllPackData = this.AllSimPackData[head.Sim];
  51. if (!AllPackData) {
  52. AllPackData = {};
  53. this.AllSimPackData[head.Sim] = AllPackData;
  54. }
  55. var pd = AllPackData[sid];
  56. if (pd) {
  57. if (!pd.PackData[head.PackInfo.Index]) {
  58. pd.PackData.Count += 1;
  59. pd.PackData.totalLength += head.MsgLen;
  60. }
  61. pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
  62. pd.LastRTime = new Date();
  63. if (this.CheckPackByPD(pd))
  64. AllPackData[sid] = null;
  65. }
  66. else {
  67. pd = {};
  68. pd.PackData = {};
  69. pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
  70. pd.PackData.Count = 1;
  71. pd.PackData.totalLength = head.MsgLen;
  72. pd.Head = new JT1078.JTHeader(head.MsgId, head.Sub, head.DataEncryptType, head.MsgLen, head.Sim, sid);
  73. pd.Head.PackInfo = new JT1078.JTPackageInfo(1, head.PackInfo.Sum);
  74. pd.StartTime = pd.LastRTime = new Date();
  75. AllPackData[sid] = pd;
  76. }
  77. } else {
  78. this.JX808Data(head, JT1078.GetMsbBody(head, bGps));
  79. }
  80. }
  81. CheckPackByPD(pd) {
  82. if (pd.PackData.Count === pd.Head.PackInfo.Sum)//判断数据包数和总包数是否相等
  83. {
  84. let tmpbuf = new Uint8Array(pd.PackData.totalLength);
  85. let index = 0;
  86. for (var i = 1; i <= pd.Head.PackInfo.Sum; i++) {
  87. tmpbuf.set(pd.PackData[i], index);
  88. index += pd.PackData[i].byteLength;
  89. }
  90. //解析失败时返回true防止一直解析此错误数据
  91. try {
  92. this.JX808Data(pd.Head, tmpbuf);
  93. }
  94. catch (ex) {
  95. console.error(ex);
  96. return true;
  97. }
  98. return true;
  99. }
  100. return false;
  101. }
  102. JX808Data(head, bts) {
  103. switch (head.MsgId) {
  104. case 0x1205://终端上传音视频资源列表数据格式
  105. var videolist = new JT1078.JTVideoListInfo();
  106. videolist.FillByBinary(bts, 0);
  107. this.onCallback[0x1205](videolist);
  108. break;
  109. case 0x1206://终端上传FTP完成通知
  110. var fileUploadEndInform = new JT1078.JTVideoFileUploadEndInform();
  111. fileUploadEndInform.FillByBinary(bts, 0);
  112. this.onCallback[0x1206](fileUploadEndInform);
  113. break;
  114. default:
  115. }
  116. }
  117. printState() {
  118. switch (this.websocket.readyState) {
  119. case 0:
  120. console.log("Websocket808 readyState=CONNECTING");
  121. break;
  122. case 1:
  123. console.log("Websocket808 readyState=OPEN");
  124. break;
  125. case 2:
  126. console.log("Websocket808 readyState=CLOSING");
  127. break;
  128. case 3:
  129. console.log("Websocket808 readyState=CLOSED");
  130. break;
  131. }
  132. }
  133. sendData(data)
  134. {
  135. this.websocket.send(data);
  136. }
  137. sendMessage(data, callback, id) {
  138. this.onCallback[id] = callback;
  139. if (this.websocket.readyState === 0) {
  140. this.sendCache.enqueue(data);
  141. } else if (this.websocket.readyState === 2 || this.websocket.readyState === 3) {
  142. this.printState();
  143. this.sendCache.enqueue(data);
  144. this.connect();
  145. } else {
  146. this.sendData(data);
  147. }
  148. }
  149. }
  150. export default WebsocketCallbackLoader;

 

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载