`

【Java TCP/IP Soket】— 消息边界的问题解决

阅读更多

转自:http://blog.csdn.net/dabing69221/article/details/17222595

【Java TCP/IP Soket】— 消息边界的问题解决

关于消息边界问题,在TCP套接字处理接收消息中尤为重要,所以大家一定要学会解决它!

场景:

     当接收者试图从套接字中读取比消息本身更多的字节时,将可能发生两种情况:

    1.如果套接字中没有其他消息,接收者将会阻塞等待,同时无法处理接收到的消息;如果发送者也在等待接收端的响应消息,则会形成死锁;

    2.如果套接字中有其他消息,接收者会将后面消息的一部分甚至全部读到第一条消息中去,这将产生一些协议错误;

应用:

    当我们使用TCP套接字时,处理“消息边界” 是一个重要的考虑因素;如果使用UDP套接字时,不存在这个问题,因为在DatagramPacket中存放的数据

    有一个确定的长度(DatagramPacket.getLength()方法),接收者能够精确的知道“消息边界”(或消息结束位置)

    现在介绍两个技术可以使接收者能够精确的找到“消息边界”(也就是消息的结束位置)

 

       1. 基于定界符

           消息的结束由一个唯一的标记指出,即发送者在传输消息后显示添加一个特殊标记。这个特殊标记不能在传输消息本身中出现。

           注意:

           (1)前提条件:

                使用“基于定界符”的方法来解决消息边界问题时,消息本身不能包含有“定界符”,否则接收者将提前认为消息已经结束;

           (2)特殊的实现:

                使用Socket.close( )或Socket.shutdownOutput( ) 来实现“基于定界符”的方法:在“基于定界符”中有一个特殊情况是,可以用在TCP连接上传输的最后一个消息

                上。 在发送完这个消息后,发送者就可以简单的关闭(使用socket.shutdownOutput()方法或socket.close()方法)发送端的TCP连接。接收者读取完这条消息

                的最后一个字节后,将接收到一个流结束标记(即InputStream.read() 返回 -1),该标记指出了已经读取到流的末尾;

           (2)应用场景:

               “基于定界符”的方法通常用在“以文本方式编码的消息”中,不能用在“以二进制方式编码的消息”中(例如图片、MP3),其中最大的一个原因就是:接收者需要遍历

                消息信息来查找“定界符”(定界符:其实就是使用一个特殊的字符或字符串来标识消息的结束)。假如这个消息信息是一个图片,你在图片(二进制文件)中去查找

                一个“字符”合适吗?肯定不合适,二进制肯定不能与字符来进行比较;

 

       2.显示长度

           在变长字段或消息前附加一个固定大小的字段,用来指示该字段或消息中包含了多少字节;

           注意:使用这种方式必须要知道消息的上限,但是,假如在无意间发送的消息超过了消息的上限,如果不处理妥当,将会发生消息丢失;

          

例子:

一.基于定界符的实现例子

1.使用“自定义定界符”,解决消息边界问题:

(1). 处理定界符的消息类

  1. public class DelimFramer  
  2. {  
  3.   
  4.     private static final int DELIMITER = '\n';  
  5.   
  6.     /** 
  7.      * 添加成帧信息并将信息写入到输出流 
  8.      *  
  9.      * @param message 
  10.      * @param out 
  11.      * @throws IOException 
  12.      */  
  13.     public void frameMsg(byte[] message, OutputStream out) throws IOException  
  14.     {  
  15.         for (byte b : message)  
  16.         {  
  17.             /* 
  18.              * 注意:发送的消息本身不能包含定界符。如果存在,则抛出异常 
  19.              */  
  20.             if (b == DELIMITER)  
  21.             {  
  22.                 throw new IOException("Message contains delimiter");  
  23.             }  
  24.         }  
  25.   
  26.         out.write(message);  
  27.         out.write(DELIMITER);  
  28.         out.flush();  
  29.     }  
  30.   
  31.     /** 
  32.      * 读入输入流,直到读取到了定界符,并返回定界符前面的所有字符 
  33.      *  
  34.      * 1.包含定界符的信息 2.不包含定界符的信息 
  35.      *  
  36.      * @return 
  37.      * @throws IOException 
  38.      */  
  39.     public byte[] nextMsg(InputStream in) throws IOException  
  40.     {  
  41.         ByteArrayOutputStream messageBuffer = new ByteArrayOutputStream();  
  42.         int nextByte;  
  43.   
  44.         /* 
  45.          * 情况一:判断消息中是否包含定界符; 如果输入流读取到了定界符,则返回定界前面的所有字符(不包括定界符) 
  46.          */  
  47.         while ((nextByte = in.read()) != DELIMITER)  
  48.         {  
  49.             /* 
  50.              * 情况二:判断消息中是否不包含定界符;如果输入流读取到了-1(说明该消息中不包括定界符) 
  51.              */  
  52.             if (nextByte == -1)  
  53.             {  
  54.                 /* 
  55.                  * 判断BytaArrayOutputStream的缓冲区中是否有数据: 
  56.                  * 1.如果没有数据:说明从该输入流中没有读取到消息,就到达输入流的末尾 ; 
  57.                  * 2.如果有数据:说明从该输入流中读取的消息是一个不带分界符的非空消息; 
  58.                  */  
  59.                 if (messageBuffer.size() == 0)  
  60.                 {  
  61.                     return null;  
  62.                 }  
  63.                 else  
  64.                 {  
  65.                     throw new EOFException(  
  66.                             "Non-empty message without delimiter");  
  67.                 }  
  68.             }  
  69.             messageBuffer.write(nextByte);  
  70.         }  
  71.         return messageBuffer.toByteArray();  
  72.     }  
  73. }  

 

(2). TCP客户端类

 

  1. public class TCPClient  
  2. {  
  3.   
  4.     public static void main(String[] args) throws UnknownHostException,  
  5.             IOException  
  6.     {  
  7.         Socket client = new Socket(InetAddress.getLocalHost(), 8888);  
  8.         OutputStream output = client.getOutputStream();  
  9.         InputStream input = client.getInputStream();  
  10.         DelimFramer delimFramer = new DelimFramer();  
  11.   
  12.         byte[] msg = new String("Hello").getBytes();  
  13.         // 发送消息  
  14.         delimFramer.frameMsg(msg, output);  
  15.   
  16.         // 接收消息  
  17.         byte[] receiveByte = delimFramer.nextMsg(input);  
  18.         String receiveMsg = new String(receiveByte);  
  19.   
  20.         System.out.println("Client receive msg:" + receiveMsg);  
  21.   
  22.         input.close();  
  23.         output.close();  
  24.         client.close();  
  25.   
  26.     }  
  27. }  

 

(3).TCP服务端类

 

 

  1. public class TCPServer  
  2. {  
  3.   
  4.     public static void main(String[] args) throws IOException  
  5.     {  
  6.         DelimFramer delimFramer = new DelimFramer();  
  7.   
  8.         ServerSocket server = new ServerSocket(8888);  
  9.         OutputStream output;  
  10.         InputStream input;  
  11.   
  12.         while (true)  
  13.         {  
  14.             Socket client = server.accept();  
  15.   
  16.             System.out.println("Handing client at "  
  17.                     + client.getRemoteSocketAddress());  
  18.   
  19.             output = client.getOutputStream();  
  20.             input = client.getInputStream();  
  21.   
  22.             byte[] msg = delimFramer.nextMsg(input);  
  23.   
  24.             System.out.println("Server receive msg:" + new String(msg));  
  25.   
  26.             delimFramer.frameMsg(msg, output);  
  27.         }  
  28.     }  
  29. }  

这个例子还有一个缺点,就是只考虑了“定界符”是单字节的情况,对于多字节的情况没有考虑。自己也没有找到什么好的办法,如果大家有知道的请回复一下。

 

2.使用定界符的“特殊的实现”(close( )或shutdownOutput( )方法), 解决消息边界问题:

(1)TCP客户端

 

  1. import java.io.ByteArrayOutputStream;  
  2. import java.io.IOException;  
  3. import java.io.InputStream;  
  4. import java.io.OutputStream;  
  5. import java.net.InetAddress;  
  6. import java.net.Socket;  
  7. import java.net.UnknownHostException;  
  8.   
  9. public class TestClient  
  10. {  
  11.     public static void main(String[] args) throws UnknownHostException,  
  12.             IOException  
  13.     {  
  14.         byte[] msg = new String("Hello Server!").getBytes();  
  15.   
  16.         Socket client = new Socket(InetAddress.getLocalHost(), 8888);  
  17.         OutputStream output = client.getOutputStream();  
  18.         InputStream input = client.getInputStream();  
  19.   
  20.         output.write(msg);  
  21.         output.flush();  
  22.   
  23.         client.shutdownOutput();  
  24.   
  25.         ByteArrayOutputStream byteArray = new ByteArrayOutputStream();  
  26.         int readSize = 0;  
  27.         byte[] temp = new byte[1024];  
  28.         while ((readSize = input.read(temp)) != -1)  
  29.         {  
  30.             byteArray.write(temp, 0, readSize);  
  31.         }  
  32.   
  33.         byte[] recvByte = byteArray.toByteArray();  
  34.   
  35.         System.out.println("Client receive message:" + new String(recvByte));  
  36.   
  37.         byteArray.close();  
  38.         input.close();  
  39.         output.close();  
  40.         client.close();  
  41.     }  
  42. }  


(2)TCP服务端

 

 

  1. import java.io.ByteArrayOutputStream;  
  2. import java.io.IOException;  
  3. import java.io.InputStream;  
  4. import java.io.OutputStream;  
  5. import java.net.ServerSocket;  
  6. import java.net.Socket;  
  7.   
  8. public class TestServer  
  9. {  
  10.     public static void main(String[] args) throws IOException  
  11.     {  
  12.         ServerSocket server = new ServerSocket(8888);  
  13.         byte[] msg = new String("Hello Client!").getBytes();  
  14.         while (true)  
  15.         {  
  16.   
  17.             Socket client = server.accept();  
  18.   
  19.             System.out.println("Handling clint at:"  
  20.                     + client.getRemoteSocketAddress());  
  21.   
  22.             InputStream input = client.getInputStream();  
  23.             OutputStream output = client.getOutputStream();  
  24.   
  25.             ByteArrayOutputStream byteArrayOut = new ByteArrayOutputStream();  
  26.             byte[] temp = new byte[1024];  
  27.             int readSize = 0;  
  28.             while ((readSize = input.read(temp)) != -1)  
  29.             {  
  30.                 byteArrayOut.write(temp, 0, readSize);  
  31.             }  
  32.             byte[] recvByte = byteArrayOut.toByteArray();  
  33.             System.out  
  34.                     .println("Server receive message:" + new String(recvByte));  
  35.   
  36.             output.write(recvByte);  
  37.             output.flush();  
  38.             client.shutdownOutput();  
  39.   
  40.             output.close();  
  41.             input.close();  
  42.         }  
  43.     }  
  44. }  

   注意:

   使用该方法 适用于 客户端与服务端的两次握手通信,一般能够瞒住大部分业务逻辑需求。两次握手通信为:客户端发送消息 服务端接收、服务端发送消息 客户端接收;

   如果要实现多次握手通信,请使用 “自定义定界符” 方式实现。

 

二. 显示长度的实现例子

   前面已经说过,使用 “显示长度” 的方式必须要知道 “消息长度的上限”,所以我们可以使用DataInputStream类来读取消息长度,它提供了两个方法,分别为:

   DataInputStream.readUnsignedByte( ): 读取此输入流的下一个字节并返回”无符号 8 位数“, 所以它的取值范围为:0 ~ 255 (2^8-1) , 所以, 消息长度上限为: 255;

   DataInputStream.readUnsignedShort():读取此输入流的下两个字节并返回” 一个无符号 16 位整数“ , 所以它的取值范围为:0 ~ 65535 (2^16-1), 所以, 消息长度上限为: 65535;

  (1). 处理定界符的消息类:

  

  1. public class LengthFramer implements Framer  
  2. {  
  3.   
  4.     public static final int MAXMESSAGELENGTH = 65535;  
  5.   
  6.     public static final int BYTEMASK = 255;  
  7.   
  8.     public static final int SHORTMASK = 65535;  
  9.   
  10.     public static final int BYTESHTFT = 8;  
  11.   
  12.     @Override  
  13.     public void frameMsg(byte[] message, OutputStream output)  
  14.             throws IOException  
  15.     {  
  16.         /** 
  17.          * 这里的接收端接收的消息长度上限为65535个byte,所以这里必须判断发送消息的长度上限。 如果超出消息长度上限,超出的部分会被忽略 
  18.          */  
  19.         if (message.length > MAXMESSAGELENGTH)  
  20.         {  
  21.             throw new IOException("message to long");  
  22.         }  
  23.         // 这里使用了Java中的移位运算与位运算,将发送的消息长度拆分为2个字节并发送(readUnsignedShort()方法:读取输入流的下两个字节,所以这里必须将消息长度拆分为2个字节发送)  
  24.         output.write((message.length >> BYTESHTFT) & BYTEMASK);  
  25.         output.write(message.length & BYTEMASK);  
  26.   
  27.         output.write(message);  
  28.         output.flush();  
  29.     }  
  30.   
  31.     @Override  
  32.     public byte[] nextMsg(InputStream input) throws IOException  
  33.     {  
  34.         int length;  
  35.         DataInputStream dataInput;  
  36.         try  
  37.         {  
  38.             /** 
  39.              * 使用readUnsignedShort()返回的最大值为65535,所以接收msg数组的长度最大为65535,所以, 
  40.              * 接收消息长度的上限为65535个字节 
  41.              */  
  42.             dataInput = new DataInputStream(input);  
  43.             length = dataInput.readUnsignedShort();  
  44.         }  
  45.         catch (EOFException e)  
  46.         {  
  47.             return null;  
  48.         }  
  49.         byte[] msg = new byte[length];  
  50.         dataInput.readFully(msg);  
  51.         return msg;  
  52.     }  
  53. }    

 

   注意:

   使用 “显示长度” 的方式 处理消息边界有一个弊端,就是必须要知道消息长度上限。但是,在实际应用中,我们发送的消息长度往往都在不经意间超出了消息长度,如果不处理妥当

 

   这时候就会造成消息的丢失,所以,这个方法也不实用,大概了解一下吧。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics