Skip to content

Commit 373ce28

Browse files
committed
提交netty-gateway的代码
1 parent 01d9794 commit 373ce28

8 files changed

Lines changed: 215 additions & 80 deletions

File tree

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.github.kimmking.gateway.filter;
2+
3+
import io.netty.channel.ChannelHandlerContext;
4+
import io.netty.channel.ChannelInboundHandlerAdapter;
5+
import io.netty.handler.codec.http.FullHttpRequest;
6+
7+
public class HttpHeadersRequestFilter extends ChannelInboundHandlerAdapter implements HttpRequestFilter {
8+
9+
@Override
10+
public void channelRead(ChannelHandlerContext ctx, Object msg) {
11+
if (msg instanceof FullHttpRequest) {
12+
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
13+
filter(fullHttpRequest, ctx);
14+
} else {
15+
ctx.fireChannelRead(msg);
16+
}
17+
}
18+
19+
@Override
20+
public void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx) {
21+
fullRequest.headers().add("nio", "linboxuan");
22+
ctx.fireChannelRead(fullRequest);
23+
}
24+
}

02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.github.kimmking.gateway.inbound;
22

3-
import io.github.kimmking.gateway.outbound.httpclient4.HttpOutboundHandler;
3+
import io.github.kimmking.gateway.outbound.netty4.HttpOutboundHandler;
44
import io.netty.channel.ChannelHandlerContext;
55
import io.netty.channel.ChannelInboundHandlerAdapter;
66
import io.netty.handler.codec.http.FullHttpRequest;
@@ -11,12 +11,10 @@
1111
public class HttpInboundHandler extends ChannelInboundHandlerAdapter {
1212

1313
private static Logger logger = LoggerFactory.getLogger(HttpInboundHandler.class);
14-
private final String proxyServer;
1514
private HttpOutboundHandler handler;
1615

1716
public HttpInboundHandler(String proxyServer) {
18-
this.proxyServer = proxyServer;
19-
handler = new HttpOutboundHandler(this.proxyServer);
17+
handler = new HttpOutboundHandler(proxyServer);
2018
}
2119

2220
@Override
@@ -29,14 +27,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
2927
try {
3028
//logger.info("channelRead流量接口请求开始,时间为{}", startTime);
3129
FullHttpRequest fullRequest = (FullHttpRequest) msg;
32-
// String uri = fullRequest.uri();
33-
// //logger.info("接收到的请求url为{}", uri);
30+
String uri = fullRequest.uri();
31+
logger.info("接收到的请求url为{}", uri);
3432
// if (uri.contains("/test")) {
3533
// handlerTest(fullRequest, ctx);
3634
// }
3735

3836
handler.handle(fullRequest, ctx);
39-
4037
} catch(Exception e) {
4138
e.printStackTrace();
4239
} finally {

02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundInitializer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.github.kimmking.gateway.inbound;
22

3+
import io.github.kimmking.gateway.filter.HttpHeadersRequestFilter;
34
import io.netty.channel.ChannelInitializer;
45
import io.netty.channel.ChannelPipeline;
56
import io.netty.channel.socket.SocketChannel;
@@ -23,6 +24,7 @@ public void initChannel(SocketChannel ch) {
2324
p.addLast(new HttpServerCodec());
2425
//p.addLast(new HttpServerExpectContinueHandler());
2526
p.addLast(new HttpObjectAggregator(1024 * 1024));
27+
p.addLast(new HttpHeadersRequestFilter());
2628
p.addLast(new HttpInboundHandler(this.proxyServer));
2729
}
2830
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package io.github.kimmking.gateway.outbound;
2+
3+
import io.github.kimmking.gateway.outbound.netty4.NettyHttpClientInboundHandler;
4+
import io.netty.bootstrap.Bootstrap;
5+
import io.netty.buffer.Unpooled;
6+
import io.netty.channel.*;
7+
import io.netty.channel.nio.NioEventLoopGroup;
8+
import io.netty.channel.socket.SocketChannel;
9+
import io.netty.channel.socket.nio.NioSocketChannel;
10+
import io.netty.handler.codec.http.*;
11+
import io.netty.util.concurrent.FutureListener;
12+
13+
import java.net.URI;
14+
import java.util.function.Consumer;
15+
import java.util.regex.Matcher;
16+
import java.util.regex.Pattern;
17+
18+
public class NettyHttpClient {
19+
20+
private final Bootstrap b;
21+
private final EventLoopGroup workerGroup;
22+
23+
public NettyHttpClient(){
24+
workerGroup = new NioEventLoopGroup();
25+
b = new Bootstrap()
26+
.group(workerGroup)
27+
.channel(NioSocketChannel.class)
28+
.option(ChannelOption.SO_KEEPALIVE, true);
29+
}
30+
31+
public void connect(final Consumer<Object> httpResponseConsumer, final HttpHeaders headers, final String backendUrl) {
32+
Matcher matcher = Pattern.compile("http://(.+):([0-9]+)").matcher(backendUrl);
33+
String host;
34+
int port;
35+
if (matcher.find()) {
36+
host = matcher.group(1);
37+
port = Integer.parseInt(matcher.group(2));
38+
} else {
39+
throw new IllegalArgumentException("illegal backendUrl");
40+
}
41+
try {
42+
// Start the client.
43+
b.handler(new ChannelInitializer<SocketChannel>() {
44+
@Override
45+
public void initChannel(SocketChannel ch) throws Exception {
46+
// 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
47+
ch.pipeline().addLast(new HttpResponseDecoder());
48+
// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
49+
ch.pipeline().addLast(new HttpRequestEncoder())
50+
.addLast(new NettyHttpClientInboundHandler(httpResponseConsumer));
51+
// ch.pipeline().addLast(new HttpClientOutboundHandler(ctx, inbound));
52+
}
53+
});
54+
ChannelFuture f = b.connect(host, port).sync();
55+
String url = "http://" + host + ":" + port + "/api/hello";
56+
URI uri = new URI(url);
57+
DefaultFullHttpRequest request = new DefaultFullHttpRequest(
58+
HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString(),
59+
Unpooled.buffer(0), headers, new DefaultHttpHeaders(true));
60+
// 构建http请求
61+
request.headers().set(HttpHeaderNames.HOST, host);
62+
request.headers().set(HttpHeaderNames.CONNECTION,
63+
HttpHeaderNames.CONNECTION);
64+
request.headers().set(HttpHeaderNames.CONTENT_LENGTH,
65+
request.content().readableBytes());
66+
f.channel().write(request);
67+
f.channel().flush();
68+
f.channel().closeFuture().sync();
69+
}catch (Exception e){
70+
throw new RuntimeException(e.getCause());
71+
} finally{
72+
workerGroup.shutdownGracefully();
73+
}
74+
}
75+
76+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package io.github.kimmking.gateway.outbound.netty4;
2+
3+
import io.github.kimmking.gateway.outbound.NettyHttpClient;
4+
import io.github.kimmking.gateway.outbound.httpclient4.NamedThreadFactory;
5+
import io.netty.buffer.ByteBuf;
6+
import io.netty.buffer.Unpooled;
7+
import io.netty.channel.ChannelFutureListener;
8+
import io.netty.channel.ChannelHandlerContext;
9+
import io.netty.handler.codec.http.*;
10+
11+
import java.nio.charset.StandardCharsets;
12+
import java.util.concurrent.*;
13+
14+
import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT;
15+
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
16+
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
17+
18+
public class HttpOutboundHandler {
19+
private ExecutorService proxyService;
20+
private String backendUrl;
21+
private NettyHttpClient httpclient;
22+
23+
public HttpOutboundHandler(String backendUrl) {
24+
this.backendUrl = backendUrl.endsWith("/") ? backendUrl.substring(0, backendUrl.length() - 1) : backendUrl;
25+
int cores = Runtime.getRuntime().availableProcessors() * 2;
26+
long keepAliveTime = 1000;
27+
int queueSize = 2048;
28+
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
29+
proxyService = new ThreadPoolExecutor(cores, cores,
30+
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
31+
new NamedThreadFactory("proxyService"), handler);
32+
33+
httpclient = new NettyHttpClient();
34+
}
35+
36+
public void handle(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx) {
37+
final String url = this.backendUrl + fullRequest.uri();
38+
proxyService.submit(() -> fetchGet(fullRequest, ctx, url));
39+
}
40+
41+
private void fetchGet(final FullHttpRequest inbound, final ChannelHandlerContext ctx, final String url) {
42+
httpclient.connect(httpResponse -> handleResponse(inbound, ctx, httpResponse), inbound.headers(), url);
43+
}
44+
45+
private void handleResponse(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx, final Object msg) {
46+
FullHttpResponse fullHttpResponse = null;
47+
try {
48+
String value = "hello,kimmking";
49+
fullHttpResponse = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes(StandardCharsets.UTF_8)));
50+
fullHttpResponse.headers().set("Content-Type", "application/json");
51+
fullHttpResponse.headers().setInt("Content-Length", value.length());
52+
53+
54+
if (msg instanceof HttpResponse)
55+
{
56+
HttpResponse response = (HttpResponse) msg;
57+
System.out.println("CONTENT_TYPE:" + response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
58+
}
59+
if(msg instanceof HttpContent)
60+
{
61+
HttpContent content = (HttpContent)msg;
62+
ByteBuf buf = content.content();
63+
System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
64+
buf.release();
65+
}
66+
} catch (Exception e) {
67+
e.printStackTrace();
68+
fullHttpResponse = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT);
69+
exceptionCaught(ctx, e);
70+
} finally {
71+
if (fullRequest != null) {
72+
if (!HttpUtil.isKeepAlive(fullRequest)) {
73+
ctx.write(fullHttpResponse).addListener(ChannelFutureListener.CLOSE);
74+
} else {
75+
ctx.write(fullHttpResponse);
76+
}
77+
}
78+
ctx.flush();
79+
//ctx.close();
80+
}
81+
82+
}
83+
84+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
85+
cause.printStackTrace();
86+
ctx.close();
87+
}
88+
}

02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClient.java

Lines changed: 0 additions & 51 deletions
This file was deleted.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.github.kimmking.gateway.outbound.netty4;
2+
3+
import io.netty.channel.ChannelHandlerContext;
4+
import io.netty.channel.SimpleChannelInboundHandler;
5+
6+
import java.util.function.Consumer;
7+
8+
public class NettyHttpClientInboundHandler extends SimpleChannelInboundHandler {
9+
private final Consumer<Object> httpResponseConsumer;
10+
11+
12+
public NettyHttpClientInboundHandler(Consumer<Object> httpResponseConsumer) {
13+
this.httpResponseConsumer = httpResponseConsumer;
14+
}
15+
16+
@Override
17+
public void channelRead0(ChannelHandlerContext ctx, Object msg) {
18+
httpResponseConsumer.accept(msg);
19+
}
20+
21+
}

02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClientOutboundHandler.java

Lines changed: 0 additions & 22 deletions
This file was deleted.

0 commit comments

Comments
 (0)