关键词搜索

源码搜索 ×
×

JAVA通信编程(四)——UDP通讯

发布2015-12-05浏览4333次

详情内容

欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

欢迎跳转到本文的原文链接:https://honeypps.com/network/java-udp-comm/

 

经过TCP和串口通讯编程的了解,相信大家应该掌握CommBuff的套路了,这里首先展示的是通过UDP编程的方式实现CommBuff接口,之后通过简单工厂模式的应用说明如何屏蔽底层通讯差异。

UdpImpl类如下:

 

  1. package com.zzh.comm;
  2. import java.io.IOException;
  3. import java.net.DatagramPacket;
  4. import java.net.DatagramSocket;
  5. import java.net.InetAddress;
  6. import java.net.SocketException;
  7. import java.net.UnknownHostException;
  8. import java.util.Map;
  9. import org.apache.log4j.Logger;
  10. public class UdpImpl implements CommBuff
  11. {
  12. private Logger logger = Logger.getLogger(Object.class.getName());
  13. private int local_port;
  14. private int dest_port;
  15. private String ip;
  16. private int time_out;
  17. DatagramSocket client = null;
  18. private String fileName = "/udp.properties";
  19. public UdpImpl()
  20. {
  21. Map<String,String> map = new ReadProperties().getPropertiesMap(fileName);
  22. try
  23. {
  24. local_port = Integer.parseInt(map.get("udp_local_port"));
  25. dest_port = Integer.parseInt(map.get("udp_dest_port"));
  26. time_out = Integer.parseInt(map.get("udp_timeout"));
  27. ip = map.get("udp_dest_ip");
  28. }
  29. catch (Exception e)
  30. {
  31. logger.error(e.getMessage());
  32. }
  33. }
  34. @Override
  35. public byte[] readBuff()
  36. {
  37. if(client == null)
  38. {
  39. throw new RuntimeException("clinet is null!");
  40. }
  41. byte[] recvBuf = new byte[1024];
  42. DatagramPacket recvPacket = new DatagramPacket(recvBuf , recvBuf.length);
  43. try
  44. {
  45. client.receive(recvPacket);
  46. }
  47. catch (IOException e)
  48. {
  49. logger.info(e.getMessage());
  50. return new byte[0];
  51. }
  52. byte[] ans = new byte[recvPacket.getLength()];
  53. System.arraycopy(recvPacket.getData(), 0, ans, 0, recvPacket.getLength());
  54. logger.info("网口接收:"+CommUtil.bytesToHex(ans));
  55. return ans;
  56. }
  57. @Override
  58. public void writeBuff(byte[] message)
  59. {
  60. if(client == null)
  61. {
  62. throw new RuntimeException("clinet is null!");
  63. }
  64. try
  65. {
  66. InetAddress addr = InetAddress.getByName(ip);
  67. DatagramPacket sendPacket = new DatagramPacket(message,message.length,addr,dest_port);
  68. client.send(sendPacket);
  69. logger.info("发送成功: "+CommUtil.bytesToHex(message));
  70. }
  71. catch (UnknownHostException e)
  72. {
  73. logger.error(e.getMessage());
  74. }
  75. catch (IOException e)
  76. {
  77. logger.error(e.getMessage());
  78. }
  79. }
  80. @Override
  81. public void open() {
  82. try
  83. {
  84. client = new DatagramSocket(local_port);
  85. client.setSoTimeout(time_out);
  86. if(client != null)
  87. {
  88. logger.info("client open succeed!");
  89. }
  90. }
  91. catch (SocketException e)
  92. {
  93. logger.error(e.getMessage());
  94. }
  95. }
  96. @Override
  97. public void close()
  98. {
  99. if(client != null)
  100. {
  101. client.close();
  102. }
  103. }
  104. @Override
  105. public Object getInfo()
  106. {
  107. return null;
  108. }
  109. }

UdpImpl实现了CommBuff接口的各个方法。UDP Socket采用的数据包的方式进行通讯的,这个可以与TCP的方式区分开。

 

