关键词搜索

源码搜索 ×
×

C#编写的Socket客户端通道发送队列

发布2018-01-08浏览2967次

详情内容

C#编写的Socket客户端通道发送队列:Socket、Channel、Queue这里就不多做介绍了,本篇旨在实现一个Socket管理通道的队列服务。

创建一个Channel

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Net.Sockets;
  6. using SQ.Base;
  7. using System.Net;
  8. namespace Network
  9. {
  10. public delegate void EventChannelReceive(object sender, ChannelReceiveArg arg);
  11. public delegate void EventChannelSend(object sender, ChannelSendArg arg);
  12. public delegate void EventChannelConnect(object sender, ChannelConnectArg arg);
  13. public delegate void EventChannelDispose(object sender, ChannelDisposeArg arg);
  14. public delegate void EventChannelError(object sender, ChannelErrorArg arg);
  15. /// <summary>
  16. /// 连接通道
  17. /// </summary>
  18. public abstract class Channel
  19. {
  20. #region 属性
  21. private ChannelQueue SendQ;
  22. /// <summary>
  23. /// 套接字
  24. /// </summary>
  25. public Socket Socket { get; protected set; }
  26. /// <summary>
  27. /// 远程端口
  28. /// </summary>
  29. public int RemotePort { get; set; }
  30. /// <summary>
  31. /// 远程IP地址
  32. /// </summary>
  33. public string RemoteHost { get; set; }
  34. /// <summary>
  35. /// 通道状态
  36. /// </summary>
  37. public ChannelState ChannelState { get; protected set; }
  38. /// <summary>
  39. /// 用户标识对象
  40. /// </summary>
  41. public object Tag { get; set; }
  42. /// <summary>
  43. /// 数据发送完成委托
  44. /// </summary>
  45. public EventChannelSend DataSend;
  46. /// <summary>
  47. /// 收到数据委托
  48. /// </summary>
  49. public EventChannelReceive DataReceive;
  50. /// <summary>
  51. /// 通道错误事件
  52. /// </summary>
  53. public event EventChannelError ChannelError;
  54. /// <summary>
  55. /// 通道断开事件
  56. /// </summary>
  57. public event EventChannelDispose ChannelDispose;
  58. /// <summary>
  59. /// 通道连接事件
  60. /// </summary>
  61. public event EventChannelConnect ChannelConnect;
  62. #endregion
  63. #region 公共方法
  64. /// <summary>
  65. /// 用已连接套接字构造通道
  66. /// </summary>
  67. /// <param name="socket">连接套接字</param>
  68. public Channel(Socket socket)
  69. {
  70. Socket = socket;
  71. IPEndPoint endPoint = Socket.RemoteEndPoint as IPEndPoint;
  72. RemoteHost = endPoint.Address.ToString();
  73. RemotePort = endPoint.Port;
  74. SendQ = Utils.GetSendQueue().AddChannelQueue(this);
  75. }
  76. /// <summary>
  77. /// 用IP地址和端口号构造通道
  78. /// </summary>
  79. /// <param name="remoteHost">远程IP地址</param>
  80. /// <param name="remotePort">端口号</param>
  81. public Channel(string remoteHost, int remotePort)
  82. {
  83. RemoteHost = remoteHost;
  84. RemotePort = remotePort;
  85. SendQ = Utils.GetSendQueue().AddChannelQueue(this);
  86. }
  87. /// <summary>
  88. /// 释放通道
  89. /// </summary>
  90. public virtual void Close()
  91. {
  92. SendQ.RemoveChannelQueue();
  93. }
  94. /// <summary>
  95. /// 连接
  96. /// </summary>
  97. public abstract void Connect();
  98. /// <summary>
  99. /// 抛送异步请求(Server类中创建通道之后调用,主动连接则不用调用)
  100. /// </summary>
  101. public abstract void StartReceiveAsync();
  102. /// <summary>
  103. /// 发送数据
  104. /// </summary>
  105. /// <param name="data">数据</param>
  106. /// <param name="token">令牌.数据发送完成后在发送数据参数中返回</param>
  107. internal void Send(byte[] data, object token = null)
  108. {
  109. Send(data, 0, data.Length, token);
  110. }
  111. /// <summary>
  112. /// 发送数据
  113. /// </summary>
  114. /// <param name="data">数据</param>
  115. /// <param name="offset">偏移量</param>
  116. /// <param name="count">长度</param>
  117. /// <param name="token">令牌.数据发送完成后在发送数据参数中返回</param>
  118. internal abstract void Send(byte[] data, int offset, int count, object token = null);
  119. public void SendAsync(byte[] data)
  120. {
  121. if (SendQ.DataQueue.Count > 500) SQ.Base.Log.WriteLog4("SendQ队列长度:" + SendQ.DataQueue.Count.ToString());
  122. SendQ.Enqueue(data);
  123. }
  124. #endregion
  125. #region 事件处理
  126. /// <summary>
  127. /// 通道连接事件处理
  128. /// </summary>
  129. /// <param name="sender"></param>
  130. /// <param name="arg"></param>
  131. protected virtual void OnConnect(object sender, ChannelConnectArg arg)
  132. {
  133. #if FUNCINLOG
  134. Log.WriteLog4("Channel.OnConnect(sender, arg) in.", LOGTYPE.DEBUG);
  135. #endif
  136. if (ChannelConnect != null)
  137. {
  138. try
  139. {
  140. ChannelConnect(sender, arg);
  141. }
  142. catch (Exception ex)
  143. {
  144. Log.WriteLog4Ex("Network Channel event error", ex);
  145. }
  146. }
  147. #if FUNCOUTLOG
  148. Log.WriteLog4("Channel.OnConnect(sender, arg) out.", LOGTYPE.DEBUG);
  149. #endif
  150. }
  151. /// <summary>
  152. /// 通道释放事件处理
  153. /// </summary>
  154. /// <param name="sender"></param>
  155. /// <param name="arg"></param>
  156. protected virtual void OnDispose(object sender, ChannelDisposeArg arg)
  157. {
  158. #if FUNCINLOG
  159. Log.WriteLog4("Channel.OnDispose(sender, arg) in.", LOGTYPE.DEBUG);
  160. #endif
  161. if (ChannelDispose != null)
  162. {
  163. try
  164. {
  165. ChannelDispose(sender, arg);
  166. }
  167. catch (Exception ex)
  168. {
  169. Log.WriteLog4Ex("Network Channel event error", ex);
  170. }
  171. }
  172. #if FUNCOUTLOG
  173. Log.WriteLog4("Channel.OnDispose(sender, arg) out.", LOGTYPE.DEBUG);
  174. #endif
  175. }
  176. /// <summary>
  177. /// 通道出错事件处理
  178. /// </summary>
  179. /// <param name="sender"></param>
  180. /// <param name="arg"></param>
  181. protected virtual void OnError(object sender, ChannelErrorArg arg)
  182. {
  183. #if FUNCINLOG
  184. Log.WriteLog4("Channel.OnError(sender, arg) in.", LOGTYPE.DEBUG);
  185. #endif
  186. if (ChannelError != null)
  187. {
  188. try
  189. {
  190. ChannelError(sender, arg);
  191. }
  192. catch (Exception ex)
  193. {
  194. Log.WriteLog4Ex("Network Channel event error", ex);
  195. }
  196. }
  197. #if FUNCOUTLOG
  198. Log.WriteLog4("Channel.OnError(sender, arg) out.", LOGTYPE.DEBUG);
  199. #endif
  200. }
  201. /// <summary>
  202. /// 通道收到数据事件处理
  203. /// </summary>
  204. /// <param name="sender"></param>
  205. /// <param name="arg"></param>
  206. protected virtual void OnReceive(object sender, ChannelReceiveArg arg)
  207. {
  208. #if FUNCINLOG
  209. Log.WriteLog4("Channel.OnReceive(sender, arg) in.", LOGTYPE.DEBUG);
  210. #endif
  211. if (DataReceive != null)
  212. {
  213. try
  214. {
  215. DataReceive?.Invoke(sender, arg);
  216. }
  217. catch (Exception ex)
  218. {
  219. Log.WriteLog4Ex("Network Channel event error", ex);
  220. }
  221. }
  222. #if FUNCOUTLOG
  223. Log.WriteLog4("Channel.OnReceive(sender, arg) out.", LOGTYPE.DEBUG);
  224. #endif
  225. }
  226. /// <summary>
  227. /// 通道数据发送完成事件处理
  228. /// </summary>
  229. /// <param name="sender"></param>
  230. /// <param name="arg"></param>
  231. protected virtual void OnSend(object sender, ChannelSendArg arg)
  232. {
  233. #if FUNCINLOG
  234. Log.WriteLog4("Channel.OnSend(sender, arg) in.", LOGTYPE.DEBUG);
  235. #endif
  236. if (DataSend != null)
  237. {
  238. try
  239. {
  240. DataSend(sender, arg);
  241. }
  242. catch (Exception ex)
  243. {
  244. Log.WriteLog4Ex("Network Channel event error", ex);
  245. }
  246. }
  247. #if FUNCOUTLOG
  248. Log.WriteLog4("Channel.OnSend(sender, arg) out.", LOGTYPE.DEBUG);
  249. #endif
  250. }
  251. #endregion
  252. }
  253. }

