比尔云BierYun--阿里云最新优惠活动
阿里云优惠码丨阿里云代金券

Netty In Action中文版 – 第十四章:实现自定义的编码解码器

Netty In Action中文版 – 第十四章:实现自定义的编码解码器http://www.bieryun.com/2194.html

本章讲述Netty中如何轻松实现定制的编解码器,由于Netty架构的灵活性,这些编解码器易于重用和测试。为了更容易实现,使用Memcached作为协议例子是因为它更方便我们实现。

        Memcached是免费开源、高性能、分布式的内存对象缓存系统,其目的是加速动态Web应用程序的响应,减轻数据库负载;Memcache实际上是一个以key-value存储任意数据的内存小块。可能有人会问“为什么使用Memcached?”,因为Memcached协议非常简单,便于讲解。

14.1 编解码器的范围

        我们将只实现Memcached协议的一个子集,这足够我们进行添加、检索、删除对象;在Memcached中是通过执行SET,GET,DELETE命令来实现的。Memcached支持很多其他的命令,但我们只使用其中三个命令,简单的东西,我们才会理解的更清楚。
        Memcached有一个二进制和纯文本协议,它们都可以用来与Memcached服务器通信,使用什么类型的协议取决于服务器支持哪些协议。本章主要关注实现二进制协议,因为二进制在网络编程中最常用。

14.2 实现Memcached的编解码器

        当想要实现一个给定协议的编解码器,我们应该花一些事件来了解它的运作原理。通常情况下,协议本身都有一些详细的记录。在这里你会发现多少细节?幸运的是Memcached的二进制协议可以很好的扩展。
        在RFC中有相应的规范,并提供了Memcached二进制协议下载地址:http://code.google.com/p/memcached/wiki/BinaryProtocolRevamped。我们不会执行Memcached的所有命令,只会执行三种操作:SET,GET和DELETE。这样做事为了让事情变得简单。

14.3 了解Memcached二进制协议

        可以在http://code.google.com/p/memcached/wiki/BinaryProtocolRevamped上详细了解Memcached二进制协议结构。不过这个网站如果不翻墙的话好像访问不了。

14.4 Netty编码器和解码器

14.4.1 实现Memcached编码器

        先定义memcached操作码(Opcode)和响应状态码(Status):

[java] view plain copy

  1. package netty.in.action.mem;
  2. /**
  3.  * memcached operation codes
  4.  * @author c.king
  5.  *
  6.  */
  7. public class Opcode {
  8.     public static final byte GET = 0x00;
  9.     public static final byte SET = 0x01;
  10.     public static final byte DELETE = 0x04;
  11. }

[java] view plain copy

  1. package netty.in.action.mem;
  2. /**
  3.  * memcached response statuses
  4.  * @author c.king
  5.  *
  6.  */
  7. public class Status {
  8.     public static final short NO_ERROR = 0x0000;
  9.     public static final short KEY_NOT_FOUND = 0x0001;
  10.     public static final short KEY_EXISTS = 0x0002;
  11.     public static final short VALUE_TOO_LARGE = 0x0003;
  12.     public static final short INVALID_ARGUMENTS = 0x0004;
  13.     public static final short ITEM_NOT_STORED = 0x0005;
  14.     public static final short INC_DEC_NON_NUM_VAL = 0x0006;
  15. }

继续编写memcached请求消息体:

[java] view plain copy

  1. package netty.in.action.mem;
  2. import java.util.Random;
  3. /**
  4.  * memcached request message object
  5.  * @author c.king
  6.  *
  7.  */
  8. public class MemcachedRequest {
  9.     private static final Random rand = new Random();
  10.     private int magic = 0x80;// fixed so hard coded
  11.     private byte opCode; // the operation e.g. set or get
  12.     private String key; // the key to delete, get or set
  13.     private int flags = 0xdeadbeef// random
  14.     private int expires; // 0 = item never expires
  15.     private String body; // if opCode is set, the value
  16.     private int id = rand.nextInt(); // Opaque
  17.     private long cas; // data version check…not used
  18.     private boolean hasExtras; // not all ops have extras
  19.     public MemcachedRequest(byte opcode, String key, String value) {
  20.         this.opCode = opcode;
  21.         this.key = key;
  22.         this.body = value == null ? “” : value;
  23.         // only set command has extras in our example
  24.         hasExtras = opcode == Opcode.SET;
  25.     }
  26.     public MemcachedRequest(byte opCode, String key) {
  27.         this(opCode, key, null);
  28.     }
  29.     public int getMagic() {
  30.         return magic;
  31.     }
  32.     public byte getOpCode() {
  33.         return opCode;
  34.     }
  35.     public String getKey() {
  36.         return key;
  37.     }
  38.     public int getFlags() {
  39.         return flags;
  40.     }
  41.     public int getExpires() {
  42.         return expires;
  43.     }
  44.     public String getBody() {
  45.         return body;
  46.     }
  47.     public int getId() {
  48.         return id;
  49.     }
  50.     public long getCas() {
  51.         return cas;
  52.     }
  53.     public boolean isHasExtras() {
  54.         return hasExtras;
  55.     }
  56. }

