@@ -36,17 +36,22 @@ public class HttpResendQueue {
3636 */
3737 private static final Logger LOGGER = LoggerFactory .getLogger (HttpResendQueue .class );
3838
39+ /**
40+ * Try posting message 3 times before skipping it
41+ */
42+ private static final int MAX_POST_ATTEMPTS = 3 ;
43+
3944 /**
4045 * The queue of requests to be retransmitted
4146 */
42- private final Queue <byte [] > resendQueue ;
47+ private final Queue <HttpResendQueueItem > resendQueue ;
4348
4449 /**
4550 * Constructor
4651 * @param maxSize Maximum size of the queue
4752 */
4853 public HttpResendQueue (final int maxSize ) {
49- this .resendQueue = new SynchronizedEvictingQueue <byte [] >(maxSize );
54+ this .resendQueue = new SynchronizedEvictingQueue <HttpResendQueueItem >(maxSize );
5055 }
5156
5257 /**
@@ -62,7 +67,7 @@ public int size() {
6267 * @param e IOException
6368 */
6469 public void offer (final byte [] request , final IOException e ) {
65- resendQueue .offer (request );
70+ resendQueue .offer (new HttpResendQueueItem ( request ) );
6671 }
6772
6873 /**
@@ -72,7 +77,7 @@ public void offer(final byte[] request, final IOException e) {
7277 */
7378 public void offer (final byte [] request , final HttpException e ) {
7479 if (!e .isClientError ()) {
75- resendQueue .offer (request );
80+ resendQueue .offer (new HttpResendQueueItem ( request ) );
7681 }
7782 }
7883
@@ -92,15 +97,54 @@ public void drain(final HttpClient httpClient, final String path) {
9297 * @param gzip True if the post should be gzipped, false otherwise
9398 */
9499 public void drain (final HttpClient httpClient , final String path , final boolean gzip ) {
100+
95101 if (!resendQueue .isEmpty ()) {
102+
103+ // queued items are available for retransmission
104+
96105 try {
106+ // drain resend queue until empty or first exception
107+
97108 LOGGER .info ("Attempting to retransmit {} requests" , resendQueue .size ());
98109
99110 while (!resendQueue .isEmpty ()) {
100- byte [] jsonBytes = resendQueue .peek ();
101- httpClient .post (path , jsonBytes , gzip );
102- resendQueue .remove ();
103- Threads .sleepQuietly (250 , TimeUnit .MILLISECONDS );
111+
112+ // get next item off queue
113+
114+ HttpResendQueueItem item = resendQueue .peek ();
115+
116+ try {
117+
118+ // retransmit queued request
119+
120+ byte [] jsonBytes = item .getJsonBytes ();
121+ httpClient .post (path , jsonBytes , gzip );
122+
123+ // retransmission successful
124+ // remove from queue and sleep for 250ms
125+
126+ resendQueue .remove ();
127+
128+ Threads .sleepQuietly (250 , TimeUnit .MILLISECONDS );
129+
130+ } catch (Throwable t ) {
131+
132+ // retransmission failed
133+ // increment the item's counter
134+
135+ item .failed ();
136+
137+ // remove it from the queue if we have had MAX_POST_ATTEMPTS (3) failures for the same request
138+
139+ if (MAX_POST_ATTEMPTS <= item .getNumFailures ())
140+ {
141+ resendQueue .remove ();
142+ }
143+
144+ // rethrow original exception from retransmission
145+
146+ throw t ;
147+ }
104148 }
105149 } catch (Throwable t ) {
106150 LOGGER .info ("Failure retransmitting queued requests" , t );
0 commit comments