Skip to content

Commit 6364be3

Browse files
author
nononi
committed
第9周作业
1 parent dd54f65 commit 6364be3

File tree

19 files changed

+582
-55
lines changed

19 files changed

+582
-55
lines changed

07rpc/rpc01/rpcfx-core/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,17 @@
5959
<artifactId>spring-boot-starter-web</artifactId>
6060
</dependency>
6161

62+
<dependency>
63+
<groupId>org.aspectj</groupId>
64+
<artifactId>aspectjweaver</artifactId>
65+
</dependency>
66+
67+
<dependency>
68+
<groupId>io.netty</groupId>
69+
<artifactId>netty-all</artifactId>
70+
<version>4.1.45.Final</version>
71+
</dependency>
72+
6273
<dependency>
6374
<groupId>org.springframework.boot</groupId>
6475
<artifactId>spring-boot-starter-test</artifactId>

07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/client/Rpcfx.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public static <T> T create(final Class<T> serviceClass, final String url, Filter
4343

4444
// 0. 替换动态代理 -> 字节码生成
4545
return (T) Proxy.newProxyInstance(Rpcfx.class.getClassLoader(), new Class[]{serviceClass}, new RpcfxInvocationHandler(serviceClass, url, filters));
46-
46+
//
4747
}
4848