最后编写memcached请求编码器:

[java] view plain copy

  1. package netty.in.action.mem;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.MessageToByteEncoder;
  5. import io.netty.util.CharsetUtil;
  6. /**
  7.  * memcached request encoder
  8.  * @author c.king
  9.  *
  10.  */
  11. public class MemcachedRequestEncoder extends MessageToByteEncoder<MemcachedRequest> {
  12.     @Override
  13.     protected void encode(ChannelHandlerContext ctx, MemcachedRequest msg, ByteBuf out)
  14.             throws Exception {
  15.         // convert key and body to bytes array
  16.         byte[] key = msg.getKey().getBytes(CharsetUtil.UTF_8);
  17.         byte[] body = msg.getBody().getBytes(CharsetUtil.UTF_8);
  18.         // total size of body = key size + body size + extras size
  19.         int bodySize = key.length + body.length + (msg.isHasExtras() ? 8 : 0);
  20.         // write magic int
  21.         out.writeInt(msg.getMagic());
  22.         // write opcode byte
  23.         out.writeByte(msg.getOpCode());
  24.         // write key length (2 byte) i.e a Java short
  25.         out.writeShort(key.length);
  26.         // write extras length (1 byte)
  27.         int extraSize = msg.isHasExtras() ? 0x08 : 0x0;
  28.         out.writeByte(extraSize);
  29.         // byte is the data type, not currently implemented in Memcached
  30.         // but required
  31.         out.writeByte(0);
  32.         // next two bytes are reserved, not currently implemented
  33.         // but are required
  34.         out.writeShort(0);
  35.         // write total body length ( 4 bytes – 32 bit int)
  36.         out.writeInt(bodySize);
  37.         // write opaque ( 4 bytes) – a 32 bit int that is returned
  38.         // in the response
  39.         out.writeInt(msg.getId());
  40.         // write CAS ( 8 bytes)
  41.         // 24 byte header finishes with the CAS
  42.         out.writeLong(msg.getCas());
  43.         if(msg.isHasExtras()){
  44.             // write extras
  45.             // (flags and expiry, 4 bytes each), 8 bytes total
  46.             out.writeInt(msg.getFlags());
  47.             out.writeInt(msg.getExpires());
  48.         }
  49.         //write key
  50.         out.writeBytes(key);
  51.         //write value
  52.         out.writeBytes(body);
  53.     }
  54. }

14.4.2 实现Memcached解码器

编写memcached响应消息体:

[java] view plain copy

  1. package netty.in.action.mem;
  2. /**
  3.  * memcached response message object
  4.  * @author c.king
  5.  *
  6.  */
  7. public class MemcachedResponse {
  8.     private byte magic;
  9.     private byte opCode;
  10.     private byte dataType;
  11.     private short status;
  12.     private int id;
  13.     private long cas;
  14.     private int flags;
  15.     private int expires;
  16.     private String key;
  17.     private String data;
  18.     public MemcachedResponse(byte magic, byte opCode, byte dataType, short status,
  19.             int id, long cas, int flags, int expires, String key, String data) {
  20.         this.magic = magic;
  21.         this.opCode = opCode;
  22.         this.dataType = dataType;
  23.         this.status = status;
  24.         this.id = id;
  25.         this.cas = cas;
  26.         this.flags = flags;
  27.         this.expires = expires;
  28.         this.key = key;
  29.         this.data = data;
  30.     }
  31.     public byte getMagic() {
  32.         return magic;
  33.     }
  34.     public byte getOpCode() {
  35.         return opCode;
  36.     }
  37.     public byte getDataType() {
  38.         return dataType;
  39.     }
  40.     public short getStatus() {
  41.         return status;
  42.     }
  43.     public int getId() {
  44.         return id;
  45.     }
  46.     public long getCas() {
  47.         return cas;
  48.     }
  49.     public int getFlags() {
  50.         return flags;
  51.     }
  52.     public int getExpires() {
  53.         return expires;
  54.     }
  55.     public String getKey() {
  56.         return key;
  57.     }
  58.     public String getData() {
  59.         return data;
  60.     }
  61. }

