|
3 | 3 | import MessageRegister.MessageRegister; |
4 | 4 | import MessageInput.MessageInput; |
5 | 5 | import RedisCommand.RedisCommandHandler; |
| 6 | +import RedisDataBase.RedisString; |
6 | 7 | import io.netty.channel.ChannelHandlerContext; |
7 | 8 | import io.netty.channel.ChannelInboundHandlerAdapter; |
8 | | -import io.netty.handler.codec.serialization.ClassResolvers; |
9 | 9 |
|
10 | | -import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; |
| 10 | +import java.util.*; |
11 | 11 | import java.util.concurrent.*; |
12 | | -import java.util.concurrent.atomic.AtomicInteger; |
13 | 12 |
|
14 | 13 | /* |
15 | 14 | * 这个类实际上是用来处理传入的Message的 |
16 | 15 | * 根据type, 可以得到对应的输入类型和handler |
17 | 16 | * 这里可以写成单例模式,但是为了图简单就没有这样做 |
18 | 17 | * */ |
19 | 18 | public class CommandDispatcher extends ChannelInboundHandlerAdapter { |
| 19 | + static Set<RedisString> protocalSet = new HashSet<>() { |
| 20 | + {this.add(new RedisString("get")); |
| 21 | + this.add(new RedisString("set")); |
| 22 | + this.add(new RedisString("hget"));} |
| 23 | + }; |
| 24 | + |
| 25 | + |
20 | 26 | // 业务线程池,用来执行各种业务 |
21 | 27 | private static ThreadPoolExecutor executor; |
22 | | - static |
23 | | - { |
24 | | - /****** 初始化整个executor 用来异步执行Redis的业务逻辑,目前来看没有必要 |
25 | | -
|
26 | | - BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000); |
27 | | -
|
28 | | - // 创造线程的工厂类,暂时没有进行任何调整 |
29 | | - ThreadFactory factory = new ThreadFactory() { |
30 | | - AtomicInteger seq = new AtomicInteger(); |
31 | | -
|
32 | | - @Override |
33 | | - public Thread newThread(Runnable r) { |
34 | | - Logger.debug("create thread"); |
35 | | - Thread t = new Thread(r); |
36 | | - t.setName("rpc-" + seq.getAndIncrement()); |
37 | | - return t; |
38 | | - } |
39 | | - }; |
40 | | -
|
41 | | - // 闲置时间超过30秒的线程自动销毁 |
42 | | - executor = new ThreadPoolExecutor(1, |
43 | | - 1, |
44 | | - 30, |
45 | | - TimeUnit.SECONDS, |
46 | | - queue, |
47 | | - factory, |
48 | | - new CallerRunsPolicy()); |
49 | | - *******/ |
| 28 | + |
| 29 | + public static boolean newProtocal(RedisString key){ |
| 30 | + return protocalSet.contains(key); |
50 | 31 | } |
51 | 32 |
|
52 | 33 | public static void closeGracefully() { |
@@ -104,8 +85,13 @@ private void handleCommand(ChannelHandlerContext ctx, MessageInput input) { |
104 | 85 | // todo 首先解决RedisStringPair的问题,+代表单行字符串\r\n代表结尾,$代表多行字符,\r\n代表换行 |
105 | 86 | // todo 所以针对get/set/expire命令,就直接计算 RedisString然后传入 |
106 | 87 | // 达到的目的是 FastJson序列化和自定义二进制协议可以同时保证 |
107 | | - Object o = input.getPayload(clazz); |
108 | | - handler.handle(ctx, input.getRequestId(), o); |
| 88 | + |
| 89 | + if(newProtocal(input.getType())){ |
| 90 | + handler.handle(ctx, input.getRequestId(), input.getContent()); |
| 91 | + }else{ |
| 92 | + Object o = input.getPayload(clazz); |
| 93 | + handler.handle(ctx, input.getRequestId(), o); |
| 94 | + } |
109 | 95 | } |
110 | 96 |
|
111 | 97 | @Override |
|
0 commit comments