JAVASocket编程学习10--解决TCP粘包分包问题
前⾔:
根据我的第七篇⽂章blog.csdn/m0_37739193/article/details/78484577编写了NIO的Socket服务端代码后,接受UDP的数据正常,但是接收的TCP数据却出现了粘包分包/拆包/半包问题,查阅⽹上资料知道已经有开源的Netty提供了多种⽀持TCP粘
包/拆包的解码器,⽤来满⾜⽤户的不同诉求。
可是我已经根据Java原⽣的NIO写完了整个代码框架,要是换成Netty的话就整个框架代码就都得换了,由于嫌⿇烦,我就开动⾃⼰的⼤脑,能不能⾃⼰通过代码来实现呢?在⾃⼰的努⼒之下好像是可以的哈,基本能把收来的数据处理成⾃⼰想要的。
⾸先模拟⼀个TCP客户端:
1import java.io.BufferedReader;
2import java.io.IOException;
3import java.io.InputStreamReader;
4import java.io.PrintWriter;
5import java.InetSocketAddress;
6import java.Socket;
7import java.UnknownHostException;
8import urrent.ExecutorService;
9import urrent.Executors;
10
11public class HeavyTCPClient1 {
12 private static ExecutorService tp = wCachedThreadPool();
13 private static final int sleep_time = 1000*1000*1000;
14 public static class EchoClient implements Runnable{
15 public void run(){
16 Socket client = null;
17 PrintWriter writer = null;
18 BufferedReader reader = null;
19 try {
20 client = new Socket();
21 t(new InetSocketAddress("localhost", 7788));
22 writer = new OutputStream(), true);
23// writer.print("{'probeid':0,'type':0,'dbopt':0,'data':{ \"client_ip\": \"192.168.102.29\", \"server_ip\": \"74.125.204.113\", \"client_port\": 13698 }}\r\n{'probeid':")
24 writer.print("0,'type':0,'dbopt':0,'data':{ \"client_ip\": \"192.168.102.29\", \"server_ip\": \"74.125.204.113\", \"client_port\": 13698 }}\r\n{'probeid':0,'type':0,'dbo 25// writer.print("opt':0,'data':
{ \"client_ip\": \"192.168.102.29\", \"server_ip\": \"74.125.204.113\", \"client_port\": 13698 }}\r\n{'probeid':0,'type':0,'dbopt':0,'data' 26// writer.print("erver_ip\": \"74.125.204.113\", \"client_port\": 13698 }}\r\n{'probeid':0,'type':0,'dbopt':0,'data':{ \"client_ip\": \"192.168.102.29\", \"server_ip\": \ 27 writer.flush();
28
29 reader = new BufferedReader(new InputStream()));
30 System.out.println("from server: " + adLine());
31
32 reader.close();
33 writer.close();
34 client.close();
35 } catch (UnknownHostException e){
36 e.printStackTrace();
37 } catch (IOException e){
38 e.printStackTrace();
39 } finally {
40 try {
41 if (writer != null)
42 writer.close();
43 if (reader != null)
44 reader.close();
45 if (client != null)
46 client.close();
47 } catch (IOException e){
48 }
49 }
50 }
51 }
52
53 public static void main(String args[]) throws IOException {
54 EchoClient ec = new EchoClient();
55// for(int i=0;i<1000;i++)
56 for(int i=0;i<1;i++)
57 tp.execute(ec);
58 tp.shutdown();
59 }
60}
因为我是把服务端接收到的数据再发送到Kafka中,再把Kafka中的数据消费到MySQL中去的,所以在消费者端处理数据的粘包分包/拆包/半包问题:
1import java.util.Arrays;
2import java.util.Properties;
3import Matcher;
4import Pattern;
5
6import org.apache.sumer.ConsumerRecord;
7import org.apache.sumer.ConsumerRecords;
8import org.apache.sumer.KafkaConsumer;
9
10public class KafkaConsumerToMysql {
11 static String first = "";
12 private static String line = "";
13 private static int b = 0;
14
15 public static void main(String[] args) throws Exception {
16// long a = System.currentTimeMillis();
17 int a = 0;
18 Properties props = new Properties();
19 props.put("bootstrap.servers", "h153:9092");
20 props.put("group.id", "test");
21// props.put("enable.automit", "true");
22 props.put("enable.automit", "false");
23 props.put("automit.interval.ms", "1000");
24 props.put("session.timeout.ms", "30000");
25 props.put("key.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
26 props.put("value.deserializer", "org.apache.kafkamon.serialization.StringDeserializer");
27 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
28 consumer.subscribe(Arrays.asList("tcp"));
29 MysqlUtils mysql = new MysqlUtils();
30 String pattern = "\\{('\\D+':\\d+,){3}'data':\\{(\\s\\S+\\s\\S+,)+\\s\\S+:\\s\\d+\\s\\}\\}";
31 Pattern p = Patternpile(pattern);
32 while (true) {
33 ConsumerRecords<String, String> records = consumer.poll(100);
34 for (ConsumerRecord<String, String> record : records){
35 a++;
36 String[] data = record.value().split("\r\n");
37 for(int i=0;i<data.length;i++){
38 if(data.length <= 1){
39 line = data[0];
40 Matcher matcher = p.matcher(line);
41 if(!matcher.matches()){
42 first = line;
43 System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), b+"-->"+data[0]);
44 continue;
45 }else{
46 first = "";
47 System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), b+"-->"+data[0]);
48 mysql.put(data[0],a);
49 continue;
50 }
51 }else{
52 if(i == data.length-1){
53 line = data[data.length-1];
54 Matcher matcher = p.matcher(line);
55 if(!matcher.matches()){
56 first = line;
57 System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), b+"-->"+data[i]);
58 continue;
59 }else{
60 first = "";
61 System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), b+"-->"+data[i]);
62 mysql.put(data[i],a);
62 mysql.put(data[i],a);
63 continue;
64 }
65 }
66 if(i == 0){
67 System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), b+"-->"+first+data[i]);
68 Matcher matcher = p.matcher(first+data[i]);
69 if(!matcher.matches()){
70 System.out.println("该条数据舍弃");
71 }else{
72 mysql.put(first+data[i],a);
73 }
74 }else{
75 System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), b+"-->"+data[i]);
76 mysql.put(data[i],a);
77 }
78 }
79 }
80 }
81 }
82 }
83}
基本思路:让客户端发数据的时候在每条完整的json字符串后加结束标识符\r\n,我这⾥把收到的数据⽤split按\r\n分隔,在代码中定义⼀个全局变量first,⽤正则表达式匹配分隔好的每条数据,由于粘包分包/拆包/半包所造成的数据分隔后只有data[0]和data[data.length-1]的数据是不完整的,这⾥就分两种情况了,第⼀种是分隔后的数据只有⼀条,那么⽤正则匹配这条数据,如果能匹配则说明该条数据是我们想要的,则可插⼊mysql中,如果不是则说明这条数据是不完整的,将它赋给first(如果出现粘包分包/拆包/半包问题,则第⼀组的最后⼀条数据和第⼆组的第⼀条数据合起来才是⼀条完整的数据);第⼆种是分隔后的数据⼤于⼀条,那么就需要⽤正则来匹配第⼀条和最后⼀条数据了,如果第⼀条不匹配则加上上⼀组的最后⼀条数据first,如果最后⼀条不匹配的话则再把这不完整的数据赋给first,以此类推。这⾥需要注意的是当客户端第⼀次连接的时候发送的第⼀组的第⼀条的数据前半部分应该是完整的,就怕特殊状态下这第⼀次第⼀条的数据就是少头部的数据从半路来的数据,这样的话这组的第⼀条不完整的数据加上上⼀组的最后⼀条不完整的数据就构不成⼀条完整的我想要的数据了,所以这⾥我就把这条数据舍弃不插⼊MySQL中。
我们考虑⼀下这样的情况:我们编写了⼀个机器⼈控制程序,通过⼀个遥控器(客户端)向机器⼈(服务器)建⽴了⼀个长连接,并通过这个连接连续不断的从遥控器发送控制指令给机器⼈。由于是连续控制指令,所以指令与指令之间没有间隔(实际上您还可以想想很多类似场景,例如:开发的Online对战游戏)。
mysql下载 csdn我们使⽤JSON格式作为指令数据的承载格式。那么发送⽅和接收⽅的数据发送-接受过程可能如下图所⽰。
通过上图我们看到了接收⽅为了接受这两条连贯的指令,⼀共做了三次接受,第⼆次接收的时候,收到了⼀部分message1的内容和⼀部分message2的内容。这⾥要说明⼏个注意事项:
# MSS:MSS属性是TCP连接双⽅在三次握⼿时所确认的每⼀个TCP报⽂段中数据字段的最⼤长度。注意,⼀是连接双⽅协商出来的;⼆是只是数据段的最⼤长度,不包括IP协议头和TCP协议头的最⼤长度。
# 半包是指接收⽅应⽤程序在接收信息时,没有接收到⼀个完成的信息格式块;粘包是指,接收⽅应⽤程序在接受信息时,除了接收到发送⽅应⽤程序发送的某⼀个完整数据信息描述外,还接受到了⼀下发送⽅应⽤程序发送的下⼀个数据信息的⼀部分。
# 半包和粘包是针对应⽤程序来说的,这个问题只会发⽣在TCP⼀些进⾏连续发送数据时(TCP长连接)。UDP不会出现这个问题,因为UDP都是有边界的数据报;TCP短连接也不会出现,因为发送完⼀个指令信息后连接就断开了,不会发送第⼆个指令数据。
# 半包和粘包问题产⽣的根本是因为TCP本质上没有“数据块”的概念,⽽是⼀连串的数据流。在应⽤程序层⾯上我们所定义的“数据块”在TCP层⾯上并不被协议认可。
# 半包/粘包是⼀个应⽤层问题。要解决半包/粘包问题,就是在应⽤程序层⾯建⽴协商⼀致的信息还原
依据。常见的有两种⽅式:⼀是消息定长,即保证每⼀个完整的信息描述的长度都是⼀定的,这样⽆论TCP/IP协议如何进⾏分⽚,数据接收⽅都可以按照固定长度进⾏消息的还原。⼆是在完整的⼀块数据结束后增加协商⼀致的分隔符(例如增加⼀个回车符)。
在JAVA NIO技术框架中,半包和粘包问题我们需要⾃⼰解决,如果使⽤Netty框架,它其中提供了多种解码器的封装帮助我们解决半包和粘包问题。甚⾄针对不同的数据格式,Netty都提供了半包和粘包问题的现成解决⽅式,例如之前我们提到的
ProtobufVarint32FrameDecoder解码器,就是专门解决Protobuf数据格式在TCP长连接传输时的半包问题的。
下⾯我们会介绍FixedLengthFrameDecoder、DelimiterBasedFrameDecoder、LineBasedFrameDecoder来解决半包/粘包的问题。
1.使⽤FixedLengthFrameDecoder解决问题
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论