Skip to content

Commit d5c312f

Browse files
feat(batching): Exclude operations that failed pre-executions (#1942) (#1946)
### 📝 Description Remove operations that failed pre-executions from batching state, an operation can become invalid if parsing, validation or any other logic from preparsed document provider failed (such as persisted queries).
1 parent a937b8d commit d5c312f

File tree

8 files changed

+189
-71
lines changed

8 files changed

+189
-71
lines changed

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/level/DataLoaderLevelDispatchedInstrumentation.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 Expedia, Inc
2+
* Copyright 2024 Expedia, Inc
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,9 +37,8 @@ import org.dataloader.DataLoader
3737
class DataLoaderLevelDispatchedInstrumentation : AbstractExecutionLevelDispatchedInstrumentation() {
3838
override fun getOnLevelDispatchedCallback(
3939
parameters: ExecutionLevelDispatchedInstrumentationParameters
40-
): OnLevelDispatchedCallback = { _, executions: List<ExecutionInput> ->
41-
executions
42-
.getOrNull(0)
40+
): OnLevelDispatchedCallback = { _, _ ->
41+
parameters.executionContext.executionInput
4342
?.dataLoaderRegistry
4443
?.dispatchAll()
4544
}

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/level/execution/AbstractExecutionLevelDispatchedInstrumentation.kt

+10-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 Expedia, Inc
2+
* Copyright 2024 Expedia, Inc
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,24 +22,26 @@ import com.expediagroup.graphql.dataloader.instrumentation.level.state.Level
2222
import graphql.ExecutionInput
2323
import graphql.ExecutionResult
2424
import graphql.execution.ExecutionContext
25+
import graphql.execution.ExecutionId
2526
import graphql.execution.instrumentation.ExecutionStrategyInstrumentationContext
2627
import graphql.execution.instrumentation.Instrumentation
2728
import graphql.execution.instrumentation.InstrumentationContext
2829
import graphql.execution.instrumentation.InstrumentationState
29-
import graphql.execution.instrumentation.parameters.InstrumentationExecuteOperationParameters
30+
import graphql.execution.instrumentation.SimplePerformantInstrumentation
31+
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters
3032
import graphql.execution.instrumentation.parameters.InstrumentationExecutionStrategyParameters
3133
import graphql.execution.instrumentation.parameters.InstrumentationFieldFetchParameters
3234
import graphql.schema.DataFetcher
3335

3436
/**
3537
* Represents the signature of a callback that will be executed when a [Level] is dispatched
3638
*/
37-
internal typealias OnLevelDispatchedCallback = (Level, List<ExecutionInput>) -> Unit
39+
internal typealias OnLevelDispatchedCallback = (Level, List<ExecutionId>) -> Unit
3840
/**
3941
* Custom GraphQL [graphql.execution.instrumentation.Instrumentation] that calculate the state of executions
4042
* of all queries sharing the same GraphQLContext map
4143
*/
42-
abstract class AbstractExecutionLevelDispatchedInstrumentation : Instrumentation {
44+
abstract class AbstractExecutionLevelDispatchedInstrumentation : SimplePerformantInstrumentation() {
4345
/**
4446
* This is invoked each time instrumentation attempts to calculate a level dispatched state, this can be called from either
4547
* `beginFieldField` or `beginExecutionStrategy`.
@@ -52,13 +54,13 @@ abstract class AbstractExecutionLevelDispatchedInstrumentation : Instrumentation
5254
parameters: ExecutionLevelDispatchedInstrumentationParameters
5355
): OnLevelDispatchedCallback
5456

55-
override fun beginExecuteOperation(
56-
parameters: InstrumentationExecuteOperationParameters,
57+
override fun beginExecution(
58+
parameters: InstrumentationExecutionParameters,
5759
state: InstrumentationState?
5860
): InstrumentationContext<ExecutionResult>? =
59-
parameters.executionContext.takeUnless(ExecutionContext::isMutation)
61+
parameters.executionInput
6062
?.graphQLContext?.get<ExecutionLevelDispatchedState>(ExecutionLevelDispatchedState::class)
61-
?.beginExecuteOperation(parameters)
63+
?.beginExecution(parameters)
6264

6365
override fun beginExecutionStrategy(
6466
parameters: InstrumentationExecutionStrategyParameters,

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/level/state/ExecutionLevelDispatchedState.kt

+49-26
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 Expedia, Inc
2+
* Copyright 2024 Expedia, Inc
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,38 +20,63 @@ import com.expediagroup.graphql.dataloader.instrumentation.extensions.getExpecte
2020
import com.expediagroup.graphql.dataloader.instrumentation.level.execution.OnLevelDispatchedCallback
2121
import graphql.ExecutionInput
2222
import graphql.ExecutionResult
23+
import graphql.execution.ExecutionId
2324
import graphql.execution.FieldValueInfo
2425
import graphql.execution.instrumentation.ExecutionStrategyInstrumentationContext
2526
import graphql.execution.instrumentation.InstrumentationContext
26-
import graphql.execution.instrumentation.parameters.InstrumentationExecuteOperationParameters
27+
import graphql.execution.instrumentation.SimpleInstrumentationContext
28+
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters
2729
import graphql.execution.instrumentation.parameters.InstrumentationExecutionStrategyParameters
2830
import graphql.execution.instrumentation.parameters.InstrumentationFieldFetchParameters
2931
import graphql.schema.DataFetcher
3032
import java.util.concurrent.CompletableFuture
3133
import java.util.concurrent.ConcurrentHashMap
34+
import java.util.concurrent.atomic.AtomicReference
3235

3336
/**
3437
* Orchestrate the [ExecutionBatchState] of all [ExecutionInput] sharing the same graphQLContext map,
3538
* when a certain state is reached will invoke [OnLevelDispatchedCallback]
3639
*/
3740
class ExecutionLevelDispatchedState(
38-
private val totalExecutions: Int
41+
totalOperations: Int
3942
) {
40-
val executions = ConcurrentHashMap<ExecutionInput, ExecutionBatchState>()
43+
private val totalExecutions: AtomicReference<Int> = AtomicReference(totalOperations)
44+
val executions = ConcurrentHashMap<ExecutionId, ExecutionBatchState>()
45+
46+
/**
47+
* Remove an [ExecutionBatchState] from the state in case operation does not qualify for execution,
48+
* for example:
49+
* parsing, validation, execution errors
50+
* persisted query errors
51+
*/
52+
private fun removeExecution(executionId: ExecutionId) {
53+
if (executions.containsKey(executionId)) {
54+
executions.remove(executionId)
55+
totalExecutions.set(totalExecutions.get() - 1)
56+
}
57+
}
4158

4259
/**
4360
* Initialize the [ExecutionBatchState] of this [ExecutionInput]
4461
*
4562
* @param parameters contains information of which [ExecutionInput] will start his execution
4663
* @return a nullable [InstrumentationContext]
4764
*/
48-
fun beginExecuteOperation(
49-
parameters: InstrumentationExecuteOperationParameters
50-
): InstrumentationContext<ExecutionResult>? {
51-
executions.computeIfAbsent(parameters.executionContext.executionInput) {
65+
fun beginExecution(
66+
parameters: InstrumentationExecutionParameters
67+
): InstrumentationContext<ExecutionResult> {
68+
executions.computeIfAbsent(parameters.executionInput.executionId) {
5269
ExecutionBatchState()
5370
}
54-
return null
71+
return object : SimpleInstrumentationContext<ExecutionResult>() {
72+
override fun onCompleted(result: ExecutionResult?, t: Throwable?) {
73+
result?.let {
74+
if (result.errors.size > 0) {
75+
removeExecution(parameters.executionInput.executionId)
76+
}
77+
}
78+
}
79+
}
5580
}
5681

5782
/**
@@ -64,11 +89,11 @@ class ExecutionLevelDispatchedState(
6489
parameters: InstrumentationExecutionStrategyParameters,
6590
onLevelDispatched: OnLevelDispatchedCallback
6691
): ExecutionStrategyInstrumentationContext {
67-
val executionInput = parameters.executionContext.executionInput
92+
val executionId = parameters.executionContext.executionInput.executionId
6893
val level = Level(parameters.executionStrategyParameters.path.level + 1)
6994
val fieldCount = parameters.executionStrategyParameters.fields.size()
7095

71-
executions.computeIfPresent(executionInput) { _, executionState ->
96+
executions.computeIfPresent(executionId) { _, executionState ->
7297
executionState.also {
7398
it.initializeLevelStateIfNeeded(level)
7499
it.increaseExpectedFetches(level, fieldCount)
@@ -86,7 +111,7 @@ class ExecutionLevelDispatchedState(
86111
override fun onFieldValuesInfo(fieldValueInfoList: List<FieldValueInfo>) {
87112
val nextLevel = level.next()
88113

89-
executions.computeIfPresent(executionInput) { _, executionState ->
114+
executions.computeIfPresent(executionId) { _, executionState ->
90115
executionState.also {
91116
it.increaseOnFieldValueInfos(level)
92117
it.increaseExpectedExecutionStrategies(
@@ -104,7 +129,7 @@ class ExecutionLevelDispatchedState(
104129
}
105130

106131
override fun onFieldValuesException() {
107-
executions.computeIfPresent(executionInput) { _, executionState ->
132+
executions.computeIfPresent(executionId) { _, executionState ->
108133
executionState.also {
109134
it.increaseOnFieldValueInfos(level)
110135
}
@@ -123,14 +148,13 @@ class ExecutionLevelDispatchedState(
123148
parameters: InstrumentationFieldFetchParameters,
124149
onLevelDispatched: OnLevelDispatchedCallback
125150
): InstrumentationContext<Any> {
126-
val executionInput = parameters.executionContext.executionInput
151+
val executionId = parameters.executionContext.executionInput.executionId
127152
val path = parameters.executionStepInfo.path
128153
val level = Level(path.level)
129154

130-
return object : InstrumentationContext<Any> {
155+
return object : SimpleInstrumentationContext<Any>() {
131156
override fun onDispatched(result: CompletableFuture<Any?>) {
132-
133-
executions.computeIfPresent(executionInput) { _, executionState ->
157+
executions.computeIfPresent(executionId) { _, executionState ->
134158
executionState.also { it.increaseDispatchedFetches(level) }
135159
}
136160

@@ -140,9 +164,6 @@ class ExecutionLevelDispatchedState(
140164
executions.forEach { (_, executionState) -> executionState.completeDataFetchers(level) }
141165
}
142166
}
143-
144-
override fun onCompleted(result: Any?, t: Throwable?) {
145-
}
146167
}
147168
}
148169

@@ -161,7 +182,7 @@ class ExecutionLevelDispatchedState(
161182
parameters: InstrumentationFieldFetchParameters
162183
): DataFetcher<*> {
163184
var manuallyCompletableDataFetcher: DataFetcher<*> = dataFetcher
164-
executions.computeIfPresent(parameters.executionContext.executionInput) { _, executionState ->
185+
executions.computeIfPresent(parameters.executionContext.executionInput.executionId) { _, executionState ->
165186
executionState.also {
166187
manuallyCompletableDataFetcher = it.toManuallyCompletableDataFetcher(
167188
Level(parameters.executionStepInfo.path.level),
@@ -180,9 +201,11 @@ class ExecutionLevelDispatchedState(
180201
* @param level that execution state will be calculated
181202
* @return Boolean for allExecutionsDispatched statement
182203
*/
183-
fun allExecutionsDispatched(level: Level): Boolean =
184-
executions
185-
.takeIf { executions -> executions.size == totalExecutions }
186-
?.all { (_, executionState) -> executionState.isLevelDispatched(level) }
187-
?: false
204+
fun allExecutionsDispatched(level: Level): Boolean = synchronized(executions) {
205+
val operationsToExecute = totalExecutions.get()
206+
when {
207+
executions.size < operationsToExecute -> false
208+
else -> executions.all { (_, executionState) -> executionState.isLevelDispatched(level) }
209+
}
210+
}
188211
}

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/DataLoaderSyncExecutionExhaustedInstrumentation.kt

+7-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 Expedia, Inc
2+
* Copyright 2024 Expedia, Inc
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@ import com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.execut
2222
import com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.execution.SyncExecutionExhaustedInstrumentationParameters
2323
import graphql.ExecutionInput
2424
import graphql.GraphQLContext
25+
import graphql.execution.ExecutionId
2526
import graphql.execution.instrumentation.Instrumentation
2627
import graphql.schema.DataFetcher
2728
import org.dataloader.DataLoader
@@ -37,10 +38,10 @@ import java.util.concurrent.CompletableFuture
3738
class DataLoaderSyncExecutionExhaustedInstrumentation : AbstractSyncExecutionExhaustedInstrumentation() {
3839
override fun getOnSyncExecutionExhaustedCallback(
3940
parameters: SyncExecutionExhaustedInstrumentationParameters
40-
): OnSyncExecutionExhaustedCallback = { executions: List<ExecutionInput> ->
41-
executions
42-
.getOrNull(0)
43-
?.dataLoaderRegistry
44-
?.dispatchAll()
41+
): OnSyncExecutionExhaustedCallback = { _: List<ExecutionId> ->
42+
parameters
43+
.executionContext.executionInput
44+
.dataLoaderRegistry
45+
.dispatchAll()
4546
}
4647
}

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/execution/AbstractSyncExecutionExhaustedInstrumentation.kt

+11-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 Expedia, Inc
2+
* Copyright 2024 Expedia, Inc
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,24 +22,26 @@ import graphql.ExecutionInput
2222
import graphql.ExecutionResult
2323
import graphql.GraphQLContext
2424
import graphql.execution.ExecutionContext
25+
import graphql.execution.ExecutionId
2526
import graphql.execution.instrumentation.ExecutionStrategyInstrumentationContext
2627
import graphql.execution.instrumentation.Instrumentation
2728
import graphql.execution.instrumentation.InstrumentationContext
2829
import graphql.execution.instrumentation.InstrumentationState
29-
import graphql.execution.instrumentation.parameters.InstrumentationExecuteOperationParameters
30+
import graphql.execution.instrumentation.SimplePerformantInstrumentation
31+
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters
3032
import graphql.execution.instrumentation.parameters.InstrumentationExecutionStrategyParameters
3133
import graphql.execution.instrumentation.parameters.InstrumentationFieldFetchParameters
3234

3335
/**
3436
* typealias that represents the signature of a callback that will be executed when sync execution is exhausted
3537
*/
36-
internal typealias OnSyncExecutionExhaustedCallback = (List<ExecutionInput>) -> Unit
38+
internal typealias OnSyncExecutionExhaustedCallback = (List<ExecutionId>) -> Unit
3739

3840
/**
3941
* Custom GraphQL [Instrumentation] that calculate the synchronous execution exhaustion
4042
* of all GraphQL operations sharing the same [GraphQLContext]
4143
*/
42-
abstract class AbstractSyncExecutionExhaustedInstrumentation : Instrumentation {
44+
abstract class AbstractSyncExecutionExhaustedInstrumentation : SimplePerformantInstrumentation() {
4345
/**
4446
* This is invoked each time instrumentation attempts to calculate exhaustion state, this can be called from either
4547
* `beginFieldField.dispatch` or `beginFieldFetch.complete`.
@@ -51,13 +53,13 @@ abstract class AbstractSyncExecutionExhaustedInstrumentation : Instrumentation {
5153
parameters: SyncExecutionExhaustedInstrumentationParameters
5254
): OnSyncExecutionExhaustedCallback
5355

54-
override fun beginExecuteOperation(
55-
parameters: InstrumentationExecuteOperationParameters,
56+
override fun beginExecution(
57+
parameters: InstrumentationExecutionParameters,
5658
state: InstrumentationState?
5759
): InstrumentationContext<ExecutionResult>? =
58-
parameters.executionContext.takeUnless(ExecutionContext::isMutation)
59-
?.graphQLContext?.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
60-
?.beginExecuteOperation(parameters)
60+
parameters.graphQLContext
61+
?.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
62+
?.beginExecution(parameters)
6163

6264
override fun beginExecutionStrategy(
6365
parameters: InstrumentationExecutionStrategyParameters,

0 commit comments

Comments
 (0)