netty⾃定义解码器处理消息报⽂(已整理springboot)Netty是由JBOSS提供的⼀个开源的java⽹络编程框架,主要是对java的nio包进⾏了再次封装。
1: 启动类,已经整合了spring boot 当启动项⽬时候同时启动netty 的端⼝服务
(弊端就是启动了俩个端⼝: ⼀个是spring boot  的端⼝,⼀个是netty的端⼝
由于没有特殊要求所以没去处理这个问题,解决了或者有解决办法的⼩伙伴可以回复下
好让我吸取吸取意见)。
可以清楚的看到这些包在maven 中都能到,如果有没贴出来的class ,请回复!
package cn.unicom.iot.gather.levle.server;
import cn.unicom.iot.gather.levle.serverty.NettyServer;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import t.annotation.EnableAspectJAutoProxy;
@SpringBootApplication
@EnableAspectJAutoProxy
@EnableCircuitBreaker
public class Application implements CommandLineRunner {
@Autowired(required = false)
private NettyServer server;
private static final Logger logger = Logger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void args) throws Exception {
server.start();
}
}
2:接下来是 netty  服务类
简单解释下这个netty服务类:抽象decode⽅法 ,任何的数据协议,只要是称得上是协议,就会有固定的格式
在我们知道了格式后,就需要到了解码器 就是decode。
Handle 接受到了再decode 接受数据处理之后的集合,进⾏业务处理。
package cn.unicom.iot.gather.levle.serverty;
import ioty.bootstrap.ServerBootstrap;
import ioty.channel.ChannelFuture;
import ioty.channel.ChannelInitializer;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.SocketChannel;
import ioty.channel.socket.nio.NioServerSocketChannel;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
/**
* 主要思路:
* 实现了⼀个解码器SunseaMessageDecoder和⼀个业务处理的SunseaMessageHandler
* 客户端发送报⽂协议
* 服务器端接收后解析数据,进⾏业务逻辑处理
* 在这过程中可以看到解码器的作⽤
*/
public class NettyServer {
@Autowired
private SunseaMessageHandler serverMessageHandler;
private static final Logger logger = Logger(NettyServer.class);
public void start() {
logger.info("netty Server 启动 !");
//服务类
ServerBootstrap bootstrap = new ServerBootstrap();
//创建两个线程池
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 设置解码器
socketChannel.pipeline().addLast(new SunseaMessageDecoder());
//  设置接受数据业务处理
socketChannel.pipeline().addLast(serverMessageHandler);
}
});
// 绑定端⼝
ChannelFuture future = bootstrap.bind(52208).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
3: 解码器信息
下⾯是最简单解码器代码 ,在这⾥我们就需要简单的说下数据协议
可能你需要处理的固定协议是这样的: 当然下⾯只是举例说明
+----------+----------+----------------+----------------+
|  Length  | Header 1 |      data      |    checkcode  |
|  2byte  |  1byte  |    8byte      |      6byte    |
+----------+----------+----------------+----------------+
有了固定的格式 ,我们只需要把怎么从解码器中读取每⼀个数据作为我们关⼼的点
package cn.unicom.iot.gather.levle.serverty;
import ioty.buffer.ByteBuf;
import ioty.buffer.ByteBufUtil;
import ioty.channel.ChannelHandlerContext;
import dec.ByteToMessageDecoder;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
/**
* 解码器
*
* @author linsh
*/
@Component
public class SunseaMessageDecoder extends ByteToMessageDecoder {
private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
private static final Logger logger = Logger(SunseaMessageDecoder.class);
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//简单的解释下我们传递的数据都是在ByteBuf 这个参数中
//  java 数据类型对应字节
//      byte    1字节
//      short    2字节
//      int      4字节
/
/      long    8字节
//      char    2字节(C语⾔中是1字节)可以存储⼀个汉字
//      float    4字节
//      double  8字节
//      boolean  false/true(理论上占⽤1bit,1/8字节,实际处理按1byte处理)
// 注意 ByteBuf  的read 操作是会改变索引如果读取和规定格式不⼀样是会读串数据的.
Map<String, Object> dataMap = new HashMap<String, Object>();
Map<String, Object> dataMap = new HashMap<String, Object>();
// 读取原始数据串不会改变ByteBuf 索引
String originalData = ByteBufUtil.hexDump(in);
dataMap.put("originalData",originalData);
/
/按照规定的数据协议读取数据
//  如果在规定的协议中,有关于⼤端字节和⼩端字节的问题
//  netty 已经帮我们做好了    in.readShort() 改为 readShortLE()
//  Byte  Long  读取⽅式同上改为带LE的⽅法即可
dataMap.put("Length", in.readShort());
dataMap.put("Header1", in.readByte());
dataMap.put("data", in.readLong());
// 当我们遇见 6个字节的规定可以每次读取⼀个字节拼接到StringBuffer 中
StringBuffer sb = new StringBuffer();
//  Stream 表达式代替for 循环
IntStream.range(0, 6).forEach(i -> {
sb.append(String.adByte()));
});
dataMap.put("checkcode", sb.toString());
out.add(dataMap);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        logger.info("level call exception .....");
cause.printStackTrace();
ctx.close();
}
}
我这⾥是把数据读取出来存到了map 中  ,然后传递到下⼀个业务处理的handler。
4: 业务处理
package cn.unicom.iot.gather.levle.serverty;
import cn.unicom.iot.gather.levle.server.kafka.KafkaProducer;
import com.alibaba.fastjson.JSONObject;
import ioty.buffer.ByteBuf;
import ioty.buffer.Unpooled;
import ioty.channel.ChannelHandler;
import ioty.channel.ChannelHandlerContext;
import ioty.channel.ChannelInboundHandlerAdapter;
import up.ChannelGroup;
import up.DefaultChannelGroup;
import urrent.GlobalEventExecutor;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Created by Administrator on 2019/2/22.
*/
@Component
@ChannelHandler.Sharable
public class SunseaMessageHandler extends ChannelInboundHandlerAdapter {
private Logger logger = Logger(Name());
static final ChannelGroup channels = new DefaultChannelGroup(
GlobalEventExecutor.INSTANCE);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Map<String, Object> contentMap = (Map<String, Object>) msg;
JSONObject json =new JSONObject(contentMap);
logger.info("json:"+String());
// 需要响应就可以回复消息跟读取数据时候⼀样根据数据类型对应字节回复消息即可
ByteBuf headerBuf = Unpooled.buffer();
headerBuf.writeByte(1);
headerBuf.writeBoolean(true);
ctx.writeAndFlush(headerBuf);
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
logger.log(Level.INFO, "AuthServerInitHandler channelReadComplete");springboot aop
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        logger.info("level-server  channelRead Exception ....");
cause.printStackTrace();
ctx.close();
}
}
最后贴⼀下pom的依赖  , 有很多项⽬⽤的依赖没有整理
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。