Netty Crash Course
一个Netty程序一般开始于Bootstrap类,通过设置程序后,使用Handlers来处理特定的event和设置Netty中的事件,从而处理多个协议数据,比如实现ChannelInBoundHandler类;
多个Handler通过ChannelInitializer的ChannelPipeline组合在一起,再配合EventLoop(整个生命周期内绑定对应一条线程)和EventLoopGroup来处理各种业务逻辑。
Channels, Events and Input/Ouput
Netty中所有操作都是异步执行的,例如连接一个主机、发消息。可以注册监听器来通知,如Futures(注册一个监听,当操作成功或者失败时会通知)和ChannelFutures(封装的是一个操作的相关信息,操作被执行会理科返回ChannelFuture)。
Bootstrap
两种引导,用于客户端或者DatagramChannel的Bootstrap(一个EventGroup)和用于服务端的ServerBootstrap(两个EventGroup)。
EventLoopGroup可以包含很多个EventLoop,每个channel绑定一个EventLoop不会被改变,很多channel共享一个EventLoop。这就意味着如果一个Channel保持EventLoop繁忙,会禁止其他channel绑定到相同的EventLoop,我们可以理解EventLoop为一个事件循环线程,而EventLoop是一个事件循环集合。
Channel Handler and Data Flow
Handlers自身依赖ChannelPipeline来决定它们的执行顺序。
ChannelHandler定义了一个父接口,ChannelInboundHandler(入站,读)和ChannelOutboundHandler(出站,写)是其子接口,ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter是桥接器类。
入站是从ChannelPipeline的头部开始传递到最后,而出站是从尾部开始。
Netty发消息有两种方法:1、直接写入通道导致消息从ChannelPipeline的尾部开始;2、写入ChannelHandlerContext对象导致处理消息从ChannelPipeline的下一个handler开始
编码器、解码器和业务逻辑
每个Handler负责转发时间到ChannelPipeline的下一个handler,但在*Adapter(和子类)中是自动完成的:
ChannelHandlerAdapter,ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter,
ByteToMessageDecoder,MessageToByteEncoder,ProtobufEncoder,ProtobufDecoder
所有的解码器都继承或者实现ChannelInboundHandlerAdapter或者ChannelInboundHandler,channelRead放大调用decode方法,并通过ChannelHandlerContext.fireChannelRead(Object msg)传递给下一个handler。编码器类似。
也许最常见的应用程序处理接收到消息后进行解码,然后供相应的业务处理模块使用。那么有用程序只需要拓展SimpleChannelInboundHandler<T>,SimpleChannelOutboundHandler<T>即可。主要方法是channelRead0(ChannelHadnlerContext ctx, T msg); 消费msg时需要注意,Netty中梳理IO一般有很多线程,不要阻塞IO线程,因为阻塞会降低程序的性能,解决方案是添加ChannelHandler到ChannelPipeline时指定一个EventExecutorGroup,EventExecutorGroup会获得一个EventExecutor,EventExecutor将会执行ChannelHandler的所有方法:
double coefficient = 0.8; //系数int numberOfCores = Runtime.getRuntime().availableProcessors();int poolSize = (int)(numberOfCores / (1 - coefficient));static final EventExecutorGroup group = new DefaultEventExecutorGroup(poolSize);ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new MyProtocolDecoder()); pipeline.addLast("encoder", new MyProtocolEncoder()); // Tell the pipeline to run MyBusinessLogicHandler's event handler methods // in a different thread than an I/O thread so that the I/O thread is not blocked by // a time-consuming task. // If your business logic is fully asynchronous or finished very quickly, you don't // need to specify a group. pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
如需要定时执行简单的任务,可以使用EventLoop:
ch.eventLoop().schedule(...);或者ScheduleFuture f = ch.eventLoop().scheduleAtFixedRate(...);f.cancel(true);
Netty使用reference-counting来安全释放Buf和其他资源:
ByteBuf
字节数据容器,单独维护读写两个索引,最大容量限制是Integer.MAX_VALUE,三种不同类型的ByteBuf:
1、Heap Buffer(堆缓冲区)
数据存储在JVM的堆空间中,通过数组实现,可以使用ByteBuf的array()来获取数据。访问非堆缓冲区的ByteBuf的数组会导致UNsupportedOperationException,可以使用hasArray()来检查
2、Direct Buffer(直接缓冲区)
堆之外直接分配内存,性能很好,分配和释放比较复杂,使用内存池来解决。不支持直接数组访问数据:
ByteBuf buf = Unpooled.directBuffer(16);if (!buf.hasArray()) { int len = buf.readableBytes(); byte[] arr = new byte[len]; buf.getBytes(arr); }
3、Composite Buffer(复合缓冲区)
提供一个视图组合多个buff,hasArray()总返回false。
CompositeByteBuf cbf = Unpooled.compositeBuffer();ByteBuf heap = Unpooled.buffer(8);ByteBuf direct = Unpooled.directBuffer(16);cbf.addComponents(heap, direct);cbf.removeComponent(0);Iteratoriter = cbf.iterator();while(iter.hasNext()) { System.out.println(iter.next().toString()); }if (!cbf.hasArray()) { int len = cbf.readableBytes(); byte[] arr = new byte[len]; cbf.getBytes(0, arr); }
相关方法:
isReadable()是否还可以继续读
writeableBytes()剩余可写空间
clear()重置读写的index为0
discardReadBytes()清空已读数据,会移动字节位置,设计到内存复制,注意性能问题
readerIndex(int idx)设置读索引位置
writerIndex(int idx)设置写索引位置
duplicate()、slice()、slice(int index, int length)、order(ByteOrder endianness)这些会创建缓冲区视图但有独立的读写索引
copy()或者copy(int index, int length)创建全新的副本
ByteBufProcessor
搜索操作,如bytesBefore(byte value)
ByteBufHolder
帮助更快捷的访问ByteBuf中的数据,当缓冲区没用了之后,尅只用这个辅助类释放资源。
ByteBufAllocator
负责分配ByteBuf实例,可以从Channel或者ChannelHandlerContext的alloc()方法获得该对象,有两种实现,一为将分配和回收成本以及内存使用降到最低,另一种是每次使用都创建新的ByteBuf实例。
Netty默认使用PooledByteBufAllocator,可以通过ChannelConfig或者引导设置不同的实现来改变。
Unpooled
创建缓冲区的工具类。
ByteBufUtil
工具类,hexDump(ByteBuf buff)