4949
public static class RpcfxInvocationHandler implements InvocationHandler {
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.kimmking.rpcfx.client.aop;
2+
3+
import java.lang.annotation.*;
4+
5+
6+
@Target({ElementType.METHOD})
7+
@Retention(RetentionPolicy.RUNTIME)
8+
@Documented
9+
public @interface RpcService {
10+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package io.kimmking.rpcfx.client.aop;
2+
3+
import com.alibaba.fastjson.JSON;
4+
import io.kimmking.rpcfx.api.RpcfxRequest;
5+
import io.kimmking.rpcfx.api.RpcfxResponse;
6+
import io.kimmking.rpcfx.client.netty4.NettyHttpClient;
7+
import io.kimmking.rpcfx.exception.RpcfxException;
8+
import okhttp3.MediaType;
9+
import okhttp3.OkHttpClient;
10+
import okhttp3.Request;
11+
import okhttp3.RequestBody;
12+
import org.aspectj.lang.ProceedingJoinPoint;
13+
import org.aspectj.lang.annotation.Around;
14+
import org.aspectj.lang.annotation.Aspect;
15+
import org.aspectj.lang.annotation.Pointcut;
16+
import org.springframework.stereotype.Component;
17+
18+
19+
import java.io.IOException;
20+
21+
/**
22+
* 定义切面,切点为标记@RpcSerivice注解的方法,环绕型通知
23+
*/
24+
@Aspect
25+
@Component
26+
public class RpcServiceAspect {
27+
private static final MediaType JSONTYPE = MediaType.get("application/json; charset=utf-8");
28+
29+
@Pointcut(value = "@annotation(io.kimmking.rpcfx.client.aop.RpcService)")
30+
public void pointcut(){
31+
32+
}
33+
34+
35+
@Around(value = "pointcut()")
36+
public Object around(ProceedingJoinPoint point) {
37+
String methodName = point.getSignature().getName();
38+
Object[] methodArgs = point.getArgs();
39+
RpcfxRequest request = new RpcfxRequest();
40+
request.setMethod(methodName);
41+
request.setParams(methodArgs);
42+
request.setServiceClass(point.getTarget().getClass().getInterfaces()[0].getName());
43+
44+
RpcfxResponse response;
45+
try {
46+
response = post(request);
47+
} catch (IOException e) {
48+
e.printStackTrace();
49+
throw new RpcfxException(e.getMessage());
50+
}
51+
52+
return JSON.parse(response.getResult().toString());
53+
54+
}
55+
56+
private RpcfxResponse post(RpcfxRequest req) throws IOException {
57+
String reqJson = JSON.toJSONString(req);
58+
System.out.println("req json: "+reqJson);
59+
60+
// 1.可以复用client
61+
// 2.尝试使用httpclient或者netty client
62+
// OkHttpClient client = new OkHttpClient();
63+
// final Request request = new Request.Builder()
64+
// .url("http://127.0.0.1:8080/")
65+
// .post(RequestBody.create(JSONTYPE, reqJson))
66+
// .build();
67+
// String respJson = client.newCall(request).execute().body().string();
68+
// System.out.println("resp json: "+respJson);
69+
// return JSON.parseObject(respJson, RpcfxResponse.class);
70+
71+
try {
72+
NettyHttpClient nettyHttpClient = new NettyHttpClient("127.0.0.1", 8080);
73+
return nettyHttpClient.send(req);
74+
75+
} catch (Exception e) {
76+
e.printStackTrace();
77+
RpcfxResponse response = new RpcfxResponse();
78+
response.setStatus(false);
79+
response.setException(new RpcfxException(e.getMessage()));
80+
response.setResult(null);
81+
return response;
82+
}
83+
}
84+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package io.kimmking.rpcfx.client.netty4;
2+
3+
import com.alibaba.fastjson.JSON;
4+
import io.kimmking.rpcfx.api.RpcfxResponse;
5+
import io.netty.buffer.ByteBuf;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.channel.ChannelInboundHandlerAdapter;
8+
import io.netty.channel.ChannelPromise;
9+
import io.netty.handler.codec.http.FullHttpRequest;
10+
import io.netty.handler.codec.http.FullHttpResponse;
11+
import io.netty.util.CharsetUtil;
12+
13+
14+
public class HttpClientHandler extends ChannelInboundHandlerAdapter {
15+
private ChannelHandlerContext ctx;
16+
private ChannelPromise promise;
17+
private volatile RpcfxResponse rpcfxResponse;
18+
19+
public ChannelPromise flushMessage(FullHttpRequest request) {
20+
if (ctx == null)
21+
throw new IllegalStateException();
22+
23+
System.out.println("flush flushMessage");
24+
promise = ctx.writeAndFlush(request).channel().newPromise();
25+
return promise;
26+
}
27+
28+
@Override
29+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
30+
super.channelActive(ctx);
31+
this.ctx = ctx;
32+
System.out.println("已连接");
33+
}
34+
35+
@Override
36+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
37+
38+
if(msg instanceof FullHttpResponse){
39+
FullHttpResponse response = (FullHttpResponse)msg;
40+
ByteBuf buf = response.content();
41+
String result = buf.toString(CharsetUtil.UTF_8);
42+
this.rpcfxResponse = JSON.parseObject(result, RpcfxResponse.class);
43+
this.promise.setSuccess(); //任务完成
44+
}
45+
}
46+
47+
public RpcfxResponse getRpcfxResponse() throws InterruptedException {
48+
return this.rpcfxResponse;
49+
}
50+
51+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package io.kimmking.rpcfx.client.netty4;//package io.github.kimmking.gateway.outbound;
2+
3+
import com.alibaba.fastjson.JSON;
4+
import io.kimmking.rpcfx.api.RpcfxRequest;
5+
import io.kimmking.rpcfx.api.RpcfxResponse;
6+
import io.netty.bootstrap.Bootstrap;
7+
import io.netty.buffer.Unpooled;
8+
import io.netty.channel.*;
9+
import io.netty.channel.nio.NioEventLoopGroup;
10+
import io.netty.channel.socket.SocketChannel;
11+
import io.netty.channel.socket.nio.NioSocketChannel;
12+
import io.netty.handler.codec.http.*;
13+
14+
import java.net.InetSocketAddress;
15+
16+
public class NettyHttpClient {
17+
private HttpClientHandler clientHandler = new HttpClientHandler();
18+
private final String host;
19+
private final int port;
20+
21+
public NettyHttpClient(String host, int port) {
22+
this.host = host;
23+
this.port = port;
24+
}
25+
26+
public RpcfxResponse send(RpcfxRequest rpcfxRequest) throws Exception {
27+
EventLoopGroup workerGroup = new NioEventLoopGroup();
28+
29+
try {
30+
Bootstrap b = new Bootstrap();
31+
b.group(workerGroup)
32+
.channel(NioSocketChannel.class)
33+
.remoteAddress(new InetSocketAddress(host,port))
34+
.option(ChannelOption.SO_KEEPALIVE, true)
35+
.handler(new ChannelInitializer<SocketChannel>() {
36+
@Override
37+
public void initChannel(SocketChannel ch) throws Exception {
38+
// 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
39+
ch.pipeline().addLast(new HttpResponseDecoder());
40+
// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
41+
ch.pipeline().addLast(new HttpRequestEncoder());
42+
ch.pipeline().addLast(new HttpObjectAggregator(1024*1024));
43+
ch.pipeline().addLast(new HttpServerExpectContinueHandler());
44+
ch.pipeline().addLast(clientHandler);
45+
}
46+
});
47+
48+
// Start the client.
49+
ChannelFuture f = b.connect().sync();
50+
51+
52+
RpcfxResponse response = this.post(rpcfxRequest);
53+
f.channel().closeFuture().sync();
54+
55+
return response;
56+
} finally {
57+
workerGroup.shutdownGracefully();
58+
}
59+
60+
}
61+
62+
private RpcfxResponse post(RpcfxRequest rpcfxRequest) throws InterruptedException {
63+
byte[] bytes = JSON.toJSONBytes(rpcfxRequest);
64+
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.POST, "/",
65+
Unpooled.wrappedBuffer(bytes));
66+
request.headers().add(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE);
67+
request.headers().add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
68+
request.headers().add(HttpHeaderNames.CONTENT_LENGTH,request.content().readableBytes());
69+
ChannelPromise channelPromise = clientHandler.flushMessage(request);
70+
channelPromise.await();
71+
return clientHandler.getRpcfxResponse();
72+
}
73+
74+
public static void main(String[] args) throws Exception {
75+
String host = "127.0.0.1";
76+
int port = 8080;
77+
78+
NettyHttpClient nettyHttpClient = new NettyHttpClient(host, port);
79+
80+
RpcfxRequest rpcfxRequest = new RpcfxRequest();
81+
rpcfxRequest.setServiceClass("io.kimmking.rpcfx.demo.api.UserService");
82+
rpcfxRequest.setParams(new Integer[]{1});
83+
rpcfxRequest.setMethod("findById");
84+
RpcfxResponse response = nettyHttpClient.send(rpcfxRequest);
85+
System.out.println(response);
86+
// System.out.println(nettyHttpClient.post(request));
87+
}
88+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.kimmking.rpcfx.exception;
2+
3+
4+
public class RpcfxException extends RuntimeException {
5+
public RpcfxException(String message) {
6+
super(message);
7+
}
8+
9+
public RpcfxException(String message, Throwable cause) {
10+
super(message, cause);
11+
}
12+
}

07rpc/rpc01/rpcfx-core/src/main/java/io/kimmking/rpcfx/server/RpcfxInvoker.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,38 @@
99
import java.lang.reflect.InvocationTargetException;
1010
import java.lang.reflect.Method;
1111
import java.util.Arrays;
12+
import java.util.HashMap;
1213

1314
public class RpcfxInvoker {
1415

1516
private RpcfxResolver resolver;
1617

18+
private HashMap<String, Class<?>> serviceContext;
19+
1720
public RpcfxInvoker(RpcfxResolver resolver){
1821
this.resolver = resolver;
1922
}
2023

24+
public RpcfxInvoker(HashMap<String, Class<?>> serviceContext) {
25+
this.serviceContext = serviceContext;
26+
}
27+
2128
public RpcfxResponse invoke(RpcfxRequest request) {
2229
RpcfxResponse response = new RpcfxResponse();
2330
String serviceClass = request.getServiceClass();
2431

2532
// 作业1:改成泛型和反射
26-
Object service = resolver.resolve(serviceClass);//this.applicationContext.getBean(serviceClass);
33+
// Object service = resolver.resolve(serviceClass);//this.applicationContext.getBean(serviceClass);
34+
Class<?> service = this.serviceContext.get(serviceClass);
2735

2836
try {
29-
Method method = resolveMethodFromClass(service.getClass(), request.getMethod());
30-
Object result = method.invoke(service, request.getParams()); // dubbo, fastjson,
37+
Method method = resolveMethodFromClass(service, request.getMethod());
38+
Object result = method.invoke(service.newInstance(), request.getParams()); // dubbo, fastjson,
3139
// 两次json序列化能否合并成一个
3240
response.setResult(JSON.toJSONString(result, SerializerFeature.WriteClassName));
3341
response.setStatus(true);
3442
return response;
35-
} catch ( IllegalAccessException | InvocationTargetException e) {
43+
} catch (IllegalAccessException | InvocationTargetException | InstantiationException e) {
3644

3745
// 3.Xstream
3846

07rpc/rpc01/rpcfx-demo-api/src/main/java/io/kimmking/rpcfx/demo/api/OrderService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.kimmking.rpcfx.demo.api;
22

3+
34
public interface OrderService {
45

56
Order findOrderById(int id);

0 commit comments

Comments
 (0)