编写memcached响应解码器:

[java] view plain copy

  1. package netty.in.action.mem;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.ByteToMessageDecoder;
  5. import io.netty.util.CharsetUtil;
  6. import java.util.List;
  7. public class MemcachedResponseDecoder extends ByteToMessageDecoder {
  8.     private enum State {
  9.         Header, Body
  10.     }
  11.     private State state = State.Header;
  12.     private int totalBodySize;
  13.     private byte magic;
  14.     private byte opCode;
  15.     private short keyLength;
  16.     private byte extraLength;
  17.     private byte dataType;
  18.     private short status;
  19.     private int id;
  20.     private long cas;
  21.     @Override
  22.     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
  23.             throws Exception {
  24.         switch (state) {
  25.         case Header:
  26.             // response header is 24 bytes
  27.             if (in.readableBytes() < 24) {
  28.                 return;
  29.             }
  30.             // read header
  31.             magic = in.readByte();
  32.             opCode = in.readByte();
  33.             keyLength = in.readShort();
  34.             extraLength = in.readByte();
  35.             dataType = in.readByte();
  36.             status = in.readShort();
  37.             totalBodySize = in.readInt();
  38.             id = in.readInt();
  39.             cas = in.readLong();
  40.             state = State.Body;
  41.             break;
  42.         case Body:
  43.             if (in.readableBytes() < totalBodySize) {
  44.                 return;
  45.             }
  46.             int flags = 0;
  47.             int expires = 0;
  48.             int actualBodySize = totalBodySize;
  49.             if (extraLength > 0) {
  50.                 flags = in.readInt();
  51.                 actualBodySize -= 4;
  52.             }
  53.             if (extraLength > 4) {
  54.                 expires = in.readInt();
  55.                 actualBodySize -= 4;
  56.             }
  57.             String key = “”;
  58.             if (keyLength > 0) {
  59.                 ByteBuf keyBytes = in.readBytes(keyLength);
  60.                 key = keyBytes.toString(CharsetUtil.UTF_8);
  61.                 actualBodySize -= keyLength;
  62.             }
  63.             ByteBuf body = in.readBytes(actualBodySize);
  64.             String data = body.toString(CharsetUtil.UTF_8);
  65.             out.add(new MemcachedResponse(magic, opCode, dataType, status,
  66.                     id, cas, flags, expires, key, data));
  67.             state = State.Header;
  68.             break;
  69.         default:
  70.             break;
  71.         }
  72.     }
  73. }

14.5 测试编解码器

基于netty的编解码器都写完了,下面我们来写一个测试它的类:

