SpringBoot集成netty在项⽬中的应⽤
最近做的这个项⽬,需要和服务端进⾏tcp通信。⼤概的需求是这样,服务端是物理硬件,由c++代码控制,后台和机器通过tcp进⾏通信。当前台输⼊数据给后台之后,后台作为客户端将前台的数据转换成byte数组给服务端发送数据,返回给后台,后台通过websocket将数据不停的发送到前台。下⾯记录⼀下后台作为客户端发送给机器的部分。
netty的客户端
package;
import Bootstrap;
import Channel;
import ChannelFuture;
import ChannelInitializer;
import EventLoopGroup;
import NioEventLoopGroup;
import SocketChannel;
import NioSocketChannel;
import IdleStateHandler;
import InitializingBean;
import Value;
import Component;
import PostConstruct;
/**
* @author zxw
* @version 1.0
* @description 连接的客户端
* @data: 2020/3/6 21:33
*/
@Component
public class SayOnClient implements InitializingBean {
private EventLoopGroup eventLoopGroup =new NioEventLoopGroup();
@Value("${netty.host}")
private String host;
@Value("${netty.port}")
private int port;
@Value("${adTime}")
private int readTime;
@Value("${netty.writeTime}")
private int writeTime;
@Value("${netty.allTime}")
private int allTime;
public static Channel channel =null;
@Override
public void afterPropertiesSet(){
createBootStrap(new Bootstrap(), eventLoopGroup);
}
void createBootStrap(Bootstrap bootstrap, EventLoopGroup eventLoopGroup){
try{
if(bootstrap !=null){
//定义handler 因为在内部类⾥⾯⽆法传⼊this对象定义到外⾯传递过去
final SayOnHandler sayOnHandler =new SayOnHandler(this);
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel socketChannel)throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(readTime, writeTime, allTime))
socketChannel.pipeline().addLast(new IdleStateHandler(readTime, writeTime, allTime)) //添加⾃定义handler
.addLast(sayOnHandler)
//添加解码器
.addLast("sayOnDecoder",new SayOnDecoder());
}
});
ChannelFuture channelFuture = t(host, port);
channelFuture.addListener(new SayOnListener(this));
//获得channel对象赋值给静态变量
channel = channelFuture.channel();
}
}catch(Exception e){
e.printStackTrace();
}
}
}
⾃定义的handler
package;
import Bootstrap;
import*;
import IdleState;
import IdleStateEvent;
import Component;
import SocketChannel;
import TimeUnit;
/**
* @author zxw
* @version 1.0
* @description ⾃定义handler 的区别就是SimpleChannelInboundHandler
* 在接收到数据后会⾃动release掉数据占⽤的Bytebuffer资源(⾃动调⽤lease())。
* ⽽为何服务器端不能⽤呢,因为我们想让服务器把客户端请求的数据发送回去,
* ⽽服务器端有可能在channelRead⽅法返回前还没有写完数据,因此不能让它⾃动release
* @data: 2020/3/7 20:21
*/
@Component
@ChannelHandler.Sharable
public class SayOnHandler extends SimpleChannelInboundHandler<SocketChannel>{
private SayOnClient sayOnClient;
public SayOnHandler(SayOnClient sayOnClient){
this.sayOnClient = sayOnClient;
}
/**
* 服务端断开连接会触发,断开后客户端会尝试重连操作
*
* @param ctx
*/
bootstrap项目@Override
public void channelInactive(ChannelHandlerContext ctx){
try{
final EventLoop eventLoop = ctx.channel().eventLoop();
//获得eventLoop对象后进⾏任务调度,执⾏客户端再次连接⽅法
eventLoop.schedule(()->{
},10L, TimeUnit.SECONDS);
}catch(Exception e){
e.printStackTrace();
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
super.channelRead(ctx, msg);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, SocketChannel socketChannel)throws Exception { }
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent =(IdleStateEvent) evt;
switch(idleStateEvent.state()){
case WRITER_IDLE:
System.out.println("发送⼼跳----写数据");
ctx.channel().writeAndFlush("111");
break;
case READER_IDLE:
System.out.println("读超时");
// ctx.channel().close();
break;
default:
ctx.channel().close();
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
System.out.println("发⽣异常:");
cause.printStackTrace();
ctx.channel().close();
}
}
⾃定义的listener
package;
import Bootstrap;
import ChannelFuture;
import ChannelFutureListener;
import EventLoop;
import TimeUnit;
/**
* @author zxw
* @version 1.0
* @description
* @data: 2020/3/7 20:33
*/
public class SayOnListener implements ChannelFutureListener {
private SayOnClient sayOnClient;
public SayOnListener(SayOnClient sayOnClient){
this.sayOnClient = sayOnClient;
}
@Override
public void operationComplete(ChannelFuture channelFuture)throws Exception { if(!channelFuture.isSuccess()){
final EventLoop eventLoop = channelFuture.channel().eventLoop();
//如果连接失败,进⾏重连的任务调度
eventLoop.schedule(()->{
},10L, TimeUnit.SECONDS);
System.out.println("开始重连!!");
}else{
System.out.println("连接成功!!");
}
}
}
Java基本类型转byte数组
package;
/**
* @author zxw
* @version 1.0
* @description 数据类型转换⼯具类
* @data: 2020/3/26 11:17
*/
public class DataConverterUtil {
/**
* char到字节数组的转换.
*/
public static byte[]charToByte(char c){
byte[] b =new byte[2];
b[0]=(byte)((c &0xFF00)>>8);
b[1]=(byte)(c &0xFF);
return b;
}
/**
* 字节数组到char的转换
* @param b 字节数组
* @param index 从第⼏位开始
*/
public static char byteToChar(byte[] b,int index){
return(char)(((b[index]&0xFF)<<8)|(b[index +1]&0xFF));
return(char)(((b[index]&0xFF)<<8)|(b[index +1]&0xFF)); }
/**
* short到字节数组的转换.
*/
public static byte[]shortToByte(short number){
int temp = number;
byte[] b =new byte[2];
for(int i =0; i < b.length; i++){
// 将最低位保存在最低位
b[i]=new Integer(temp &0xff).byteValue();
// 向右移8位
temp = temp >>8;
}
return b;
}
/**
* 字节数组到short的转换.
* @param b 字节数组
* @param index 从数组的第⼏位开始取
*/
public static short byteToShort(byte[] b,int index){
short s =0;
// 最低位
short s0 =(short)(b[index]&0xff);
short s1 =(short)(b[index +1]&0xff);
s1 <<=8;
s =(short)(s0 | s1);
return s;
}
/**
* int到字节数组的转换.
*/
public static byte[]intToByte(int number){
int temp = number;
byte[] b =new byte[4];
for(int i =0; i < b.length; i++){
// 将最低位保存在最低位
b[i]=new Integer(temp &0xff).byteValue();
// 向右移8位
temp = temp >>8;
}
return b;
}
/**
* 字节数组到int的转换.
* @param b 字节数组
* @param index 从数组的第⼏位开始取
*/
public static int byteToInt(byte[] b,int index){
int s =0;
// 最低位
int s0 = b[index]&0xff;
int s1 = b[index +1]&0xff;
int s2 = b[index +2]&0xff;
int s3 = b[index +3]&0xff;
s3 <<=24;
s2 <<=16;
s1 <<=8;
s = s0 | s1 | s2 | s3;
return s;
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论