> 文章列表 > Netty中的HttpServerCodec和HttpObjectAggregator

Netty中的HttpServerCodec和HttpObjectAggregator

Netty中的HttpServerCodec和HttpObjectAggregator

首先使用Netty搭建一个HttpServer,代码如下:

public class App {public static boolean useEpoll = false;static {String os = System.getProperty("os.name");if (Objects.nonNull(os) && os.equalsIgnoreCase("linux") &&Epoll.isAvailable()) {useEpoll = true;}}public static void main(String[] args) throws InterruptedException {System.out.println("starting...");nettyMain();}public static void nettyMain() throws InterruptedException {EventLoopGroup boss = buildBossEventLoopGroup();EventLoopGroup worker = buildWorkerEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(boss, worker).channel(useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO)).addLast(new IdleStateHandler(0, 0, 5, TimeUnit.SECONDS)).addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5*1024))// 自定义业务handler.addLast(new TestChannelHandler());}}).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture sync = serverBootstrap.bind(8081).sync();System.out.println("started!");sync.channel().closeFuture().sync();} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}public static EventLoopGroup buildBossEventLoopGroup() {return useEpoll ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1);}public static EventLoopGroup buildWorkerEventLoopGroup() {return useEpoll ? new EpollEventLoopGroup() : new NioEventLoopGroup();}}

其中业务处理的handler如下:

@Sharable
public class TestChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> {public static Gson gson = new Gson();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) {String requestData = msg.content().toString(CharsetUtil.UTF_8);System.out.println("received data: " + requestData);ResponseData res = new ResponseData();res.setIntValue(200);res.setData("received data: " + requestData);// returnFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,Unpooled.copiedBuffer(gson.toJson(res), CharsetUtil.UTF_8));response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json;charset=UTF-8");// *重要,必须加// 否则客户端会一直转圈圈response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());ctx.writeAndFlush(response).addListener(channelFuture -> System.out.println("writeAndFlush succeed"));}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {ctx.close();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt;if (idleStateEvent.state().equals(IdleState.ALL_IDLE)) {ctx.close();}}}
}

作为HttpServer,需要加入来自client的Request的Http协议的Decoder和server回复Response的Http协议的Encoder,而这两个部分集成在了Netty提供的HttpServerCodec中,来看类图结构:
Netty中的HttpServerCodec和HttpObjectAggregator
HttpServerCodec肩负Inbound和Outbound的功能,而HttpServerCodec本身不实现两个功能,而是委托给了两个内部类完成:
Netty中的HttpServerCodec和HttpObjectAggregator
以HttpServerRequestDecoder为例,该类类图结构如下:
Netty中的HttpServerCodec和HttpObjectAggregator
可以看出,这两个内部编码解码类同样是ChannelHandler. 同时该类继承自HttpObjectDecoder extends ByteToMessageDecoder,而ByteToMessageDecoder是一个通用的decoder,可以自定义decoder将bytes转化为自己的数据类型,而一个数据类型一般由多个bytes组合而成,那么如何划分界限呢?常用的DelimiterBasedFrameDecoder, FixedLengthFrameDecoder都继承自该类,而这里HttpObjectDecoder 提供了解码生成HttpMessageHttpContent的方法。先来看ByteToMessageDecoder的注释:

decodes bytes in a stream-like fashion from one ByteBuf to an other Message type.
Be aware that sub-classes of ByteToMessageDecoder MUST NOT annotated with @Sharable. Some methods such as ByteBuf.readBytes(int) will cause a memory leak if the returned buffer is not released or added to the out List. Use derived buffers like ByteBuf.readSlice(int) to avoid leaking memory.

来看HttpObjectDecoder的注释:

Decodes ByteBufs into HttpMessages and HttpContents.
prevents excessive memory consumption;
control parsing behavior;
Chunked Content.
If you prefer not to handle HttpContents by yourself for your convenience, insert HttpObjectAggregator after this decoder in the ChannelPipeline. However, please note that your server might not be as memory efficient as without the aggregator.

HttpObjectDecoder会将一个Http请求分为多个数据类型,如请求头、请求体、数据过大时分片(有限状态机实现),后续的ChannelHandler可以根据不同的类型来处理,如果想直接处理完整的Http请求,可在pipeline后面加上HttpObjectAggregator(只处理入站请求),该类会聚合一个请求的所有东西生成一个FullHttpRequest供后续InboundChannelHandler使用。