Skip to content

Commit cf3ebdf

Browse files
bazel-iofmeum
andauthored
[8.1.0] Use Single.using instead of onError/onSuccess for async uploads (#25251)
Users have reported hangs in Bazel's asynchronous remote cache uploads that may be happening because neither `onSuccess` nor `onError` is called on the observer. Work towards #25232 Closes #25231. PiperOrigin-RevId: 725235495 Change-Id: I20c3aaa2ee57a52041dea0b3c17356445f2bbc34 Commit d4c9b92 Co-authored-by: Fabian Meumertzheim <[email protected]>
1 parent 3b0d75a commit cf3ebdf

File tree

1 file changed

+28
-42
lines changed

1 file changed

+28
-42
lines changed

src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java

Lines changed: 28 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,8 @@
129129
import com.google.protobuf.ExtensionRegistry;
130130
import com.google.protobuf.Message;
131131
import io.grpc.Status.Code;
132-
import io.reactivex.rxjava3.annotations.NonNull;
133132
import io.reactivex.rxjava3.core.Scheduler;
134133
import io.reactivex.rxjava3.core.Single;
135-
import io.reactivex.rxjava3.core.SingleObserver;
136-
import io.reactivex.rxjava3.disposables.Disposable;
137134
import io.reactivex.rxjava3.schedulers.Schedulers;
138135
import java.io.FileNotFoundException;
139136
import java.io.IOException;
@@ -160,6 +157,7 @@
160157
import java.util.concurrent.Phaser;
161158
import java.util.concurrent.Semaphore;
162159
import java.util.concurrent.atomic.AtomicBoolean;
160+
import java.util.concurrent.atomic.AtomicLong;
163161
import java.util.concurrent.atomic.AtomicReference;
164162
import java.util.stream.Stream;
165163
import javax.annotation.Nullable;
@@ -1794,45 +1792,33 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult, Runnable
17941792

17951793
if (remoteOptions.remoteCacheAsync
17961794
&& !action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) {
1797-
Single.using(
1798-
combinedCache::retain,
1799-
combinedCache ->
1800-
buildUploadManifestAsync(action, spawnResult)
1801-
.flatMap(
1802-
manifest ->
1803-
manifest.uploadAsync(
1804-
action.getRemoteActionExecutionContext(),
1805-
combinedCache,
1806-
reporter)),
1807-
CombinedCache::release)
1808-
.subscribeOn(scheduler)
1809-
.subscribe(
1810-
new SingleObserver<ActionResult>() {
1811-
long startTime = 0;
1812-
1813-
@Override
1814-
public void onSubscribe(@NonNull Disposable d) {
1815-
backgroundTaskPhaser.register();
1816-
startTime = Profiler.nanoTimeMaybe();
1817-
}
1818-
1819-
@Override
1820-
public void onSuccess(@NonNull ActionResult actionResult) {
1821-
Profiler.instance()
1822-
.completeTask(startTime, ProfilerTask.UPLOAD_TIME, "upload outputs");
1823-
backgroundTaskPhaser.arriveAndDeregister();
1824-
onUploadComplete.run();
1825-
}
1826-
1827-
@Override
1828-
public void onError(@NonNull Throwable e) {
1829-
Profiler.instance()
1830-
.completeTask(startTime, ProfilerTask.UPLOAD_TIME, "upload outputs");
1831-
backgroundTaskPhaser.arriveAndDeregister();
1832-
reportUploadError(e);
1833-
onUploadComplete.run();
1834-
}
1835-
});
1795+
AtomicLong startTime = new AtomicLong();
1796+
var unused =
1797+
Single.using(
1798+
() -> {
1799+
backgroundTaskPhaser.register();
1800+
CombinedCache cache = combinedCache.retain();
1801+
startTime.set(Profiler.nanoTimeMaybe());
1802+
return cache;
1803+
},
1804+
combinedCache ->
1805+
buildUploadManifestAsync(action, spawnResult)
1806+
.flatMap(
1807+
manifest ->
1808+
manifest.uploadAsync(
1809+
action.getRemoteActionExecutionContext(),
1810+
combinedCache,
1811+
reporter)),
1812+
cacheResource -> {
1813+
Profiler.instance()
1814+
.completeTask(startTime.get(), ProfilerTask.UPLOAD_TIME, "upload outputs");
1815+
backgroundTaskPhaser.arriveAndDeregister();
1816+
onUploadComplete.run();
1817+
cacheResource.release();
1818+
},
1819+
/* eager= */ false)
1820+
.subscribeOn(scheduler)
1821+
.subscribe(result -> {}, this::reportUploadError);
18361822
} else {
18371823
try (SilentCloseable c =
18381824
Profiler.instance().profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {

0 commit comments

Comments
 (0)