Skip to content

Commit fab00ca

Browse files
committed
Merge pull request #6 from stackify/read-timeout-retransmission
Increase read timeout from 5s to 15s. Decreasing retransmission queue
2 parents 99da033 + c14867b commit fab00ca

7 files changed

Lines changed: 204 additions & 12 deletions

File tree

src/main/java/com/stackify/api/common/http/HttpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class HttpClient {
4444
/**
4545
* READ_TIMEOUT
4646
*/
47-
private static final int READ_TIMEOUT = 5000;
47+
private static final int READ_TIMEOUT = 15000;
4848

4949
/**
5050
* API configuration

src/main/java/com/stackify/api/common/http/HttpResendQueue.java

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,22 @@ public class HttpResendQueue {
3636
*/
3737
private static final Logger LOGGER = LoggerFactory.getLogger(HttpResendQueue.class);
3838

39+
/**
40+
* Try posting message 3 times before skipping it
41+
*/
42+
private static final int MAX_POST_ATTEMPTS = 3;
43+
3944
/**
4045
* The queue of requests to be retransmitted
4146
*/
42-
private final Queue<byte[]> resendQueue;
47+
private final Queue<HttpResendQueueItem> resendQueue;
4348

4449
/**
4550
* Constructor
4651
* @param maxSize Maximum size of the queue
4752
*/
4853
public HttpResendQueue(final int maxSize) {
49-
this.resendQueue = new SynchronizedEvictingQueue<byte[]>(maxSize);
54+
this.resendQueue = new SynchronizedEvictingQueue<HttpResendQueueItem>(maxSize);
5055
}
5156

5257
/**
@@ -62,7 +67,7 @@ public int size() {
6267
* @param e IOException
6368
*/
6469
public void offer(final byte[] request, final IOException e) {
65-
resendQueue.offer(request);
70+
resendQueue.offer(new HttpResendQueueItem(request));
6671
}
6772

6873
/**
@@ -72,7 +77,7 @@ public void offer(final byte[] request, final IOException e) {
7277
*/
7378
public void offer(final byte[] request, final HttpException e) {
7479
if (!e.isClientError()) {
75-
resendQueue.offer(request);
80+
resendQueue.offer(new HttpResendQueueItem(request));
7681
}
7782
}
7883

@@ -92,15 +97,54 @@ public void drain(final HttpClient httpClient, final String path) {
9297
* @param gzip True if the post should be gzipped, false otherwise
9398
*/
9499
public void drain(final HttpClient httpClient, final String path, final boolean gzip) {
100+
95101
if (!resendQueue.isEmpty()) {
102+
103+
// queued items are available for retransmission
104+
96105
try {
106+
// drain resend queue until empty or first exception
107+
97108
LOGGER.info("Attempting to retransmit {} requests", resendQueue.size());
98109

99110
while (!resendQueue.isEmpty()) {
100-
byte[] jsonBytes = resendQueue.peek();
101-
httpClient.post(path, jsonBytes, gzip);
102-
resendQueue.remove();
103-
Threads.sleepQuietly(250, TimeUnit.MILLISECONDS);
111+
112+
// get next item off queue
113+
114+
HttpResendQueueItem item = resendQueue.peek();
115+
116+
try {
117+
118+
// retransmit queued request
119+
120+
byte[] jsonBytes = item.getJsonBytes();
121+
httpClient.post(path, jsonBytes, gzip);
122+
123+
// retransmission successful
124+
// remove from queue and sleep for 250ms
125+
126+
resendQueue.remove();
127+
128+
Threads.sleepQuietly(250, TimeUnit.MILLISECONDS);
129+
130+
} catch (Throwable t) {
131+
132+
// retransmission failed
133+
// increment the item's counter
134+
135+
item.failed();
136+
137+
// remove it from the queue if we have had MAX_POST_ATTEMPTS (3) failures for the same request
138+
139+
if (MAX_POST_ATTEMPTS <= item.getNumFailures())
140+
{
141+
resendQueue.remove();
142+
}
143+
144+
// rethrow original exception from retransmission
145+
146+
throw t;
147+
}
104148
}
105149
} catch (Throwable t) {
106150
LOGGER.info("Failure retransmitting queued requests", t);
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2015 Stackify
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.stackify.api.common.http;
17+
18+
/**
19+
* HttpResendQueueItem
20+
* @author Eric Martin
21+
*/
22+
public class HttpResendQueueItem {
23+
24+
/**
25+
* JSON bytes
26+
*/
27+
private final byte[] jsonBytes;
28+
29+
/**
30+
* Number of failures for the item;
31+
*/
32+
private int numFailures;
33+
34+
/**
35+
* Constructor
36+
* @param jsonBytes JSON bytes
37+
*/
38+
public HttpResendQueueItem(final byte[] jsonBytes) {
39+
this.jsonBytes = jsonBytes;
40+
this.numFailures = 1;
41+
}
42+
43+
/**
44+
* @return the jsonBytes
45+
*/
46+
public byte[] getJsonBytes() {
47+
return jsonBytes;
48+
}
49+
50+
/**
51+
* @return the numFailures
52+
*/
53+
public int getNumFailures() {
54+
return numFailures;
55+
}
56+
57+
/**
58+
* Increment the number of failures
59+
*/
60+
public void failed() {
61+
++numFailures;
62+
}
63+
}

src/main/java/com/stackify/api/common/log/LogAppender.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
*/
3333
public class LogAppender<T> implements Closeable {
3434

35+
/**
36+
* Internal package prefix
37+
*/
38+
private static final String COM_DOT_STACKIFY = "com.stackify.";
39+
3540
/**
3641
* Logger project name
3742
*/
@@ -132,7 +137,7 @@ public void append(final T event) {
132137
String className = eventAdapter.getClassName(event);
133138

134139
if (className != null) {
135-
if (className.startsWith("com.stackify.api.")) {
140+
if (className.startsWith(COM_DOT_STACKIFY)) {
136141
return;
137142
}
138143
}

src/main/java/com/stackify/api/common/log/LogSender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ public class LogSender {
5656
private final ObjectMapper objectMapper;
5757

5858
/**
59-
* The queue of requests to be retransmitted (max of 100 batches of 100 messages)
59+
* The queue of requests to be retransmitted (max of 20 batches of 100 messages)
6060
*/
61-
private final HttpResendQueue resendQueue = new HttpResendQueue(100);
61+
private final HttpResendQueue resendQueue = new HttpResendQueue(20);
6262

6363
/**
6464
* Default constructor
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2015 Stackify
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.stackify.api.common.http;
17+
18+
import org.junit.Assert;
19+
import org.junit.Test;
20+
21+
/**
22+
* HttpResendQueueItem JUnit Test
23+
* @author Eric Martin
24+
*/
25+
public class HttpResendQueueItemTest {
26+
27+
/**
28+
* testConstrcutorAndGetters
29+
*/
30+
@Test
31+
public void testConstrcutorAndGetters() {
32+
byte[] jsonBytes = "{\"method\": \"testConstrcutorAndGetters\"}".getBytes();
33+
34+
HttpResendQueueItem item = new HttpResendQueueItem(jsonBytes);
35+
Assert.assertNotNull(item);
36+
37+
Assert.assertEquals(jsonBytes, item.getJsonBytes());
38+
Assert.assertEquals(1, item.getNumFailures());
39+
}
40+
41+
/**
42+
* testIncrementFailures
43+
*/
44+
@Test
45+
public void testIncrementFailures() {
46+
byte[] jsonBytes = "{\"method\": \"testIncrementRetries\"}".getBytes();
47+
48+
HttpResendQueueItem item = new HttpResendQueueItem(jsonBytes);
49+
Assert.assertEquals(1, item.getNumFailures());
50+
51+
item.failed();
52+
Assert.assertEquals(2, item.getNumFailures());
53+
54+
item.failed();
55+
Assert.assertEquals(3, item.getNumFailures());
56+
}
57+
}

src/test/java/com/stackify/api/common/http/HttpResendQueueTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,27 @@ public void testDrainWithException() throws Exception {
112112

113113
Assert.assertEquals(0, resendQueue.size());
114114
}
115+
116+
/**
117+
* testDrainWithExceptionAndSkip
118+
* @throws Exception
119+
*/
120+
@Test
121+
public void testDrainWithExceptionAndSkip() throws Exception {
122+
byte[] request1 = new byte[]{1};
123+
124+
HttpResendQueue resendQueue = new HttpResendQueue(3);
125+
resendQueue.offer(request1, new IOException());
126+
127+
Assert.assertEquals(1, resendQueue.size());
128+
129+
HttpClient httpClient = Mockito.mock(HttpClient.class);
130+
Mockito.when(httpClient.post("/path", request1, false)).thenThrow(new RuntimeException());
131+
132+
resendQueue.drain(httpClient, "/path");
133+
Assert.assertEquals(1, resendQueue.size());
134+
135+
resendQueue.drain(httpClient, "/path");
136+
Assert.assertEquals(0, resendQueue.size());
137+
}
115138
}

0 commit comments

Comments
 (0)