Skip to content

Batch2 Remove Paging Iterator #6783

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
type: fix
issue: 6783
title: "The batch2 job processor will no longer use paginator to queue workchunks as 'ready' due to performance bottleneck
around saturation of available workchunk-processors or consumers"
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ default void errorRetry_maxErrors_movesToFailed() {
assertEquals(WorkChunkStatusEnum.FAILED, chunk.getStatus());
assertEquals(4, chunk.getErrorCount());
assertThat(chunk.getErrorMessage()).as("Error message contains last error").contains(ERROR_MESSAGE_C);
assertThat(chunk.getErrorMessage()).as("Error message contains error count and complaint").contains("many errors: 4");
assertThat(chunk.getErrorMessage()).as("Error message contains error count and complaint").contains("many errors (4)");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@
import ca.uhn.fhir.batch2.progress.JobInstanceProgressCalculator;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.model.api.PagingIterator;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -266,17 +264,6 @@ private boolean canAdvanceGatedJob(JobInstance theInstance) {
return workChunkStatuses.equals(Set.of(WorkChunkStatusEnum.COMPLETED));
}

protected PagingIterator<WorkChunkMetadata> getReadyChunks() {
return new PagingIterator<>(WORK_CHUNK_METADATA_BATCH_SIZE, (index, batchsize, consumer) -> {
Pageable pageable = Pageable.ofSize(batchsize).withPage(index);
Page<WorkChunkMetadata> results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(
pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY));
for (WorkChunkMetadata metadata : results) {
consumer.accept(metadata);
}
});
}

/**
* Trigger the reduction step for the given job instance. Reduction step chunks should never be queued.
*/
Expand All @@ -292,27 +279,56 @@ private void triggerReductionStep(JobInstance theInstance, JobWorkCursor<?, ?, ?
* for processing.
*/
private void enqueueReadyChunks(JobInstance theJobInstance, JobDefinition<?> theJobDefinition) {
Iterator<WorkChunkMetadata> iter = getReadyChunks();

int counter = 0;
while (iter.hasNext()) {
WorkChunkMetadata metadata = iter.next();

/*
* For each chunk id
* * Move to QUEUE'd
* * Send to topic
* * flush changes
* * commit
*/
updateChunkAndSendToQueue(metadata);
counter++;
}
ourLog.debug(
"Encountered {} READY work chunks for job {} of type {}",
counter,
theJobInstance.getInstanceId(),
theJobDefinition.getJobDefinitionId());
// timebox to prevent unusual amount of time updating resources
long minuteMS = 60 * 1000;
long deadline = System.currentTimeMillis() + DateUtils.MILLIS_PER_MINUTE;
boolean done = false;
do {
// Paginator has an issue keeping worker nodes saturated due to processing workloads a page at a time
// PageNumber '0' is hardcoded to re-saturate 10k records to process at a time instead
// Each consecutive request for results will be the next 10k records needing updating (no paging needed)
// Recommend this eventually moves to java stream once architecture supports the change
Pageable pageable = Pageable.ofSize(WORK_CHUNK_METADATA_BATCH_SIZE).withPage(0);
Page<WorkChunkMetadata> results = myJobPersistence.fetchAllWorkChunkMetadataForJobInStates(
pageable, myInstanceId, Set.of(WorkChunkStatusEnum.READY));
if (results.isEmpty()) {
done = true;
}
int counter = 0;

if (!done) {
for (WorkChunkMetadata metadata : results) {
/*
* For each chunk id
* * Move to QUEUE'd
* * Send to topic
* * flush changes
* * commit
*/
updateChunkAndSendToQueue(metadata);
counter++;
}
}
// catch to prevent unlimited looping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: This counting and check probably isn't required since you do the empty check above. The next page after a short result will be empty.

if (counter < WORK_CHUNK_METADATA_BATCH_SIZE) {
done = true;
}
ourLog.debug(
"Encountered {} READY work chunks for job {} of type {}",
counter,
theJobInstance.getInstanceId(),
theJobDefinition.getJobDefinitionId());

// Log warning and exit if deadline is exceeded
if (System.currentTimeMillis() >= deadline) {
ourLog.warn(
"Deadline exceeded while processing job {} of type {}. Some chunks may not have been processed.",
theJobInstance.getInstanceId(),
theJobDefinition.getJobDefinitionId());
break;
}

} while (!done);
}

/**
Expand Down
Loading