Skip to content

Commit 0bddcab

Browse files
authored
[7.6.1] Don't leak in-flight execution in RemoteSpawnCache (#25688)
The in-flight executions need to be cleaned up also in case of a cache hit or an exception while interacting with the cache, otherwise `inFlightExecutions` grows over time retaining large `Spawn`s. Closes #25668. PiperOrigin-RevId: 740058969 Change-Id: I42045219510df0347e38234d6d3bd1ebb6167d9b (cherry picked from commit 0f957c1)
1 parent 080d7d9 commit 0bddcab

File tree

3 files changed

+180
-18
lines changed

3 files changed

+180
-18
lines changed

src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1455,9 +1455,11 @@ public InMemoryOutput downloadOutputs(RemoteAction action, RemoteActionResult re
14551455
}
14561456

14571457
/** An ongoing local execution of a spawn. */
1458-
public static final class LocalExecution {
1458+
public static final class LocalExecution implements SilentCloseable {
14591459
private final RemoteAction action;
14601460
private final SettableFuture<SpawnResult> spawnResultFuture;
1461+
private final Runnable onClose;
1462+
private final AtomicBoolean closeManually = new AtomicBoolean(false);
14611463
private final Phaser spawnResultConsumers =
14621464
new Phaser(1) {
14631465
@Override
@@ -1467,9 +1469,10 @@ protected boolean onAdvance(int phase, int registeredParties) {
14671469
}
14681470
};
14691471

1470-
private LocalExecution(RemoteAction action) {
1472+
private LocalExecution(RemoteAction action, Runnable onClose) {
14711473
this.action = action;
14721474
this.spawnResultFuture = SettableFuture.create();
1475+
this.onClose = onClose;
14731476
}
14741477

14751478
/**
@@ -1482,11 +1485,11 @@ private LocalExecution(RemoteAction action) {
14821485
* builds and clients.
14831486
*/
14841487
@Nullable
1485-
public static LocalExecution createIfDeduplicatable(RemoteAction action) {
1488+
public static LocalExecution createIfDeduplicatable(RemoteAction action, Runnable onClose) {
14861489
if (action.getSpawn().getPathMapper().isNoop()) {
14871490
return null;
14881491
}
1489-
return new LocalExecution(action);
1492+
return new LocalExecution(action, onClose);
14901493
}
14911494

14921495
/**
@@ -1522,10 +1525,30 @@ public void awaitAllOutputReuse() {
15221525

15231526
/**
15241527
* Signals to all potential consumers of the {@link #spawnResultFuture} that this execution has
1525-
* been cancelled and that the result will not be available.
1528+
* finished or been canceled and that the result will no longer be available.
15261529
*/
1527-
public void cancel() {
1530+
@Override
1531+
public void close() {
1532+
if (!closeManually.get()) {
1533+
doClose();
1534+
}
1535+
}
1536+
1537+
/**
1538+
* Returns a {@link Runnable} that will close this {@link LocalExecution} instance when called.
1539+
* After this method is called, the {@link LocalExecution} instance will not be closed by the
1540+
* {@link #close()} method.
1541+
*/
1542+
public Runnable delayClose() {
1543+
if (!closeManually.compareAndSet(false, true)) {
1544+
throw new IllegalStateException("delayClose has already been called");
1545+
}
1546+
return this::doClose;
1547+
}
1548+
1549+
private void doClose() {
15281550
spawnResultFuture.cancel(true);
1551+
onClose.run();
15291552
}
15301553
}
15311554

src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ RemoteExecutionService getRemoteExecutionService() {
7878
return remoteExecutionService;
7979
}
8080

81+
@VisibleForTesting
82+
int getInFlightExecutionsSize() {
83+
return inFlightExecutions.size();
84+
}
85+
8186
@Override
8287
public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
8388
throws InterruptedException, IOException, ExecException, ForbiddenActionInputException {
@@ -110,7 +115,9 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
110115
// first one.
111116
LocalExecution previousExecution = null;
112117
try {
113-
thisExecution = LocalExecution.createIfDeduplicatable(action);
118+
thisExecution =
119+
LocalExecution.createIfDeduplicatable(
120+
action, () -> inFlightExecutions.remove(action.getActionKey()));
114121
if (shouldUploadLocalResults && thisExecution != null) {
115122
LocalExecution previousOrThisExecution =
116123
inFlightExecutions.merge(
@@ -126,8 +133,12 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
126133
return thisExecutionArg;
127134
}
128135
});
129-
previousExecution =
130-
previousOrThisExecution == thisExecution ? null : previousOrThisExecution;
136+
if (previousOrThisExecution != thisExecution) {
137+
// The current execution is not the first one to be registered for this action key, so
138+
// we need to wait for the previous one to finish before we can reuse its result.
139+
previousExecution = previousOrThisExecution;
140+
thisExecution = null;
141+
}
131142
}
132143
try {
133144
RemoteActionResult result;
@@ -138,6 +149,9 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
138149
// In case the remote cache returned a failed action (exit code != 0) we treat it as a
139150
// cache miss
140151
if (result != null && result.getExitCode() == 0) {
152+
if (thisExecution != null) {
153+
thisExecution.close();
154+
}
141155
Stopwatch fetchTime = Stopwatch.createStarted();
142156
InMemoryOutput inMemoryOutput;
143157
try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download outputs")) {
@@ -259,7 +273,9 @@ public void store(SpawnResult result) throws ExecException, InterruptedException
259273
// indefinitely is important to avoid excessive memory pressure - Spawns can be very
260274
// large.
261275
remoteExecutionService.uploadOutputs(
262-
action, result, () -> inFlightExecutions.remove(action.getActionKey()));
276+
action,
277+
result,
278+
thisExecutionFinal != null ? thisExecutionFinal.delayClose() : () -> {});
263279
if (thisExecutionFinal != null
264280
&& action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) {
265281
// In this case outputs have been uploaded synchronously and the callback above has run,
@@ -290,7 +306,7 @@ private void checkForConcurrentModifications()
290306
@Override
291307
public void close() {
292308
if (thisExecutionFinal != null) {
293-
thisExecutionFinal.cancel();
309+
thisExecutionFinal.close();
294310
}
295311
}
296312
};

src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java

Lines changed: 130 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,14 @@ public void pathMappedActionIsDeduplicated() throws Exception {
772772
// Simulate a very slow upload to the remote cache to ensure that the second spawn is
773773
// deduplicated rather than a cache hit. This is a slight hack, but also avoid introducing
774774
// concurrency to this test.
775-
Mockito.doNothing().when(remoteExecutionService).uploadOutputs(any(), any(), any());
775+
AtomicReference<Runnable> onUploadComplete = new AtomicReference<>();
776+
Mockito.doAnswer(
777+
invocationOnMock -> {
778+
onUploadComplete.set(invocationOnMock.getArgument(2));
779+
return null;
780+
})
781+
.when(remoteExecutionService)
782+
.uploadOutputs(any(), any(), any());
776783

777784
// act
778785
try (CacheHandle firstCacheHandle = cache.lookup(firstSpawn, firstPolicy)) {
@@ -795,6 +802,8 @@ public void pathMappedActionIsDeduplicated() throws Exception {
795802
fs.getPath("/exec/root/bazel-bin/k8-opt/bin/output"), UTF_8))
796803
.isEqualTo("hello");
797804
assertThat(secondCacheHandle.willStore()).isFalse();
805+
onUploadComplete.get().run();
806+
assertThat(cache.getInFlightExecutionsSize()).isEqualTo(0);
798807
}
799808

800809
@Test
@@ -840,10 +849,12 @@ public boolean mayModifySpawnOutputsAfterExecution() {
840849
// Simulate a very slow upload to the remote cache to ensure that the second spawn is
841850
// deduplicated rather than a cache hit. This is a slight hack, but also avoids introducing
842851
// more concurrency to this test.
852+
AtomicReference<Runnable> onUploadComplete = new AtomicReference<>();
843853
Mockito.doAnswer(
844854
(Answer<Void>)
845855
invocation -> {
846856
enteredUploadOutputs.countDown();
857+
onUploadComplete.set(invocation.getArgument(2));
847858
return null;
848859
})
849860
.when(remoteExecutionService)
@@ -910,6 +921,8 @@ public boolean mayModifySpawnOutputsAfterExecution() {
910921
fs.getPath("/exec/root/bazel-bin/k8-opt/bin/output"), UTF_8))
911922
.isEqualTo("hello");
912923
assertThat(secondCacheHandle.willStore()).isFalse();
924+
onUploadComplete.get().run();
925+
assertThat(cache.getInFlightExecutionsSize()).isEqualTo(0);
913926
}
914927

915928
@Test
@@ -934,7 +947,14 @@ public void pathMappedActionWithInMemoryOutputIsDeduplicated() throws Exception
934947
// Simulate a very slow upload to the remote cache to ensure that the second spawn is
935948
// deduplicated rather than a cache hit. This is a slight hack, but also avoid introducing
936949
// concurrency to this test.
937-
Mockito.doNothing().when(remoteExecutionService).uploadOutputs(any(), any(), any());
950+
AtomicReference<Runnable> onUploadComplete = new AtomicReference<>();
951+
Mockito.doAnswer(
952+
invocationOnMock -> {
953+
onUploadComplete.set(invocationOnMock.getArgument(2));
954+
return null;
955+
})
956+
.when(remoteExecutionService)
957+
.uploadOutputs(any(), any(), any());
938958

939959
// act
940960
try (CacheHandle firstCacheHandle = cache.lookup(firstSpawn, firstPolicy)) {
@@ -957,6 +977,8 @@ public void pathMappedActionWithInMemoryOutputIsDeduplicated() throws Exception
957977
.isEqualTo("in-memory");
958978
assertThat(execRoot.getRelative(inMemoryOutput.getExecPath()).exists()).isFalse();
959979
assertThat(secondCacheHandle.willStore()).isFalse();
980+
onUploadComplete.get().run();
981+
assertThat(cache.getInFlightExecutionsSize()).isEqualTo(0);
960982
}
961983

962984
@Test
@@ -978,10 +1000,6 @@ public void deduplicatedActionWithNonZeroExitCodeIsACacheMiss() throws Exception
9781000

9791001
RemoteExecutionService remoteExecutionService = cache.getRemoteExecutionService();
9801002
Mockito.doCallRealMethod().when(remoteExecutionService).waitForAndReuseOutputs(any(), any());
981-
// Simulate a very slow upload to the remote cache to ensure that the second spawn is
982-
// deduplicated rather than a cache hit. This is a slight hack, but also avoid introducing
983-
// concurrency to this test.
984-
Mockito.doNothing().when(remoteExecutionService).uploadOutputs(any(), any(), any());
9851003

9861004
// act
9871005
try (CacheHandle firstCacheHandle = cache.lookup(firstSpawn, firstPolicy)) {
@@ -1001,11 +1019,14 @@ public void deduplicatedActionWithNonZeroExitCodeIsACacheMiss() throws Exception
10011019
.setRunnerName("test")
10021020
.build());
10031021
}
1022+
Mockito.verify(remoteExecutionService, never()).uploadOutputs(any(), any(), any());
10041023
CacheHandle secondCacheHandle = cache.lookup(secondSpawn, secondPolicy);
10051024

10061025
// assert
10071026
assertThat(secondCacheHandle.hasResult()).isFalse();
10081027
assertThat(secondCacheHandle.willStore()).isTrue();
1028+
secondCacheHandle.close();
1029+
assertThat(cache.getInFlightExecutionsSize()).isEqualTo(0);
10091030
}
10101031

10111032
@Test
@@ -1030,7 +1051,14 @@ public void deduplicatedActionWithMissingOutputIsACacheMiss() throws Exception {
10301051
// Simulate a very slow upload to the remote cache to ensure that the second spawn is
10311052
// deduplicated rather than a cache hit. This is a slight hack, but also avoid introducing
10321053
// concurrency to this test.
1033-
Mockito.doNothing().when(remoteExecutionService).uploadOutputs(any(), any(), any());
1054+
AtomicReference<Runnable> onUploadComplete = new AtomicReference<>();
1055+
Mockito.doAnswer(
1056+
invocationOnMock -> {
1057+
onUploadComplete.set(invocationOnMock.getArgument(2));
1058+
return null;
1059+
})
1060+
.when(remoteExecutionService)
1061+
.uploadOutputs(any(), any(), any());
10341062

10351063
// act
10361064
try (CacheHandle firstCacheHandle = cache.lookup(firstSpawn, firstPolicy)) {
@@ -1047,5 +1075,100 @@ public void deduplicatedActionWithMissingOutputIsACacheMiss() throws Exception {
10471075
// assert
10481076
assertThat(secondCacheHandle.hasResult()).isFalse();
10491077
assertThat(secondCacheHandle.willStore()).isTrue();
1078+
onUploadComplete.get().run();
1079+
assertThat(cache.getInFlightExecutionsSize()).isEqualTo(0);
1080+
}
1081+
1082+
@Test
1083+
public void pathMappedActionWithCacheHitRemovesInFlightExecution() throws Exception {
1084+
// arrange
1085+
RemoteSpawnCache cache = createRemoteSpawnCache();
1086+
1087+
SimpleSpawn spawn = simplePathMappedSpawn("k8-fastbuild");
1088+
FakeActionInputFileCache fakeFileCache = new FakeActionInputFileCache(execRoot);
1089+
fakeFileCache.createScratchInput(spawn.getInputFiles().getSingleton(), "xyz");
1090+
SpawnExecutionContext policy =
1091+
createSpawnExecutionContext(spawn, execRoot, fakeFileCache, outErr);
1092+
1093+
RemoteExecutionService remoteExecutionService = cache.getRemoteExecutionService();
1094+
Mockito.doReturn(
1095+
RemoteActionResult.createFromCache(
1096+
CachedActionResult.remote(ActionResult.getDefaultInstance())))
1097+
.when(remoteExecutionService)
1098+
.lookupCache(any());
1099+
Mockito.doReturn(null).when(remoteExecutionService).downloadOutputs(any(), any());
1100+
1101+
// act
1102+
try (CacheHandle cacheHandle = cache.lookup(spawn, policy)) {
1103+
checkState(cacheHandle.hasResult());
1104+
}
1105+
1106+
// assert
1107+
assertThat(cache.getInFlightExecutionsSize()).isEqualTo(0);
1108+
}
1109+
1110+
@Test
1111+
public void pathMappedActionNotUploadedRemovesInFlightExecution() throws Exception {
1112+
// arrange
1113+
RemoteSpawnCache cache = createRemoteSpawnCache();
1114+
1115+
SimpleSpawn spawn = simplePathMappedSpawn("k8-fastbuild");
1116+
FakeActionInputFileCache fakeFileCache = new FakeActionInputFileCache(execRoot);
1117+
fakeFileCache.createScratchInput(spawn.getInputFiles().getSingleton(), "xyz");
1118+
SpawnExecutionContext policy =
1119+
createSpawnExecutionContext(spawn, execRoot, fakeFileCache, outErr);
1120+
1121+
RemoteExecutionService remoteExecutionService = cache.getRemoteExecutionService();
1122+
Mockito.doCallRealMethod()
1123+
.when(remoteExecutionService)
1124+
.commitResultAndDecideWhetherToUpload(any(), any());
1125+
1126+
// act
1127+
try (CacheHandle cacheHandle = cache.lookup(spawn, policy)) {
1128+
cacheHandle.store(
1129+
new SpawnResult.Builder()
1130+
.setExitCode(1)
1131+
.setStatus(Status.NON_ZERO_EXIT)
1132+
.setFailureDetail(
1133+
FailureDetail.newBuilder()
1134+
.setMessage("test spawn failed")
1135+
.setSpawn(
1136+
FailureDetails.Spawn.newBuilder()
1137+
.setCode(FailureDetails.Spawn.Code.NON_ZERO_EXIT))
1138+
.build())
1139+
.setRunnerName("test")
1140+
.build());
1141+
}
1142+
1143+
// assert
1144+
assertThat(cache.getInFlightExecutionsSize()).isEqualTo(0);
1145+
}
1146+
1147+
@Test
1148+
public void pathMappedActionWithCacheIoExceptionRemovesInFlightExecution() throws Exception {
1149+
// arrange
1150+
RemoteSpawnCache cache = createRemoteSpawnCache();
1151+
1152+
SimpleSpawn spawn = simplePathMappedSpawn("k8-fastbuild");
1153+
FakeActionInputFileCache fakeFileCache = new FakeActionInputFileCache(execRoot);
1154+
fakeFileCache.createScratchInput(spawn.getInputFiles().getSingleton(), "xyz");
1155+
SpawnExecutionContext policy =
1156+
createSpawnExecutionContext(spawn, execRoot, fakeFileCache, outErr);
1157+
1158+
RemoteExecutionService remoteExecutionService = cache.getRemoteExecutionService();
1159+
Mockito.doReturn(
1160+
RemoteActionResult.createFromCache(
1161+
CachedActionResult.remote(ActionResult.getDefaultInstance())))
1162+
.when(remoteExecutionService)
1163+
.lookupCache(any());
1164+
Mockito.doThrow(new IOException()).when(remoteExecutionService).downloadOutputs(any(), any());
1165+
1166+
// act
1167+
try (CacheHandle cacheHandle = cache.lookup(spawn, policy)) {
1168+
checkState(!cacheHandle.hasResult());
1169+
}
1170+
1171+
// assert
1172+
assertThat(cache.getInFlightExecutionsSize()).isEqualTo(0);
10501173
}
10511174
}

0 commit comments

Comments
 (0)