From 41f832af9d9be8a4c87ce55ee3fa4905a9f4278c Mon Sep 17 00:00:00 2001 From: Justin McKelvy <60718638+Capt-Mac@users.noreply.github.com> Date: Fri, 7 Mar 2025 10:12:23 -0700 Subject: [PATCH 01/11] modify enqueueReadyChunks without paginator to keep consumers saturated --- .../maintenance/JobInstanceProcessor.java | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 199686301c82..6a41bc7f774d 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -33,7 +33,6 @@ import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator; import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; import ca.uhn.fhir.model.api.IModelJson; -import ca.uhn.fhir.model.api.PagingIterator; import ca.uhn.fhir.util.Logs; import ca.uhn.fhir.util.StopWatch; import org.apache.commons.lang3.time.DateUtils; @@ -266,17 +265,6 @@ private boolean canAdvanceGatedJob(JobInstance theInstance) { return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED)); } - protected PagingIterator getReadyChunks() { - return new PagingIterator<>(WORK_CHUNK_METADATA_BATCH_SIZE, (index, batchsize, consumer) -> { - Pageable pageable = Pageable.ofSize(batchsize).withPage(index); - Page results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates( - pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY)); - for (WorkChunkMetadata metadata : results) { - consumer.accept(metadata); - } - }); - } - /** * Trigger the reduction step for the given job instance. Reduction step chunks should never be queued. */ @@ -291,28 +279,40 @@ private void triggerReductionStep(JobInstance theInstance, JobWorkCursor theJobDefinition) { - Iterator iter = getReadyChunks(); - - int counter = 0; - while (iter.hasNext()) { - WorkChunkMetadata metadata = iter.next(); - - /* - * For each chunk id - * * Move to QUEUE'd - * * Send to topic - * * flush changes - * * commit - */ - updateChunkAndSendToQueue(metadata); - counter++; - } + long deadline = System.currentTimeMillis() + 60 * 1000; + boolean done = false; + do { + // PageNumber '0' is hardcoded to re-saturate 10k records to process at a time instead of fixed pages + // This allows for all nodes to remain busy instead of processing each page's worth of data at a time + Pageable pageable = Pageable.ofSize(WORK_CHUNK_METADATA_BATCH_SIZE).withPage(0); + Page results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates( + pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY)); + if (results.isEmpty()) { + break; + } + Iterator iter = results.iterator(); + int counter = 0; + while (iter.hasNext()) { + /* + * For each chunk id + * * Move to QUEUE'd + * * Send to topic + * * flush changes + * * commit + */ + WorkChunkMetadata metadata = iter.next(); + updateChunkAndSendToQueue(metadata); + counter++; + } ourLog.debug( "Encountered {} READY work chunks for job {} of type {}", counter, theJobInstance.getInstanceId(), theJobDefinition.getJobDefinitionId()); + // timebox update of 10k records + } while(deadline>System.currentTimeMillis() && !done); } /** From 0e8d14646e17db3d547a5eb0d90c3a992b40cc78 Mon Sep 17 00:00:00 2001 From: Justin McKelvy <60718638+Capt-Mac@users.noreply.github.com> Date: Fri, 7 Mar 2025 10:24:47 -0700 Subject: [PATCH 02/11] spotless edits --- .../maintenance/JobInstanceProcessor.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 6a41bc7f774d..f0d9d7c51051 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -279,7 +279,6 @@ private void triggerReductionStep(JobInstance theInstance, JobWorkCursor theJobDefinition) { long deadline = System.currentTimeMillis() + 60 * 1000; boolean done = false; @@ -288,10 +287,10 @@ private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition the // This allows for all nodes to remain busy instead of processing each page's worth of data at a time Pageable pageable = Pageable.ofSize(WORK_CHUNK_METADATA_BATCH_SIZE).withPage(0); Page results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates( - pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY)); + pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY)); if (results.isEmpty()) { break; - } + } Iterator iter = results.iterator(); int counter = 0; while (iter.hasNext()) { @@ -306,13 +305,13 @@ private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition the updateChunkAndSendToQueue(metadata); counter++; } - ourLog.debug( - "Encountered {} READY work chunks for job {} of type {}", - counter, - theJobInstance.getInstanceId(), - theJobDefinition.getJobDefinitionId()); - // timebox update of 10k records - } while(deadline>System.currentTimeMillis() && !done); + ourLog.debug( + "Encountered {} READY work chunks for job {} of type {}", + counter, + theJobInstance.getInstanceId(), + theJobDefinition.getJobDefinitionId()); + // timebox update of 10k records + } while (deadline > System.currentTimeMillis() && !done); } /** From ef7fc41c6834600d1f1ba4677aab86fb0ad6f0a6 Mon Sep 17 00:00:00 2001 From: Justin McKelvy <60718638+Capt-Mac@users.noreply.github.com> Date: Fri, 7 Mar 2025 11:25:33 -0700 Subject: [PATCH 03/11] update assertion for error count --- .../uhn/hapi/fhir/batch2/test/IWorkChunkErrorActionsTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkErrorActionsTests.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkErrorActionsTests.java index 21d85b9f3d25..e406758f7dac 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkErrorActionsTests.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkErrorActionsTests.java @@ -119,7 +119,7 @@ default void errorRetry_maxErrors_movesToFailed() { assertEquals(WorkChunkStatusEnum.FAILED, chunk.getStatus()); assertEquals(4, chunk.getErrorCount()); assertThat(chunk.getErrorMessage()).as("Error message contains last error").contains(ERROR_MESSAGE_C); - assertThat(chunk.getErrorMessage()).as("Error message contains error count and complaint").contains("many errors: 4"); + assertThat(chunk.getErrorMessage()).as("Error message contains error count and complaint").contains("many errors (4)"); } } From 1746795b09a9b0113a464bf3dfe0aff28ec72043 Mon Sep 17 00:00:00 2001 From: Justin McKelvy <60718638+Capt-Mac@users.noreply.github.com> Date: Mon, 10 Mar 2025 10:31:45 -0600 Subject: [PATCH 04/11] add catch to prevent infinite looping --- .../maintenance/JobInstanceProcessor.java | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index f0d9d7c51051..718b530ed7d1 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -280,37 +280,47 @@ private void triggerReductionStep(JobInstance theInstance, JobWorkCursor theJobDefinition) { + // timebox to prevent unusual amount of time updating resources long deadline = System.currentTimeMillis() + 60 * 1000; boolean done = false; do { - // PageNumber '0' is hardcoded to re-saturate 10k records to process at a time instead of fixed pages - // This allows for all nodes to remain busy instead of processing each page's worth of data at a time + // PageNumber '0' is hardcoded to re-saturate 10k records to process at a time instead of fixed smaller + // pages + // once the first processed page of records is completed, the next 10k needing processing will automatically + // be retrieved Pageable pageable = Pageable.ofSize(WORK_CHUNK_METADATA_BATCH_SIZE).withPage(0); Page results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates( pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY)); if (results.isEmpty()) { - break; + done = true; } - Iterator iter = results.iterator(); int counter = 0; - while (iter.hasNext()) { - /* - * For each chunk id - * * Move to QUEUE'd - * * Send to topic - * * flush changes - * * commit - */ - WorkChunkMetadata metadata = iter.next(); - updateChunkAndSendToQueue(metadata); - counter++; + + if (!done) { + Iterator iter = results.iterator(); + while (iter.hasNext()) { + /* + * For each chunk id + * * Move to QUEUE'd + * * Send to topic + * * flush changes + * * commit + */ + WorkChunkMetadata metadata = iter.next(); + updateChunkAndSendToQueue(metadata); + counter++; + } + } + // catch to prevent unlimited looping + if (counter < WORK_CHUNK_METADATA_BATCH_SIZE) { + done = true; } ourLog.debug( "Encountered {} READY work chunks for job {} of type {}", counter, theJobInstance.getInstanceId(), theJobDefinition.getJobDefinitionId()); - // timebox update of 10k records + // timebox update of 10k records } while (deadline > System.currentTimeMillis() && !done); } From d30575dda48080f72f77707b157490c7d6c1ad17 Mon Sep 17 00:00:00 2001 From: Justin McKelvy <60718638+Capt-Mac@users.noreply.github.com> Date: Mon, 10 Mar 2025 10:46:40 -0600 Subject: [PATCH 05/11] changelog and comments --- .../changelog/8_2_0/6784-remove-batch2-paginator.yaml | 5 +++++ .../uhn/fhir/batch2/maintenance/JobInstanceProcessor.java | 8 ++++---- 2 files changed, 9 insertions(+), 4 deletions(-) create mode 100644 hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_2_0/6784-remove-batch2-paginator.yaml diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_2_0/6784-remove-batch2-paginator.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_2_0/6784-remove-batch2-paginator.yaml new file mode 100644 index 000000000000..9a8c20b18ed0 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_2_0/6784-remove-batch2-paginator.yaml @@ -0,0 +1,5 @@ +--- +type: fix +issue: 6783 +title: "The batch2 job processor will no longer use paginator to queue workchunks as 'ready' due to performance bottleneck +around saturation of available workchunk-processors or consumers" diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 718b530ed7d1..6ee506d19328 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -284,10 +284,10 @@ private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition the long deadline = System.currentTimeMillis() + 60 * 1000; boolean done = false; do { - // PageNumber '0' is hardcoded to re-saturate 10k records to process at a time instead of fixed smaller - // pages - // once the first processed page of records is completed, the next 10k needing processing will automatically - // be retrieved + // Paginator has an issue keeping worker nodes saturated due to processing workloads a page at a time + // PageNumber '0' is hardcoded to re-saturate 10k records to process at a time instead + // Each consecutive request for results will be the next 10k records needing updating (no paging needed) + // Recommend this eventually moves to java stream once architecture supports the change Pageable pageable = Pageable.ofSize(WORK_CHUNK_METADATA_BATCH_SIZE).withPage(0); Page results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates( pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY)); From c9810e14e36a6014aecfa6b8f79d07ba4ced90e7 Mon Sep 17 00:00:00 2001 From: Justin McKelvy <60718638+Capt-Mac@users.noreply.github.com> Date: Mon, 10 Mar 2025 11:39:33 -0600 Subject: [PATCH 06/11] log warning for missed deadline --- .../fhir/batch2/maintenance/JobInstanceProcessor.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 6ee506d19328..26a2f3455da4 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -320,7 +320,15 @@ private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition the counter, theJobInstance.getInstanceId(), theJobDefinition.getJobDefinitionId()); - // timebox update of 10k records + + // Log warning if deadline is exceeded + if (System.currentTimeMillis() >= deadline) { + ourLog.warn( + "Deadline exceeded while processing job {} of type {}. Some chunks may not have been processed.", + theJobInstance.getInstanceId(), + theJobDefinition.getJobDefinitionId()); + } + } while (deadline > System.currentTimeMillis() && !done); } From ec0183024f72a7f4ce15867a0cb976837eaafe89 Mon Sep 17 00:00:00 2001 From: Justin McKelvy <60718638+Capt-Mac@users.noreply.github.com> Date: Mon, 10 Mar 2025 11:43:26 -0600 Subject: [PATCH 07/11] convert while to for loop --- .../ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 26a2f3455da4..3e228314b49c 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -297,8 +297,7 @@ private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition the int counter = 0; if (!done) { - Iterator iter = results.iterator(); - while (iter.hasNext()) { + for (WorkChunkMetadata metadata : results) { /* * For each chunk id * * Move to QUEUE'd @@ -306,7 +305,6 @@ private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition the * * flush changes * * commit */ - WorkChunkMetadata metadata = iter.next(); updateChunkAndSendToQueue(metadata); counter++; } From 891464ca608ac9d57a8d12ac8a6c8897f78a76ed Mon Sep 17 00:00:00 2001 From: Justin McKelvy <60718638+Capt-Mac@users.noreply.github.com> Date: Mon, 10 Mar 2025 11:45:29 -0600 Subject: [PATCH 08/11] minute variable for deadline --- .../ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 3e228314b49c..7e711081cf27 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -281,7 +281,8 @@ private void triggerReductionStep(JobInstance theInstance, JobWorkCursor theJobDefinition) { // timebox to prevent unusual amount of time updating resources - long deadline = System.currentTimeMillis() + 60 * 1000; + long minuteMS = 60 * 1000; + long deadline = System.currentTimeMillis() + minuteMS; boolean done = false; do { // Paginator has an issue keeping worker nodes saturated due to processing workloads a page at a time From 6fa34c5111a00a51726058c4a1d829e29bea964a Mon Sep 17 00:00:00 2001 From: Justin McKelvy <60718638+Capt-Mac@users.noreply.github.com> Date: Mon, 10 Mar 2025 11:47:51 -0600 Subject: [PATCH 09/11] remove unused import --- .../ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 7e711081cf27..1e60942b6554 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -40,7 +40,6 @@ import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; -import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; From 89cc2aca25266f9312360596304fbe5e46feb190 Mon Sep 17 00:00:00 2001 From: Justin McKelvy <60718638+Capt-Mac@users.noreply.github.com> Date: Mon, 10 Mar 2025 19:52:16 -0600 Subject: [PATCH 10/11] Update hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java Co-authored-by: James Agnew --- .../ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 1e60942b6554..554fa0eb36fb 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -281,7 +281,7 @@ private void triggerReductionStep(JobInstance theInstance, JobWorkCursor theJobDefinition) { // timebox to prevent unusual amount of time updating resources long minuteMS = 60 * 1000; - long deadline = System.currentTimeMillis() + minuteMS; + long deadline = System.currentTimeMillis() + DateUtils.MILLIS_PER_MINUTE; boolean done = false; do { // Paginator has an issue keeping worker nodes saturated due to processing workloads a page at a time From 4b5b51e9bedbb7435f6ee6d35445043515e0f6a6 Mon Sep 17 00:00:00 2001 From: Justin McKelvy <60718638+Capt-Mac@users.noreply.github.com> Date: Tue, 11 Mar 2025 07:36:34 -0600 Subject: [PATCH 11/11] add break when processing deadline is met --- .../ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 1e60942b6554..c33b5bed591a 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -319,15 +319,16 @@ private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition the theJobInstance.getInstanceId(), theJobDefinition.getJobDefinitionId()); - // Log warning if deadline is exceeded + // Log warning and exit if deadline is exceeded if (System.currentTimeMillis() >= deadline) { ourLog.warn( "Deadline exceeded while processing job {} of type {}. Some chunks may not have been processed.", theJobInstance.getInstanceId(), theJobDefinition.getJobDefinitionId()); + break; } - } while (deadline > System.currentTimeMillis() && !done); + } while (!done); } /**