Netty编解码器 编码和解码,或者数据从一种特定协议的格式到另一种格式的转换。这些任务将由通常称为编解码器的组件来处理。每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。
例如WebSocket 服务,WebSocket服务器收到是字节流,我们收到字节流后要将其解码成WebSocket消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-------+-+-------------+-------------------------------+ |F|R|R|R| opcode|M| Payload len | Extended payload length | |I|S|S|S| (4) |A| (7) | (16/64) | |N|V|V|V| |S| | (if payload len==126/127) | | |1|2|3| |K| | | +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + | Extended payload length continued, if payload len == 127 | + - - - - - - - - - - - - - - - +-------------------------------+ | |Masking-key, if MASK set to 1 | +-------------------------------+-------------------------------+ | Masking-key (continued) | Payload Data | +-------------------------------- - - - - - - - - - - - - - - - + : Payload Data continued ... : + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | Payload Data continued ... | +---------------------------------------------------------------+
编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将网络字节流转换回应用程序的消息格式。编码器操作出站数据,而解码器处理入站数据。
解码器 因为解码器是负责将入站数据从一种格式转换到另一种格式的,所以Netty的解码器都实现了ChannelInboundHandler。
Netty提供下面两个不同用处的解码器:
将字节转解码为消息 – ByteToMessageDecoder 和 ReplayingDecoder
ByteToMessageDecoder是一个抽象类,我们需要实现其decode方法。
由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到有足够的数据准备好处理。
1 2 3 4 5 6 public class ToIntegerDecoder extends ByteToMessageDecoder { @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= 4 ) out.add(in.readInt()); } }
上面的例子中,我们要手动判断输入的ByteBuf是否具有足够的数据有点繁琐。下面我们使用ReplayingDecode,它可以让我们不必重复做这个事情。
1 2 3 4 5 6 public class ToIntegerDecoder extends ReplayingDecoder <Integer> { @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { out.add(in.readInt()); } }
使用ReplayingDecoder时会稍慢于ByteToMessageDecoder。
将一种消息解码为另一种消息 – MessageToMessageDecoder
下面例子中讲Integer类型的消息转换成String类型,然后传递给下一个handler。
1 2 3 4 5 6 public class IntegerToStringDecoder extends MessageToMessageDecoder <Integer> { @Override protected void decode (ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception { out.add(String.valueOf(msg)); } }
注意:每次调用decode方法后即没新message产生缓冲区也无变化,缓存区数据将在下一轮channelRead事件触发后处理。原因也很好理解,如果一次decoder后没新message产生也没缓存消耗,再执行一次decoder将还是同样的结果(所有条件都没变化),处理不好这会造成死循环。代码32行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 protected void callDecode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { int outSize = out.size(); if (outSize > 0 ) { fireChannelRead(ctx, out, outSize); out.clear(); if (ctx.isRemoved()) { break ; } outSize = 0 ; } int oldInputLength = in.readableBytes(); decodeRemovalReentryProtection(ctx, in, out); if (ctx.isRemoved()) { break ; } if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break ; } else { continue ; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException ( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message." ); } if (isSingleDecode()) { break ; } } } catch (DecoderException e) { throw e; } catch (Exception cause) { throw new DecoderException (cause); } }
我们需要在字节可以解码之前在内存中缓冲它们。因此,不能让解码器缓冲大量的数据以至于耗尽可用的内存。为了解除这个常见的顾虑,Netty提供了TooLongFrameException类,如果缓冲区超出指定大小限制时(手动)抛出。如果你正在使用一个可变帧大小的协议,那么这种保护措施将是尤为重要的。
1 2 3 4 5 6 7 8 9 10 11 12 13 public class SafeByteToMessageDecoder extends ByteToMessageDecoder { private static final int MAX_FRAME_SIZE = 1024 ; @Override public void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int readable = in.readableBytes(); if (readable > MAX_FRAME_SIZE) { in.skipBytes(readable); throw new TooLongFrameException ("Frame too big!" ); } ... } }
WebSocketFrameDecoder 解析WebSocket消息协议示例
WebSocketFrameDecoder.java view raw 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public class WebSocketFrameDecoder extends ByteToMessageDecoder { private static final short MAX_FRAME_SIZE = 1024 * 10 ; private static final byte READ_HEADER = 1 ; private static final byte READ_EXT_PAYLOAD_LEN_16 = 2 ; private static final byte READ_EXT_PAYLOAD_LEN_64 = 4 ; private static final byte READ_MASK_KEY = 8 ; private static final byte READ_PAYLOAD = 16 ; private byte STATE = READ_HEADER; private byte fin; private byte opcode; private byte mask; private long payloadLen; private short [] maskKey = new short [4 ]; private byte [] payload; private void reset () { STATE = READ_HEADER; } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (STATE == READ_HEADER) { if (in.readableBytes() < 2 ) return ; int header = in.readUnsignedShort(); fin = (byte ) (header >> 15 ); opcode = (byte ) (header & 0b0000111100000000 >> 8 ); mask = (byte ) ((header & 0b0000000010000000 ) >> 7 ); payloadLen = header & 0b0000000001111111 ; if (payloadLen == 127 ) STATE = READ_EXT_PAYLOAD_LEN_64; else if (payloadLen == 126 ) STATE = READ_EXT_PAYLOAD_LEN_16; else STATE = READ_MASK_KEY; } else if (STATE == READ_EXT_PAYLOAD_LEN_16) { if (in.readableBytes() < 2 ) return ; payloadLen = in.readUnsignedShort(); STATE = READ_MASK_KEY; } else if (STATE == READ_EXT_PAYLOAD_LEN_64) { if (in.readableBytes() < 8 ) return ; payloadLen = in.readLong(); STATE = READ_MASK_KEY; } else if (STATE == READ_MASK_KEY) { if (mask == 1 ) { if (in.readableBytes() < 4 ) return ; maskKey[0 ] = in.readUnsignedByte(); maskKey[1 ] = in.readUnsignedByte(); maskKey[2 ] = in.readUnsignedByte(); maskKey[3 ] = in.readUnsignedByte(); } STATE = READ_PAYLOAD; } else if (STATE == READ_PAYLOAD) { if (payloadLen > MAX_FRAME_SIZE) { in.skipBytes(in.readableBytes()); throw new TooLongFrameException ("Frame too big!" ); } if (in.readableBytes() < payloadLen) return ; payload = new byte [(int ) payloadLen]; int pos = 0 ; while (in.isReadable() && payloadLen > 0 ) { payload[pos] = (byte ) (in.readByte() ^ maskKey[pos % 4 ]); pos++; payloadLen--; } out.add(new WebSocketFrame (fin, opcode, mask, payload.length, maskKey, payload)); reset(); } } }
编码器
ByteToMessageCodec 和 MessageToMessageCodec ByteToMessageCodec将字节编解码为消息,它结合了ByteToMessageDecoder以及它的逆向——MessageToByteEncoder。
1 2 3 4 5 6 7 8 9 10 11 public class IntegerStringCodec extends ByteToMessageCodec <Integer> { @Override protected void encode (ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { out.writeInt(msg); } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= 4 ) out.add(in.readInt()); } }
CombinedChannelDuplexHandler ByteToMessageCodec 和 MessageToMessageCodec 将编解码逻辑在一个类中实现,有时候我们编解码可能在不同的类中,能不能提供一个组合的将编解码组合到一个类中呢?CombinedChannelDuplexHandler提供了这个解决方案。
1 2 3 4 5 public class IntegerByteCodec extends CombinedChannelDuplexHandler <ToIntegerDecoder, IntegerToByteEncoder> { public IntegerByteCodec () { super (new ToIntegerDecoder (), new IntegerToByteEncoder ()); } }
参考