-
Notifications
You must be signed in to change notification settings - Fork 1.1k
AsynchronousExecutionStrategy without changing the interface of GraphQL #481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,223 @@ | ||
| package graphql.execution; | ||
|
|
||
| import graphql.ExecutionResult; | ||
| import graphql.ExecutionResultImpl; | ||
| import graphql.execution.instrumentation.Instrumentation; | ||
| import graphql.execution.instrumentation.InstrumentationContext; | ||
| import graphql.execution.instrumentation.parameters.FieldFetchParameters; | ||
| import graphql.execution.instrumentation.parameters.FieldParameters; | ||
| import graphql.language.Field; | ||
| import graphql.schema.*; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.util.*; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.CompletionStage; | ||
| import java.util.concurrent.ExecutionException; | ||
|
|
||
| import static graphql.execution.FieldCollectorParameters.newParameters; | ||
| import static graphql.execution.TypeInfo.newTypeInfo; | ||
|
|
||
| public class AsynchronousExecutionStrategy extends ExecutionStrategy { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is an aside to this PR btw. There is a lot of repeated code in here from the base ExecutionStrategy. Perhaps we should make a ExecutionStrategyHelper and use composition instead of inheritance to allow better reuse?? Hard problem I know |
||
|
|
||
| private static final Logger log = LoggerFactory.getLogger(AsynchronousExecutionStrategy.class); | ||
|
|
||
| @Override | ||
| public ExecutionResult execute(ExecutionContext executionContext, | ||
| ExecutionParameters parameters) throws NonNullableFieldWasNullException { | ||
|
|
||
| Map<String, List<Field>> fields = parameters.fields(); | ||
| Map<String,Object> results = Collections.synchronizedMap(new HashMap<>()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ConcurrentHashMap is a better choice than synchronizedMap |
||
| CompletionStage<Void> future = CompletableFuture.completedFuture(null); | ||
|
|
||
| for (String fieldName : fields.keySet()) { | ||
| final List<Field> fieldList = fields.get(fieldName); | ||
| CompletionStage<ExecutionResult> resolveFieldFuture = | ||
| resolveFieldAsync(executionContext, parameters, fieldList). | ||
| thenApplyAsync(executionResult -> { | ||
| if (executionResult != null) { | ||
| results.put(fieldName, executionResult.getData()); | ||
| } else { | ||
| results.put(fieldName, null); | ||
| } | ||
| return executionResult; | ||
| }); | ||
| future = future.thenCombineAsync(resolveFieldFuture,(t,executionResult)-> t); | ||
| } | ||
|
|
||
| return new ExecutionResultImpl(future.thenApplyAsync(t -> results),executionContext.getErrors()); | ||
| } | ||
|
|
||
| protected CompletionStage<ExecutionResult> resolveFieldAsync(ExecutionContext executionContext, ExecutionParameters parameters, List<Field> fields) { | ||
| GraphQLObjectType type = parameters.typeInfo().castType(GraphQLObjectType.class); | ||
| GraphQLFieldDefinition | ||
| fieldDef = getFieldDef(executionContext.getGraphQLSchema(), type, fields.get(0)); | ||
|
|
||
| Map<String, Object> argumentValues = valuesResolver.getArgumentValues(fieldDef.getArguments(), fields.get(0).getArguments(), executionContext.getVariables()); | ||
|
|
||
| GraphQLOutputType fieldType = fieldDef.getType(); | ||
| DataFetchingFieldSelectionSet fieldCollector = DataFetchingFieldSelectionSetImpl | ||
| .newCollector(executionContext, fieldType, fields); | ||
|
|
||
| DataFetchingEnvironment environment = new DataFetchingEnvironmentImpl( | ||
| parameters.source(), | ||
| argumentValues, | ||
| executionContext.getRoot(), | ||
| fields, | ||
| fieldType, | ||
| type, | ||
| executionContext.getGraphQLSchema(), | ||
| executionContext.getFragmentsByName(), | ||
| executionContext.getExecutionId(), | ||
| fieldCollector); | ||
|
|
||
| Instrumentation instrumentation = executionContext.getInstrumentation(); | ||
|
|
||
| InstrumentationContext<ExecutionResult> | ||
| fieldCtx = instrumentation.beginField(new FieldParameters(executionContext, fieldDef, environment)); | ||
|
|
||
| InstrumentationContext<Object> fetchCtx = instrumentation.beginFieldFetch(new FieldFetchParameters(executionContext, fieldDef, environment)); | ||
| Object resolvedValue = null; | ||
|
|
||
| CompletableFuture<Object> dataFetcherResult = null; | ||
| try { | ||
| resolvedValue = fieldDef.getDataFetcher().get(environment); | ||
|
|
||
| if(resolvedValue instanceof CompletionStage) { | ||
| dataFetcherResult = (CompletableFuture) resolvedValue; | ||
| } else { | ||
| dataFetcherResult = CompletableFuture.completedFuture(resolvedValue); | ||
| } | ||
| } catch (Exception e) { | ||
| log.warn("Exception while fetching data", e); | ||
| dataFetcherResult = new CompletableFuture(); | ||
| dataFetcherResult.completeExceptionally(e); | ||
| } | ||
|
|
||
| return dataFetcherResult.handleAsync((value,th)-> { | ||
| if(th != null) { | ||
| log.warn("Exception while fetching data", th); | ||
| handleDataFetchingException(executionContext, fieldDef, argumentValues, new ExecutionException(th)); | ||
| fetchCtx.onEnd(th); | ||
| } | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Instrumentation is not called back here and hence is lost
|
||
| TypeInfo fieldTypeInfo = newTypeInfo() | ||
| .type(fieldType) | ||
| .parentInfo(parameters.typeInfo()) | ||
| .build(); | ||
|
|
||
| ExecutionParameters newParameters = ExecutionParameters.newParameters() | ||
| .typeInfo(fieldTypeInfo) | ||
| .fields(parameters.fields()) | ||
| .arguments(argumentValues) | ||
| .source(value).build(); | ||
|
|
||
| return newParameters; | ||
| }).thenComposeAsync(newParameters -> completeValueAsync(executionContext, newParameters, | ||
| fields)); | ||
|
|
||
|
|
||
| } | ||
| protected CompletionStage<ExecutionResult> completeValueAsync(ExecutionContext executionContext, ExecutionParameters parameters, List<Field> fields) { | ||
| TypeInfo typeInfo = parameters.typeInfo(); | ||
| Object result = parameters.source(); | ||
| GraphQLType fieldType = parameters.typeInfo().type(); | ||
|
|
||
| if (result == null) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fyi - there are a few PRS in the pipe that will make this code break. But thats often always the case |
||
| if (typeInfo.typeIsNonNull()) { | ||
| // see http://facebook.github.io/graphql/#sec-Errors-and-Non-Nullability | ||
| NonNullableFieldWasNullException nonNullException = new NonNullableFieldWasNullException(typeInfo); | ||
| executionContext.addError(nonNullException); | ||
| throw nonNullException; | ||
| } | ||
| return CompletableFuture.completedFuture(null); | ||
| } else if (fieldType instanceof GraphQLList) { | ||
| return completeValueForListAsync(executionContext, parameters, fields, toIterable(result)); | ||
| } else if (fieldType instanceof GraphQLScalarType) { | ||
| return CompletableFuture.completedFuture(completeValueForScalar((GraphQLScalarType) fieldType, result)); | ||
| } else if (fieldType instanceof GraphQLEnumType) { | ||
| return CompletableFuture.completedFuture(completeValueForEnum((GraphQLEnumType) fieldType, result)); | ||
| } | ||
|
|
||
|
|
||
| GraphQLObjectType resolvedType; | ||
| if (fieldType instanceof GraphQLInterfaceType) { | ||
| TypeResolutionParameters resolutionParams = TypeResolutionParameters.newParameters() | ||
| .graphQLInterfaceType((GraphQLInterfaceType) fieldType) | ||
| .field(fields.get(0)) | ||
| .value(parameters.source()) | ||
| .argumentValues(parameters.arguments()) | ||
| .schema(executionContext.getGraphQLSchema()).build(); | ||
| resolvedType = resolveTypeForInterface(resolutionParams); | ||
|
|
||
| } else if (fieldType instanceof GraphQLUnionType) { | ||
| TypeResolutionParameters resolutionParams = TypeResolutionParameters.newParameters() | ||
| .graphQLUnionType((GraphQLUnionType) fieldType) | ||
| .field(fields.get(0)) | ||
| .value(parameters.source()) | ||
| .argumentValues(parameters.arguments()) | ||
| .schema(executionContext.getGraphQLSchema()).build(); | ||
| resolvedType = resolveTypeForUnion(resolutionParams); | ||
| } else { | ||
| resolvedType = (GraphQLObjectType) fieldType; | ||
| } | ||
|
|
||
| FieldCollectorParameters collectorParameters = newParameters(executionContext.getGraphQLSchema(), resolvedType) | ||
| .fragments(executionContext.getFragmentsByName()) | ||
| .variables(executionContext.getVariables()) | ||
| .build(); | ||
|
|
||
| Map<String, List<Field>> subFields = fieldCollector.collectFields(collectorParameters, fields); | ||
|
|
||
| ExecutionParameters newParameters = ExecutionParameters.newParameters() | ||
| .typeInfo(typeInfo.asType(resolvedType)) | ||
| .fields(subFields) | ||
| .source(result).build(); | ||
|
|
||
| // Calling this from the executionContext to ensure we shift back from mutation strategy to the query strategy. | ||
|
|
||
| ExecutionResult executionResult = executionContext.getQueryStrategy().execute(executionContext, newParameters); | ||
| if(!(executionResult.getData() instanceof CompletionStage)) { | ||
| return CompletableFuture.completedFuture(executionResult); | ||
| } else { | ||
| return ((CompletionStage) executionResult.getData()).handleAsync((resultMap,th) -> | ||
| new ExecutionResultImpl(resultMap,executionResult.getErrors()) | ||
| ); | ||
|
|
||
| } | ||
| } | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mostly pondering here. I wonder if we should make the complete value list truly async. Should it join back as one result at this point? Is there any disadvantage is leaving it purely async for many items in a list? Is there any expectation that is this point you have resolved the value so it can be sent onto future calls? |
||
| protected CompletionStage<ExecutionResult> completeValueForListAsync(ExecutionContext executionContext, ExecutionParameters parameters, List<Field> fields, Iterable<Object> result) { | ||
| TypeInfo typeInfo = parameters.typeInfo(); | ||
| GraphQLList fieldType = typeInfo.castType(GraphQLList.class); | ||
| List<Object> resultList = Collections.synchronizedList(new ArrayList<>()); | ||
| CompletionStage<Void> future = CompletableFuture.completedFuture(null); | ||
|
|
||
| for (Object item : result) { | ||
| ExecutionParameters newParameters = ExecutionParameters.newParameters() | ||
| .typeInfo(typeInfo.asType(fieldType.getWrappedType())) | ||
| .fields(parameters.fields()) | ||
| .source(item).build(); | ||
|
|
||
| CompletionStage<ExecutionResult> completedValueFuture = | ||
| completeValueAsync(executionContext, newParameters, fields); | ||
|
|
||
| future = future.thenCombineAsync(completedValueFuture, (t,executionResult) -> { | ||
| resultList.add(executionResult.getData()); | ||
| return null; | ||
| }); | ||
| } | ||
|
|
||
| return future.thenApplyAsync(t -> new ExecutionResultImpl(resultList,null)); | ||
| } | ||
|
|
||
| private Iterable<Object> toIterable(Object result) { | ||
| if (result.getClass().isArray()) { | ||
| result = Arrays.asList((Object[]) result); | ||
| } | ||
| //noinspection unchecked | ||
| return (Iterable<Object>) result; | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,130 @@ | ||
| package graphql.execution | ||
|
|
||
| import graphql.GraphQL | ||
| import graphql.schema.DataFetcher | ||
| import graphql.schema.GraphQLObjectType | ||
| import graphql.schema.GraphQLSchema | ||
| import spock.lang.Specification | ||
|
|
||
| import java.util.concurrent.CompletableFuture | ||
| import java.util.concurrent.CompletionStage | ||
|
|
||
| import static graphql.Scalars.GraphQLString | ||
| import static graphql.schema.GraphQLFieldDefinition.newFieldDefinition | ||
| import static graphql.schema.GraphQLObjectType.newObject | ||
|
|
||
| class AsynchronousExecutionStrategyTest extends Specification { | ||
|
|
||
| def "Example usage of AsynchronousExecutionStrategy."() { | ||
| given: | ||
|
|
||
| GraphQLObjectType queryType = newObject() | ||
| .name("data") | ||
| .field( | ||
| newFieldDefinition().type(GraphQLString).name("key1").dataFetcher({env -> CompletableFuture.completedFuture("value1")})) | ||
| .field( | ||
| newFieldDefinition().type(GraphQLString).name("key2").staticValue("value2")) | ||
| .build(); | ||
|
|
||
| GraphQLSchema schema = GraphQLSchema.newSchema() | ||
| .query(queryType) | ||
| .build(); | ||
|
|
||
| def expected = [key1:"value1",key2:"value2"] | ||
|
|
||
| when: | ||
| GraphQL graphQL = GraphQL.newGraphQL(schema) | ||
| .queryExecutionStrategy(new AsynchronousExecutionStrategy()) | ||
| .build(); | ||
|
|
||
| Map<String,Object> result = ((CompletionStage<Object>) graphQL.execute("{key1,key2}").data).toCompletableFuture().get(); | ||
|
|
||
| then: | ||
| assert expected == result; | ||
| } | ||
|
|
||
| def "Ensure the execution order." () { | ||
| given: | ||
| Timer timer = new Timer(); | ||
|
|
||
| DataFetcher<CompletionStage<String>> grandFetcher = { | ||
| env -> | ||
| CompletableFuture<Object> future = new CompletableFuture<>() | ||
| timer.schedule({_-> future.complete([field:"grandValue"]) },50) | ||
| return future | ||
| } | ||
|
|
||
| DataFetcher<CompletionStage<String>> parentFetcher = { | ||
| env -> | ||
| CompletableFuture<Object> future = new CompletableFuture<>() | ||
| timer.schedule({_-> future.complete([field:"parentValue"]) },20) | ||
| return future | ||
| } | ||
|
|
||
| DataFetcher<CompletionStage<String>> childFetcher = { | ||
| env -> | ||
| CompletableFuture<Object> future = new CompletableFuture<>() | ||
| timer.schedule({_-> future.complete([field:"childValue"]) },10) | ||
| return future | ||
| } | ||
|
|
||
| GraphQLObjectType childObjectType = newObject().name("ChildObject"). | ||
| field(newFieldDefinition().name("field").type(GraphQLString)). | ||
| build(); | ||
|
|
||
| GraphQLObjectType parentObjectType = newObject().name("ParentObject"). | ||
| field(newFieldDefinition().name("field").type(GraphQLString)). | ||
| field(newFieldDefinition().name("child").type(childObjectType).dataFetcher(childFetcher)). | ||
| build(); | ||
|
|
||
| GraphQLObjectType grandObjectType = newObject().name("GrandObject"). | ||
| field(newFieldDefinition().name("field").type(GraphQLString)). | ||
| field(newFieldDefinition().name("parent").type(parentObjectType).dataFetcher(parentFetcher)). | ||
| build(); | ||
|
|
||
| GraphQLObjectType rootObjectType = newObject().name("Root"). | ||
| field( | ||
| newFieldDefinition().name("grand").type(grandObjectType).dataFetcher(grandFetcher) | ||
| ).build(); | ||
|
|
||
| GraphQLSchema schema = GraphQLSchema.newSchema() | ||
| .query(rootObjectType) | ||
| .build(); | ||
| when: | ||
|
|
||
| GraphQL graphQL = GraphQL.newGraphQL(schema) | ||
| .queryExecutionStrategy(new AsynchronousExecutionStrategy()) | ||
| .build(); | ||
|
|
||
| String queryString = | ||
| """ | ||
| { | ||
| grand { | ||
| field | ||
| parent { | ||
| field | ||
| child { | ||
| field | ||
| } | ||
| } | ||
| } | ||
| } | ||
| """ | ||
| Map<String,Object> result = ((CompletionStage<Object>) graphQL.execute(queryString).data).toCompletableFuture().get(); | ||
|
|
||
| def expected = [ | ||
| grand:[ | ||
| field: "grandValue", | ||
| parent:[ | ||
| field:"parentValue", | ||
| child: [ | ||
| field: "childValue" | ||
| ] | ||
| ] | ||
| ] | ||
| ] | ||
|
|
||
| then: | ||
| assert result == expected | ||
| } | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we trying to say that the first things to be resolved (grand) is likely to be the last thing to be completed (50ms) and hence it come back in the right order.?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just trying to work out what the tests really proves |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we dont do * imports. Full imports please