欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。
欢迎跳转到本文的原文链接:https://honeypps.com/network/java-udp-comm/
经过TCP和串口通讯编程的了解,相信大家应该掌握CommBuff的套路了,这里首先展示的是通过UDP编程的方式实现CommBuff接口,之后通过简单工厂模式的应用说明如何屏蔽底层通讯差异。
UdpImpl类如下:
- package com.zzh.comm;
-
- import java.io.IOException;
- import java.net.DatagramPacket;
- import java.net.DatagramSocket;
- import java.net.InetAddress;
- import java.net.SocketException;
- import java.net.UnknownHostException;
- import java.util.Map;
-
- import org.apache.log4j.Logger;
-
- public class UdpImpl implements CommBuff
- {
- private Logger logger = Logger.getLogger(Object.class.getName());
-
- private int local_port;
- private int dest_port;
- private String ip;
- private int time_out;
-
- DatagramSocket client = null;
-
- private String fileName = "/udp.properties";
- public UdpImpl()
- {
- Map<String,String> map = new ReadProperties().getPropertiesMap(fileName);
- try
- {
- local_port = Integer.parseInt(map.get("udp_local_port"));
- dest_port = Integer.parseInt(map.get("udp_dest_port"));
- time_out = Integer.parseInt(map.get("udp_timeout"));
- ip = map.get("udp_dest_ip");
- }
- catch (Exception e)
- {
- logger.error(e.getMessage());
- }
- }
-
- @Override
- public byte[] readBuff()
- {
- if(client == null)
- {
- throw new RuntimeException("clinet is null!");
- }
- byte[] recvBuf = new byte[1024];
- DatagramPacket recvPacket = new DatagramPacket(recvBuf , recvBuf.length);
- try
- {
- client.receive(recvPacket);
- }
- catch (IOException e)
- {
- logger.info(e.getMessage());
- return new byte[0];
- }
- byte[] ans = new byte[recvPacket.getLength()];
- System.arraycopy(recvPacket.getData(), 0, ans, 0, recvPacket.getLength());
- logger.info("网口接收:"+CommUtil.bytesToHex(ans));
- return ans;
- }
-
- @Override
- public void writeBuff(byte[] message)
- {
- if(client == null)
- {
- throw new RuntimeException("clinet is null!");
- }
-
- try
- {
- InetAddress addr = InetAddress.getByName(ip);
- DatagramPacket sendPacket = new DatagramPacket(message,message.length,addr,dest_port);
- client.send(sendPacket);
- logger.info("发送成功: "+CommUtil.bytesToHex(message));
- }
- catch (UnknownHostException e)
- {
- logger.error(e.getMessage());
- }
- catch (IOException e)
- {
- logger.error(e.getMessage());
- }
-
- }
-
- @Override
- public void open() {
- try
- {
- client = new DatagramSocket(local_port);
- client.setSoTimeout(time_out);
- if(client != null)
- {
- logger.info("client open succeed!");
- }
- }
- catch (SocketException e)
- {
- logger.error(e.getMessage());
- }
- }
-
- @Override
- public void close()
- {
- if(client != null)
- {
- client.close();
- }
- }
-
- @Override
- public Object getInfo()
- {
- return null;
- }
-
- }
UdpImpl实现了CommBuff接口的各个方法。UDP Socket采用的数据包的方式进行通讯的,这个可以与TCP的方式区分开。
下面通过一个简单工厂模式,可以实现底层通讯的便利性。
- package com.zzh.comm;
-
- public class CommFactory
- {
- public CommBuff getCommBuff(String properties) throws Exception
- {
- if(properties.equals("comm_serial"))
- {
- return new SerialImpl();
- }
- else if(properties.equals("comm_tcpServer"))
- {
- return new TcpServerImpl();
- }
- else if(properties.equals("comm_tcpClient"))
- {
- return new TcpClientImpl();
- }
- else if(properties.equals("comm_udp"))
- {
- return new UdpImpl();
- }
- else
- {
- throw new Exception("Communication para error: no found avaliable communication Object instance.");
- }
- }
- }
上面的getCommBuff方法通过参数properties可以初始化不同的通讯接口实现类,这样上次应用只需调用Commbuff接口的方法,而无需与底层通讯的细节相融合,极大的降低了程序间的耦合性。
本篇就简单的阐述到这里。但是下面会附加一个程序,这个程序通过调用CommFactory的方法生成底层通讯的实例,程序的主要内容是电力行业的某个通讯规约(Modbus)的实现,如果非电力行业的通讯,可以不必了解程序中的细节,可以大概看一下怎么使用.
- package com.zzh.protocol;
-
- import java.util.Calendar;
- import java.util.Map;
- import java.util.concurrent.ConcurrentLinkedDeque;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.TimeUnit;
-
- import com.zzh.comm.CommBuff;
- import com.zzh.comm.CommFactory;
- import com.zzh.comm.CommUtil;
- import com.zzh.comm.ReadProperties;
- import com.zzh.dao.ModbusDao;
- import com.zzh.dao.ModbusDaoImpl;
- import com.zzh.dao.pojo.ModbusPojo;
-
- public class Modbus {
- private CommBuff comm;
- private int comm_timeout;
- private byte devAddr;
-
- private static int RECV_SIZE = 35;
- private static int RECV_INNER_SIZE = 30;
- private static int MINUTE=60000;
- private volatile boolean refreshFlag = false;
-
- private ModbusPojo modbusPojo;
-
- private ConcurrentLinkedDeque<Byte> deque = new ConcurrentLinkedDeque<Byte>();
- private String fileName = "/modbus.properties";
-
- public Modbus()
- {
- Map<String,String> map = new ReadProperties().getPropertiesMap(fileName);
- String comm_way = map.get("modbus_comm_way");
- String comm_timeouts = map.get("comm_timeout");
- comm_timeout = Integer.parseInt(comm_timeouts);
- String devAddrs = map.get("devAddr");
- devAddr = Byte.parseByte(devAddrs);
- if(comm_way!=null)
- {
- modbusPojo = new ModbusPojo();
- try
- {
- comm = new CommFactory().getCommBuff(comm_way);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- comm.open();
-
- ExecutorService pool = Executors.newFixedThreadPool(2);
- Thread thread1 = new Thread(new readThread());
- thread1.setDaemon(true);
- Thread thread2 = new Thread(new dbThread());
- thread2.setDaemon(true);
- pool.execute(thread1);
- pool.execute(thread2);
- }
- else
- {
- throw new RuntimeException("没有配置好合适的串口参数");
- }
- }
-
- private class readThread implements Runnable
- {
- @Override
- public void run()
- {
- while(true)
- {
- byte[] recvBuff = comm.readBuff();
- if(recvBuff.length>0)
- {
- for(int i=0;i<recvBuff.length;i++)
- {
- deque.add(recvBuff[i]);
- }
- }
- try
- {
- TimeUnit.MILLISECONDS.sleep(1000);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- }
- }
-
- private class dbThread implements Runnable
- {
- @Override
- public void run()
- {
- while(true)
- {
- if(refreshFlag == true)
- {
- Calendar now = Calendar.getInstance();
- if(now.get(Calendar.MINUTE)%5==0)
- // if(true)
- {
- synchronized (modbusPojo)
- {
- filterModbusPojo();
- modbusPojo.setNow(TimeUtil.getDateOfMM(now));
- // modbusPojo.setNow(new java.sql.Timestamp(new Date().getTime()));
- ModbusDao md = new ModbusDaoImpl();
- md.addModbus(modbusPojo);
- }
- }
- }
- try
- {
- TimeUnit.MILLISECONDS.sleep(MINUTE);
- // TimeUnit.MILLISECONDS.sleep(1000);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- }
-
- }
-
- public void filterModbusPojo()
- {
- modbusPojo.setQua(0);
- if(modbusPojo.getEnvTemperature()>ModbusUtil.TEMPERATURE_UP)
- {
- modbusPojo.setEnvTemperature(ModbusUtil.TEMPERATURE_UP);
- System.out.println("getEnvTemperature = "+modbusPojo.getEnvTemperature());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getEnvTemperature()<ModbusUtil.TEMPERATURE_LOW)
- {
- modbusPojo.setEnvTemperature(ModbusUtil.TEMPERATURE_LOW);
- System.out.println("getEnvTemperature = "+modbusPojo.getEnvTemperature());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getTemperature()>ModbusUtil.TEMPERATURE_UP)
- {
- modbusPojo.setTemperature(ModbusUtil.TEMPERATURE_UP);
- System.out.println("getTemperature = "+modbusPojo.getTemperature());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getTemperature()<ModbusUtil.TEMPERATURE_LOW)
- {
- modbusPojo.setTemperature(ModbusUtil.TEMPERATURE_LOW);
- System.out.println("getTemperature = "+modbusPojo.getTemperature());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getHumidity()>ModbusUtil.HUMIDITY_UP)
- {
- modbusPojo.setHumidity(ModbusUtil.HUMIDITY_UP);
- System.out.println("getHumidity = "+modbusPojo.getHumidity());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getHumidity()<ModbusUtil.HUMIDITY_LOW)
- {
- modbusPojo.setHumidity(ModbusUtil.HUMIDITY_LOW);
- System.out.println("getHumidity = "+modbusPojo.getHumidity());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getPressure()>ModbusUtil.PRESSURE_UP)
- {
- modbusPojo.setPressure(ModbusUtil.PRESSURE_UP);
- System.out.println("getPressure = "+modbusPojo.getPressure());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getPressure()<ModbusUtil.PRESSURE_LOW)
- {
- modbusPojo.setPressure(ModbusUtil.PRESSURE_LOW);
- System.out.println("getPressure = "+modbusPojo.getPressure());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getIrradiance()>ModbusUtil.IRRADIANCE_UP)
- {
- modbusPojo.setIrradiance(ModbusUtil.IRRADIANCE_UP);
- System.out.println("getIrradiance = "+modbusPojo.getIrradiance());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getIrradiance()<ModbusUtil.IRRADIANCE_LOW)
- {
- modbusPojo.setIrradiance(ModbusUtil.IRRADIANCE_LOW);
- System.out.println("getIrradiance = "+modbusPojo.getIrradiance());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getScaIrradiance()>ModbusUtil.IRRADIANCE_UP)
- {
- modbusPojo.setScaIrradiance(ModbusUtil.IRRADIANCE_UP);
- System.out.println("getScaIrradiance = "+modbusPojo.getScaIrradiance());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getScaIrradiance()<ModbusUtil.IRRADIANCE_LOW)
- {
- modbusPojo.setScaIrradiance(ModbusUtil.IRRADIANCE_LOW);
- System.out.println("getScaIrradiance = "+modbusPojo.getScaIrradiance());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getDirIrradiance()>ModbusUtil.IRRADIANCE_UP)
- {
- modbusPojo.setDirIrradiance(ModbusUtil.IRRADIANCE_UP);
- System.out.println("getDirIrradiance = "+modbusPojo.getDirIrradiance());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getDirIrradiance()<ModbusUtil.IRRADIANCE_LOW)
- {
- modbusPojo.setDirIrradiance(ModbusUtil.IRRADIANCE_LOW);
- System.out.println("getDirIrradiance = "+modbusPojo.getDirIrradiance());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getWindSpeed()>ModbusUtil.UAVG_UP)
- {
- modbusPojo.setWindSpeed(ModbusUtil.UAVG_UP);
- System.out.println("getWindSpeed = "+modbusPojo.getWindSpeed());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getWindSpeed()<ModbusUtil.UAVG_LOW)
- {
- modbusPojo.setWindSpeed(ModbusUtil.UAVG_LOW);
- System.out.println("getWindSpeed = "+modbusPojo.getWindSpeed());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getWindDir()>ModbusUtil.VAVG_UP)
- {
- modbusPojo.setWindDir(ModbusUtil.VAVG_UP);
- System.out.println("getWindDir = "+modbusPojo.getWindDir());
- modbusPojo.setQua(1);
- }
- if(modbusPojo.getWindDir()<ModbusUtil.VAVG_LOW)
- {
- modbusPojo.setWindDir(ModbusUtil.VAVG_LOW);
- System.out.println("getWindDir = "+modbusPojo.getWindDir());
- modbusPojo.setQua(1);
- }
- }
-
- public void process()
- {
- try
- {
- TimeUnit.MILLISECONDS.sleep(comm_timeout);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- recvProcess();
- sendProcess();
- }
-
- public void recvProcess()
- {
- refreshFlag = false;
- byte[] recvBuff = new byte[RECV_INNER_SIZE];
- while(deque.size()>=RECV_SIZE)
- {
- Byte first = deque.pollFirst();
- if(first == devAddr)
- {
- Byte second = deque.pollFirst();
- if(second == 0x03)
- {
- Byte third = deque.pollFirst();
- if(third == RECV_INNER_SIZE)
- {
- for(int i=0;i<RECV_INNER_SIZE;i++)
- {
- recvBuff[i] = deque.pollFirst();
- }
- deque.pollFirst();
- deque.pollFirst();
- dealRecvBuff(recvBuff);
- }
- }
- }
- }
- }
-
- public void dealRecvBuff(byte[] recvBuff)
- {
- System.out.println(CommUtil.bytesToHex(recvBuff));
- refreshFlag = true;
- getModbusPojo(recvBuff);
- // modbusPojo.print();
- }
-
- public void getModbusPojo(byte[] recvBuff)
- {
- int temp;
- synchronized (modbusPojo)
- {
- for(int i=0;i<recvBuff.length;)
- {
- switch(i)
- {
- case 0:
- temp = ModbusUtil.getSignedAns(recvBuff, 0, 1);
- double envTemperature = temp*0.1;
- modbusPojo.setEnvTemperature(envTemperature);
- break;
- case 2:
- temp = ModbusUtil.getSignedAns(recvBuff, 2, 3);
- double temperature = temp*0.1;
- modbusPojo.setTemperature(temperature);
- break;
- case 4:
- temp = ModbusUtil.getUnsignedAns(recvBuff, 4, 5);
- double humidity = temp*0.1;
- modbusPojo.setHumidity(humidity);
- break;
- case 6:
- temp = ModbusUtil.getUnsignedAns(recvBuff, 6, 7);
- double pressure = temp*0.1;
- modbusPojo.setPressure(pressure);
- break;
- case 8:
- temp = ModbusUtil.getUnsignedAns(recvBuff, 8, 9);
- modbusPojo.setIrradiance(temp);
- break;
- case 10:
- temp = ModbusUtil.getUnsignedAns(recvBuff, 10, 11);
- modbusPojo.setScaIrradiance(temp);
- break;
- case 12:
- temp = ModbusUtil.getUnsignedAns(recvBuff, 12, 13);
- modbusPojo.setDirIrradiance(temp);
- break;
- case 14:
- temp = ModbusUtil.getUnsignedAns(recvBuff, 14, 15);
- modbusPojo.setWindDir(temp);
- break;
- case 16:
- temp = ModbusUtil.getUnsignedAns(recvBuff, 16, 17);
- double windSpeed = temp*0.1;
- modbusPojo.setWindSpeed(windSpeed);
- break;
- case 18:
- temp = ModbusUtil.getUnsignedAns(recvBuff, 18, 19);
- double windSpeedTwo = temp*0.1;
- modbusPojo.setWindSpeedTwo(windSpeedTwo);
- break;
- case 20:
- temp = ModbusUtil.getUnsignedAns(recvBuff, 20, 21);
- double windSpeedTen = temp*0.1;
- modbusPojo.setWindSpeedTen(windSpeedTen);
- break;
- case 22:
- temp = ModbusUtil.getUnsignedAns(recvBuff, 22, 23);
- modbusPojo.setDailyExposure(temp);
- break;
- case 24:
- temp = ModbusUtil.getUnsignedAns(recvBuff, 24, 25);
- double totalExposure = temp*0.001;
- modbusPojo.setTotalExposure(totalExposure);
- break;
- case 26:
- temp = ModbusUtil.getUnsignedAns(recvBuff, 26, 27);
- double scaExposure = temp*0.001;
- modbusPojo.setScaExposure(scaExposure);
- break;
- case 28:
- temp = ModbusUtil.getUnsignedAns(recvBuff, 28, 29);
- double dirExposure = temp*0.001;
- modbusPojo.setDirExposure(dirExposure);
- break;
- }
- i=i+2;
- }
- }
- }
-
- public void sendProcess()
- {
- byte[] message = new byte[8];
- int sendLen = 0;
- message[sendLen++] = devAddr;
- message[sendLen++] = 0x03;
- message[sendLen++] = 0x00;
- message[sendLen++] = 0x00;
- message[sendLen++] = 0x00;
- message[sendLen++] = 0x0F;
- byte[] crc = CommUtil.CRC16(message,6);
- message[sendLen++] = crc[0];
- message[sendLen++] = crc[1];
- comm.writeBuff(message);
- }
-
- }
欢迎跳转到本文的原文链接:https://honeypps.com/network/java-udp-comm/
欢迎支持笔者新作:《深入理解Kafka:核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客。