创建一个TCPChannel

TCPChannel继承自Channel.

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Net.Sockets;
  6. using System.Net;
  7. using System.Threading;
  8. using SQ.Base;
  9. namespace Network
  10. {
  11. /// <summary>
  12. /// TCP通道
  13. /// </summary>
  14. /// <remarks>内部方法注释请参照基类</remarks>
  15. public class TCPChannel : Channel
  16. {
  17. /// <summary>
  18. /// 已启动接收
  19. /// </summary>
  20. private bool IsStartReceive;
  21. #region 属性
  22. public bool Connected
  23. {
  24. get
  25. {
  26. if (Socket != null && Socket.Connected)
  27. return true;
  28. else
  29. return false;
  30. }
  31. }
  32. #endregion
  33. #region 公共方法
  34. /// <summary>
  35. /// 使用远程IP地址和端口号创建TCP通道
  36. /// </summary>
  37. /// <param name="remoteHost"></param>
  38. /// <param name="remotePort"></param>
  39. public TCPChannel(string remoteHost, int remotePort)
  40. : base(remoteHost, remotePort)
  41. {
  42. }
  43. /// <summary>
  44. /// 使用已连接套接字创建TCP通道
  45. /// </summary>
  46. /// <param name="socket">已连接套接字</param>
  47. public TCPChannel(Socket socket)
  48. : base(socket)
  49. {
  50. }
  51. public override void StartReceiveAsync()
  52. {
  53. if (IsStartReceive)
  54. {
  55. return;
  56. }
  57. IsStartReceive = true;
  58. SocketAsyncEventArgs e = Managers.SocketArgManager.Allocate(true);
  59. e.Completed += IO_Completed;
  60. DateTime dt = DateTime.Now;
  61. while (!Connected)
  62. {
  63. if (SQ.Base.DateTimeHelper.DiffNowSec(dt) > 10)
  64. {
  65. e.Completed -= IO_Completed;
  66. Managers.SocketArgManager.Free(e, true);
  67. e.SocketError = SocketError.ConnectionAborted;
  68. //如果10秒都未连接成功
  69. throw new Exception("等待连接超时!");
  70. }
  71. else
  72. {
  73. Thread.Sleep(100);
  74. }
  75. }
  76. while (!Socket.ReceiveAsync(e))//如果同步接收完成,直接执行,不用等待IO_Completed
  77. {
  78. if (e.SocketError == SocketError.Success && e.BytesTransferred > 0)
  79. {
  80. byte[] temp = new byte[e.BytesTransferred];
  81. Array.Copy(e.Buffer, e.Offset, temp, 0, e.BytesTransferred);
  82. OnReceive(this, new ChannelReceiveArg(temp));
  83. }
  84. else if (e.BytesTransferred == 0)//通道断开
  85. {
  86. e.Completed -= IO_Completed;
  87. Managers.SocketArgManager.Free(e, true);
  88. OnDispose(this, new ChannelDisposeArg(this, e.SocketError));
  89. break;
  90. }
  91. else//通道出错
  92. {
  93. e.Completed -= IO_Completed;
  94. Managers.SocketArgManager.Free(e, true);
  95. OnError(this, new ChannelErrorArg(this, e.SocketError));
  96. break;
  97. }
  98. }
  99. }
  100. public override void Connect()
  101. {
  102. #if FUNCINLOG
  103. Log.WriteLog4("TCPChannel.Connect() in.", LOGTYPE.DEBUG);
  104. #endif
  105. Socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  106. SocketAsyncEventArgs e = Managers.SocketArgManager.Allocate(false);
  107. e.RemoteEndPoint = Utils.GetEndPoint(RemoteHost, RemotePort);
  108. //new IPEndPoint(IPAddress.Parse(RemoteHost), RemotePort);
  109. e.Completed += IO_Completed;
  110. IsStartReceive = false;
  111. if (!Socket.ConnectAsync(e))
  112. {
  113. OnConnect(e);
  114. }
  115. #if FUNCOUTLOG
  116. Log.WriteLog4("TCPChannel.Connect() out.", LOGTYPE.DEBUG);
  117. #endif
  118. }
  119. //public override void Connect(int timeout) {
  120. public void Send(byte[] data)
  121. {
  122. Send(data, 0, data.Length);
  123. }
  124. //}
  125. /// <summary>
  126. /// 是否存在乱序问题?发送完成事件肯定不能保证顺序
  127. /// </summary>
  128. /// <param name="data"></param>
  129. /// <param name="offset"></param>
  130. /// <param name="count"></param>
  131. /// <param name="token"></param>
  132. internal override void Send(byte[] data, int offset, int count, object token = null)
  133. {
  134. if (data == null)
  135. {
  136. Log.WriteLog4("[Send]data=null", null, LOGTYPE.INFO);
  137. return;
  138. }
  139. if (data != null && data.Length == 0)
  140. {
  141. Log.WriteLog4("[Send]data.Length == 0", null, LOGTYPE.INFO);
  142. return;
  143. }
  144. DateTime dtTag = DateTime.Now;
  145. if (Connected)
  146. {
  147. SocketAsyncEventArgs e = Managers.SocketArgManager.Allocate(false);
  148. e.Completed += IO_Completed;
  149. e.SetBuffer(data, offset, count);
  150. e.UserToken = token;
  151. if (!Socket.SendAsync(e))
  152. {
  153. OnSend(e);
  154. }
  155. }
  156. int a = (DateTime.Now - dtTag).Milliseconds;
  157. if ((DateTime.Now.Millisecond % 1000) < 10 && a > 0) SQ.Base.Log.WriteLog4("处理[Send]耗时(Milliseconds):" + a.ToString() + " __ data.Length = " + data.Length.ToString(), LOGTYPE.INFO);
  158. }
  159. public override void Close()
  160. {
  161. if (Connected)
  162. {
  163. IsStartReceive = false;
  164. Socket.Close();
  165. Socket.Dispose();
  166. base.Close();
  167. }
  168. }
  169. #endregion
  170. #region 异步请求结果处理
  171. /// <summary>
  172. /// 发送数据异步请求结果处理
  173. /// </summary>
  174. /// <param name="e"></param>
  175. private void OnSend(SocketAsyncEventArgs e)
  176. {
  177. #if FUNCINLOG
  178. Log.WriteLog4("TCPChannel.OnSend(e) in.", LOGTYPE.DEBUG);
  179. #endif
  180. var arg = new ChannelSendArg(e.UserToken, e.Buffer, e.Offset, e.Count);
  181. e.SetBuffer(null, 0, 0);
  182. e.Completed -= IO_Completed;
  183. Managers.SocketArgManager.Free(e, false);
  184. OnSend(this, arg);
  185. #if FUNCOUTLOG
  186. Log.WriteLog4("TCPChannel.OnSend(e) out.", LOGTYPE.DEBUG);
  187. #endif
  188. }
  189. /// <summary>
  190. /// 接收数据异步请求结果处理
  191. /// </summary>
  192. /// <param name="e"></param>
  193. private void OnReceive(SocketAsyncEventArgs e)
  194. {
  195. #if FUNCINLOG
  196. Log.WriteLog4("TCPChannel.OnReceive(e) in.", LOGTYPE.DEBUG);
  197. #endif
  198. if (e.SocketError == SocketError.Success && e.BytesTransferred > 0)//收到数据
  199. {
  200. byte[] temp = new byte[e.BytesTransferred];
  201. Array.Copy(e.Buffer, e.Offset, temp, 0, e.BytesTransferred);
  202. OnReceive(this, new ChannelReceiveArg(temp));
  203. bool flag = true;//防止回收两次造成e重用的问题
  204. while (Connected && !Socket.ReceiveAsync(e))//如果同步接收完成,直接执行,不用等待下一次IO_Completed
  205. {
  206. if (e.SocketError == SocketError.Success && e.BytesTransferred > 0)
  207. {
  208. temp = new byte[e.BytesTransferred];
  209. Array.Copy(e.Buffer, e.Offset, temp, 0, e.BytesTransferred);
  210. OnReceive(this, new ChannelReceiveArg(temp));
  211. }
  212. else if (e.BytesTransferred == 0)//通道断开
  213. {
  214. flag = false;
  215. e.Completed -= IO_Completed;
  216. Managers.SocketArgManager.Free(e, true);
  217. OnDispose(this, new ChannelDisposeArg(this, e.SocketError));
  218. //break;//无需break,下次循环Connected 为false
  219. }
  220. else//通道出错
  221. {
  222. flag = false;
  223. e.Completed -= IO_Completed;
  224. Managers.SocketArgManager.Free(e, true);
  225. OnError(this, new ChannelErrorArg(this, e.SocketError));
  226. //break;//无需break,下次循环Connected 为false
  227. }
  228. }
  229. if (!Connected && flag)
  230. {
  231. e.Completed -= IO_Completed;
  232. Managers.SocketArgManager.Free(e, true);
  233. OnDispose(this, new ChannelDisposeArg(this, e.SocketError));
  234. }
  235. }
  236. else if (e.BytesTransferred == 0)//通道断开
  237. {
  238. e.Completed -= IO_Completed;
  239. Managers.SocketArgManager.Free(e, true);
  240. OnDispose(this, new ChannelDisposeArg(this, e.SocketError));
  241. }
  242. else//通道出错
  243. {
  244. e.Completed -= IO_Completed;
  245. Managers.SocketArgManager.Free(e, true);
  246. OnError(this, new ChannelErrorArg(this, e.SocketError));
  247. }
  248. #if FUNCOUTLOG
  249. Log.WriteLog4("TCPChannel.OnReceive(e) out.", LOGTYPE.DEBUG);
  250. #endif
  251. }
  252. /// <summary>
  253. /// 通道连接异步请求结果处理(通过IP地址和端口号创建通道时,通道连接返回后触发)
  254. /// </summary>
  255. /// <param name="e"></param>
  256. private void OnConnect(SocketAsyncEventArgs e)
  257. {
  258. #if FUNCINLOG
  259. Log.WriteLog4("TCPChannel.OnConnect(e) in.", LOGTYPE.DEBUG);
  260. #endif
  261. if (e.SocketError == SocketError.Success)
  262. {
  263. StartReceiveAsync();
  264. }
  265. OnConnect(this, new ChannelConnectArg(this, e.SocketError));
  266. e.Completed -= IO_Completed;
  267. Managers.SocketArgManager.Free(e, false);
  268. #if FUNCOUTLOG
  269. Log.WriteLog4("TCPChannel.OnConnect(e) out.", LOGTYPE.DEBUG);
  270. #endif
  271. }
  272. #endregion
  273. #region 内部方法
  274. /// <summary>
  275. /// 异步请求回调处理方法
  276. /// </summary>
  277. /// <param name="sender"></param>
  278. /// <param name="e"></param>
  279. private void IO_Completed(object sender, SocketAsyncEventArgs e)
  280. {
  281. #if FUNCINLOG
  282. Log.WriteLog4("TCPChannel.IO_Completed(sender, e) in.", LOGTYPE.DEBUG);
  283. #endif
  284. try
  285. {
  286. switch (e.LastOperation)
  287. {
  288. case SocketAsyncOperation.Connect:
  289. OnConnect(e);
  290. break;
  291. case SocketAsyncOperation.Receive:
  292. OnReceive(e);
  293. break;
  294. case SocketAsyncOperation.Send:
  295. OnSend(e);
  296. break;
  297. default:
  298. e.SetBuffer(null, 0, 0);
  299. e.Completed -= IO_Completed;
  300. Managers.SocketArgManager.Free(e, false);
  301. break;
  302. }
  303. }
  304. catch (Exception ex)
  305. {
  306. Log.WriteLog4Ex("TCPChannel.IO_Completed(sender, e) error", ex);
  307. }
  308. #if FUNCOUTLOG
  309. Log.WriteLog4("TCPChannel.IO_Completed(sender, e) out.", LOGTYPE.DEBUG);
  310. #endif
  311. }
  312. #endregion
  313. }
  314. }

