public class TimeServer { public void bind(int port) { try { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel arg0) throws Exception { System.out.println("初始化"); arg0.pipeline().addLast(new TimeHandler()); } }); ChannelFuture future = b.bind(port).sync(); System.out.println("执行这里"); future.channel().closeFuture().sync(); System.out.println("执行这里"); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { new TimeServer().bind(10000); } } public class TimeHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); ByteBuf buf=(ByteBuf) msg; //// byte[] butfs = buf.array();//报错 System.out.println(buf.readableBytes()); byte[] butfs = new byte[buf.readableBytes()]; buf.readBytes(butfs); System.out.println(new String(butfs,"UTF-8")); System.out.println(msg); } }
客户端使用的是BIO的模型:
public static void main(String[] args) throws Exception { final int port = 10000; // NioServer server = new NioServer(port); // server.init(); /// ======================================================== // 接下来模拟3个Client并发访问服务器 int poolsize = 1; ExecutorService pool = Executors.newFixedThreadPool(poolsize); Collection tasks = new ArrayList(10); final String clientname = "clientThread"; for (int i = 0; i < poolsize; i++) { final int n = i; // 若每一个Client都保持使用BIO方式发送数据到Server,并读取数据。 tasks.add(new Callable() { @Override public Object call() throws Exception { Socket socket = new Socket("127.0.0.1", port); final InputStream input = socket.getInputStream(); final OutputStream out = socket.getOutputStream(); final String clientname_n = clientname + "_" + n; // BIO读取数据线程 new Thread(clientname_n + "_read") { @Override public void run() { byte[] bs = new byte[1024]; while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } int len = 0; try { while ((len = input.read(bs)) != -1) { System.out.println("Clinet thread " + Thread.currentThread().getName() + " read: " + new String(bs, 0, len)); } } catch (IOException e) { e.printStackTrace(); } } } }.start(); // BIO写数据线程 new Thread(clientname_n + "_write") { @Override public void run() { int a = 0; while (true) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } String str = Thread.currentThread().getName() + " hello, " + a; try { out.write(str.getBytes()); out.flush(); a++; } catch (IOException e) { e.printStackTrace(); } } } }.start(); return null; } }); } pool.invokeAll((Collection extends Callable
结果运行的时候出现了以下错误:
月 13, 2017 5:52:56 下午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException 警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. io.netty.util.IllegalReferenceCountException: refCnt: 0 at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1408) at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1394) at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1383) at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:850) at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:858) at test.netty.TimeHandler.channelRead(TimeHandler.java:17) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:624) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:559) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:476) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:745)
请问这是为什么呢?
In netty4, the life cycle of an object is controlled by a reference counter. This is the case with ByteBuf. The initial reference count of each object is 1. If the release method is called once, the reference counter will be reduced by 1. When trying to access an object with a counter of 0, IllegalReferenceCountException will be thrown, just like the implementation of ensureAccessible. For a more detailed explanation, please refer to the official documentation
AbstractByteBuf.java
Pay attention to the line of code super.channelRead(ctx, msg); in TZ’sTimeHandlerclass. Trace the calling path,
The final code called is: ReferenceCountUtil.release(msg)
That is, every time super.channelRead(ctx, msg);, ByteBuf will call the release() method, the counter will be decremented by one, and then in buf.readBytes(butfs); this line of code will verify ensureAccessible(), the counter When it is 0, netty thinks that the ByteBuf object has been released and throws an exception.
Solution:
Remove this line of code super.channelRead(ctx, msg);Whoever processes the ByteBuf object releases it inTimeHandler
.
An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
缺少exceptionCaught(),在在server端最后一个Handler中增加exceptionCaught()