下面通过一个简单工厂模式,可以实现底层通讯的便利性。

 

  1. package com.zzh.comm;
  2. public class CommFactory
  3. {
  4. public CommBuff getCommBuff(String properties) throws Exception
  5. {
  6. if(properties.equals("comm_serial"))
  7. {
  8. return new SerialImpl();
  9. }
  10. else if(properties.equals("comm_tcpServer"))
  11. {
  12. return new TcpServerImpl();
  13. }
  14. else if(properties.equals("comm_tcpClient"))
  15. {
  16. return new TcpClientImpl();
  17. }
  18. else if(properties.equals("comm_udp"))
  19. {
  20. return new UdpImpl();
  21. }
  22. else
  23. {
  24. throw new Exception("Communication para error: no found avaliable communication Object instance.");
  25. }
  26. }
  27. }

上面的getCommBuff方法通过参数properties可以初始化不同的通讯接口实现类,这样上次应用只需调用Commbuff接口的方法,而无需与底层通讯的细节相融合,极大的降低了程序间的耦合性。

 

本篇就简单的阐述到这里。但是下面会附加一个程序,这个程序通过调用CommFactory的方法生成底层通讯的实例,程序的主要内容是电力行业的某个通讯规约(Modbus)的实现,如果非电力行业的通讯,可以不必了解程序中的细节,可以大概看一下怎么使用.

 

  1. package com.zzh.protocol;
  2. import java.util.Calendar;
  3. import java.util.Map;
  4. import java.util.concurrent.ConcurrentLinkedDeque;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.TimeUnit;
  8. import com.zzh.comm.CommBuff;
  9. import com.zzh.comm.CommFactory;
  10. import com.zzh.comm.CommUtil;
  11. import com.zzh.comm.ReadProperties;
  12. import com.zzh.dao.ModbusDao;
  13. import com.zzh.dao.ModbusDaoImpl;
  14. import com.zzh.dao.pojo.ModbusPojo;
  15. public class Modbus {
  16. private CommBuff comm;
  17. private int comm_timeout;
  18. private byte devAddr;
  19. private static int RECV_SIZE = 35;
  20. private static int RECV_INNER_SIZE = 30;
  21. private static int MINUTE=60000;
  22. private volatile boolean refreshFlag = false;
  23. private ModbusPojo modbusPojo;
  24. private ConcurrentLinkedDeque<Byte> deque = new ConcurrentLinkedDeque<Byte>();
  25. private String fileName = "/modbus.properties";
  26. public Modbus()
  27. {
  28. Map<String,String> map = new ReadProperties().getPropertiesMap(fileName);
  29. String comm_way = map.get("modbus_comm_way");
  30. String comm_timeouts = map.get("comm_timeout");
  31. comm_timeout = Integer.parseInt(comm_timeouts);
  32. String devAddrs = map.get("devAddr");
  33. devAddr = Byte.parseByte(devAddrs);
  34. if(comm_way!=null)
  35. {
  36. modbusPojo = new ModbusPojo();
  37. try
  38. {
  39. comm = new CommFactory().getCommBuff(comm_way);
  40. }
  41. catch (Exception e)
  42. {
  43. e.printStackTrace();
  44. }
  45. comm.open();
  46. ExecutorService pool = Executors.newFixedThreadPool(2);
  47. Thread thread1 = new Thread(new readThread());
  48. thread1.setDaemon(true);
  49. Thread thread2 = new Thread(new dbThread());
  50. thread2.setDaemon(true);
  51. pool.execute(thread1);
  52. pool.execute(thread2);
  53. }
  54. else
  55. {
  56. throw new RuntimeException("没有配置好合适的串口参数");
  57. }
  58. }
  59. private class readThread implements Runnable
  60. {
  61. @Override
  62. public void run()
  63. {
  64. while(true)
  65. {
  66. byte[] recvBuff = comm.readBuff();
  67. if(recvBuff.length>0)
  68. {
  69. for(int i=0;i<recvBuff.length;i++)
  70. {
  71. deque.add(recvBuff[i]);
  72. }
  73. }
  74. try
  75. {
  76. TimeUnit.MILLISECONDS.sleep(1000);
  77. }
  78. catch (InterruptedException e)
  79. {
  80. e.printStackTrace();
  81. }
  82. }
  83. }
  84. }
  85. private class dbThread implements Runnable
  86. {
  87. @Override
  88. public void run()
  89. {
  90. while(true)
  91. {
  92. if(refreshFlag == true)
  93. {
  94. Calendar now = Calendar.getInstance();
  95. if(now.get(Calendar.MINUTE)%5==0)
  96. // if(true)
  97. {
  98. synchronized (modbusPojo)
  99. {
  100. filterModbusPojo();
  101. modbusPojo.setNow(TimeUtil.getDateOfMM(now));
  102. // modbusPojo.setNow(new java.sql.Timestamp(new Date().getTime()));
  103. ModbusDao md = new ModbusDaoImpl();
  104. md.addModbus(modbusPojo);
  105. }
  106. }
  107. }
  108. try
  109. {
  110. TimeUnit.MILLISECONDS.sleep(MINUTE);
  111. // TimeUnit.MILLISECONDS.sleep(1000);
  112. }
  113. catch (InterruptedException e)
  114. {
  115. e.printStackTrace();
  116. }
  117. }
  118. }
  119. }
  120. public void filterModbusPojo()
  121. {
  122. modbusPojo.setQua(0);
  123. if(modbusPojo.getEnvTemperature()>ModbusUtil.TEMPERATURE_UP)
  124. {
  125. modbusPojo.setEnvTemperature(ModbusUtil.TEMPERATURE_UP);
  126. System.out.println("getEnvTemperature = "+modbusPojo.getEnvTemperature());
  127. modbusPojo.setQua(1);
  128. }
  129. if(modbusPojo.getEnvTemperature()<ModbusUtil.TEMPERATURE_LOW)
  130. {
  131. modbusPojo.setEnvTemperature(ModbusUtil.TEMPERATURE_LOW);
  132. System.out.println("getEnvTemperature = "+modbusPojo.getEnvTemperature());
  133. modbusPojo.setQua(1);
  134. }
  135. if(modbusPojo.getTemperature()>ModbusUtil.TEMPERATURE_UP)
  136. {
  137. modbusPojo.setTemperature(ModbusUtil.TEMPERATURE_UP);
  138. System.out.println("getTemperature = "+modbusPojo.getTemperature());
  139. modbusPojo.setQua(1);
  140. }
  141. if(modbusPojo.getTemperature()<ModbusUtil.TEMPERATURE_LOW)
  142. {
  143. modbusPojo.setTemperature(ModbusUtil.TEMPERATURE_LOW);
  144. System.out.println("getTemperature = "+modbusPojo.getTemperature());
  145. modbusPojo.setQua(1);
  146. }
  147. if(modbusPojo.getHumidity()>ModbusUtil.HUMIDITY_UP)
  148. {
  149. modbusPojo.setHumidity(ModbusUtil.HUMIDITY_UP);
  150. System.out.println("getHumidity = "+modbusPojo.getHumidity());
  151. modbusPojo.setQua(1);
  152. }
  153. if(modbusPojo.getHumidity()<ModbusUtil.HUMIDITY_LOW)
  154. {
  155. modbusPojo.setHumidity(ModbusUtil.HUMIDITY_LOW);
  156. System.out.println("getHumidity = "+modbusPojo.getHumidity());
  157. modbusPojo.setQua(1);
  158. }
  159. if(modbusPojo.getPressure()>ModbusUtil.PRESSURE_UP)
  160. {
  161. modbusPojo.setPressure(ModbusUtil.PRESSURE_UP);
  162. System.out.println("getPressure = "+modbusPojo.getPressure());
  163. modbusPojo.setQua(1);
  164. }
  165. if(modbusPojo.getPressure()<ModbusUtil.PRESSURE_LOW)
  166. {
  167. modbusPojo.setPressure(ModbusUtil.PRESSURE_LOW);
  168. System.out.println("getPressure = "+modbusPojo.getPressure());
  169. modbusPojo.setQua(1);
  170. }
  171. if(modbusPojo.getIrradiance()>ModbusUtil.IRRADIANCE_UP)
  172. {
  173. modbusPojo.setIrradiance(ModbusUtil.IRRADIANCE_UP);
  174. System.out.println("getIrradiance = "+modbusPojo.getIrradiance());
  175. modbusPojo.setQua(1);
  176. }
  177. if(modbusPojo.getIrradiance()<ModbusUtil.IRRADIANCE_LOW)
  178. {
  179. modbusPojo.setIrradiance(ModbusUtil.IRRADIANCE_LOW);
  180. System.out.println("getIrradiance = "+modbusPojo.getIrradiance());
  181. modbusPojo.setQua(1);
  182. }
  183. if(modbusPojo.getScaIrradiance()>ModbusUtil.IRRADIANCE_UP)
  184. {
  185. modbusPojo.setScaIrradiance(ModbusUtil.IRRADIANCE_UP);
  186. System.out.println("getScaIrradiance = "+modbusPojo.getScaIrradiance());
  187. modbusPojo.setQua(1);
  188. }
  189. if(modbusPojo.getScaIrradiance()<ModbusUtil.IRRADIANCE_LOW)
  190. {
  191. modbusPojo.setScaIrradiance(ModbusUtil.IRRADIANCE_LOW);
  192. System.out.println("getScaIrradiance = "+modbusPojo.getScaIrradiance());
  193. modbusPojo.setQua(1);
  194. }
  195. if(modbusPojo.getDirIrradiance()>ModbusUtil.IRRADIANCE_UP)
  196. {
  197. modbusPojo.setDirIrradiance(ModbusUtil.IRRADIANCE_UP);
  198. System.out.println("getDirIrradiance = "+modbusPojo.getDirIrradiance());
  199. modbusPojo.setQua(1);
  200. }
  201. if(modbusPojo.getDirIrradiance()<ModbusUtil.IRRADIANCE_LOW)
  202. {
  203. modbusPojo.setDirIrradiance(ModbusUtil.IRRADIANCE_LOW);
  204. System.out.println("getDirIrradiance = "+modbusPojo.getDirIrradiance());
  205. modbusPojo.setQua(1);
  206. }
  207. if(modbusPojo.getWindSpeed()>ModbusUtil.UAVG_UP)
  208. {
  209. modbusPojo.setWindSpeed(ModbusUtil.UAVG_UP);
  210. System.out.println("getWindSpeed = "+modbusPojo.getWindSpeed());
  211. modbusPojo.setQua(1);
  212. }
  213. if(modbusPojo.getWindSpeed()<ModbusUtil.UAVG_LOW)
  214. {
  215. modbusPojo.setWindSpeed(ModbusUtil.UAVG_LOW);
  216. System.out.println("getWindSpeed = "+modbusPojo.getWindSpeed());
  217. modbusPojo.setQua(1);
  218. }
  219. if(modbusPojo.getWindDir()>ModbusUtil.VAVG_UP)
  220. {
  221. modbusPojo.setWindDir(ModbusUtil.VAVG_UP);
  222. System.out.println("getWindDir = "+modbusPojo.getWindDir());
  223. modbusPojo.setQua(1);
  224. }
  225. if(modbusPojo.getWindDir()<ModbusUtil.VAVG_LOW)
  226. {
  227. modbusPojo.setWindDir(ModbusUtil.VAVG_LOW);
  228. System.out.println("getWindDir = "+modbusPojo.getWindDir());
  229. modbusPojo.setQua(1);
  230. }
  231. }
  232. public void process()
  233. {
  234. try
  235. {
  236. TimeUnit.MILLISECONDS.sleep(comm_timeout);
  237. }
  238. catch (InterruptedException e)
  239. {
  240. e.printStackTrace();
  241. }
  242. recvProcess();
  243. sendProcess();
  244. }
  245. public void recvProcess()
  246. {
  247. refreshFlag = false;
  248. byte[] recvBuff = new byte[RECV_INNER_SIZE];
  249. while(deque.size()>=RECV_SIZE)
  250. {
  251. Byte first = deque.pollFirst();
  252. if(first == devAddr)
  253. {
  254. Byte second = deque.pollFirst();
  255. if(second == 0x03)
  256. {
  257. Byte third = deque.pollFirst();
  258. if(third == RECV_INNER_SIZE)
  259. {
  260. for(int i=0;i<RECV_INNER_SIZE;i++)
  261. {
  262. recvBuff[i] = deque.pollFirst();
  263. }
  264. deque.pollFirst();
  265. deque.pollFirst();
  266. dealRecvBuff(recvBuff);
  267. }
  268. }
  269. }
  270. }
  271. }
  272. public void dealRecvBuff(byte[] recvBuff)
  273. {
  274. System.out.println(CommUtil.bytesToHex(recvBuff));
  275. refreshFlag = true;
  276. getModbusPojo(recvBuff);
  277. // modbusPojo.print();
  278. }
  279. public void getModbusPojo(byte[] recvBuff)
  280. {
  281. int temp;
  282. synchronized (modbusPojo)
  283. {
  284. for(int i=0;i<recvBuff.length;)
  285. {
  286. switch(i)
  287. {
  288. case 0:
  289. temp = ModbusUtil.getSignedAns(recvBuff, 0, 1);
  290. double envTemperature = temp*0.1;
  291. modbusPojo.setEnvTemperature(envTemperature);
  292. break;
  293. case 2:
  294. temp = ModbusUtil.getSignedAns(recvBuff, 2, 3);
  295. double temperature = temp*0.1;
  296. modbusPojo.setTemperature(temperature);
  297. break;
  298. case 4:
  299. temp = ModbusUtil.getUnsignedAns(recvBuff, 4, 5);
  300. double humidity = temp*0.1;
  301. modbusPojo.setHumidity(humidity);
  302. break;
  303. case 6:
  304. temp = ModbusUtil.getUnsignedAns(recvBuff, 6, 7);
  305. double pressure = temp*0.1;
  306. modbusPojo.setPressure(pressure);
  307. break;
  308. case 8:
  309. temp = ModbusUtil.getUnsignedAns(recvBuff, 8, 9);
  310. modbusPojo.setIrradiance(temp);
  311. break;
  312. case 10:
  313. temp = ModbusUtil.getUnsignedAns(recvBuff, 10, 11);
  314. modbusPojo.setScaIrradiance(temp);
  315. break;
  316. case 12:
  317. temp = ModbusUtil.getUnsignedAns(recvBuff, 12, 13);
  318. modbusPojo.setDirIrradiance(temp);
  319. break;
  320. case 14:
  321. temp = ModbusUtil.getUnsignedAns(recvBuff, 14, 15);
  322. modbusPojo.setWindDir(temp);
  323. break;
  324. case 16:
  325. temp = ModbusUtil.getUnsignedAns(recvBuff, 16, 17);
  326. double windSpeed = temp*0.1;
  327. modbusPojo.setWindSpeed(windSpeed);
  328. break;
  329. case 18:
  330. temp = ModbusUtil.getUnsignedAns(recvBuff, 18, 19);
  331. double windSpeedTwo = temp*0.1;
  332. modbusPojo.setWindSpeedTwo(windSpeedTwo);
  333. break;
  334. case 20:
  335. temp = ModbusUtil.getUnsignedAns(recvBuff, 20, 21);
  336. double windSpeedTen = temp*0.1;
  337. modbusPojo.setWindSpeedTen(windSpeedTen);
  338. break;
  339. case 22:
  340. temp = ModbusUtil.getUnsignedAns(recvBuff, 22, 23);
  341. modbusPojo.setDailyExposure(temp);
  342. break;
  343. case 24:
  344. temp = ModbusUtil.getUnsignedAns(recvBuff, 24, 25);
  345. double totalExposure = temp*0.001;
  346. modbusPojo.setTotalExposure(totalExposure);
  347. break;
  348. case 26:
  349. temp = ModbusUtil.getUnsignedAns(recvBuff, 26, 27);
  350. double scaExposure = temp*0.001;
  351. modbusPojo.setScaExposure(scaExposure);
  352. break;
  353. case 28:
  354. temp = ModbusUtil.getUnsignedAns(recvBuff, 28, 29);
  355. double dirExposure = temp*0.001;
  356. modbusPojo.setDirExposure(dirExposure);
  357. break;
  358. }
  359. i=i+2;
  360. }
  361. }
  362. }
  363. public void sendProcess()
  364. {
  365. byte[] message = new byte[8];
  366. int sendLen = 0;
  367. message[sendLen++] = devAddr;
  368. message[sendLen++] = 0x03;
  369. message[sendLen++] = 0x00;
  370. message[sendLen++] = 0x00;
  371. message[sendLen++] = 0x00;
  372. message[sendLen++] = 0x0F;
  373. byte[] crc = CommUtil.CRC16(message,6);
  374. message[sendLen++] = crc[0];
  375. message[sendLen++] = crc[1];
  376. comm.writeBuff(message);
  377. }
  378. }

 

欢迎跳转到本文的原文链接:https://honeypps.com/network/java-udp-comm/

欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。

 

 

相关技术文章

点击QQ咨询
开通会员
返回顶部
×
微信扫码支付
微信扫码支付
确定支付下载
请使用微信描二维码支付
×

提示信息

×

选择支付方式

  • 微信支付
  • 支付宝付款
确定支付下载