Skip to content

Commit ddc81be

Browse files
feat(batching): Exclude operations that failed pre-executions (#1942)
### 📝 Description Remove operations that failed pre-executions from batching state, an operation can become invalid if parsing, validation or any other logic from preparsed document provider failed (such as persisted queries).
1 parent 7838268 commit ddc81be

File tree

8 files changed

+185
-69
lines changed

8 files changed

+185
-69
lines changed

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/level/DataLoaderLevelDispatchedInstrumentation.kt

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 Expedia, Inc
2+
* Copyright 2024 Expedia, Inc
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,9 +37,8 @@ import org.dataloader.DataLoader
3737
class DataLoaderLevelDispatchedInstrumentation : AbstractExecutionLevelDispatchedInstrumentation() {
3838
override fun getOnLevelDispatchedCallback(
3939
parameters: ExecutionLevelDispatchedInstrumentationParameters
40-
): OnLevelDispatchedCallback = { _, executions: List<ExecutionInput> ->
41-
executions
42-
.getOrNull(0)
40+
): OnLevelDispatchedCallback = { _, _ ->
41+
parameters.executionContext.executionInput
4342
?.dataLoaderRegistry
4443
?.dispatchAll()
4544
}

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/level/execution/AbstractExecutionLevelDispatchedInstrumentation.kt

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 Expedia, Inc
2+
* Copyright 2024 Expedia, Inc
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,19 +22,20 @@ import com.expediagroup.graphql.dataloader.instrumentation.level.state.Level
2222
import graphql.ExecutionInput
2323
import graphql.ExecutionResult
2424
import graphql.execution.ExecutionContext
25+
import graphql.execution.ExecutionId
2526
import graphql.execution.instrumentation.ExecutionStrategyInstrumentationContext
2627
import graphql.execution.instrumentation.InstrumentationContext
2728
import graphql.execution.instrumentation.InstrumentationState
2829
import graphql.execution.instrumentation.SimplePerformantInstrumentation
29-
import graphql.execution.instrumentation.parameters.InstrumentationExecuteOperationParameters
30+
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters
3031
import graphql.execution.instrumentation.parameters.InstrumentationExecutionStrategyParameters
3132
import graphql.execution.instrumentation.parameters.InstrumentationFieldFetchParameters
3233
import graphql.schema.DataFetcher
3334

3435
/**
3536
* Represents the signature of a callback that will be executed when a [Level] is dispatched
3637
*/
37-
internal typealias OnLevelDispatchedCallback = (Level, List<ExecutionInput>) -> Unit
38+
internal typealias OnLevelDispatchedCallback = (Level, List<ExecutionId>) -> Unit
3839
/**
3940
* Custom GraphQL [graphql.execution.instrumentation.Instrumentation] that calculate the state of executions
4041
* of all queries sharing the same GraphQLContext map
@@ -52,13 +53,13 @@ abstract class AbstractExecutionLevelDispatchedInstrumentation : SimplePerforman
5253
parameters: ExecutionLevelDispatchedInstrumentationParameters
5354
): OnLevelDispatchedCallback
5455

55-
override fun beginExecuteOperation(
56-
parameters: InstrumentationExecuteOperationParameters,
56+
override fun beginExecution(
57+
parameters: InstrumentationExecutionParameters,
5758
state: InstrumentationState?
5859
): InstrumentationContext<ExecutionResult>? =
59-
parameters.executionContext.takeUnless(ExecutionContext::isMutation)
60+
parameters.executionInput
6061
?.graphQLContext?.get<ExecutionLevelDispatchedState>(ExecutionLevelDispatchedState::class)
61-
?.beginExecuteOperation(parameters)
62+
?.beginExecution(parameters)
6263

6364
override fun beginExecutionStrategy(
6465
parameters: InstrumentationExecutionStrategyParameters,

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/level/state/ExecutionLevelDispatchedState.kt

+49-26
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 Expedia, Inc
2+
* Copyright 2024 Expedia, Inc
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,38 +20,63 @@ import com.expediagroup.graphql.dataloader.instrumentation.extensions.getExpecte
2020
import com.expediagroup.graphql.dataloader.instrumentation.level.execution.OnLevelDispatchedCallback
2121
import graphql.ExecutionInput
2222
import graphql.ExecutionResult
23+
import graphql.execution.ExecutionId
2324
import graphql.execution.FieldValueInfo
2425
import graphql.execution.instrumentation.ExecutionStrategyInstrumentationContext
2526
import graphql.execution.instrumentation.InstrumentationContext
26-
import graphql.execution.instrumentation.parameters.InstrumentationExecuteOperationParameters
27+
import graphql.execution.instrumentation.SimpleInstrumentationContext
28+
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters
2729
import graphql.execution.instrumentation.parameters.InstrumentationExecutionStrategyParameters
2830
import graphql.execution.instrumentation.parameters.InstrumentationFieldFetchParameters
2931
import graphql.schema.DataFetcher
3032
import java.util.concurrent.CompletableFuture
3133
import java.util.concurrent.ConcurrentHashMap
34+
import java.util.concurrent.atomic.AtomicReference
3235

3336
/**
3437
* Orchestrate the [ExecutionBatchState] of all [ExecutionInput] sharing the same graphQLContext map,
3538
* when a certain state is reached will invoke [OnLevelDispatchedCallback]
3639
*/
3740
class ExecutionLevelDispatchedState(
38-
private val totalExecutions: Int
41+
totalOperations: Int
3942
) {
40-
val executions = ConcurrentHashMap<ExecutionInput, ExecutionBatchState>()
43+
private val totalExecutions: AtomicReference<Int> = AtomicReference(totalOperations)
44+
val executions = ConcurrentHashMap<ExecutionId, ExecutionBatchState>()
45+
46+
/**
47+
* Remove an [ExecutionBatchState] from the state in case operation does not qualify for execution,
48+
* for example:
49+
* parsing, validation, execution errors
50+
* persisted query errors
51+
*/
52+
private fun removeExecution(executionId: ExecutionId) {
53+
if (executions.containsKey(executionId)) {
54+
executions.remove(executionId)
55+
totalExecutions.set(totalExecutions.get() - 1)
56+
}
57+
}
4158

4259
/**
4360
* Initialize the [ExecutionBatchState] of this [ExecutionInput]
4461
*
4562
* @param parameters contains information of which [ExecutionInput] will start his execution
4663
* @return a nullable [InstrumentationContext]
4764
*/
48-
fun beginExecuteOperation(
49-
parameters: InstrumentationExecuteOperationParameters
50-
): InstrumentationContext<ExecutionResult>? {
51-
executions.computeIfAbsent(parameters.executionContext.executionInput) {
65+
fun beginExecution(
66+
parameters: InstrumentationExecutionParameters
67+
): InstrumentationContext<ExecutionResult> {
68+
executions.computeIfAbsent(parameters.executionInput.executionId) {
5269
ExecutionBatchState()
5370
}
54-
return null
71+
return object : SimpleInstrumentationContext<ExecutionResult>() {
72+
override fun onCompleted(result: ExecutionResult?, t: Throwable?) {
73+
result?.let {
74+
if (result.errors.size > 0) {
75+
removeExecution(parameters.executionInput.executionId)
76+
}
77+
}
78+
}
79+
}
5580
}
5681

5782
/**
@@ -64,11 +89,11 @@ class ExecutionLevelDispatchedState(
6489
parameters: InstrumentationExecutionStrategyParameters,
6590
onLevelDispatched: OnLevelDispatchedCallback
6691
): ExecutionStrategyInstrumentationContext {
67-
val executionInput = parameters.executionContext.executionInput
92+
val executionId = parameters.executionContext.executionInput.executionId
6893
val level = Level(parameters.executionStrategyParameters.path.level + 1)
6994
val fieldCount = parameters.executionStrategyParameters.fields.size()
7095

71-
executions.computeIfPresent(executionInput) { _, executionState ->
96+
executions.computeIfPresent(executionId) { _, executionState ->
7297
executionState.also {
7398
it.initializeLevelStateIfNeeded(level)
7499
it.increaseExpectedFetches(level, fieldCount)
@@ -86,7 +111,7 @@ class ExecutionLevelDispatchedState(
86111
override fun onFieldValuesInfo(fieldValueInfoList: List<FieldValueInfo>) {
87112
val nextLevel = level.next()
88113

89-
executions.computeIfPresent(executionInput) { _, executionState ->
114+
executions.computeIfPresent(executionId) { _, executionState ->
90115
executionState.also {
91116
it.increaseOnFieldValueInfos(level)
92117
it.increaseExpectedExecutionStrategies(
@@ -104,7 +129,7 @@ class ExecutionLevelDispatchedState(
104129
}
105130

106131
override fun onFieldValuesException() {
107-
executions.computeIfPresent(executionInput) { _, executionState ->
132+
executions.computeIfPresent(executionId) { _, executionState ->
108133
executionState.also {
109134
it.increaseOnFieldValueInfos(level)
110135
}
@@ -123,14 +148,13 @@ class ExecutionLevelDispatchedState(
123148
parameters: InstrumentationFieldFetchParameters,
124149
onLevelDispatched: OnLevelDispatchedCallback
125150
): InstrumentationContext<Any> {
126-
val executionInput = parameters.executionContext.executionInput
151+
val executionId = parameters.executionContext.executionInput.executionId
127152
val path = parameters.executionStepInfo.path
128153
val level = Level(path.level)
129154

130-
return object : InstrumentationContext<Any> {
155+
return object : SimpleInstrumentationContext<Any>() {
131156
override fun onDispatched(result: CompletableFuture<Any?>) {
132-
133-
executions.computeIfPresent(executionInput) { _, executionState ->
157+
executions.computeIfPresent(executionId) { _, executionState ->
134158
executionState.also { it.increaseDispatchedFetches(level) }
135159
}
136160

@@ -140,9 +164,6 @@ class ExecutionLevelDispatchedState(
140164
executions.forEach { (_, executionState) -> executionState.completeDataFetchers(level) }
141165
}
142166
}
143-
144-
override fun onCompleted(result: Any?, t: Throwable?) {
145-
}
146167
}
147168
}
148169

@@ -161,7 +182,7 @@ class ExecutionLevelDispatchedState(
161182
parameters: InstrumentationFieldFetchParameters
162183
): DataFetcher<*> {
163184
var manuallyCompletableDataFetcher: DataFetcher<*> = dataFetcher
164-
executions.computeIfPresent(parameters.executionContext.executionInput) { _, executionState ->
185+
executions.computeIfPresent(parameters.executionContext.executionInput.executionId) { _, executionState ->
165186
executionState.also {
166187
manuallyCompletableDataFetcher = it.toManuallyCompletableDataFetcher(
167188
Level(parameters.executionStepInfo.path.level),
@@ -180,9 +201,11 @@ class ExecutionLevelDispatchedState(
180201
* @param level that execution state will be calculated
181202
* @return Boolean for allExecutionsDispatched statement
182203
*/
183-
fun allExecutionsDispatched(level: Level): Boolean =
184-
executions
185-
.takeIf { executions -> executions.size == totalExecutions }
186-
?.all { (_, executionState) -> executionState.isLevelDispatched(level) }
187-
?: false
204+
fun allExecutionsDispatched(level: Level): Boolean = synchronized(executions) {
205+
val operationsToExecute = totalExecutions.get()
206+
when {
207+
executions.size < operationsToExecute -> false
208+
else -> executions.all { (_, executionState) -> executionState.isLevelDispatched(level) }
209+
}
210+
}
188211
}

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/DataLoaderSyncExecutionExhaustedInstrumentation.kt

+7-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 Expedia, Inc
2+
* Copyright 2024 Expedia, Inc
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@ import com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.execut
2222
import com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.execution.SyncExecutionExhaustedInstrumentationParameters
2323
import graphql.ExecutionInput
2424
import graphql.GraphQLContext
25+
import graphql.execution.ExecutionId
2526
import graphql.execution.instrumentation.Instrumentation
2627
import graphql.schema.DataFetcher
2728
import org.dataloader.DataLoader
@@ -37,10 +38,10 @@ import java.util.concurrent.CompletableFuture
3738
class DataLoaderSyncExecutionExhaustedInstrumentation : AbstractSyncExecutionExhaustedInstrumentation() {
3839
override fun getOnSyncExecutionExhaustedCallback(
3940
parameters: SyncExecutionExhaustedInstrumentationParameters
40-
): OnSyncExecutionExhaustedCallback = { executions: List<ExecutionInput> ->
41-
executions
42-
.getOrNull(0)
43-
?.dataLoaderRegistry
44-
?.dispatchAll()
41+
): OnSyncExecutionExhaustedCallback = { _: List<ExecutionId> ->
42+
parameters
43+
.executionContext.executionInput
44+
.dataLoaderRegistry
45+
.dispatchAll()
4546
}
4647
}

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/execution/AbstractSyncExecutionExhaustedInstrumentation.kt

+9-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 Expedia, Inc
2+
* Copyright 2024 Expedia, Inc
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,19 +22,20 @@ import graphql.ExecutionInput
2222
import graphql.ExecutionResult
2323
import graphql.GraphQLContext
2424
import graphql.execution.ExecutionContext
25+
import graphql.execution.ExecutionId
2526
import graphql.execution.instrumentation.ExecutionStrategyInstrumentationContext
2627
import graphql.execution.instrumentation.Instrumentation
2728
import graphql.execution.instrumentation.InstrumentationContext
2829
import graphql.execution.instrumentation.InstrumentationState
2930
import graphql.execution.instrumentation.SimplePerformantInstrumentation
30-
import graphql.execution.instrumentation.parameters.InstrumentationExecuteOperationParameters
31+
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters
3132
import graphql.execution.instrumentation.parameters.InstrumentationExecutionStrategyParameters
3233
import graphql.execution.instrumentation.parameters.InstrumentationFieldFetchParameters
3334

3435
/**
3536
* typealias that represents the signature of a callback that will be executed when sync execution is exhausted
3637
*/
37-
internal typealias OnSyncExecutionExhaustedCallback = (List<ExecutionInput>) -> Unit
38+
internal typealias OnSyncExecutionExhaustedCallback = (List<ExecutionId>) -> Unit
3839

3940
/**
4041
* Custom GraphQL [Instrumentation] that calculate the synchronous execution exhaustion
@@ -52,13 +53,13 @@ abstract class AbstractSyncExecutionExhaustedInstrumentation : SimplePerformantI
5253
parameters: SyncExecutionExhaustedInstrumentationParameters
5354
): OnSyncExecutionExhaustedCallback
5455

55-
override fun beginExecuteOperation(
56-
parameters: InstrumentationExecuteOperationParameters,
56+
override fun beginExecution(
57+
parameters: InstrumentationExecutionParameters,
5758
state: InstrumentationState?
5859
): InstrumentationContext<ExecutionResult>? =
59-
parameters.executionContext.takeUnless(ExecutionContext::isMutation)
60-
?.graphQLContext?.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
61-
?.beginExecuteOperation(parameters)
60+
parameters.graphQLContext
61+
?.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
62+
?.beginExecution(parameters)
6263

6364
override fun beginExecutionStrategy(
6465
parameters: InstrumentationExecutionStrategyParameters,

0 commit comments

Comments
 (0)