Skip to content

Commit d8724af

Browse files
author
cutie
committed
cyy 1、提交OkhttpOutboundHandler
1 parent 01d9794 commit d8724af

File tree

3 files changed

+111
-3
lines changed

3 files changed

+111
-3
lines changed

02nio/nio02/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@
5252
<artifactId>httpasyncclient</artifactId>
5353
<version>4.1.4</version>
5454
</dependency>
55+
<dependency>
56+
<groupId>com.squareup.okhttp3</groupId>
57+
<artifactId>okhttp</artifactId>
58+
<version>3.8.1</version>
59+
</dependency>
5560

5661
<!--
5762
<dependency>

02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.github.kimmking.gateway.inbound;
22

3-
import io.github.kimmking.gateway.outbound.httpclient4.HttpOutboundHandler;
3+
import io.github.kimmking.gateway.outbound.okhttp.OkhttpOutboundHandler;
44
import io.netty.channel.ChannelHandlerContext;
55
import io.netty.channel.ChannelInboundHandlerAdapter;
66
import io.netty.handler.codec.http.FullHttpRequest;
@@ -12,11 +12,11 @@ public class HttpInboundHandler extends ChannelInboundHandlerAdapter {
1212

1313
private static Logger logger = LoggerFactory.getLogger(HttpInboundHandler.class);
1414
private final String proxyServer;
15-
private HttpOutboundHandler handler;
15+
private OkhttpOutboundHandler handler;
1616

1717
public HttpInboundHandler(String proxyServer) {
1818
this.proxyServer = proxyServer;
19-
handler = new HttpOutboundHandler(this.proxyServer);
19+
handler = new OkhttpOutboundHandler(this.proxyServer);
2020
}
2121

2222
@Override
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,107 @@
11
package io.github.kimmking.gateway.outbound.okhttp;
22

3+
import io.github.kimmking.gateway.outbound.httpclient4.NamedThreadFactory;
4+
import io.netty.buffer.Unpooled;
5+
import io.netty.channel.ChannelFutureListener;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.handler.codec.http.DefaultFullHttpResponse;
8+
import io.netty.handler.codec.http.FullHttpRequest;
9+
import io.netty.handler.codec.http.FullHttpResponse;
10+
import io.netty.handler.codec.http.HttpUtil;
11+
import okhttp3.Interceptor;
12+
import okhttp3.OkHttpClient;
13+
import okhttp3.Request;
14+
import okhttp3.Response;
15+
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
16+
import org.apache.http.protocol.HTTP;
17+
18+
import java.io.IOException;
19+
import java.util.concurrent.*;
20+
21+
import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT;
22+
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
23+
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
24+
325
public class OkhttpOutboundHandler {
26+
private CloseableHttpAsyncClient httpclient;
27+
private ExecutorService proxyService;
28+
private String backendUrl;
29+
private OkHttpClient client;
30+
31+
public OkhttpOutboundHandler(String backendUrl){
32+
this.backendUrl = backendUrl.endsWith("/")?backendUrl.substring(0,backendUrl.length()-1):backendUrl;
33+
int cores = Runtime.getRuntime().availableProcessors() * 2;
34+
long keepAliveTime = 1000;
35+
int queueSize = 2048;
36+
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();//.DiscardPolicy();
37+
proxyService = new ThreadPoolExecutor(cores, cores,
38+
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
39+
new NamedThreadFactory("proxyService"), handler);
40+
}
41+
42+
public void handle(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx) {
43+
final String url = this.backendUrl + fullRequest.uri();
44+
proxyService.submit(()->fetchGet(fullRequest, ctx, url));
45+
}
46+
47+
private void fetchGet(final FullHttpRequest inbound, final ChannelHandlerContext ctx, final String url) {
48+
client = new OkHttpClient.Builder()
49+
.addInterceptor(new HandleRespInterceptor(inbound, ctx))
50+
.build();
51+
52+
Request request = new Request.Builder()
53+
.header(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE)
54+
.url(url)
55+
.build();
56+
try {
57+
Response response = client.newCall(request).execute();
58+
System.out.println(response);
59+
} catch (IOException e) {
60+
e.printStackTrace();
61+
}
62+
}
63+
64+
65+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
66+
cause.printStackTrace();
67+
ctx.close();
68+
}
69+
70+
class HandleRespInterceptor implements Interceptor {
71+
FullHttpResponse response = null;
72+
FullHttpRequest inbound;
73+
ChannelHandlerContext ctx;
74+
75+
public HandleRespInterceptor(FullHttpRequest inbound, ChannelHandlerContext ctx) {
76+
this.ctx = ctx;
77+
this.inbound = inbound;
78+
}
79+
80+
@Override
81+
public Response intercept(Chain chain) throws IOException {
82+
Request request = chain.request();
83+
Response okHttpResponse = chain.proceed(request);
84+
try {
85+
byte[] respBytes = okHttpResponse.body().bytes();
86+
response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(respBytes));
87+
response.headers().set("Content-Type", "application/json");
88+
response.headers().setInt("Content-Length", Integer.parseInt(okHttpResponse.header("Content-Length")));
89+
}catch (Exception e) {
90+
e.printStackTrace();
91+
response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT);
92+
exceptionCaught(ctx, e);
93+
} finally {
94+
if (inbound != null) {
95+
if (!HttpUtil.isKeepAlive(inbound)) {
96+
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
97+
} else {
98+
ctx.write(response);
99+
}
100+
}
101+
ctx.flush();
102+
}
103+
return okHttpResponse;
104+
}
105+
106+
}
4107
}

0 commit comments

Comments
 (0)