diff --git a/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_2_0/6784-remove-batch2-paginator.yaml b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_2_0/6784-remove-batch2-paginator.yaml new file mode 100644 index 000000000000..9a8c20b18ed0 --- /dev/null +++ b/hapi-fhir-docs/src/main/resources/ca/uhn/hapi/fhir/changelog/8_2_0/6784-remove-batch2-paginator.yaml @@ -0,0 +1,5 @@ +--- +type: fix +issue: 6783 +title: "The batch2 job processor will no longer use paginator to queue workchunks as 'ready' due to performance bottleneck +around saturation of available workchunk-processors or consumers" diff --git a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkErrorActionsTests.java b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkErrorActionsTests.java index 21d85b9f3d25..e406758f7dac 100644 --- a/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkErrorActionsTests.java +++ b/hapi-fhir-storage-batch2-test-utilities/src/main/java/ca/uhn/hapi/fhir/batch2/test/IWorkChunkErrorActionsTests.java @@ -119,7 +119,7 @@ default void errorRetry_maxErrors_movesToFailed() { assertEquals(WorkChunkStatusEnum.FAILED, chunk.getStatus()); assertEquals(4, chunk.getErrorCount()); assertThat(chunk.getErrorMessage()).as("Error message contains last error").contains(ERROR_MESSAGE_C); - assertThat(chunk.getErrorMessage()).as("Error message contains error count and complaint").contains("many errors: 4"); + assertThat(chunk.getErrorMessage()).as("Error message contains error count and complaint").contains("many errors (4)"); } } diff --git a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java index 199686301c82..6539de7f9a3d 100644 --- a/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java +++ b/hapi-fhir-storage-batch2/src/main/java/ca/uhn/fhir/batch2/maintenance/JobInstanceProcessor.java @@ -33,7 +33,6 @@ import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator; import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater; import ca.uhn.fhir.model.api.IModelJson; -import ca.uhn.fhir.model.api.PagingIterator; import ca.uhn.fhir.util.Logs; import ca.uhn.fhir.util.StopWatch; import org.apache.commons.lang3.time.DateUtils; @@ -41,7 +40,6 @@ import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; -import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.Set; @@ -266,17 +264,6 @@ private boolean canAdvanceGatedJob(JobInstance theInstance) { return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED)); } - protected PagingIterator getReadyChunks() { - return new PagingIterator<>(WORK_CHUNK_METADATA_BATCH_SIZE, (index, batchsize, consumer) -> { - Pageable pageable = Pageable.ofSize(batchsize).withPage(index); - Page results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates( - pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY)); - for (WorkChunkMetadata metadata : results) { - consumer.accept(metadata); - } - }); - } - /** * Trigger the reduction step for the given job instance. Reduction step chunks should never be queued. */ @@ -292,27 +279,56 @@ private void triggerReductionStep(JobInstance theInstance, JobWorkCursor theJobDefinition) { - Iterator iter = getReadyChunks(); - - int counter = 0; - while (iter.hasNext()) { - WorkChunkMetadata metadata = iter.next(); - - /* - * For each chunk id - * * Move to QUEUE'd - * * Send to topic - * * flush changes - * * commit - */ - updateChunkAndSendToQueue(metadata); - counter++; - } - ourLog.debug( - "Encountered {} READY work chunks for job {} of type {}", - counter, - theJobInstance.getInstanceId(), - theJobDefinition.getJobDefinitionId()); + // timebox to prevent unusual amount of time updating resources + long minuteMS = 60 * 1000; + long deadline = System.currentTimeMillis() + DateUtils.MILLIS_PER_MINUTE; + boolean done = false; + do { + // Paginator has an issue keeping worker nodes saturated due to processing workloads a page at a time + // PageNumber '0' is hardcoded to re-saturate 10k records to process at a time instead + // Each consecutive request for results will be the next 10k records needing updating (no paging needed) + // Recommend this eventually moves to java stream once architecture supports the change + Pageable pageable = Pageable.ofSize(WORK_CHUNK_METADATA_BATCH_SIZE).withPage(0); + Page results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates( + pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY)); + if (results.isEmpty()) { + done = true; + } + int counter = 0; + + if (!done) { + for (WorkChunkMetadata metadata : results) { + /* + * For each chunk id + * * Move to QUEUE'd + * * Send to topic + * * flush changes + * * commit + */ + updateChunkAndSendToQueue(metadata); + counter++; + } + } + // catch to prevent unlimited looping + if (counter < WORK_CHUNK_METADATA_BATCH_SIZE) { + done = true; + } + ourLog.debug( + "Encountered {} READY work chunks for job {} of type {}", + counter, + theJobInstance.getInstanceId(), + theJobDefinition.getJobDefinitionId()); + + // Log warning and exit if deadline is exceeded + if (System.currentTimeMillis() >= deadline) { + ourLog.warn( + "Deadline exceeded while processing job {} of type {}. Some chunks may not have been processed.", + theJobInstance.getInstanceId(), + theJobDefinition.getJobDefinitionId()); + break; + } + + } while (!done); } /**