Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 1 addition & 22 deletions src/main/java/graphql/ExecutionInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class ExecutionInput {
private final DataLoaderRegistry dataLoaderRegistry;
private final ExecutionId executionId;
private final Locale locale;
// this is currently not used but we want it back soon after the v23 release
private final AtomicBoolean cancelled;


Expand Down Expand Up @@ -141,28 +142,6 @@ public Map<String, Object> getExtensions() {
return extensions;
}


/**
* The graphql engine will check this frequently and if that is true, it will
* throw a {@link graphql.execution.AbortExecutionException} to cancel the execution.
* <p>
* This is a cooperative cancellation. Some asynchronous data fetching code may still continue to
* run but there will be no more efforts run future field fetches say.
*
* @return true if the execution should be cancelled
*/
public boolean isCancelled() {
return cancelled.get();
}

/**
* This can be called to cancel the graphql execution. Remember this is a cooperative cancellation
* and the graphql engine needs to be running on a thread to allow is to respect this flag.
*/
public void cancel() {
cancelled.set(true);
}

/**
* This helps you transform the current ExecutionInput object into another one by starting a builder with all
* the current values and allows you to transform it how you want.
Expand Down
7 changes: 0 additions & 7 deletions src/main/java/graphql/execution/EngineRunningObserver.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package graphql.execution;

import graphql.ExecutionInput;
import graphql.ExperimentalApi;
import graphql.GraphQLContext;
import org.jspecify.annotations.NullMarked;
Expand All @@ -9,8 +8,6 @@
* This class lets you observe the running state of the graphql-java engine. As it processes and dispatches graphql fields,
* the engine moves in and out of a running and not running state. As it does this, the callback is called with information telling you the current
* state.
* <p>
* If the engine is cancelled via {@link ExecutionInput#cancel()} then the observer will also be called to indicate that.
*/
@ExperimentalApi
@NullMarked
Expand All @@ -25,10 +22,6 @@ enum RunningState {
* Represents that the engine code is asynchronously waiting for fetching to happen
*/
NOT_RUNNING,
/**
* Represents that the engine code has been cancelled via {@link ExecutionInput#cancel()}
*/
CANCELLED
}


Expand Down
22 changes: 0 additions & 22 deletions src/main/java/graphql/execution/ExecutionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.function.Supplier;

import static graphql.Assert.assertTrue;
import static graphql.execution.EngineRunningObserver.RunningState.CANCELLED;
import static graphql.execution.EngineRunningObserver.RunningState.NOT_RUNNING;
import static graphql.execution.EngineRunningObserver.RunningState.RUNNING;

Expand Down Expand Up @@ -378,15 +377,13 @@ public boolean isRunning() {
}

private void incrementRunning(Throwable throwable) {
checkIsCancelled(throwable);
assertTrue(isRunning.get() >= 0);
if (isRunning.incrementAndGet() == 1) {
changeOfState(RUNNING);
}
}

private void decrementRunning(Throwable throwable) {
checkIsCancelled(throwable);
assertTrue(isRunning.get() > 0);
if (isRunning.decrementAndGet() == 0) {
changeOfState(NOT_RUNNING);
Expand Down Expand Up @@ -438,25 +435,6 @@ public void run(Throwable throwable, Runnable runnable) {
}
}

private void checkIsCancelled(Throwable currentThrowable) {
// no need to check we are cancelled if we already have an exception in play
// since it can lead to an exception being thrown when an exception has already been
// thrown
if (currentThrowable == null) {
checkIsCancelled();
}
}

/**
* This will abort the execution via {@link AbortExecutionException} if the {@link ExecutionInput} has been cancelled
*/
private void checkIsCancelled() {
if (executionInput.isCancelled()) {
changeOfState(CANCELLED);
throw new AbortExecutionException("Execution has been asked to be cancelled");
}
}

private void changeOfState(RunningState runningState) {
if (engineRunningObserver != null) {
engineRunningObserver.runningStateChanged(executionId, graphQLContext, runningState);
Expand Down
67 changes: 0 additions & 67 deletions src/test/groovy/graphql/ExecutionInputTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -167,71 +167,4 @@ class ExecutionInputTest extends Specification {
er.errors.isEmpty()
er.data["fetch"] == "{locale=German, executionId=ID123, graphqlContext=b}"
}

def "can cancel the execution"() {
def sdl = '''
type Query {
fetch1 : Inner
fetch2 : Inner
}

type Inner {
f : String
}

'''

CountDownLatch fieldLatch = new CountDownLatch(1)

DataFetcher df1Sec = { DataFetchingEnvironment env ->
println("Entering DF1")
return CompletableFuture.supplyAsync {
println("DF1 async run")
fieldLatch.await()
Thread.sleep(1000)
return [f: "x"]
}
}
DataFetcher df10Sec = { DataFetchingEnvironment env ->
println("Entering DF10")
return CompletableFuture.supplyAsync {
println("DF10 async run")
fieldLatch.await()
Thread.sleep(10000)
return "x"
}
}

def fetcherMap = ["Query": ["fetch1": df1Sec, "fetch2": df1Sec],
"Inner": ["f": df10Sec]
]
def schema = TestUtil.schema(sdl, fetcherMap)
def graphQL = GraphQL.newGraphQL(schema).build()

when:
ExecutionInput executionInput = ExecutionInput.newExecutionInput()
.query("query q { fetch1 { f } fetch2 { f } }")
.build()

def cf = graphQL.executeAsync(executionInput)

Thread.sleep(250) // let it get into the field fetching say

// lets cancel it
println("cancelling")
executionInput.cancel()

// let the DFs run
println("make the fields run")
fieldLatch.countDown()

println("and await for the overall CF to complete")
Awaitility.await().atMost(Duration.ofSeconds(60)).until({ -> cf.isDone() })

def er = cf.join()

then:
!er.errors.isEmpty()
er.errors[0]["message"] == "Execution has been asked to be cancelled"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -710,49 +710,6 @@ class SubscriptionExecutionStrategyTest extends Specification {
}
}

def "we can cancel the operation and the upstream publisher is told"() {
List<Runnable> promises = []
RxJavaMessagePublisher publisher = new RxJavaMessagePublisher(10)

DataFetcher newMessageDF = { env -> return publisher }
DataFetcher senderDF = dfThatDoesNotComplete("sender", promises)
DataFetcher textDF = PropertyDataFetcher.fetching("text")

GraphQL graphQL = buildSubscriptionQL(newMessageDF, senderDF, textDF)

def executionInput = ExecutionInput.newExecutionInput().query("""
subscription NewMessages {
newMessage(roomId: 123) {
sender
text
}
}
""").graphQLContext([(SubscriptionExecutionStrategy.KEEP_SUBSCRIPTION_EVENTS_ORDERED): true]).build()

def executionResult = graphQL.execute(executionInput)

when:
Publisher<ExecutionResult> msgStream = executionResult.getData()
def capturingSubscriber = new CapturingSubscriber<ExecutionResult>(1)
msgStream.subscribe(capturingSubscriber)

// now cancel the operation
executionInput.cancel()

// make things over the subscription
promises.forEach {it.run()}


then:
Awaitility.await().untilTrue(capturingSubscriber.isDone())

def messages = capturingSubscriber.events
messages.size() == 1
def error = messages[0].errors[0]
assert error.message == "Execution has been asked to be cancelled"
publisher.counter == 2
}

private static DataFetcher<?> dfThatDoesNotComplete(String propertyName, List<Runnable> promises) {
{ env ->
def df = PropertyDataFetcher.fetching(propertyName)
Expand Down
Loading