Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;

/**
* This uses the reactive streams TCK to test that our CompletionStageMappingPublisher meets spec
* when it's got CFs that complete at different times
* when it's got CFs that complete at different times.
* <p>
* Uses a shared single-thread executor per publisher so CFs complete sequentially — see
* CompletionStageMappingOrderedPublisherTckVerificationTest for details on why.
*/
@Test
public class CompletionStageMappingOrderedPublisherRandomCompleteTckVerificationTest extends PublisherVerification<String> {
Expand Down Expand Up @@ -50,6 +54,7 @@ public boolean skipStochasticTests() {

@NonNull
private static Function<Integer, CompletionStage<String>> mapperFunc() {
ExecutorService executor = Executors.newSingleThreadExecutor();
return i -> CompletableFuture.supplyAsync(() -> {
int ms = rand(0, 5);
try {
Expand All @@ -58,7 +63,7 @@ private static Function<Integer, CompletionStage<String>> mapperFunc() {
throw new RuntimeException(e);
}
return i + "!";
}, Executors.newSingleThreadExecutor());
}, executor);
}

static Random rn = new Random();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;

/**
* This uses the reactive streams TCK to test that our CompletionStageMappingPublisher meets spec
* when it's got CFs that complete off thread
* when it's got CFs that complete off thread.
* <p>
* Uses a shared single-thread executor per publisher so CFs complete sequentially.
* The ordered subscriber drains completed CFs in a while loop — with concurrent executors,
* multiple CFs can complete before the drain starts, causing multiple onNext calls on the
* same thread which the TCK flags as a spec303 (unbounded recursion) violation.
*/
@Test
public class CompletionStageMappingOrderedPublisherTckVerificationTest extends PublisherVerification<String> {
Expand All @@ -32,14 +38,16 @@ public long maxElementsFromPublisher() {
@Override
public Publisher<String> createPublisher(long elements) {
Publisher<Integer> publisher = Flowable.range(0, (int) elements);
Function<Integer, CompletionStage<String>> mapper = i -> CompletableFuture.supplyAsync(() -> i + "!", Executors.newSingleThreadExecutor());
ExecutorService executor = Executors.newSingleThreadExecutor();
Function<Integer, CompletionStage<String>> mapper = i -> CompletableFuture.supplyAsync(() -> i + "!", executor);
return new CompletionStageMappingOrderedPublisher<>(publisher, mapper);
}

@Override
public Publisher<String> createFailedPublisher() {
Publisher<Integer> publisher = Flowable.error(() -> new RuntimeException("Bang"));
Function<Integer, CompletionStage<String>> mapper = i -> CompletableFuture.supplyAsync(() -> i + "!", Executors.newSingleThreadExecutor());
ExecutorService executor = Executors.newSingleThreadExecutor();
Function<Integer, CompletionStage<String>> mapper = i -> CompletableFuture.supplyAsync(() -> i + "!", executor);
return new CompletionStageMappingOrderedPublisher<>(publisher, mapper);
}

Expand Down