-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Async Execution Strategy #242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f4fe81e
f650aae
bd3818e
9874437
97fc494
7a5addb
b375ffe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| package graphql; | ||
|
|
||
| import graphql.execution.ExecutionStrategy; | ||
| import graphql.schema.GraphQLSchema; | ||
|
|
||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.concurrent.CompletionStage; | ||
|
|
||
| import static java.util.concurrent.CompletableFuture.completedFuture; | ||
|
|
||
| public class GraphQLAsync extends GraphQL { | ||
|
|
||
| public GraphQLAsync(GraphQLSchema graphQLSchema) { | ||
| super(graphQLSchema); | ||
| } | ||
|
|
||
| public GraphQLAsync(GraphQLSchema graphQLSchema, ExecutionStrategy queryStrategy) { | ||
| super(graphQLSchema, queryStrategy); | ||
| } | ||
|
|
||
| public GraphQLAsync(GraphQLSchema graphQLSchema, ExecutionStrategy queryStrategy, ExecutionStrategy mutationStrategy) { | ||
| super(graphQLSchema, queryStrategy, mutationStrategy); | ||
| } | ||
|
|
||
| public CompletionStage<ExecutionResult> executeAsync(String requestString) { | ||
| return executeAsync(requestString, null); | ||
|
|
||
| } | ||
|
|
||
| public CompletionStage<ExecutionResult> executeAsync(String requestString, Object context) { | ||
| return executeAsync(requestString, context, Collections.emptyMap()); | ||
|
|
||
| } | ||
|
|
||
| public CompletionStage<ExecutionResult> executeAsync(String requestString, String operationName, Object context) { | ||
| return executeAsync(requestString, operationName, context, Collections.emptyMap()); | ||
|
|
||
| } | ||
|
|
||
| public CompletionStage<ExecutionResult> executeAsync(String requestString, Object context, Map<String, Object> arguments) { | ||
| return executeAsync(requestString, null, context, arguments); | ||
|
|
||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| public CompletionStage<ExecutionResult> executeAsync(String requestString, String operationName, Object context, Map<String, Object> arguments) { | ||
| ExecutionResult result = execute(requestString, operationName, context, arguments); | ||
| Object data1 = result.getData(); | ||
| if (data1 instanceof CompletionStage) { | ||
| return ((CompletionStage<Map<String, Object>>) data1).thenApply(data -> new ExecutionResultImpl(data, result.getErrors())); | ||
| } | ||
| return completedFuture(result); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,20 +6,18 @@ | |
| import graphql.language.OperationDefinition; | ||
| import graphql.schema.GraphQLSchema; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.LinkedHashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.*; | ||
|
|
||
| public class ExecutionContext { | ||
|
|
||
| private GraphQLSchema graphQLSchema; | ||
| private ExecutionStrategy executionStrategy; | ||
| private ExecutionStrategy queryStrategy; | ||
| private ExecutionStrategy mutationStrategy; | ||
| private Map<String, FragmentDefinition> fragmentsByName = new LinkedHashMap<String, FragmentDefinition>(); | ||
| private OperationDefinition operationDefinition; | ||
| private Map<String, Object> variables = new LinkedHashMap<String, Object>(); | ||
| private Object root; | ||
| private List<GraphQLError> errors = new ArrayList<GraphQLError>(); | ||
| private List<GraphQLError> errors = Collections.synchronizedList(new ArrayList<GraphQLError>()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CopyOnWriteArrayList is pretty efficient list
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure Collections.synchronizedList() is good here anyway. Thanks, I'll check out CopyOnWriteArrayList |
||
|
|
||
| public GraphQLSchema getGraphQLSchema() { | ||
| return graphQLSchema; | ||
|
|
@@ -73,11 +71,19 @@ public List<GraphQLError> getErrors() { | |
| return errors; | ||
| } | ||
|
|
||
| public ExecutionStrategy getExecutionStrategy() { | ||
| return executionStrategy; | ||
| public ExecutionStrategy getQueryStrategy() { | ||
| return queryStrategy; | ||
| } | ||
|
|
||
| public void setExecutionStrategy(ExecutionStrategy executionStrategy) { | ||
| this.executionStrategy = executionStrategy; | ||
| public void setQueryStrategy(ExecutionStrategy queryStrategy) { | ||
| this.queryStrategy = queryStrategy; | ||
| } | ||
|
|
||
| public ExecutionStrategy getMutationStrategy() { | ||
| return mutationStrategy; | ||
| } | ||
|
|
||
| public void setMutationStrategy(ExecutionStrategy mutationStrategy) { | ||
| this.mutationStrategy = mutationStrategy; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| package graphql.execution.async; | ||
|
|
||
| import graphql.ExceptionWhileDataFetching; | ||
| import graphql.ExecutionResult; | ||
| import graphql.GraphQLError; | ||
| import graphql.execution.ExecutionContext; | ||
| import graphql.execution.ExecutionStrategy; | ||
| import graphql.language.Field; | ||
| import graphql.schema.*; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionStage; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import static com.spotify.futures.CompletableFutures.successfulAsList; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you really want Spotify as a dependency. I thought graphql-java was trying to be as lightweight as possible
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's just in there while I work this out. That artifact is tiny (50 LOC?) and can be copy/pasted into this project license-permitting.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for taking a look! |
||
| import static java.util.concurrent.CompletableFuture.completedFuture; | ||
| import static java.util.stream.Collectors.toList; | ||
| import static java.util.stream.Collectors.toMap; | ||
|
|
||
|
|
||
| public final class AsyncExecutionStrategy extends ExecutionStrategy { | ||
|
|
||
| public static AsyncExecutionStrategy serial() { | ||
| return new AsyncExecutionStrategy(true); | ||
| } | ||
|
|
||
| public static AsyncExecutionStrategy parallel() { | ||
| return new AsyncExecutionStrategy(false); | ||
| } | ||
|
|
||
| private static final Logger log = LoggerFactory.getLogger(AsyncExecutionStrategy.class); | ||
|
|
||
| private final boolean serial; | ||
|
|
||
| private AsyncExecutionStrategy(boolean serial) { | ||
| this.serial = serial; | ||
| } | ||
|
|
||
| @Override | ||
| public ExecutionResult execute(ExecutionContext executionContext, GraphQLObjectType parentType, Object source, Map<String, List<Field>> fields) { | ||
| Map<String, Supplier<CompletionStage<ExecutionResult>>> fieldResolvers = fields.entrySet() | ||
| .stream() | ||
| .collect(toMap(Map.Entry::getKey, entry -> () -> resolveFieldAsync(executionContext, parentType, source, entry.getValue()))); | ||
|
|
||
| AsyncFieldsCoordinator coordinator = new AsyncFieldsCoordinator(fieldResolvers); | ||
|
|
||
| CompletionStage<Map<String, Object>> data = serial ? coordinator.executeSerially() : coordinator.executeParallelly(); | ||
| return new ExecutionResultImpl(data, executionContext.getErrors()); | ||
| } | ||
|
|
||
| private CompletionStage<ExecutionResult> resolveFieldAsync(ExecutionContext executionContext, GraphQLObjectType parentType, Object source, List<Field> fieldList) { | ||
| GraphQLFieldDefinition fieldDef = getFieldDef(executionContext.getGraphQLSchema(), parentType, fieldList.get(0)); | ||
| GraphQLOutputType fieldType = fieldDef.getType(); | ||
|
|
||
| DataFetchingEnvironment env = new DataFetchingEnvironment( | ||
| source, | ||
| valuesResolver.getArgumentValues(fieldDef.getArguments(), fieldList.get(0).getArguments(), executionContext.getVariables()), | ||
| executionContext.getRoot(), | ||
| fieldList, | ||
| fieldDef.getType(), | ||
| parentType, | ||
| executionContext.getGraphQLSchema() | ||
| ); | ||
|
|
||
| try { | ||
| Object obj1 = fieldDef.getDataFetcher().get(env); | ||
| if (obj1 instanceof CompletionStage) { | ||
| return ((CompletionStage<?>) obj1) | ||
| .exceptionally(e -> { | ||
| logExceptionWhileFetching(e, fieldList.get(0)); | ||
| executionContext.addError(new ExceptionWhileDataFetching(e)); | ||
| return null; | ||
| }) | ||
| .thenCompose(obj2 -> completeValueAsync(executionContext, fieldType, fieldList, obj2)); | ||
| } else { | ||
| return completeValueAsync(executionContext, fieldType, fieldList, obj1); | ||
| } | ||
| } catch (Exception e) { | ||
| logExceptionWhileFetching(e, fieldList.get(0)); | ||
| executionContext.addError(new ExceptionWhileDataFetching(e)); | ||
| return completedFuture(new ExecutionResultImpl(null, null)); | ||
| } | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private CompletionStage<ExecutionResult> completeValueAsync(ExecutionContext executionContext, GraphQLType fieldType, List<Field> fieldList, Object result) { | ||
| if (fieldType instanceof GraphQLList) { | ||
| List<Object> collect = ((List<Object>) result); | ||
|
|
||
| List<CompletionStage<ExecutionResult>> collect1 = collect.stream() | ||
| .map(object -> { | ||
| CompletionStage<Object> stage = object instanceof CompletionStage ? (CompletionStage<Object>) object : completedFuture(object); | ||
| return stage | ||
| .thenCompose(result5 -> completeValueAsync(executionContext, ((GraphQLList) fieldType).getWrappedType(), fieldList, result5)); | ||
| }) | ||
| .collect(toList()); | ||
|
|
||
| return successfulAsList(collect1, t -> null).thenApply(results -> { | ||
| List<Object> list = new ArrayList<>(); | ||
| List<GraphQLError> errors = new ArrayList<>(); | ||
| for (ExecutionResult executionResult : results) { | ||
| list.add(executionResult.getData()); | ||
| errors.addAll(executionResult.getErrors()); | ||
| } | ||
| return new ExecutionResultImpl(list, errors); | ||
| }); | ||
|
|
||
| } else { | ||
| ExecutionResult completed = completeValue(executionContext, fieldType, fieldList, result); | ||
| // Happens when the data fetcher returns null for nullable field | ||
| if (completed == null) { | ||
| return completedFuture(new ExecutionResultImpl(null, null)); | ||
| } | ||
| if (!(completed.getData() instanceof CompletionStage)) { | ||
| return completedFuture(completed); | ||
| } | ||
| return ((CompletionStage<?>) completed.getData()) | ||
| .thenApply(data -> new ExecutionResultImpl(data, completed.getErrors())); | ||
| } | ||
| } | ||
|
|
||
| private void logExceptionWhileFetching(Throwable e, Field field) { | ||
| log.debug("Exception while fetching data for field {}", field.getName(), e); | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The GraphQL spec says that mutations WILL ALWAYS be serial. So why allow people to wire in anything BUT a serial implementation?
I guess it makes it more configureable than today but you have no way of knowing that they are following spec or not.
Maybe its a case of "dont shoot yourself" by wiring in a asynch mutation execution strategy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My serial implementation is async. The current serial implementation is not async. Both are serial though.