Skip to content

Commit 42a4f1a

Browse files
committed
添加netty支持,测试未通过
1 parent 31970a4 commit 42a4f1a

File tree

6 files changed

+140
-5
lines changed

6 files changed

+140
-5
lines changed

07rpc/rpc01/rpcfx-core/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@
3838
<version>1.4.0</version>
3939
</dependency>
4040

41+
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
42+
<dependency>
43+
<groupId>io.netty</groupId>
44+
<artifactId>netty-all</artifactId>
45+
<version>4.1.56.Final</version>
46+
</dependency>
47+
4148

4249
<dependency>
4350
<groupId>com.squareup.okhttp3</groupId>

07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
import io.kimmking.rpcfx.exception.RpcfxException;
88
import io.kimmking.rpcfx.param.RpcfxRequest;
99
import io.kimmking.rpcfx.param.RpcfxResponse;
10+
import io.kimmking.rpcfx.utils.ClientUtils;
11+
import io.kimmking.rpcfx.utils.NettyClientUtils;
1012
import io.kimmking.rpcfx.utils.XStreamUtils;
1113
import okhttp3.MediaType;
12-
import okhttp3.OkHttpClient;
1314
import okhttp3.Request;
1415
import okhttp3.RequestBody;
1516
import org.springframework.cglib.proxy.Enhancer;
@@ -19,7 +20,6 @@
1920
import java.io.IOException;
2021
import java.lang.reflect.InvocationHandler;
2122
import java.lang.reflect.Method;
22-
import java.lang.reflect.Proxy;
2323

