forked from pyb1993/JavaRedis
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRedisServer.java
More file actions
146 lines (120 loc) · 6.14 KB
/
RedisServer.java
File metadata and controls
146 lines (120 loc) · 6.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package RedisServer;
import CommandDispatcher.CommandDispatcher;
import MessageRegister.MessageRegister;
import RedisCommand.MessageEncoder;
import RedisDataBase.AbstractObjectPool;
import RedisDataBase.RedisString;
import RedisFuture.RedisRunnable;
import RedisDataBase.RedisDb;
import RedisDataBase.RedisTimerWheel;
import RedisFuture.RedisFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.*;
import io.netty.channel.*;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.nio.AbstractNioByteChannel;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import Common.*;
import RedisCommand.*;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.SystemPropertyUtil;
import java.util.LinkedList;
import java.util.Locale;
import java.util.concurrent.*;
public class RedisServer {
private String ip;
private int port;
public static EventLoopGroup acceptGroup = new NioEventLoopGroup(1);
private static LinkedList<RedisFuture> queue = new LinkedList<>(); // 用来处理定时任务结果的
static public final ScheduledExecutorService ExpireHelper = Executors.newScheduledThreadPool(1);// 用来在处理大量过期事件时候进行帮助的线程
static Thread mThread;
/** 如果需要添加自己的命令,只需要继承原来的RedisServer
* 然后在构造函数里面调用 MessageRegister.registerDefault().register(xxx).register(yyy) 就好**/
public RedisServer(String ip, int port){
this.ip = ip;
this.port = port;
//rehashThread = Executors.newCachedThreadPool();// 用来在rehash的时候提交的
MessageRegister.registerDefault();// 注册默认的那些命令比如set get incr
}
// todo IdleStateHandler是不会自动关闭的,需要自己实现心跳机制
public void start() throws Exception{
try{
ServerBootstrap b = new ServerBootstrap();// 接受链接一个group,IO一个group
// 设置所有的属性, serverBootstrap实际上会调用group(group,group),因为需要两个group来分配EventLoop
b.group(acceptGroup).
channel(NioServerSocketChannel.class).
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new IdleStateHandler(60,60,180))
.addLast(new MessageEncoder());
ch.pipeline()
.addLast(new ByteToMessageInputDecoder())
//.addLast(new MessageDecoder())
.addLast(new CommandDispatcher());
}});
b.option(ChannelOption.SO_BACKLOG, 2048) // socket接受队列大小
.option(ChannelOption.SO_REUSEADDR, true) // 避免端口冲突
.option(ChannelOption.TCP_NODELAY, true) // 关闭小流合并,保证消息的及时性
.childOption(ChannelOption.SO_KEEPALIVE, true); // 长时间没动静的链接自动关闭
ChannelFuture f = b.bind(this.ip,this.port).sync();
Logger.log(RedisServer.class.getName() + "start and listen on " + f.channel().localAddress());
acceptGroup.submit(()->{mThread = Thread.currentThread();}).sync();// 获取EventLoop的thread
// 10ms执行一次,用来更新系统时间
acceptGroup.scheduleAtFixedRate(()->RedisTimerWheel.updateSystemTime(),0,10,TimeUnit.MILLISECONDS);
// 每250ms执行一次对过期数据的删除
acceptGroup.scheduleAtFixedRate(new RedisRunnable(()->RedisDb.processExpires()),1,250,TimeUnit.MILLISECONDS);
// 每xxms执行一次,用来执行 移除过期key 任务完成的回调
acceptGroup.scheduleAtFixedRate(new RedisRunnable(()->RedisServer.onComplete()),2,137,TimeUnit.MILLISECONDS);
// 先输出一下统计情况,观察一下
acceptGroup.scheduleAtFixedRate(new RedisRunnable(()->RedisServer.statisticForPool()),5,1000,TimeUnit.MILLISECONDS);
// 衰减一下
acceptGroup.scheduleAtFixedRate(new RedisRunnable(()->RedisServer.scaleDown()),5,1000,TimeUnit.MILLISECONDS);
// 需要将删除的数据放回原来的地方
acceptGroup.scheduleAtFixedRate(new RedisRunnable(()->RedisString.pool.releaseFromRemovedDeque()),5,500,TimeUnit.MILLISECONDS);
f.channel().closeFuture().sync();
Logger.log("close done");
}finally {
acceptGroup.shutdownGracefully().sync();
}
}
// 每过时间T就会执行,这里的T设定为1S
public static void scaleDown(){
AbstractObjectPool pool = RedisString.pool;
pool.scaleDown();
}
// 每过时间T就会执行,这里的T设置为1S
public static void statisticForPool(){
// 首先要对RedisStringPool进行统计
AbstractObjectPool pool = RedisString.pool;
pool.usePoolWhenNeed(3);
RedisString.pool.print();
}
public static boolean isCurrentThread(){
assert mThread != null;
return Thread.currentThread() == mThread;
}
public static void addFuture(RedisFuture future){
queue.add(future);
}
// 用来检查所有的回调有没有执行完全
// 每次处理最多25个任务的回调,由于回调的任务一般都相对比较简单,所以应该很快就执行完了
// 之所以设置25,是因为有些任务存在锁的竞争,如果暂时不能获取,就先不获取
public static void onComplete() {
int size = 15;
RedisFuture ef;
while (size-- > 0 && (ef = queue.poll())!= null){
if(ef.isDone()){
ef.onComplete();
}else {
queue.add(ef);
}
}
}
}