[java] view plain copy

  1. package netty.in.action.mem;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.embedded.EmbeddedChannel;
  4. import io.netty.util.CharsetUtil;
  5. import org.junit.Assert;
  6. import org.junit.Test;
  7. /**
  8.  * test memcached encoder
  9.  * @author c.king
  10.  *
  11.  */
  12. public class MemcachedRequestEncoderTest {
  13.     @Test
  14.     public void testMemcachedRequestEncoder() {
  15.         MemcachedRequest request = new MemcachedRequest(Opcode.SET, “k1”“v1”);
  16.         EmbeddedChannel channel = new EmbeddedChannel(
  17.                 new MemcachedRequestEncoder());
  18.         Assert.assertTrue(channel.writeOutbound(request));
  19.         ByteBuf encoded = (ByteBuf) channel.readOutbound();
  20.         Assert.assertNotNull(encoded);
  21.         Assert.assertEquals(request.getMagic(), encoded.readInt());
  22.         Assert.assertEquals(request.getOpCode(), encoded.readByte());
  23.         Assert.assertEquals(2, encoded.readShort());
  24.         Assert.assertEquals((byte0x08, encoded.readByte());
  25.         Assert.assertEquals((byte0, encoded.readByte());
  26.         Assert.assertEquals(0, encoded.readShort());
  27.         Assert.assertEquals(2 + 2 + 8, encoded.readInt());
  28.         Assert.assertEquals(request.getId(), encoded.readInt());
  29.         Assert.assertEquals(request.getCas(), encoded.readLong());
  30.         Assert.assertEquals(request.getFlags(), encoded.readInt());
  31.         Assert.assertEquals(request.getExpires(), encoded.readInt());
  32.         byte[] data = new byte[encoded.readableBytes()];
  33.         encoded.readBytes(data);
  34.         Assert.assertArrayEquals((request.getKey() + request.getBody())
  35.                 .getBytes(CharsetUtil.UTF_8), data);
  36.         Assert.assertFalse(encoded.isReadable());
  37.         Assert.assertFalse(channel.finish());
  38.         Assert.assertNull(channel.readInbound());
  39.     }
  40. }

[java] view plain copy

  1. package netty.in.action.mem;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.embedded.EmbeddedChannel;
  5. import io.netty.util.CharsetUtil;
  6. import org.junit.Assert;
  7. import org.junit.Test;
  8. /**
  9.  * test memcached decoder
  10.  * 
  11.  * @author c.king
  12.  * 
  13.  */
  14. public class MemcachedResponseDecoderTest {
  15.     @Test
  16.     public void testMemcachedResponseDecoder() {
  17.         EmbeddedChannel channel = new EmbeddedChannel(
  18.                 new MemcachedResponseDecoder());
  19.         byte magic = 1;
  20.         byte opCode = Opcode.SET;
  21.         byte dataType = 0;
  22.         byte[] key = “Key1”.getBytes(CharsetUtil.UTF_8);
  23.         byte[] body = “Value”.getBytes(CharsetUtil.UTF_8);
  24.         int id = (int) System.currentTimeMillis();
  25.         long cas = System.currentTimeMillis();
  26.         ByteBuf buffer = Unpooled.buffer();
  27.         buffer.writeByte(magic);
  28.         buffer.writeByte(opCode);
  29.         buffer.writeShort(key.length);
  30.         buffer.writeByte(0);
  31.         buffer.writeByte(dataType);
  32.         buffer.writeShort(Status.KEY_EXISTS);
  33.         buffer.writeInt(body.length + key.length);
  34.         buffer.writeInt(id);
  35.         buffer.writeLong(cas);
  36.         buffer.writeBytes(key);
  37.         buffer.writeBytes(body);
  38.         Assert.assertTrue(channel.writeInbound(buffer));
  39.         MemcachedResponse response = (MemcachedResponse) channel.readInbound();
  40.         assertResponse(response, magic, opCode, dataType, Status.KEY_EXISTS, 0,
  41.                 0, id, cas, key, body);
  42.     }
  43.     private static void assertResponse(MemcachedResponse response, byte magic,
  44.             byte opCode, byte dataType, short status, int expires, int flags,
  45.             int id, long cas, byte[] key, byte[] body) {
  46.         Assert.assertEquals(magic, response.getMagic());
  47.         Assert.assertArrayEquals(key,
  48.                 response.getKey().getBytes(CharsetUtil.UTF_8));
  49.         Assert.assertEquals(opCode, response.getOpCode());
  50.         Assert.assertEquals(dataType, response.getDataType());
  51.         Assert.assertEquals(status, response.getStatus());
  52.         Assert.assertEquals(cas, response.getCas());
  53.         Assert.assertEquals(expires, response.getExpires());
  54.         Assert.assertEquals(flags, response.getFlags());
  55.         Assert.assertArrayEquals(body,
  56.                 response.getData().getBytes(CharsetUtil.UTF_8));
  57.         Assert.assertEquals(id, response.getId());
  58.     }
  59. }

14.6 Summary

        本章主要是使用netty写了个模拟memcached二进制协议的处理。至于memcached二进制协议具体是个啥玩意,可以单独了解,这里也没有详细说明。

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

强烈推荐

高性能SSD云服务器ECS抗攻击,高可用云数据库RDS