发送队列SendQueue


  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading;
  6. namespace Network
  7. {
  8. public class SendQueue
  9. {
  10. public Dictionary<Channel, ChannelQueue> dit = new Dictionary<Channel, ChannelQueue>();
  11. public void SendAll()
  12. {
  13. var arr = dit.Values.ToArray();
  14. foreach (var item in arr)
  15. {
  16. item.SendAll();
  17. }
  18. }
  19. public ChannelQueue AddChannelQueue(Channel cl)
  20. {
  21. var cq = new ChannelQueue()
  22. {
  23. Cl = cl,
  24. SQU = this
  25. };
  26. dit[cl] = cq;
  27. return cq;
  28. }
  29. public void RemoveChannelQueue(Channel cl)
  30. {
  31. if (dit.ContainsKey(cl))
  32. {
  33. dit.Remove(cl);
  34. }
  35. }
  36. public void RunSend()
  37. {
  38. try
  39. {
  40. while (true)
  41. {
  42. SendAll();
  43. Thread.Sleep(100);
  44. }
  45. }
  46. catch (Exception ex)
  47. {
  48. }
  49. }
  50. }
  51. public class ChannelQueue
  52. {
  53. public SendQueue SQU;
  54. public Channel Cl;
  55. public Queue<byte[]> DataQueue = new Queue<byte[]>();
  56. public void Enqueue(byte[] bts)
  57. {
  58. lock (this)
  59. {
  60. DataQueue.Enqueue(bts);
  61. }
  62. }
  63. public void SendAll()
  64. {
  65. lock (this)
  66. {
  67. List<byte> bts = new List<byte>();
  68. while (DataQueue.Count > 0)
  69. {
  70. bts.AddRange(DataQueue.Dequeue());
  71. }
  72. if (bts.Count>0) Cl.Send(bts.ToArray());
  73. }
  74. }
  75. public void RemoveChannelQueue()
  76. {
  77. SQU.RemoveChannelQueue(Cl);
  78. }
  79. }
  80. }