2424
public final class Rpcfx {
2525

@@ -40,18 +40,36 @@ public static <T> T create(final Class<T> serviceClass, final String url) {
4040

4141
}
4242

43+
public static <T> T create(final Class<T> serviceClass, final String host, final Integer port) {
44+
//Gglib方式
45+
Enhancer enhancer = new Enhancer();
46+
//enhancer.setCallback(new RpcfxInvocationHandler(serviceClass, host, port));
47+
enhancer.setSuperclass(serviceClass);
48+
return (T) enhancer.create();
49+
// 0. 替换动态代理 -> AOP
50+
//return (T) Proxy.newProxyInstance(Rpcfx.class.getClassLoader(), new Class[]{serviceClass}, new RpcfxInvocationHandler(serviceClass, url));
51+
52+
}
53+
4354
public static class RpcfxInvocationHandler implements InvocationHandler, MethodInterceptor {
4455

4556
public static final MediaType JSONTYPE = MediaType.get("application/json; charset=utf-8");
4657

4758
private final Class<?> serviceClass;
4859
private final String url;
60+
// private final String host;
61+
// private final Integer port;
4962
private final XStream stream = XStreamUtils.createToJson();
5063

5164
public <T> RpcfxInvocationHandler(Class<T> serviceClass, String url) {
5265
this.serviceClass = serviceClass;
5366
this.url = url;
5467
}
68+
// public <T> RpcfxInvocationHandler(Class<T> serviceClass, String host, Integer port) {
69+
// this.serviceClass = serviceClass;
70+
// this.host = host;
71+
// this.port = port;
72+
// }
5573

5674
// 可以尝试,自己去写对象序列化,二进制还是文本的,,,rpcfx是xml自定义序列化、反序列化,json: code.google.com/p/rpcfx
5775
// int byte char float double long bool
@@ -62,7 +80,7 @@ public Object invoke(Object proxy, Method method, Object[] params) throws Throwa
6280
return post(method, params, url);
6381
}
6482

65-
private Object post(Method method, Object[] params, String url) throws IOException, RpcfxException {
83+
private Object post(Method method, Object[] params, String url) throws IOException, RpcfxException, InterruptedException {
6684
RpcfxRequest rpcfxRequest = new RpcfxRequest();
6785
rpcfxRequest.setServiceClass(this.serviceClass.getName());
6886
rpcfxRequest.setMethod(method.getName());
@@ -72,12 +90,14 @@ private Object post(Method method, Object[] params, String url) throws IOExcepti
7290

7391
// 1.可以复用client
7492
// 2.尝试使用httpclient或者netty client
75-
OkHttpClient client = new OkHttpClient();
93+
//OkHttpClient client = new OkHttpClient();
7694
final Request request = new Request.Builder()
7795
.url(url)
7896
.post(RequestBody.create(JSONTYPE, reqJson))
7997
.build();
80-
String respJson = client.newCall(request).execute().body().string();
98+
String respJson = ClientUtils.execute(request).body().string();
99+
/*NettyClientUtils nettyClientUtils = new NettyClientUtils(host, port, reqJson);
100+
String respJson = nettyClientUtils.start();*/
81101
System.out.println("resp json: " + respJson);
82102
RpcfxResponse response = JSON.parseObject(respJson, RpcfxResponse.class);
83103
// 这里判断response.status,处理异常
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.kimmking.rpcfx.handler;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.Unpooled;
5+
import io.netty.channel.ChannelHandler;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.channel.SimpleChannelInboundHandler;
8+
import io.netty.util.CharsetUtil;
9+
10+
@ChannelHandler.Sharable
11+
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
12+
13+
private final String req;
14+
15+
public ClientHandler(String req) {
16+
this.req = req;
17+
}
18+
19+
@Override
20+
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
21+
22+
System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
23+
24+
}
25+
26+
@Override
27+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
28+
ctx.writeAndFlush(Unpooled.copiedBuffer(req, CharsetUtil.UTF_8));
29+
}
30+
31+
@Override
32+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
33+
cause.printStackTrace();
34+
ctx.close();
35+
}
36+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.kimmking.rpcfx.utils;
2+
3+
import okhttp3.OkHttpClient;
4+
import okhttp3.Request;
5+
import okhttp3.Response;
6+
7+
import java.io.IOException;
8+
9+
public class ClientUtils {
10+
11+
private static final OkHttpClient client = new OkHttpClient();
12+
13+
public static Response execute(Request request) throws IOException {
14+
return client.newCall(request).execute();
15+
}
16+
17+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.kimmking.rpcfx.utils;
2+
3+
import io.kimmking.rpcfx.handler.ClientHandler;
4+
import io.netty.bootstrap.Bootstrap;
5+
import io.netty.channel.ChannelFuture;
6+
import io.netty.channel.ChannelInitializer;
7+
import io.netty.channel.EventLoopGroup;
8+
import io.netty.channel.nio.NioEventLoopGroup;
9+
import io.netty.channel.socket.SocketChannel;
10+
import io.netty.channel.socket.nio.NioSocketChannel;
11+
import io.netty.util.CharsetUtil;
12+
13+
public class NettyClientUtils {
14+
15+
private final String host;
16+
17+
private final Integer port;
18+
19+
private final String req;
20+
21+
22+
public NettyClientUtils(String host, Integer port, String req) {
23+
this.host = host;
24+
this.port = port;
25+
this.req = req;
26+
}
27+
28+
public String start() throws InterruptedException {
29+
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
30+
String resp;
31+
try {
32+
Bootstrap bootstrap = new Bootstrap();
33+
bootstrap.
34+
group(eventLoopGroup).
35+
channel(NioSocketChannel.class).
36+
remoteAddress(host, port).
37+
handler(new ChannelInitializer<SocketChannel>() {
38+
@Override
39+
protected void initChannel(SocketChannel socketChannel) throws Exception {
40+
socketChannel.pipeline().addLast(new ClientHandler(req));
41+
}
42+
});
43+
ChannelFuture future = bootstrap.connect().sync();
44+
resp = future.channel().alloc().buffer().toString(CharsetUtil.UTF_8);
45+
System.out.println(resp);
46+
future.channel().closeFuture().sync();
47+
} finally {
48+
eventLoopGroup.shutdownGracefully();
49+
}
50+
return resp;
51+
}
52+
53+
}

07rpc/rpc01/rpcfx-demo-consumer/src/main/java/io/kimmking/rpcfx/demo/consumer/RpcfxClientApplication.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ public static void main(String[] args) {
2121
// service.findById
2222

2323
UserService userService = Rpcfx.create(UserService.class, "http://localhost:8080/");
24+
//UserService userService = Rpcfx.create(UserService.class, "127.0.0.1", 8080);
2425
User user = userService.findById(1);
2526
System.out.println("find user id=1 from server: " + user.getName());
2627

2728
OrderService orderService = Rpcfx.create(OrderService.class, "http://localhost:8080/");
29+
//OrderService orderService = Rpcfx.create(OrderService.class, "127.0.0.1", 8080);
2830
Order order = orderService.findOrderById(1992129);
2931
System.out.println(String.format("find order name=%s, amount=%f",order.getName(),order.getAmount()));
3032

0 commit comments

Comments
 (0)