Skip to content

Commit eb7e469

Browse files
committed
adding queue notifications
1 parent 52cb224 commit eb7e469

5 files changed

Lines changed: 61 additions & 13 deletions

File tree

build.gradle

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@ configurations.all {
2121
}
2222

2323
repositories {
24-
25-
maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
26-
maven { url "http://repo.maven.apache.org/maven2" }
27-
maven { url "https://plugins.gradle.org/m2/" }
24+
mavenCentral()
2825
}
2926

3027
dependencies {

src/main/java/com/pubnub/api/PNConfiguration.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ public class PNConfiguration {
117117
@Setter
118118
private Proxy proxy;
119119

120+
/**
121+
* if set, the SDK will alert once the number of messages arrived in one call equal to the threshold
122+
*/
123+
private Integer requestMessageCountThreshold;
124+
120125
/**
121126
* Initialize the PNConfiguration with default values
122127
*/

src/main/java/com/pubnub/api/enums/PNStatusCategory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,6 @@ public enum PNStatusCategory {
1919
PNTLSConnectionFailedCategory,
2020
PNTLSUntrustedCertificateCategory,
2121

22+
PNRequestMessageCountExceededCategory
23+
2224
}

src/main/java/com/pubnub/api/managers/SubscriptionManager.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -220,21 +220,24 @@ public void onResponse(final SubscribeEnvelope result, final PNStatus status) {
220220
}
221221

222222
if (!subscriptionStatusAnnounced) {
223-
PNStatus pnStatus = PNStatus.builder()
224-
.error(false)
223+
PNStatus pnStatus = createPublicStatus(status)
225224
.category(PNStatusCategory.PNConnectedCategory)
226-
.statusCode(status.getStatusCode())
227-
.authKey(status.getAuthKey())
228-
.operation(status.getOperation())
229-
.clientRequest(status.getClientRequest())
230-
.origin(status.getOrigin())
231-
.tlsEnabled(status.isTlsEnabled())
225+
.error(false)
232226
.build();
233-
234227
subscriptionStatusAnnounced = true;
235228
listenerManager.announce(pnStatus);
236229
}
237230

231+
Integer requestMessageCountThreshold = pubnub.getConfiguration().getRequestMessageCountThreshold();
232+
if (requestMessageCountThreshold != null && requestMessageCountThreshold == result.getMessages().size()) {
233+
PNStatus pnStatus = createPublicStatus(status)
234+
.category(PNStatusCategory.PNRequestMessageCountExceededCategory)
235+
.error(false)
236+
.build();
237+
238+
listenerManager.announce(pnStatus);
239+
}
240+
238241
if (result.getMessages().size() != 0) {
239242
messageQueue.addAll(result.getMessages());
240243
}
@@ -308,4 +311,15 @@ public final synchronized void unsubscribeAll() {
308311
.channels(subscriptionState.prepareChannelList(false))
309312
.build());
310313
}
314+
315+
private PNStatus.PNStatusBuilder createPublicStatus(PNStatus privateStatus) {
316+
return PNStatus.builder()
317+
.statusCode(privateStatus.getStatusCode())
318+
.authKey(privateStatus.getAuthKey())
319+
.operation(privateStatus.getOperation())
320+
.clientRequest(privateStatus.getClientRequest())
321+
.origin(privateStatus.getOrigin())
322+
.tlsEnabled(privateStatus.isTlsEnabled());
323+
}
324+
311325
}

src/test/java/com/pubnub/api/managers/SubscriptionManagerTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,36 @@ public void presence(PubNub pubnub, PNPresenceEventResult presence) {
140140

141141
}
142142

143+
@Test
144+
public void testQueueNotificationsBuilder() {
145+
pubnub.getConfiguration().setRequestMessageCountThreshold(1);
146+
final AtomicBoolean gotStatus = new AtomicBoolean();
147+
stubFor(get(urlPathEqualTo("/v2/subscribe/mySubscribeKey/ch2,ch1/0"))
148+
.willReturn(aResponse().withBody("{\"t\":{\"t\":\"14607577960932487\",\"r\":1},\"m\":[{\"a\":\"4\",\"f\":0,\"i\":\"Client-g5d4g\",\"p\":{\"t\":\"14607577960925503\",\"r\":1},\"o\":{\"t\":\"14737141991877032\",\"r\":2},\"k\":\"sub-c-4cec9f8e-01fa-11e6-8180-0619f8945a4f\",\"c\":\"coolChannel\",\"d\":{\"text\":\"Message\"},\"b\":\"coolChannel\"}]}")));
149+
150+
pubnub.addListener(new SubscribeCallback() {
151+
@Override
152+
public void status(PubNub pubnub, PNStatus status) {
153+
if (status.getCategory() == PNStatusCategory.PNRequestMessageCountExceededCategory) {
154+
gotStatus.set(true);
155+
}
156+
}
157+
158+
@Override
159+
public void message(PubNub pubnub, PNMessageResult message) {
160+
}
161+
162+
@Override
163+
public void presence(PubNub pubnub, PNPresenceEventResult presence) {
164+
}
165+
});
166+
167+
168+
pubnub.subscribe().channels(Arrays.asList("ch1", "ch2")).execute();
169+
170+
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAtomic(gotStatus, org.hamcrest.core.IsEqual.equalTo(true));
171+
}
172+
143173
@Test
144174
public void testSubscribeBuilderWithAccessManager403Error() {
145175
final AtomicInteger gotStatus = new AtomicInteger();

0 commit comments

Comments
 (0)