如何使用发送队列

通过工具类Utils来初始化队列和依次获取队列。

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net;
  5. using System.Text;
  6. using System.Threading;
  7. namespace Network
  8. {
  9. public class Utils
  10. {
  11. static int p;
  12. static SendQueue[] SQS;
  13. static bool Setuped = false;
  14. public static void Initialize()
  15. {
  16. if (SQS == null || !Setuped)
  17. {
  18. Setup();
  19. }
  20. }
  21. public static void Setup(int sendThread = 10)
  22. {
  23. SQS = new Network.SendQueue[sendThread];
  24. for (int i = 0; i < sendThread; i++)
  25. {
  26. SQS[i] = new SendQueue();
  27. Thread th = new Thread(SQS[i].RunSend);
  28. th.IsBackground = true;
  29. th.Name = "RunSend" + i;
  30. th.Start();
  31. }
  32. Setuped = true;
  33. }
  34. public static SendQueue GetSendQueue()
  35. {
  36. lock (SQS)
  37. {
  38. p++;
  39. if (p >= SQS.Length)
  40. {
  41. p = 0;
  42. }
  43. return SQS[p];
  44. }
  45. }
  46. static System.Text.RegularExpressions.Regex reg = new System.Text.RegularExpressions.Regex(@"\d+\.\d+\.\d+\.\d+");
  47. public static EndPoint GetEndPoint(string Host, int Port)
  48. {
  49. if (reg.IsMatch(Host))
  50. {
  51. return new IPEndPoint(IPAddress.Parse(Host), Port);
  52. }
  53. else
  54. {
  55. return new DnsEndPoint(Host, Port);
  56. }
  57. }
  58. }
  59. }
