Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 223 additions & 0 deletions src/main/java/graphql/execution/AsynchronousExecutionStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package graphql.execution;

import graphql.ExecutionResult;
import graphql.ExecutionResultImpl;
import graphql.execution.instrumentation.Instrumentation;
import graphql.execution.instrumentation.InstrumentationContext;
import graphql.execution.instrumentation.parameters.FieldFetchParameters;
import graphql.execution.instrumentation.parameters.FieldParameters;
import graphql.language.Field;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we dont do * imports. Full imports please

import graphql.schema.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

import static graphql.execution.FieldCollectorParameters.newParameters;
import static graphql.execution.TypeInfo.newTypeInfo;

public class AsynchronousExecutionStrategy extends ExecutionStrategy {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is an aside to this PR btw.

@andimarek

There is a lot of repeated code in here from the base ExecutionStrategy. Perhaps we should make a ExecutionStrategyHelper and use composition instead of inheritance to allow better reuse??

Hard problem I know


private static final Logger log = LoggerFactory.getLogger(AsynchronousExecutionStrategy.class);

@Override
public ExecutionResult execute(ExecutionContext executionContext,
ExecutionParameters parameters) throws NonNullableFieldWasNullException {

Map<String, List<Field>> fields = parameters.fields();
Map<String,Object> results = Collections.synchronizedMap(new HashMap<>());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentHashMap is a better choice than synchronizedMap

CompletionStage<Void> future = CompletableFuture.completedFuture(null);

for (String fieldName : fields.keySet()) {
final List<Field> fieldList = fields.get(fieldName);
CompletionStage<ExecutionResult> resolveFieldFuture =
resolveFieldAsync(executionContext, parameters, fieldList).
thenApplyAsync(executionResult -> {
if (executionResult != null) {
results.put(fieldName, executionResult.getData());
} else {
results.put(fieldName, null);
}
return executionResult;
});
future = future.thenCombineAsync(resolveFieldFuture,(t,executionResult)-> t);
}

return new ExecutionResultImpl(future.thenApplyAsync(t -> results),executionContext.getErrors());
}

protected CompletionStage<ExecutionResult> resolveFieldAsync(ExecutionContext executionContext, ExecutionParameters parameters, List<Field> fields) {
GraphQLObjectType type = parameters.typeInfo().castType(GraphQLObjectType.class);
GraphQLFieldDefinition
fieldDef = getFieldDef(executionContext.getGraphQLSchema(), type, fields.get(0));

Map<String, Object> argumentValues = valuesResolver.getArgumentValues(fieldDef.getArguments(), fields.get(0).getArguments(), executionContext.getVariables());

GraphQLOutputType fieldType = fieldDef.getType();
DataFetchingFieldSelectionSet fieldCollector = DataFetchingFieldSelectionSetImpl
.newCollector(executionContext, fieldType, fields);

DataFetchingEnvironment environment = new DataFetchingEnvironmentImpl(
parameters.source(),
argumentValues,
executionContext.getRoot(),
fields,
fieldType,
type,
executionContext.getGraphQLSchema(),
executionContext.getFragmentsByName(),
executionContext.getExecutionId(),
fieldCollector);

Instrumentation instrumentation = executionContext.getInstrumentation();

InstrumentationContext<ExecutionResult>
fieldCtx = instrumentation.beginField(new FieldParameters(executionContext, fieldDef, environment));

InstrumentationContext<Object> fetchCtx = instrumentation.beginFieldFetch(new FieldFetchParameters(executionContext, fieldDef, environment));
Object resolvedValue = null;

CompletableFuture<Object> dataFetcherResult = null;
try {
resolvedValue = fieldDef.getDataFetcher().get(environment);

if(resolvedValue instanceof CompletionStage) {
dataFetcherResult = (CompletableFuture) resolvedValue;
} else {
dataFetcherResult = CompletableFuture.completedFuture(resolvedValue);
}
} catch (Exception e) {
log.warn("Exception while fetching data", e);
dataFetcherResult = new CompletableFuture();
dataFetcherResult.completeExceptionally(e);
}

return dataFetcherResult.handleAsync((value,th)-> {
if(th != null) {
log.warn("Exception while fetching data", th);
handleDataFetchingException(executionContext, fieldDef, argumentValues, new ExecutionException(th));
fetchCtx.onEnd(th);
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Instrumentation is not called back here and hence is lost

        fetchCtx.onEnd(resolvedValue);
  • Add in intrumentation call

TypeInfo fieldTypeInfo = newTypeInfo()
.type(fieldType)
.parentInfo(parameters.typeInfo())
.build();

ExecutionParameters newParameters = ExecutionParameters.newParameters()
.typeInfo(fieldTypeInfo)
.fields(parameters.fields())
.arguments(argumentValues)
.source(value).build();

return newParameters;
}).thenComposeAsync(newParameters -> completeValueAsync(executionContext, newParameters,
fields));


}
protected CompletionStage<ExecutionResult> completeValueAsync(ExecutionContext executionContext, ExecutionParameters parameters, List<Field> fields) {
TypeInfo typeInfo = parameters.typeInfo();
Object result = parameters.source();
GraphQLType fieldType = parameters.typeInfo().type();

if (result == null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi - there are a few PRS in the pipe that will make this code break. But thats often always the case

if (typeInfo.typeIsNonNull()) {
// see http://facebook.github.io/graphql/#sec-Errors-and-Non-Nullability
NonNullableFieldWasNullException nonNullException = new NonNullableFieldWasNullException(typeInfo);
executionContext.addError(nonNullException);
throw nonNullException;
}
return CompletableFuture.completedFuture(null);
} else if (fieldType instanceof GraphQLList) {
return completeValueForListAsync(executionContext, parameters, fields, toIterable(result));
} else if (fieldType instanceof GraphQLScalarType) {
return CompletableFuture.completedFuture(completeValueForScalar((GraphQLScalarType) fieldType, result));
} else if (fieldType instanceof GraphQLEnumType) {
return CompletableFuture.completedFuture(completeValueForEnum((GraphQLEnumType) fieldType, result));
}


GraphQLObjectType resolvedType;
if (fieldType instanceof GraphQLInterfaceType) {
TypeResolutionParameters resolutionParams = TypeResolutionParameters.newParameters()
.graphQLInterfaceType((GraphQLInterfaceType) fieldType)
.field(fields.get(0))
.value(parameters.source())
.argumentValues(parameters.arguments())
.schema(executionContext.getGraphQLSchema()).build();
resolvedType = resolveTypeForInterface(resolutionParams);

} else if (fieldType instanceof GraphQLUnionType) {
TypeResolutionParameters resolutionParams = TypeResolutionParameters.newParameters()
.graphQLUnionType((GraphQLUnionType) fieldType)
.field(fields.get(0))
.value(parameters.source())
.argumentValues(parameters.arguments())
.schema(executionContext.getGraphQLSchema()).build();
resolvedType = resolveTypeForUnion(resolutionParams);
} else {
resolvedType = (GraphQLObjectType) fieldType;
}

FieldCollectorParameters collectorParameters = newParameters(executionContext.getGraphQLSchema(), resolvedType)
.fragments(executionContext.getFragmentsByName())
.variables(executionContext.getVariables())
.build();

Map<String, List<Field>> subFields = fieldCollector.collectFields(collectorParameters, fields);

ExecutionParameters newParameters = ExecutionParameters.newParameters()
.typeInfo(typeInfo.asType(resolvedType))
.fields(subFields)
.source(result).build();

// Calling this from the executionContext to ensure we shift back from mutation strategy to the query strategy.

ExecutionResult executionResult = executionContext.getQueryStrategy().execute(executionContext, newParameters);
if(!(executionResult.getData() instanceof CompletionStage)) {
return CompletableFuture.completedFuture(executionResult);
} else {
return ((CompletionStage) executionResult.getData()).handleAsync((resultMap,th) ->
new ExecutionResultImpl(resultMap,executionResult.getErrors())
);

}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly pondering here.

I wonder if we should make the complete value list truly async. Should it join back as one result at this point?

Is there any disadvantage is leaving it purely async for many items in a list? Is there any expectation that is this point you have resolved the value so it can be sent onto future calls?

protected CompletionStage<ExecutionResult> completeValueForListAsync(ExecutionContext executionContext, ExecutionParameters parameters, List<Field> fields, Iterable<Object> result) {
TypeInfo typeInfo = parameters.typeInfo();
GraphQLList fieldType = typeInfo.castType(GraphQLList.class);
List<Object> resultList = Collections.synchronizedList(new ArrayList<>());
CompletionStage<Void> future = CompletableFuture.completedFuture(null);

for (Object item : result) {
ExecutionParameters newParameters = ExecutionParameters.newParameters()
.typeInfo(typeInfo.asType(fieldType.getWrappedType()))
.fields(parameters.fields())
.source(item).build();

CompletionStage<ExecutionResult> completedValueFuture =
completeValueAsync(executionContext, newParameters, fields);

future = future.thenCombineAsync(completedValueFuture, (t,executionResult) -> {
resultList.add(executionResult.getData());
return null;
});
}

return future.thenApplyAsync(t -> new ExecutionResultImpl(resultList,null));
}

private Iterable<Object> toIterable(Object result) {
if (result.getClass().isArray()) {
result = Arrays.asList((Object[]) result);
}
//noinspection unchecked
return (Iterable<Object>) result;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package graphql.execution

import graphql.GraphQL
import graphql.schema.DataFetcher
import graphql.schema.GraphQLObjectType
import graphql.schema.GraphQLSchema
import spock.lang.Specification

import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage

import static graphql.Scalars.GraphQLString
import static graphql.schema.GraphQLFieldDefinition.newFieldDefinition
import static graphql.schema.GraphQLObjectType.newObject

class AsynchronousExecutionStrategyTest extends Specification {

def "Example usage of AsynchronousExecutionStrategy."() {
given:

GraphQLObjectType queryType = newObject()
.name("data")
.field(
newFieldDefinition().type(GraphQLString).name("key1").dataFetcher({env -> CompletableFuture.completedFuture("value1")}))
.field(
newFieldDefinition().type(GraphQLString).name("key2").staticValue("value2"))
.build();

GraphQLSchema schema = GraphQLSchema.newSchema()
.query(queryType)
.build();

def expected = [key1:"value1",key2:"value2"]

when:
GraphQL graphQL = GraphQL.newGraphQL(schema)
.queryExecutionStrategy(new AsynchronousExecutionStrategy())
.build();

Map<String,Object> result = ((CompletionStage<Object>) graphQL.execute("{key1,key2}").data).toCompletableFuture().get();

then:
assert expected == result;
}

def "Ensure the execution order." () {
given:
Timer timer = new Timer();

DataFetcher<CompletionStage<String>> grandFetcher = {
env ->
CompletableFuture<Object> future = new CompletableFuture<>()
timer.schedule({_-> future.complete([field:"grandValue"]) },50)
return future
}

DataFetcher<CompletionStage<String>> parentFetcher = {
env ->
CompletableFuture<Object> future = new CompletableFuture<>()
timer.schedule({_-> future.complete([field:"parentValue"]) },20)
return future
}

DataFetcher<CompletionStage<String>> childFetcher = {
env ->
CompletableFuture<Object> future = new CompletableFuture<>()
timer.schedule({_-> future.complete([field:"childValue"]) },10)
return future
}

GraphQLObjectType childObjectType = newObject().name("ChildObject").
field(newFieldDefinition().name("field").type(GraphQLString)).
build();

GraphQLObjectType parentObjectType = newObject().name("ParentObject").
field(newFieldDefinition().name("field").type(GraphQLString)).
field(newFieldDefinition().name("child").type(childObjectType).dataFetcher(childFetcher)).
build();

GraphQLObjectType grandObjectType = newObject().name("GrandObject").
field(newFieldDefinition().name("field").type(GraphQLString)).
field(newFieldDefinition().name("parent").type(parentObjectType).dataFetcher(parentFetcher)).
build();

GraphQLObjectType rootObjectType = newObject().name("Root").
field(
newFieldDefinition().name("grand").type(grandObjectType).dataFetcher(grandFetcher)
).build();

GraphQLSchema schema = GraphQLSchema.newSchema()
.query(rootObjectType)
.build();
when:

GraphQL graphQL = GraphQL.newGraphQL(schema)
.queryExecutionStrategy(new AsynchronousExecutionStrategy())
.build();

String queryString =
"""
{
grand {
field
parent {
field
child {
field
}
}
}
}
"""
Map<String,Object> result = ((CompletionStage<Object>) graphQL.execute(queryString).data).toCompletableFuture().get();

def expected = [
grand:[
field: "grandValue",
parent:[
field:"parentValue",
child: [
field: "childValue"
]
]
]
]

then:
assert result == expected
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we trying to say that the first things to be resolved (grand) is likely to be the last thing to be completed (50ms) and hence it come back in the right order.?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just trying to work out what the tests really proves