-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathJobQueueTests.java
More file actions
364 lines (320 loc) · 10.3 KB
/
JobQueueTests.java
File metadata and controls
364 lines (320 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
package com.almworks.sqlite4java;
import java.io.File;
import java.util.concurrent.*;
public class JobQueueTests extends SQLiteConnectionFixture {
private TestQueue myQueue;
@Override
protected void setUp() throws Exception {
super.setUp();
myQueue = new TestQueue().start();
}
@Override
protected void tearDown() throws Exception {
myQueue.stop(false).join();
super.tearDown();
}
public void testStartStop() {
assertFalse(myQueue.isStopped());
assertNull(myQueue.getDatabaseFile());
myQueue.stop(false);
assertTrue(myQueue.isStopped());
}
public void testStartStopUnusual() throws InterruptedException {
myQueue.start();
myQueue.start();
myQueue.stop(true);
myQueue.stop(false);
myQueue.join();
myQueue.join();
}
public void testBadThreadFactory() throws InterruptedException {
myQueue.stop(true).join();
myQueue = new TestQueue(null, new ThreadFactory() {
public Thread newThread(Runnable r) {
return null;
}
});
try {
myQueue.start();
fail("started without thread?");
} catch (RuntimeException e) {
// ok
}
}
public void testGracefulStop() {
BarrierJob job1 = myQueue.execute(new BarrierJob());
BarrierJob job2 = myQueue.execute(new BarrierJob());
myQueue.stop(true);
assertTrue(job1.await());
assertTrue(job2.await());
job1.testResult(true);
job2.testResult(true);
}
public void testNonGracefulStop() {
BarrierJob job1 = myQueue.execute(new BarrierJob());
BarrierJob job2 = myQueue.execute(new BarrierJob());
myQueue.stop(false);
job1.await(); // job1 may be executed if it is started before stop() is called - no check
assertFalse(job2.await());
job2.testNoResult(false, true, null);
}
public void testExecuteAfterStop() throws InterruptedException {
myQueue.stop(true);
SimpleJob job = myQueue.execute(new SimpleJob());
job.testNoResult(false, true, null);
myQueue.join();
myQueue = new TestQueue().start();
myQueue.stop(false).join();
job = myQueue.execute(new SimpleJob());
job.testNoResult(false, true, null);
}
public void testExecuteBeforeStart() throws InterruptedException {
myQueue.stop(true).join();
myQueue = new TestQueue();
SimpleJob job = myQueue.execute(new SimpleJob());
myQueue.start();
job.testResult(true);
}
public void testFlush() throws InterruptedException {
for (int i = 0; i < 100; i++) myQueue.execute(new SimpleJob());
SimpleJob job = myQueue.execute(new SimpleJob());
myQueue.flush();
job.testState(true, true, false, false);
}
public void testExecute() {
Boolean r = myQueue.execute(new SQLiteJob<Boolean>() {
@Override
protected Boolean job(SQLiteConnection connection) throws Throwable {
return connection.getAutoCommit();
}
}).complete();
assertEquals((Boolean) true, r);
}
public void testBasicOpen() throws InterruptedException {
SQLiteQueue queue = new SQLiteQueue().start();
assertTrue(queue.execute(new SimpleJob()).complete());
queue.stop(false).join();
}
public void testCancelRollback() {
myQueue.execute(new SQLiteJob<Object>() {
@Override
protected Object job(SQLiteConnection connection) throws Throwable {
connection.exec("create table x (x)");
return null;
}
}).complete();
BarrierJob job = myQueue.execute(new BarrierJob() {
@Override
protected Boolean job(SQLiteConnection connection) throws Throwable {
connection.exec("begin");
connection.exec("insert into x values (1)");
return super.job(connection);
}
});
while (job.barrier.getNumberWaiting() < 1) {}
job.cancel(true);
job.testNoResult(true, true, null);
assertEquals((Integer)0, myQueue.execute(new SQLiteJob<Integer>() {
@Override
protected Integer job(SQLiteConnection connection) throws Throwable {
SQLiteStatement st = connection.prepare("select count(*) from x");
st.step();
int r = st.columnInt(0);
st.dispose();
return r;
}
}).complete());
}
public void testJobError() {
myQueue.execute(new SimpleJob() {
@Override
protected Boolean job(SQLiteConnection connection) throws Throwable {
connection.exec("BEIGN");
return super.job(connection);
}
}).testNoResult(true, false, SQLiteException.class);
}
public void testAbnormalStop() throws InterruptedException {
final Thread[] hijackThread = {null};
myQueue.execute(new SQLiteJob<Object>() {
@Override
protected Object job(SQLiteConnection connection) throws Throwable {
hijackThread[0] = Thread.currentThread();
return null;
}
}).complete();
BarrierJob job1 = myQueue.execute(new BarrierJob());
BarrierJob job2 = myQueue.execute(new BarrierJob());
hijackThread[0].interrupt();
job1.await();
job2.await();
job2.testNoResult(false, true, null);
myQueue.join();
assertTrue(myQueue.isStopped());
}
public void testReincarnation() throws InterruptedException {
myQueue.stop(false).join();
FileQueue q = new FileQueue().start();
q.execute(new SQLiteJob<Object>() {
@Override
protected Object job(SQLiteConnection connection) throws Throwable {
connection.exec("create table x (x)");
return null;
}
}).complete();
// jobs:
// 1. barrier
BarrierJob barrier = q.execute(new BarrierJob());
// 2. break queue
q.execute(new SQLiteJob<Object>() {
@Override
protected Object job(SQLiteConnection connection) throws Throwable {
connection.exec("begin");
connection.exec("insert into x values (1)");
connection.exec("WHOA");
return null;
}
});
// 3. normal job - should be executed after reincarnation
SQLiteJob<Integer> job = q.execute(new SQLiteJob<Integer>() {
@Override
protected Integer job(SQLiteConnection connection) throws Throwable {
SQLiteStatement st = connection.prepare("select count(*) from x");
st.step();
return st.columnInt(0);
}
});
barrier.await();
Thread.sleep(100);
assertFalse(q.isStopped());
assertEquals((Integer)0, job.complete());
q.stop(true).join();
}
public class TestQueue extends SQLiteQueue {
public TestQueue() {
}
@Override
public TestQueue start() {
return (TestQueue) super.start();
}
public TestQueue(File databaseFile, ThreadFactory threadFactory) {
super(databaseFile, threadFactory);
}
@Override
protected SQLiteConnection openConnection() throws SQLiteException {
return memDb().open();
}
}
public class FileQueue extends SQLiteQueue {
public FileQueue() {
}
@Override
public FileQueue start() {
return (FileQueue) super.start();
}
@Override
protected SQLiteConnection openConnection() throws SQLiteException {
return fileDb().open();
}
@Override
protected void handleJobException(SQLiteJob job, Throwable e) throws SQLiteException {
throw new RuntimeException("fail!", e);
}
@Override
protected boolean isReincarnationPossible() {
return true;
}
@Override
protected long getReincarnationTimeout() {
return 200;
}
}
private static abstract class TestJob<T> extends SQLiteJob<T> {
final CountDownLatch started = new CountDownLatch(1);
final CountDownLatch finished = new CountDownLatch(1);
final CountDownLatch errored = new CountDownLatch(1);
final CountDownLatch cancelled = new CountDownLatch(1);
volatile SQLiteConnection connection;
volatile T result;
volatile Throwable error;
public void testState(boolean started, boolean finished, boolean error, boolean cancelled) {
assertEquals(started, this.started.getCount() == 0);
assertEquals(finished, this.finished.getCount() == 0);
assertEquals(error, this.errored.getCount() == 0);
assertEquals(cancelled, this.cancelled.getCount() == 0);
assertEquals(cancelled, isCancelled());
assertEquals(finished, isDone());
assertEquals(error, getError() != null);
}
public void testResult(T result) {
assert result != null;
assertEquals(result, complete());
// additional wait to let callbacks be called
while (getQueue() != null) {}
testState(true, true, false, false);
}
public void testNoResult(boolean started, boolean cancelled, Class errorClass) {
assertNull(complete());
// additional wait to let callbacks be called
while (getQueue() != null) {
}
testState(started, true, errorClass != null, cancelled);
if (errorClass != null) {
assertEquals(errorClass, error == null ? null : error.getClass());
}
}
@Override
protected void jobStarted(SQLiteConnection connection) throws Throwable {
started.countDown();
this.connection = connection;
}
@Override
protected void jobFinished(T result) throws Throwable {
finished.countDown();
this.result = result;
}
@Override
protected void jobError(Throwable error) throws Throwable {
errored.countDown();
this.error = error;
}
@Override
protected void jobCancelled() throws Throwable {
cancelled.countDown();
}
}
private static class SimpleJob extends TestJob<Boolean> {
@Override
protected Boolean job(SQLiteConnection connection) throws Throwable {
return true;
}
}
private static class BarrierJob extends TestJob<Boolean> {
final CyclicBarrier barrier = new CyclicBarrier(2);
public boolean await() {
try {
barrier.await(1000, TimeUnit.MILLISECONDS);
return true;
} catch (BrokenBarrierException e) {
return false;
} catch (TimeoutException e) {
return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError();
}
}
@Override
protected Boolean job(SQLiteConnection connection) throws Throwable {
if (!await()) return null;
if (isCancelled()) return null;
return true;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean r = super.cancel(mayInterruptIfRunning);
barrier.reset();
return r;
}
}
}