初始化使用队列:

 Network.Utils.Setup(2);

创建Socket通道连接:

  1. channel = new Network.TCPChannel(txtServer.Text.Trim(), Convert.ToInt32(txtPort.Text.Trim()));
  2. channel.DataReceive = Receive;
  3. channel.DataSend = DataSend;
  4. channel.ChannelConnect += new EventChannelConnect(channel_ChannelConnect);
  5. channel.Connect();

Socket设置新增通道队列的代码(Channel初始化调用的是父类的构造,即Channel类构造):

  1. /// <summary>
  2. /// 用已连接套接字构造通道
  3. /// </summary>
  4. /// <param name="socket">连接套接字</param>
  5. public Channel(Socket socket)
  6. {
  7. Socket = socket;
  8. IPEndPoint endPoint = Socket.RemoteEndPoint as IPEndPoint;
  9. RemoteHost = endPoint.Address.ToString();
  10. RemotePort = endPoint.Port;
  11. SendQ = Utils.GetSendQueue().AddChannelQueue(this);
  12. }
此时已经将当前的Channel对象加入到指定大小的队列之中。

使用通道来构造客户端

Client类

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using SQ.Base;
  6. using System.Threading;
  7. namespace Network
  8. {
  9. #region 委托定义
  10. public delegate void EventClientConnect(object sender, ClientConnectArg arg);
  11. public delegate void EventClientReceive(object sender, ClientReceiveArg arg);
  12. public delegate void EventClientSend(object sender, ClientSendArg arg);
  13. public delegate void EventClientlDispose(object sender, ClientDisposeArg arg);
  14. public delegate void EventClientError(object sender, ClientErrorArg arg);
  15. #endregion
  16. public abstract class Client
  17. {
  18. private AutoResetEvent connWaiter = new AutoResetEvent(false);
  19. #region 属性
  20. /// <summary>
  21. /// 客户端连接通道
  22. /// </summary>
  23. public Channel Channel { get; protected set; }
  24. /// <summary>
  25. /// 用户令牌
  26. /// </summary>
  27. public object Tag { get; set; }
  28. /// <summary>
  29. /// 日志打印标志
  30. /// </summary>
  31. public bool LogFocus = false;
  32. /// <summary>
  33. /// Client计数器
  34. /// </summary>
  35. public ClientCounter ClientCounter = new ClientCounter();
  36. /// <summary>
  37. /// 客户端连接事件
  38. /// </summary>
  39. public event EventClientConnect ClientConnect;
  40. /// <summary>
  41. /// 客户端释放事件
  42. /// </summary>
  43. public event EventClientlDispose ClientDispose;
  44. /// <summary>
  45. /// 客户端异常事件
  46. /// </summary>
  47. public event EventClientError ClientError;
  48. /// <summary>
  49. /// 客户端收到数据事件
  50. /// </summary>
  51. public EventClientReceive ObjectReceive;
  52. /// <summary>
  53. /// 客户端数据发送完成事件
  54. /// </summary>
  55. public EventClientSend ObjectSend;
  56. #endregion
  57. #region 事件处理
  58. /// <summary>
  59. /// 客户端连接事件处理
  60. /// </summary>
  61. /// <param name="sender"></param>
  62. /// <param name="arg"></param>
  63. protected virtual void OnConnect(object sender, ChannelConnectArg arg)
  64. {
  65. connWaiter.Set();
  66. if (ClientConnect != null)
  67. {
  68. try
  69. {
  70. ClientConnect(this, new ClientConnectArg(this));
  71. }
  72. catch (Exception ex)
  73. {
  74. Log.WriteLog4Ex("Network Client event error", ex);
  75. }
  76. }
  77. }
  78. /// <summary>
  79. /// 客户端释放事件处理
  80. /// </summary>
  81. /// <param name="sender"></param>
  82. /// <param name="arg"></param>
  83. protected virtual void OnDispose(object sender, ChannelDisposeArg arg)
  84. {
  85. if (ClientDispose != null)
  86. {
  87. try
  88. {
  89. ClientDispose(this, new ClientDisposeArg(this));
  90. }
  91. catch (Exception ex)
  92. {
  93. Log.WriteLog4Ex("Network Client event error", ex);
  94. }
  95. }
  96. }
  97. /// <summary>
  98. /// 客户端错误事件处理
  99. /// </summary>
  100. /// <param name="sender"></param>
  101. /// <param name="arg"></param>
  102. protected virtual void OnError(object sender, ChannelErrorArg arg)
  103. {
  104. if (ClientError != null)
  105. {
  106. try
  107. {
  108. ClientError(this, new ClientErrorArg(this));
  109. }
  110. catch (Exception ex)
  111. {
  112. Log.WriteLog4Ex("Network Client event error", ex);
  113. }
  114. }
  115. }
  116. /// <summary>
  117. /// 客户端收到数据事件处理
  118. /// </summary>
  119. /// <param name="sender"></param>
  120. /// <param name="arg"></param>
  121. protected virtual void OnReceive(object sender, ChannelReceiveArg arg)
  122. {
  123. ClientCounter.RefreshReceiveCount();
  124. if (ClientCounter.ReceiveCountSum < 1 || (LogFocus && arg.Data != null)) Log.WriteLog4("[OnReceive]source=" + Channel.RemoteHost + ":" + Channel.RemotePort.ToString() + ",arg.Data=", arg.Data, LOGTYPE.INFO);
  125. if (ObjectReceive != null)
  126. {
  127. try
  128. {
  129. ObjectReceive(this, new ClientReceiveArg(arg.Data));
  130. }
  131. catch (Exception ex)
  132. {
  133. Log.WriteLog4Ex("Network Client event error", ex);
  134. }
  135. }
  136. }
  137. /// <summary>
  138. /// 客户端数据发送完成事件处理
  139. /// </summary>
  140. /// <param name="sender"></param>
  141. /// <param name="arg"></param>
  142. protected virtual void OnSend(object sender, ChannelSendArg arg)
  143. {
  144. bool boolExcuteObjectSend = false;
  145. if (ObjectSend != null)
  146. {
  147. try
  148. {
  149. ObjectSend(this, new ClientSendArg(arg.Token));
  150. boolExcuteObjectSend = true;
  151. }
  152. catch (Exception ex)
  153. {
  154. Log.WriteLog4Ex("Network Client event error", ex);
  155. }
  156. }
  157. if (LogFocus && arg.Buffer != null) Log.WriteLog4("[OnSend]ExcuteObjectSend=" + boolExcuteObjectSend.ToString() + " __ 发往" + (sender as Channel).RemoteHost + ":" + (sender as Channel).RemotePort.ToString() + "的数据:", arg.Buffer, LOGTYPE.INFO);
  158. ClientCounter.RefreshSentCount();
  159. //if ((sender as Channel).RemoteHost == "https://cdn.jxasp.com:9143/image/222.134.129.230" || (sender as Channel).RemoteHost == "192.168.121.1" || (sender as Channel).RemoteHost == "192.168.122.1" || (sender as Channel).RemoteHost == "10.10.11.177")
  160. }
  161. #endregion
  162. #region 公共方法
  163. /// <summary>
  164. /// 通道通道构造客户端
  165. /// </summary>
  166. /// <param name="channel">通道</param>
  167. public Client(Channel channel)
  168. {
  169. Channel = channel;
  170. Channel.ChannelConnect += OnConnect;
  171. Channel.ChannelDispose += OnDispose;
  172. Channel.ChannelError += OnError;
  173. Channel.DataReceive += OnReceive;
  174. Channel.DataSend += OnSend;
  175. }
  176. /// <summary>
  177. /// 发送数据
  178. /// </summary>
  179. /// <param name="obj"></param>
  180. /// <param name="token"></param>
  181. public virtual void Send(byte[] data)
  182. {
  183. if (data != null)
  184. {
  185. //Channel.SendAsync(data);
  186. Channel.Send(data);
  187. ClientCounter.RefreshSendCount();
  188. if (ClientCounter.SendCountSum < 1 || (LogFocus && data != null)) Log.WriteLog4("[Send]target=" + Channel.RemoteHost + ":" + Channel.RemotePort.ToString() + ",data=", data, LOGTYPE.INFO);
  189. }
  190. }
  191. /// <summary>
  192. /// 关闭客户端
  193. /// </summary>
  194. public virtual void Close()
  195. {
  196. Channel.Close();
  197. }
  198. public bool Connect(int _timeout)
  199. {
  200. try
  201. {
  202. bool result = false;
  203. Channel.Connect();
  204. if (connWaiter.WaitOne(_timeout))
  205. {
  206. result = true;
  207. }
  208. else
  209. {
  210. result = false;
  211. }
  212. return result;
  213. }
  214. catch (Exception e)
  215. {
  216. return false;
  217. }
  218. }
  219. #endregion
  220. }
  221. }
其构造方法就可以通过Channel来实现。


相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载