JavaSocket通信技术收发线程互斥的解决方法

Java Socket通信技术在很长的时间里都在使用,在不少的程序员眼中都有很多高的评价。那么下面我们就看看如何才能掌握这门复杂的编程语言,希望大家在今后的Java Socket通信技术使用中有所收获。

创新互联专注于企业成都营销网站建设、网站重做改版、富锦网站定制设计、自适应品牌网站建设、H5页面制作商城网站定制开发、集团公司官网建设、外贸网站制作、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为富锦等各大城市提供网站开发制作服务。

下面就是Java Socket通信技术在解决收发线程互斥的代码介绍。

 
 
 
  1. package com.bill99.svr; 
  2. import java.io.IOException; 
  3. import java.io.InputStream; 
  4. import java.io.OutputStream; 
  5. import java.net.InetSocketAddress; 
  6. import java.net.Socket; 
  7. import java.net.SocketException; 
  8. import java.net.SocketTimeoutException; 
  9. import java.text.SimpleDateFormat; 
  10. import java.util.Date; 
  11. import java.util.Properties; 
  12. import java.util.Timer; 
  13. import java.util.TimerTask; 
  14. import java.util.concurrent.ConcurrentHashMap; 
  15. import java.util.concurrent.TimeUnit; 
  16. import java.util.concurrent.locks.Condition; 
  17. import java.util.concurrent.locks.ReentrantLock; 
  18. import org.apache.log4j.Logger; 
  19. /** 
  20. *

    title: socket通信包装类

     
  21. *

    Description: 

     
  22. *

    CopyRight: CopyRight (c) 2009

     
  23. *

    Company: 99bill.com

     
  24. *

    Create date: 2009-10-14

     
  25. *author sunnylocus 
  26.  * v0.10 2009-10-14 初类 
  27. * v0.11 2009-11-12 对命令收发逻辑及收发线程互斥机制进行了优化,
    处理命令速度由原来8~16个/秒提高到25~32个/秒 
  28. */ public class SocketConnection { 
  29. private volatile Socket socket; 
  30. private int timeout = 1000*10; //超时时间,初始值10秒 
  31. private boolean isLaunchHeartcheck = false;//是否已启动心跳检测 
  32. private boolean isNetworkConnect = false; //网络是否已连接 
  33. private static String host = ""; 
  34. private static int port; 
  35. static InputStream inStream = null; 
  36. static OutputStream outStream = null; 
  37. private static Logger log =Logger.getLogger
    (SocketConnection.class); 
  38. private static SocketConnection socketConnection = null; 
  39. private static java.util.Timer heartTimer=null;   
  40. //private final Map recMsgMap= Collections.
    synchronizedMap(new HashMap()); 
  41. private final ConcurrentHashMap recMsgMap 
    = new ConcurrentHashMap(); 
  42. private static Thread receiveThread = null; 
  43. private final ReentrantLock lock = new ReentrantLock(); 
  44. private SocketConnection(){ 
  45. Properties conf = new Properties(); 
  46. try { 
  47. conf.load(SocketConnection.class.getResourceAsStream
    ("test.conf")); 
  48. this.timeout = Integer.valueOf(conf.getProperty("timeout")); 
  49. init(conf.getProperty("ip"),Integer.valueOf
    (conf.getProperty("port"))); 
  50. } catch(IOException e) { 
  51. log.fatal("socket初始化异常!",e); 
  52. throw new RuntimeException("socket初始化异常,请检查配置参数"); 
  53. /** 
  54. * 单态模式 
  55. */ 
  56. public static SocketConnection getInstance() { 
  57. if(socketConnection==null) { 
  58. synchronized(SocketConnection.class) { 
  59. if(socketConnection==null) { 
  60. socketConnection = new SocketConnection(); 
  61. return socketConnection; 
  62. return socketConnection; 
  63. private void init(String host,int port) throws IOException { 
  64. InetSocketAddress addr = new InetSocketAddress(host,port); 
  65. socket = new Socket(); 
  66. synchronized (this) { 
  67. log.info("【准备与"+addr+"建立连接】"); 
  68. socket.connect(addr, timeout); 
  69. log.info("【与"+addr+"连接已建立】"); 
  70. inStream = socket.getInputStream(); 
  71. outStream = socket.getOutputStream(); 
  72. socket.setTcpNoDelay(true);//数据不作缓冲,立即发送 
  73. socket.setSoLinger(true, 0);//socket关闭时,立即释放资源 
  74. socket.setKeepAlive(true); 
  75. socket.setTrafficClass(0x04|0x10);//高可靠性和最小延迟传输 
  76. isNetworkConnect=true; 
  77. receiveThread = new Thread(new ReceiveWorker()); 
  78. receiveThread.start(); 
  79. SocketConnection.host=host; 
  80. SocketConnection.port=port; 
  81. if(!isLaunchHeartcheck) 
  82. launchHeartcheck(); 
  83. /** 
  84. * 心跳包检测 
  85. */ 
  86. private void launchHeartcheck() { 
  87. if(socket == null) 
  88. throw new IllegalStateException("socket is not 
    established!"); 
  89. heartTimer = new Timer(); 
  90. isLaunchHeartcheck = true; 
  91. heartTimer.schedule(new TimerTask() { 
  92. public void run() { 
  93. String msgStreamNo = StreamNoGenerator.getStreamNo("kq"); 
  94. int mstType =9999;//999-心跳包请求 
  95. SimpleDateFormat dateformate = new SimpleDateFormat
    ("yyyyMMddHHmmss"); 
  96. String msgDateTime = dateformate.format(new Date()); 
  97. int msgLength =38;//消息头长度 
  98. String commandstr = "00" +msgLength + mstType + msgStreamNo; 
  99. log.info("心跳检测包 -> IVR "+commandstr); 
  100. int reconnCounter = 1; 
  101. while(true) { 
  102. String responseMsg =null; 
  103. try { 
  104. responseMsg = readReqMsg(commandstr); 
  105. } catch (IOException e) { 
  106. log.error("IO流异常",e); 
  107. reconnCounter ++; 
  108. if(responseMsg!=null) { 
  109. log.info("心跳响应包 <- IVR "+responseMsg); 
  110. reconnCounter = 1; 
  111. break; 
  112. } else { 
  113. reconnCounter ++; 
  114. if(reconnCounter >3) {//重连次数已达三次,判定网络连接中断,
    重新建立连接。连接未被建立时不释放锁 
  115. reConnectToCTCC(); break; 
  116. },1000 * 60*1,1000*60*2); 
  117. /** 
  118. * 重连与目标IP建立重连 
  119. */ 
  120. private void reConnectToCTCC() { 
  121. new Thread(new Runnable(){ 
  122. public void run(){ 
  123. log.info("重新建立与"+host+":"+port+"的连接"); 
  124. //清理工作,中断计时器,中断接收线程,恢复初始变量 
  125. heartTimer.cancel(); 
  126. isLaunchHeartcheck=false; 
  127. isNetworkConnect = false; 
  128. receiveThread.interrupt(); 
  129. try { 
  130. socket.close(); 
  131. } catch (IOException e1) {log.error("重连时,关闭socket连
    接发生IO流异常",e1);} 
  132. //---------------- 
  133. synchronized(this){ 
  134. for(; ;){ 
  135. try { 
  136. Thread.currentThread(); 
  137. Thread.sleep(1000 * 1); 
  138. init(host,port); 
  139. this.notifyAll(); 
  140. break ; 
  141. } catch (IOException e) { 
  142. log.error("重新建立连接未成功",e); 
  143. } catch (InterruptedException e){ 
  144. log.error("重连线程中断",e); 
  145. }).start(); 
  146. /** 
  147. * 发送命令并接受响应 
  148. * @param requestMsg 
  149. * @return 
  150. * @throws SocketTimeoutException 
  151. * @throws IOException 
  152. */ 
  153. public String readReqMsg(String requestMsg) throws IOException { 
  154. if(requestMsg ==null) { 
  155. return null; 
  156. if(!isNetworkConnect) { 
  157. synchronized(this){ 
  158. try { 
  159. this.wait(1000*5); //等待5秒,如果网络还没有恢复,抛出IO流异常 
  160. if(!isNetworkConnect) { 
  161. throw new IOException("网络连接中断!"); 
  162. } catch (InterruptedException e) { 
  163. log.error("发送线程中断",e); 
  164. String msgNo = requestMsg.substring(8, 8 + 24);//读取流水号 
  165. outStream = socket.getOutputStream(); 
  166. outStream.write(requestMsg.getBytes()); 
  167. outStream.flush(); 
  168. Condition msglock = lock.newCondition(); //消息锁 
  169. //注册等待接收消息 
  170. recMsgMap.put(msgNo, msglock); 
  171. try { 
  172. lock.lock(); 
  173. msglock.await(timeout,TimeUnit.MILLISECONDS); 
  174. } catch (InterruptedException e) { 
  175. log.error("发送线程中断",e); 
  176. } finally { 
  177. lock.unlock(); 
  178. Object respMsg = recMsgMap.remove(msgNo); //响应信息 
  179. if(respMsg!=null &&(respMsg != msglock)) { 
  180. //已经接收到消息,注销等待,成功返回消息 
  181. return (String) respMsg; 
  182. } else { 
  183. log.error(msgNo+" 超时,未收到响应消息"); 
  184. throw new SocketTimeoutException(msgNo+" 超时,未收到响应消息"); 
  185. public void finalize() { 
  186. if (socket != null) { 
  187. try { 
  188. socket.close(); 
  189. } catch (IOException e) { 
  190. e.printStackTrace(); 
  191. //消息接收线程 
  192. private class ReceiveWorker implements Runnable { 
  193. String intStr= null; 
  194. public void run() { 
  195. while(!Thread.interrupted()){ 
  196. try { 
  197. byte[] headBytes = new byte[4]; 
  198. if(inStream.read(headBytes)==-1){ 
  199. log.warn("读到流未尾,对方已关闭流!"); 
  200. reConnectToCTCC();//读到流未尾,对方已关闭流 
  201. return; 
  202. byte[] tmp =new byte[4]; 
  203. tmp = headBytes; 
  204. String tempStr = new String(tmp).trim(); 
  205. if(tempStr==null || tempStr.equals("")) { 
  206. log.error("received message is null"); 
  207. continue; 
  208. intStr = new String(tmp); 
  209. int totalLength =Integer.parseInt(intStr); 
  210. //---------------- 
  211. byte[] msgBytes = new byte[totalLength-4]; 
  212. inStream.read(msgBytes); 
  213. String resultMsg = new String(headBytes)+ new 
    String(msgBytes); 
  214. //抽出消息ID 
  215. String msgNo = resultMsg.substring(8, 8 + 24); 
  216. Condition msglock =(Condition) recMsgMap.get(msgNo); 
  217. if(msglock ==null) { 
  218. log.warn(msgNo+"序号可能已被注销!响应消息丢弃"); 
  219. recMsgMap.remove(msgNo); 
  220. continue; 
  221. recMsgMap.put(msgNo, resultMsg); 
  222. try{ 
  223. lock.lock(); 
  224. msglock.signalAll(); 
  225. }finally { 
  226. lock.unlock(); 
  227. }catch(SocketException e){ 
  228. log.error("服务端关闭socket",e); 
  229. reConnectToCTCC(); 
  230. } catch(IOException e) { 
  231. log.error("接收线程读取响应数据时发生IO流异常",e); 
  232. } catch(NumberFormatException e){ 
  233. log.error("收到没良心包,String转int异常,异常字符:"+intStr); 
  234. }

以上就是对Java Socket通信技术中收发线程互斥的详细解决方法。希望大家有所领悟。

当前名称:JavaSocket通信技术收发线程互斥的解决方法
分享网址:http://www.hantingmc.com/qtweb/news44/52344.html

网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联