Skip to content

Commit 651098f

Browse files
author
彭羿博
committed
替换部分fastJSON为自定义的协议(length + String)
1 parent a2b80eb commit 651098f

29 files changed

Lines changed: 357 additions & 194 deletions

src/main/java/CommandDispatcher/CommandDispatcher.java

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,50 +3,31 @@
33
import MessageRegister.MessageRegister;
44
import MessageInput.MessageInput;
55
import RedisCommand.RedisCommandHandler;
6+
import RedisDataBase.RedisString;
67
import io.netty.channel.ChannelHandlerContext;
78
import io.netty.channel.ChannelInboundHandlerAdapter;
8-
import io.netty.handler.codec.serialization.ClassResolvers;
99

10-
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
10+
import java.util.*;
1111
import java.util.concurrent.*;
12-
import java.util.concurrent.atomic.AtomicInteger;
1312

1413
/*
1514
* 这个类实际上是用来处理传入的Message的
1615
* 根据type, 可以得到对应的输入类型和handler
1716
* 这里可以写成单例模式,但是为了图简单就没有这样做
1817
* */
1918
public class CommandDispatcher extends ChannelInboundHandlerAdapter {
19+
static Set<RedisString> protocalSet = new HashSet<>() {
20+
{this.add(new RedisString("get"));
21+
this.add(new RedisString("set"));
22+
this.add(new RedisString("hget"));}
23+
};
24+
25+
2026
// 业务线程池,用来执行各种业务
2127
private static ThreadPoolExecutor executor;
22-
static
23-
{
24-
/****** 初始化整个executor 用来异步执行Redis的业务逻辑,目前来看没有必要
25-
26-
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
27-
28-
// 创造线程的工厂类,暂时没有进行任何调整
29-
ThreadFactory factory = new ThreadFactory() {
30-
AtomicInteger seq = new AtomicInteger();
31-
32-
@Override
33-
public Thread newThread(Runnable r) {
34-
Logger.debug("create thread");
35-
Thread t = new Thread(r);
36-
t.setName("rpc-" + seq.getAndIncrement());
37-
return t;
38-
}
39-
};
40-
41-
// 闲置时间超过30秒的线程自动销毁
42-
executor = new ThreadPoolExecutor(1,
43-
1,
44-
30,
45-
TimeUnit.SECONDS,
46-
queue,
47-
factory,
48-
new CallerRunsPolicy());
49-
*******/
28+
29+
public static boolean newProtocal(RedisString key){
30+
return protocalSet.contains(key);
5031
}
5132

5233
public static void closeGracefully() {
@@ -104,8 +85,13 @@ private void handleCommand(ChannelHandlerContext ctx, MessageInput input) {
10485
// todo 首先解决RedisStringPair的问题,+代表单行字符串\r\n代表结尾,$代表多行字符,\r\n代表换行
10586
// todo 所以针对get/set/expire命令,就直接计算 RedisString然后传入
10687
// 达到的目的是 FastJson序列化和自定义二进制协议可以同时保证
107-
Object o = input.getPayload(clazz);
108-
handler.handle(ctx, input.getRequestId(), o);
88+
89+
if(newProtocal(input.getType())){
90+
handler.handle(ctx, input.getRequestId(), input.getContent());
91+
}else{
92+
Object o = input.getPayload(clazz);
93+
handler.handle(ctx, input.getRequestId(), o);
94+
}
10995
}
11096

11197
@Override

src/main/java/Common/RedisInputStringPair.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package Common;
22

3+
import RedisDataBase.RedisString;
4+
35
// 用来传递两个String的结构
46
public class RedisInputStringPair{
57
String first;
@@ -11,6 +13,8 @@ public RedisInputStringPair(String first,String second){
1113
public String getFirst(){
1214
return first;
1315
}
16+
public RedisString getFirst2(){ return new RedisString(first);};
17+
public RedisString getSecond2(){ return new RedisString(second);};
1418

1519
public String getSecond(){
1620
return second;

src/main/java/Common/RedisUtil.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package Common;
22

3+
import RedisDataBase.RedisString;
4+
35
/* 辅助类 */
46
public class RedisUtil {
57

@@ -21,6 +23,45 @@ public static boolean isInteger(String s) {
2123
return true;
2224
}
2325

26+
public static boolean isInteger(RedisString s) {
27+
int len = s.size();
28+
if(len == 0) return false;
29+
if( s.getByte(0) == '-' && len == 1) {
30+
return false;
31+
}
32+
33+
for(int i = 0; i < len; i++) {
34+
byte c = s.getByte(i);
35+
if(c > '9' || c < '0'){
36+
return false;
37+
}
38+
}
39+
return true;
40+
}
41+
42+
public Integer parseInt(RedisString s){
43+
int len = s.size();
44+
int i = 0;
45+
int sign = 1;
46+
int sum = 0;
47+
if(s.getByte(0) == '-'){
48+
i++;
49+
sign = -1;
50+
}
51+
52+
for(; i < len; i++) {
53+
byte c = s.getByte(i);
54+
if(c > '9' || c < '0') {
55+
return null;
56+
}else{
57+
sum = sum * 10 + c - '0';
58+
}
59+
}
60+
61+
return sum * sign;
62+
}
63+
64+
2465
// 获取最接近二次幂
2566
public static int sizeForTable(int cap){
2667
int n = cap - 1;
@@ -34,4 +75,6 @@ public static int sizeForTable(int cap){
3475
}
3576

3677

78+
79+
3780
}

src/main/java/Demo/DemoServer.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
11
package Demo;
22

3-
import Common.Logger;
43
import RedisServer.RedisServer;
5-
import Common.RedisUtil;
6-
import io.netty.util.ResourceLeakDetector;
7-
import javafx.beans.property.Property;
8-
9-
import java.io.BufferedReader;
10-
import java.io.InputStream;
11-
import java.io.InputStreamReader;
124

135
// 使用两个线程来处理,一个处理网络IO,一个处理业务逻辑(因为Redis这种架构如果采取多线程,可能回导致数据竞争比较困难)
146
public class DemoServer {
Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package MessageInput;
22

3-
import Common.Logger;
3+
import RedisDataBase.RedisString;
44
import com.alibaba.fastjson.JSON;
55

66
/*
@@ -9,21 +9,21 @@
99
payload 一定是一个json格式的参数,利用fastJson直接转换成对应的类
1010
*/
1111
public class MessageInput{
12-
private String type;
13-
private String requestId;
14-
private String payload;
12+
private RedisString type;
13+
private RedisString requestId;
14+
private RedisString payload;
1515

16-
public MessageInput(String type, String requestId, String payload) {
16+
public MessageInput(RedisString type, RedisString requestId, RedisString payload) {
1717
this.type = type;
1818
this.requestId = requestId;
1919
this.payload = payload;
2020
}
2121

22-
public String getType() {
22+
public RedisString getType() {
2323
return type;
2424
}
2525

26-
public String getRequestId() {
26+
public RedisString getRequestId() {
2727
return requestId;
2828
}
2929

@@ -32,6 +32,11 @@ public <T> T getPayload(Class<T> clazz) {
3232
if (payload == null) {
3333
return null;
3434
}
35-
return JSON.parseObject(payload, clazz);
35+
36+
return JSON.parseObject(payload.toString(), clazz);
37+
}
38+
39+
public RedisString getContent(){
40+
return payload;
3641
}
3742
}

src/main/java/MessageOutput/MessageOutput.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
package MessageOutput;
22

3+
import RedisDataBase.RedisString;
4+
35
public class MessageOutput {
4-
private String requestId;
5-
private String type;
6+
private RedisString requestId;
7+
private RedisString type;
68
private Object payload;
79

8-
public MessageOutput(String requestId, String type, Object payload) {
10+
public MessageOutput(RedisString requestId, RedisString type, Object payload) {
911
this.requestId = requestId;
1012
this.type = type;
1113
this.payload = payload;
1214
}
1315

14-
public String getType() {
16+
public RedisString getType() {
1517
return this.type;
1618
}
1719

18-
public String getRequestId() {
20+
public RedisString getRequestId() {
1921
return requestId;
2022
}
2123

src/main/java/MessageRegister/MessageRegister.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import Common.RedisInputStringPair;
44
import Common.RedisStringList;
55
import RedisCommand.*;
6+
import RedisDataBase.RedisString;
67

78
import java.util.ArrayList;
89
import java.util.HashMap;
@@ -13,16 +14,16 @@
1314
*/
1415

1516
public class MessageRegister {
16-
private static final Map<String, Class<?>> clazzMapping = new HashMap<>();
17-
private static final Map<String, RedisCommandHandler<?>> handlerMapping = new HashMap<>();
17+
private static final Map<RedisString, Class<?>> clazzMapping = new HashMap<>();
18+
private static final Map<RedisString, RedisCommandHandler<?>> handlerMapping = new HashMap<>();
1819
public static final DefaultHandler defaultHandler = new DefaultHandler();
1920

2021

21-
public static Class<?> getMessage(String type) {
22+
public static Class<?> getMessage(RedisString type) {
2223
return clazzMapping.get(type);
2324
}
2425

25-
public static RedisCommandHandler<?> getHandler(String type) {
26+
public static RedisCommandHandler<?> getHandler(RedisString type) {
2627
return handlerMapping.getOrDefault(type,defaultHandler);
2728
}
2829

@@ -44,8 +45,8 @@ public MessageRegister register(String type, Class<?> clazz, RedisCommandHandler
4445
if (clazz == null || handler == null) {
4546
throw new RuntimeException("params cannot be null");
4647
}
47-
clazzMapping.put(type, clazz);
48-
handlerMapping.put(type, handler);
48+
clazzMapping.put(new RedisString(type), clazz);
49+
handlerMapping.put(new RedisString(type), handler);
4950
return this;
5051
}
5152

src/main/java/RedisCommand/DefaultHandler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package RedisCommand;
22

3+
import RedisDataBase.RedisString;
34
import io.netty.channel.ChannelHandlerContext;
45
import MessageOutput.MessageOutput;
56

67
// 有问题的消息统一处理(消息未注册,参数错误等)
78
public class DefaultHandler implements RedisCommandHandler<String> {
9+
static final RedisString unknownConstant = new RedisString("unknown type");
10+
811
@Override
9-
public void handle(ChannelHandlerContext ctx, String requestId, String message) {
10-
ctx.writeAndFlush(new MessageOutput(requestId, "UNKNOWN TYPE", message));
12+
public void handle(ChannelHandlerContext ctx, RedisString requestId, String message) {
13+
ctx.writeAndFlush(new MessageOutput(requestId, unknownConstant, message));
1114
}
1215
}
1316

src/main/java/RedisCommand/ExpireHandler.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,23 @@
55
import MessageOutput.MessageOutput;
66
import RedisDataBase.RedisDb;
77
import RedisDataBase.RedisObject;
8+
import RedisDataBase.RedisString;
89
import io.netty.channel.ChannelHandlerContext;
910

1011
/** Expire命令需要覆盖原来的过期时间 **/
1112
public class ExpireHandler implements RedisCommandHandler<RedisInputStringPair> {
13+
static private final RedisString expireConstant = new RedisString("expire");
14+
1215
@Override
13-
public void handle(ChannelHandlerContext ctx, String requestId, RedisInputStringPair pair){
16+
public void handle(ChannelHandlerContext ctx, RedisString requestId, RedisInputStringPair pair){
1417
// 执行 get key 的命令
1518
String key = pair.getFirst();
1619
String delay = pair.getSecond();
1720
//RedisObject ret = RedisDb.get(key);
18-
RedisDb.expire(key,Integer.parseInt(delay));
21+
RedisDb.expire(new RedisString(key),Integer.parseInt(delay));
1922
Logger.debug(requestId + " " + ctx.channel() + ":send expire ok");
2023

21-
ctx.writeAndFlush(new MessageOutput(requestId,"expire","ok"));
24+
ctx.writeAndFlush(new MessageOutput(requestId,expireConstant,"ok"));
2225
}
2326
}
2427

src/main/java/RedisCommand/GetCommandHandler.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,27 @@
44
import MessageOutput.MessageOutput;
55
import RedisDataBase.RedisDb;
66
import RedisDataBase.RedisObject;
7+
import RedisDataBase.RedisString;
78
import io.netty.buffer.Unpooled;
89
import io.netty.channel.ChannelHandlerContext;
910

1011
/***处理常见的命令
1112
* 如果key不存在,返回一个空字符串
1213
*
1314
* ***/
14-
public class GetCommandHandler implements RedisCommandHandler<String> {
15-
16-
// todo 这个key如果使用RedisString,则应该在这里释放
15+
public class GetCommandHandler implements RedisCommandHandler<RedisString> {
16+
static private final RedisString getConstant = new RedisString("get");
17+
// 这里是线程安全的,因为异步线程删除只会直接放入removedQueue,不会直接分配出去
1718
@Override
18-
public void handle(ChannelHandlerContext ctx, String requestId, String key){
19+
public void handle(ChannelHandlerContext ctx, RedisString requestId, RedisString key){
1920
// 执行 get key 的命令
2021
RedisObject ret = RedisDb.get(key);
22+
key.release();
2123
if(ret != null) {
2224
//Logger.debug(requestId + " " + ctx.channel() + ":get resp " + (String) ret.getData());
2325
}
24-
ctx.writeAndFlush(new MessageOutput(requestId, "get", ret == null ? "" : ret.getData()));
26+
ctx.writeAndFlush(new MessageOutput(requestId, getConstant, ret == null ? "" : ret.getData()));
27+
2528
}
2629
}
2730

0 commit comments

Comments
 (0)