Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
Expand All @@ -32,10 +35,21 @@ public class BatchCompareDataFetchers {

AtomicBoolean useAsyncBatchLoading = new AtomicBoolean(false);

private volatile CountDownLatch rootFetcherRendezvous;
private volatile CountDownLatch completionOverlapLatch;
private final AtomicBoolean shopsOverlapSignaled = new AtomicBoolean(false);
private final AtomicBoolean exShopsOverlapSignaled = new AtomicBoolean(false);
private final ExecutorService executor = Executors.newFixedThreadPool(4);

public void useAsyncBatchLoading(boolean flag) {
useAsyncBatchLoading.set(flag);
}

public void useSynchronizedFetching(int numberOfRootFetchers) {
rootFetcherRendezvous = new CountDownLatch(numberOfRootFetchers);
completionOverlapLatch = new CountDownLatch(numberOfRootFetchers);
}


private static final Map<String, Shop> shops = new LinkedHashMap<>();
private static final Map<String, Shop> expensiveShops = new LinkedHashMap<>();
Expand All @@ -52,10 +66,10 @@ public void useAsyncBatchLoading(boolean flag) {


public DataFetcher<CompletableFuture<List<Shop>>> shopsDataFetcher =
environment -> supplyAsyncWithSleep(() -> new ArrayList<>(shops.values()));
environment -> supplyAsyncWithRendezvous(() -> new ArrayList<>(shops.values()));

public DataFetcher<CompletableFuture<List<Shop>>> expensiveShopsDataFetcher = environment ->
supplyAsyncWithSleep(() -> new ArrayList<>(expensiveShops.values()));
supplyAsyncWithRendezvous(() -> new ArrayList<>(expensiveShops.values()));

// Departments
private static Map<String, Department> departments = new LinkedHashMap<>();
Expand Down Expand Up @@ -101,6 +115,21 @@ private static List<List<Department>> getDepartmentsForShops(List<Shop> shops) {

public DataFetcher<CompletableFuture<List<Department>>> departmentsForShopDataLoaderDataFetcher = environment -> {
Shop shop = environment.getSource();
// When synchronized fetching is enabled, ensure both root fields (shops and expensiveShops)
// are inside their startComplete/stopComplete window before either proceeds.
// This guarantees objectRunningCount never drops to 0 prematurely.
CountDownLatch overlapLatch = completionOverlapLatch;
if (overlapLatch != null) {
AtomicBoolean flag = shop.getId().startsWith("ex") ? exShopsOverlapSignaled : shopsOverlapSignaled;
if (flag.compareAndSet(false, true)) {
overlapLatch.countDown();
try {
overlapLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
return (CompletableFuture) environment.getDataLoader("departments").load(shop.getId());
};

Expand Down Expand Up @@ -149,6 +178,22 @@ private <T> CompletableFuture<T> maybeAsyncWithSleep(Supplier<CompletableFuture<
}
}

private <T> CompletableFuture<T> supplyAsyncWithRendezvous(Supplier<T> supplier) {
CountDownLatch latch = rootFetcherRendezvous;
if (latch != null) {
return CompletableFuture.supplyAsync(() -> {
try {
latch.countDown();
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return supplier.get();
}, executor);
}
return supplyAsyncWithSleep(supplier);
}

private static <T> CompletableFuture<T> supplyAsyncWithSleep(Supplier<T> supplier) {
Supplier<T> sleepSome = sleepSome(supplier);
return CompletableFuture.supplyAsync(sleepSome);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class DataLoaderPerformanceTest extends Specification {

when:

batchCompareDataFetchers.useSynchronizedFetching(2)

ExecutionInput executionInput = ExecutionInput.newExecutionInput()
.query(getExpensiveQuery(false))
.dataLoaderRegistry(dataLoaderRegistry)
Expand All @@ -71,8 +73,8 @@ class DataLoaderPerformanceTest extends Specification {
then:
result.data == expectedExpensiveData

batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() <= 2
batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() <= 2
batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() == 1
batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 1

where:
incrementalSupport | contextKey
Expand Down Expand Up @@ -123,6 +125,7 @@ class DataLoaderPerformanceTest extends Specification {
when:

batchCompareDataFetchers.useAsyncBatchLoading(true)
batchCompareDataFetchers.useSynchronizedFetching(2)

ExecutionInput executionInput = ExecutionInput.newExecutionInput()
.query(getExpensiveQuery(false))
Expand All @@ -136,8 +139,8 @@ class DataLoaderPerformanceTest extends Specification {
then:
result.data == expectedExpensiveData

batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() <= 2
batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() <= 2
batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() == 1
batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 1

where:
incrementalSupport | contextKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package graphql.execution.instrumentation.dataloader
import graphql.ExecutionInput
import graphql.GraphQL
import org.dataloader.DataLoaderRegistry
import spock.lang.Ignore
import spock.lang.Specification

import static graphql.ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT
Expand Down Expand Up @@ -49,11 +48,12 @@ class DataLoaderPerformanceWithChainedInstrumentationTest extends Specification
incrementalSupport << [true, false]
}

@Ignore("This test flakes on Travis for some reason. Clearly this indicates some sort of problem to investigate. However it also stop releases.")
def "chainedInstrumentation: 970 ensure data loader is performant for multiple field with lists"() {

when:

batchCompareDataFetchers.useSynchronizedFetching(2)

ExecutionInput executionInput = ExecutionInput.newExecutionInput()
.query(getExpensiveQuery(false))
.dataLoaderRegistry(dataLoaderRegistry)
Expand Down Expand Up @@ -101,6 +101,7 @@ class DataLoaderPerformanceWithChainedInstrumentationTest extends Specification
when:

batchCompareDataFetchers.useAsyncBatchLoading(true)
batchCompareDataFetchers.useSynchronizedFetching(2)

ExecutionInput executionInput = ExecutionInput.newExecutionInput()
.query(getExpensiveQuery(false))
Expand All @@ -112,8 +113,8 @@ class DataLoaderPerformanceWithChainedInstrumentationTest extends Specification
then:
result.data == expectedExpensiveData

batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() <= 2
batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() <= 2
batchCompareDataFetchers.departmentsForShopsBatchLoaderCounter.get() == 1
batchCompareDataFetchers.productsForDepartmentsBatchLoaderCounter.get() == 1

where:
incrementalSupport << [true, false]
Expand Down