Skip to content

Commit 2a3f6b0

Browse files
author
Samuel Vazquez
committed
feat: batching working again
1 parent ca13568 commit 2a3f6b0

File tree

22 files changed

+342
-626
lines changed

22 files changed

+342
-626
lines changed

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/exceptions/MissingKotlinDataLoaderRegistryException.kt

-25
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 Expedia, Inc
2+
* Copyright 2025 Expedia, Inc
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,35 +16,31 @@
1616

1717
package com.expediagroup.graphql.dataloader.instrumentation.extensions
1818

19-
import com.expediagroup.graphql.dataloader.KotlinDataLoaderRegistry
2019
import com.expediagroup.graphql.dataloader.instrumentation.exceptions.MissingInstrumentationStateException
21-
import com.expediagroup.graphql.dataloader.instrumentation.exceptions.MissingKotlinDataLoaderRegistryException
2220
import com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.state.SyncExecutionExhaustedState
2321
import graphql.schema.DataFetchingEnvironment
2422
import org.dataloader.DataLoader
23+
import org.dataloader.DataLoaderRegistry
2524
import java.util.concurrent.CompletableFuture
2625

2726
/**
28-
* Check if all futures collected on [KotlinDataLoaderRegistry.dispatchAll] were handled
27+
* Check if all futures collected on [DataLoaderRegistry.dispatchAll] were handled
2928
* and if we have more futures than we had when we started to dispatch, if so,
3029
* means that [DataLoader]s were chained, so we need to dispatch the dataLoaderRegistry.
30+
*
31+
* @throws MissingInstrumentationStateException if a [SyncExecutionExhaustedState] instance is not present in the graphQLContext
3132
*/
3233
fun <V> CompletableFuture<V>.dispatchIfNeeded(
3334
environment: DataFetchingEnvironment
3435
): CompletableFuture<V> {
35-
val dataLoaderRegistry = environment.dataLoaderRegistry as? KotlinDataLoaderRegistry ?: throw MissingKotlinDataLoaderRegistryException()
36+
val dataLoaderRegistry = environment.dataLoaderRegistry
37+
val syncExecutionExhaustedState = environment
38+
.graphQlContext
39+
.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
40+
?: throw MissingInstrumentationStateException()
3641

37-
if (dataLoaderRegistry.dataLoadersInvokedOnDispatch()) {
38-
when {
39-
environment.graphQlContext.hasKey(SyncExecutionExhaustedState::class) -> {
40-
environment
41-
.graphQlContext.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
42-
.ifAllSyncExecutionsExhausted {
43-
dataLoaderRegistry.dispatchAll()
44-
}
45-
}
46-
else -> throw MissingInstrumentationStateException()
47-
}
42+
if (syncExecutionExhaustedState.dataLoadersInvokedAfterDispatch() && syncExecutionExhaustedState.allSyncExecutionsExhausted()) {
43+
dataLoaderRegistry.dispatchAll()
4844
}
4945
return this
5046
}
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,29 @@
1616

1717
package com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion
1818

19-
import com.expediagroup.graphql.dataloader.KotlinDataLoaderInstrumentation
2019
import com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.state.SyncExecutionExhaustedState
20+
import org.dataloader.DataLoader
21+
import org.dataloader.instrumentation.DataLoaderInstrumentation
2122
import org.dataloader.instrumentation.DataLoaderInstrumentationContext
2223

23-
internal class DataLoaderInstrumentationContext(
24+
class DataLoaderSyncExecutionExhaustedDataLoaderDispatcher(
2425
private val syncExecutionExhaustedState: SyncExecutionExhaustedState
25-
): DataLoaderInstrumentationContext<Any?> {
26-
override fun onDispatched() {
27-
syncExecutionExhaustedState.onDataLoaderPromiseDispatched()
28-
}
29-
override fun onCompleted(result: Any?, t: Throwable?) {
30-
syncExecutionExhaustedState.onDataLoaderPromiseCompleted()
31-
}
32-
}
26+
): DataLoaderInstrumentation {
27+
28+
private val contextForSyncExecutionExhausted: DataLoaderInstrumentationContext<Any?> =
29+
object: DataLoaderInstrumentationContext<Any?> {
30+
override fun onDispatched() {
31+
syncExecutionExhaustedState.onDataLoaderPromiseDispatched()
32+
}
33+
override fun onCompleted(result: Any?, t: Throwable?) {
34+
syncExecutionExhaustedState.onDataLoaderPromiseCompleted(result, t)
35+
}
36+
}
3337

34-
class DataLoaderInstrumentationForSyncExecutionExhausted(
35-
syncExecutionExhaustedState: SyncExecutionExhaustedState
36-
) : KotlinDataLoaderInstrumentation(
37-
DataLoaderInstrumentationContext(syncExecutionExhaustedState)
38-
)
38+
override fun beginLoad(
39+
dataLoader: DataLoader<*, *>,
40+
key: Any,
41+
loadContext: Any?
42+
): DataLoaderInstrumentationContext<Any?> =
43+
contextForSyncExecutionExhausted
44+
}

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/DataLoaderSyncExecutionExhaustedInstrumentation.kt

-47
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 Expedia, Inc
2+
* Copyright 2025 Expedia, Inc
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,15 +14,13 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.execution
17+
package com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion
1818

1919
import com.expediagroup.graphql.dataloader.instrumentation.extensions.isMutation
2020
import com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.state.SyncExecutionExhaustedState
21-
import graphql.ExecutionInput
2221
import graphql.ExecutionResult
2322
import graphql.GraphQLContext
2423
import graphql.execution.ExecutionContext
25-
import graphql.execution.ExecutionId
2624
import graphql.execution.instrumentation.ExecuteObjectInstrumentationContext
2725
import graphql.execution.instrumentation.ExecutionStrategyInstrumentationContext
2826
import graphql.execution.instrumentation.FieldFetchingInstrumentationContext
@@ -32,41 +30,22 @@ import graphql.execution.instrumentation.InstrumentationState
3230
import graphql.execution.instrumentation.SimplePerformantInstrumentation
3331
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters
3432
import graphql.execution.instrumentation.parameters.InstrumentationExecutionStrategyParameters
33+
import graphql.execution.instrumentation.parameters.InstrumentationFieldCompleteParameters
3534
import graphql.execution.instrumentation.parameters.InstrumentationFieldFetchParameters
3635

37-
/**
38-
* typealias that represents the signature of a callback that will be executed when sync execution is exhausted
39-
*/
40-
internal typealias OnSyncExecutionExhaustedCallback = (List<ExecutionId>) -> Unit
41-
4236
/**
4337
* Custom GraphQL [Instrumentation] that calculate the synchronous execution exhaustion
4438
* of all GraphQL operations sharing the same [GraphQLContext]
4539
*/
46-
abstract class AbstractSyncExecutionExhaustedInstrumentation : SimplePerformantInstrumentation() {
47-
/**
48-
* This is invoked each time instrumentation attempts to calculate exhaustion state, this can be called from either
49-
* `beginFieldField.dispatch` or `beginFieldFetch.complete`.
50-
*
51-
* @param parameters contains information of which [ExecutionInput] caused the calculation
52-
* @return [OnSyncExecutionExhaustedCallback] to invoke when the synchronous execution of all operations was exhausted
53-
*/
54-
abstract fun getOnSyncExecutionExhaustedCallback(
55-
parameters: SyncExecutionExhaustedInstrumentationParameters
56-
): OnSyncExecutionExhaustedCallback
40+
class GraphQLSyncExecutionExhaustedDataLoaderDispatcher : SimplePerformantInstrumentation() {
5741

5842
override fun beginExecution(
5943
parameters: InstrumentationExecutionParameters,
6044
state: InstrumentationState?
6145
): InstrumentationContext<ExecutionResult>? =
6246
parameters.graphQLContext
6347
?.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
64-
?.beginExecution(
65-
parameters,
66-
this.getOnSyncExecutionExhaustedCallback(
67-
SyncExecutionExhaustedInstrumentationParameters(parameters.executionInput)
68-
)
69-
)
48+
?.beginExecution(parameters)
7049

7150
override fun beginExecutionStrategy(
7251
parameters: InstrumentationExecutionStrategyParameters,
@@ -94,10 +73,13 @@ abstract class AbstractSyncExecutionExhaustedInstrumentation : SimplePerformantI
9473
): FieldFetchingInstrumentationContext? =
9574
parameters.executionContext.takeUnless(ExecutionContext::isMutation)
9675
?.graphQLContext?.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
97-
?.beginFieldFetching(
98-
parameters,
99-
this.getOnSyncExecutionExhaustedCallback(
100-
SyncExecutionExhaustedInstrumentationParameters(parameters.executionContext.executionInput)
101-
)
102-
)
76+
?.beginFieldFetching(parameters)
77+
78+
override fun beginFieldCompletion(
79+
parameters: InstrumentationFieldCompleteParameters,
80+
state: InstrumentationState?
81+
): InstrumentationContext<Any>? {
82+
println("field completed: ${parameters.fetchedValue}")
83+
return super.beginFieldCompletion(parameters, state)
84+
}
10385
}

executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/execution/SyncExecutionExhaustedInstrumentationParameters.kt

-27
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2025 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.dataloader.instrumentation.syncexhaustion.state
18+
19+
import java.util.concurrent.CompletableFuture
20+
import java.util.concurrent.atomic.AtomicInteger
21+
22+
class DataLoaderDispatchState {
23+
24+
// count number of loads
25+
private val onLoadCounter: AtomicInteger = AtomicInteger(0)
26+
// take snapshot of load counts when dispatchAll is invoked, then on every load complete decrease it
27+
private val onDispatchLoadCounter: AtomicInteger = AtomicInteger(0)
28+
29+
fun takeSnapshot() {
30+
onDispatchLoadCounter.set(onLoadCounter.get())
31+
onLoadCounter.set(0)
32+
}
33+
34+
/**
35+
* Will signal when all dependants of all [onDispatchFutures] were invoked,
36+
* [onDispatchFutures] is the list of all [CompletableFuture]s that will complete because the [dispatchAll]
37+
* method was invoked
38+
*
39+
* @return weather or not all futures gathered before [dispatchAll] were handled
40+
*/
41+
fun onDispatchFuturesHandled(): Boolean =
42+
onDispatchLoadCounter.get() == 0
43+
44+
/**
45+
* Will signal if more dataLoaders where invoked during the [dispatchAll] invocation
46+
* @return weather or not futures where loaded during [dispatchAll]
47+
*/
48+
fun dataLoadersInvokedAfterDispatch(): Boolean =
49+
onLoadCounter.get() > 0
50+
51+
fun onDataLoaderPromiseDispatched() {
52+
onLoadCounter.incrementAndGet()
53+
}
54+
55+
fun onDataLoaderPromiseCompleted(result: Any?, t: Throwable?) {
56+
onDispatchLoadCounter.decrementAndGet()
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 Expedia, Inc
2+
* Copyright 2025 Expedia, Inc
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,7 +41,7 @@ import java.util.concurrent.ConcurrentHashMap
4141
* }
4242
* ```
4343
*
44-
* When the [ExecutionBatchState] will be considered exhausted will have this state:
44+
* The [ExecutionInputState] will be considered exhausted will have this state:
4545
*
4646
* ```
4747
* {
@@ -61,7 +61,7 @@ import java.util.concurrent.ConcurrentHashMap
6161
*
6262
* once astronaut [DataFetcher] completes his value starting a new ExecutionStrategy for `astronaut` field for
6363
* the resolution of `id`, and `name`, once these fields are dispatched and completed
64-
* and exhaustion will be calculated again and the state of [ExecutionBatchState] will be the following:
64+
* and exhaustion will be calculated again and the state of [ExecutionInputState] will be the following:
6565
*
6666
* ```
6767
* {
@@ -94,8 +94,9 @@ import java.util.concurrent.ConcurrentHashMap
9494
* }
9595
* ```
9696
*/
97-
class ExecutionBatchState {
98-
97+
class ExecutionInputState(
98+
val executionInput: ExecutionInput,
99+
) {
99100
private val executionStrategiesState: ConcurrentHashMap<String, ExecutionStrategyState> = ConcurrentHashMap()
100101

101102
/**
@@ -177,7 +178,7 @@ class ExecutionBatchState {
177178
}
178179

179180
/**
180-
* Recursively calculate the sync state of this [ExecutionBatchState],
181+
* Recursively calculate the sync state of this [ExecutionInputState],
181182
* by traversing all [executionStrategiesState].
182183
*
183184
* @param executionStrategyPath the string representation of the executionStrategy which state will be calculated,

0 commit comments

Comments
 (0)