Skip to content

Commit 8b69984

Browse files
fmeumjjudd
andauthored
[7.6.0] Fix race condition with multiplex sandboxed workers (#25491)
Prior to this change, multiplex sandboxed workers shared a working directory per mnemonic. This caused a race condition when a new multiplex sandboxed worker with the same mnemonic was launched because when launching it cleaned the working directory. That could cause problems for any actions executing in that directory. This change makes it so each multiplex sandbox worker process has a unique working directory. It does so while ensuring each SandboxedWorkerProxy and the associated sandbox are still associated with the correct multiplexer process and working directory. Resolves #22589 I couldn't figure out if the Bazel repo has an autoformatter somewhere, so I did my best to manually format the code with a style that follows the existing code. Closes #25400. PiperOrigin-RevId: 732256101 Change-Id: If8deea240fda77780feaeac352cf099fb9bfcee3 (cherry picked from commit d54fc62) Fixes #25460 Co-authored-by: jjudd <[email protected]>
1 parent 519b5fd commit 8b69984

File tree

4 files changed

+135
-14
lines changed

4 files changed

+135
-14
lines changed

src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ public Worker create(WorkerKey key) throws IOException {
9696
if (key.isSandboxed()) {
9797
if (key.isMultiplex()) {
9898
WorkerMultiplexer workerMultiplexer = WorkerMultiplexerManager.getInstance(key, logFile);
99-
Path workDir = getSandboxedWorkerPath(key);
99+
int multiplexerId = workerMultiplexer.getMultiplexerId();
100+
Path workDir = getMultiplexSandboxedWorkerPath(key, multiplexerId);
100101
worker = new SandboxedWorkerProxy(key, workerId, logFile, workerMultiplexer, workDir);
101102
} else {
102103
Path workDir = getSandboxedWorkerPath(key, workerId);
@@ -136,10 +137,11 @@ Path getSandboxedWorkerPath(WorkerKey key, int workerId) {
136137
.getRelative(workspaceName);
137138
}
138139

139-
Path getSandboxedWorkerPath(WorkerKey key) {
140+
Path getMultiplexSandboxedWorkerPath(WorkerKey key, int multiplexerId) {
140141
String workspaceName = key.getExecRoot().getBaseName();
141142
return workerBaseDir
142-
.getRelative(key.getMnemonic() + "-" + key.getWorkerTypeName() + "-workdir")
143+
.getRelative(
144+
key.getMnemonic() + "-" + key.getWorkerTypeName() + "-" + multiplexerId + "-workdir")
143145
.getRelative(workspaceName);
144146
}
145147

src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,34 +54,47 @@
5454
* WorkerMultiplexer} wakes up the relevant {@code WorkerProxy} to retrieve the response.
5555
*/
5656
public class WorkerMultiplexer {
57+
/**
58+
* An ID for this multiplexer that can be used by sandboxed multiplex workers to generate their
59+
* workdir. The workdir needs to be the same for all {@code SandboxedWorkerProxy} instances
60+
* associated with a {@code WorkerMultiplexer}, but needs to be unique across multiplexers for the
61+
* same mnemonic. This is analogous to the @{code workerId} created in {@code WorkerFactory}.
62+
*/
63+
private final int multiplexerId;
64+
5765
/**
5866
* A queue of {@link WorkRequest} instances that need to be sent to the worker. {@link
5967
* WorkerProxy} instances add to this queue, while the requestSender subthread remove requests and
6068
* send them to the worker. This prevents dynamic execution interrupts from corrupting the {@code
6169
* stdin} of the worker process.
6270
*/
6371
@VisibleForTesting final BlockingQueue<WorkRequest> pendingRequests = new LinkedBlockingQueue<>();
72+
6473
/**
6574
* A map of {@code WorkResponse}s received from the worker process. They are stored in this map
6675
* keyed by the request id until the corresponding {@code WorkerProxy} picks them up.
6776
*/
6877
private final ConcurrentMap<Integer, WorkResponse> workerProcessResponse =
6978
new ConcurrentHashMap<>();
79+
7080
/**
7181
* A map of semaphores corresponding to {@code WorkRequest}s. After sending the {@code
7282
* WorkRequest}, {@code WorkerProxy} will wait on a semaphore to be released. {@code
7383
* WorkerMultiplexer} is responsible for releasing the corresponding semaphore in order to signal
7484
* {@code WorkerProxy} that the {@code WorkerResponse} has been received.
7585
*/
7686
private final ConcurrentMap<Integer, Semaphore> responseChecker = new ConcurrentHashMap<>();
87+
7788
/**
7889
* The worker process that this WorkerMultiplexer should be talking to. This should only be set
7990
* once, when creating a new process. If the process dies or its stdio streams get corrupted, the
8091
* {@code WorkerMultiplexer} gets discarded as well and a new one gets created as needed.
8192
*/
8293
@VisibleForTesting Subprocess process;
94+
8395
/** The implementation of the worker protocol (JSON or Proto). */
8496
private WorkerProtocolImpl workerProtocol;
97+
8598
/** InputStream from the worker process. */
8699
@LazyInit private RecordingInputStream recordingStream;
87100
/** True if this multiplexer was explicitly destroyed. */
@@ -117,9 +130,10 @@ public class WorkerMultiplexer {
117130
*/
118131
private Thread shutdownHook;
119132

120-
WorkerMultiplexer(Path logFile, WorkerKey workerKey) {
133+
WorkerMultiplexer(Path logFile, WorkerKey workerKey, int multiplexerId) {
121134
this.logFile = logFile;
122135
this.workerKey = workerKey;
136+
this.multiplexerId = multiplexerId;
123137
}
124138

125139
/** Sets or clears the reporter for outputting verbose info. */
@@ -157,7 +171,7 @@ public synchronized void createSandboxedProcess(
157171
SandboxOutputs.getEmptyInstance());
158172
SandboxHelpers.cleanExisting(
159173
workDir.getParentDirectory(), inputFiles, inputsToCreate, dirsToCreate, workDir);
160-
SandboxHelpers.createDirectories(dirsToCreate, workDir, /* strict=*/ false);
174+
SandboxHelpers.createDirectories(dirsToCreate, workDir, /* strict= */ false);
161175
WorkerExecRoot.createInputs(inputsToCreate, inputFiles.limitedCopy(workerFiles), workDir);
162176
createProcess(workDir);
163177
}
@@ -455,4 +469,8 @@ public long getProcessId() {
455469
}
456470
return process.getProcessId();
457471
}
472+
473+
public int getMultiplexerId() {
474+
return this.multiplexerId;
475+
}
458476
}

src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.devtools.build.lib.vfs.Path;
2525
import java.util.HashMap;
2626
import java.util.Map;
27+
import java.util.concurrent.atomic.AtomicInteger;
2728
import javax.annotation.Nullable;
2829

2930
/**
@@ -40,6 +41,15 @@ public class WorkerMultiplexerManager {
4041

4142
private WorkerMultiplexerManager() {}
4243

44+
/**
45+
* A counter used to provide unique IDs across sandboxed multiplexer instances. It is used in
46+
* determining the workdir for the multiplexer process. This is analogous to the {@code
47+
* pidCounter} in {@code WorkerFactory}. It is ok to use an {@code AtomicInteger} here for the
48+
* same reasons as it is there: the counter is only incremented when spawning a new multiplexer,
49+
* so even in the worst case of workers quitting after each action it shouldn't overflow.
50+
*/
51+
private static final AtomicInteger multiplexerIdCounter = new AtomicInteger(1);
52+
4353
/**
4454
* Returns a {@code WorkerMultiplexer} instance to {@code WorkerProxy}. {@code WorkerProxy}
4555
* objects with the same {@code WorkerKey} talk to the same {@code WorkerMultiplexer}. Also,
@@ -48,7 +58,10 @@ private WorkerMultiplexerManager() {}
4858
public static synchronized WorkerMultiplexer getInstance(WorkerKey key, Path logFile) {
4959
InstanceInfo instanceInfo =
5060
multiplexerInstance.computeIfAbsent(
51-
key, k -> new InstanceInfo(new WorkerMultiplexer(logFile, k)));
61+
key,
62+
k ->
63+
new InstanceInfo(
64+
new WorkerMultiplexer(logFile, k, multiplexerIdCounter.getAndIncrement())));
5265
instanceInfo.increaseRefCount();
5366
return instanceInfo.getWorkerMultiplexer();
5467
}

src/test/java/com/google/devtools/build/lib/worker/SandboxedWorkerProxyTest.java

Lines changed: 96 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121

2222
import com.google.common.base.Joiner;
2323
import com.google.common.collect.ImmutableMap;
24+
import com.google.common.collect.Lists;
2425
import com.google.devtools.build.lib.actions.Spawn;
26+
import com.google.devtools.build.lib.actions.UserExecException;
2527
import com.google.devtools.build.lib.vfs.DigestHashFunction;
2628
import com.google.devtools.build.lib.vfs.FileSystem;
2729
import com.google.devtools.build.lib.vfs.FileSystemUtils;
@@ -31,6 +33,7 @@
3133
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
3234
import java.io.IOException;
3335
import java.io.PipedInputStream;
36+
import java.util.ArrayList;
3437
import org.junit.Before;
3538
import org.junit.Test;
3639
import org.junit.runner.RunWith;
@@ -58,8 +61,12 @@ public void setUp() throws IOException {
5861

5962
@Test
6063
public void prepareExecution_createsFilesInSandbox() throws IOException, InterruptedException {
61-
SandboxedWorkerProxy proxy = createSandboxedWorkerProxy();
62-
Path workDir = workerBaseDir.getChild("Mnem-multiplex-worker-workdir").getChild("execroot");
64+
SandboxedWorkerProxy proxy = createSandboxedWorkerProxies("Mnem", 1).get(0);
65+
int multiplexerId = proxy.workerMultiplexer.getMultiplexerId();
66+
Path workDir =
67+
workerBaseDir
68+
.getChild("Mnem-multiplex-worker-" + multiplexerId + "-workdir")
69+
.getChild("execroot");
6370
Path sandboxDir =
6471
workDir
6572
.getChild("__sandbox")
@@ -94,7 +101,11 @@ public void prepareExecution_createsFilesInSandbox() throws IOException, Interru
94101
@Test
95102
public void putRequest_setsSandboxDir() throws IOException, InterruptedException {
96103
SandboxedWorkerProxy worker = createFakedSandboxedWorkerProxy();
97-
Path workDir = workerBaseDir.getChild("Mnem-multiplex-worker-workdir").getChild("execroot");
104+
int multiplexerId = worker.workerMultiplexer.getMultiplexerId();
105+
Path workDir =
106+
workerBaseDir
107+
.getChild("Mnem-multiplex-worker-" + multiplexerId + "-workdir")
108+
.getChild("execroot");
98109
SandboxHelper sandboxHelper =
99110
new SandboxHelper(globalExecRoot, workDir)
100111
.addAndCreateInputFile("anInputFile", "anInputFile", "Just stuff")
@@ -115,7 +126,11 @@ public void putRequest_setsSandboxDir() throws IOException, InterruptedException
115126
@Test
116127
public void finishExecution_copiesOutputs() throws IOException, InterruptedException {
117128
SandboxedWorkerProxy worker = createFakedSandboxedWorkerProxy();
118-
Path workDir = workerBaseDir.getChild("Mnem-multiplex-worker-workdir").getChild("execroot");
129+
int multiplexerId = worker.workerMultiplexer.getMultiplexerId();
130+
Path workDir =
131+
workerBaseDir
132+
.getChild("Mnem-multiplex-worker-" + multiplexerId + "-workdir")
133+
.getChild("execroot");
119134
SandboxHelper sandboxHelper =
120135
new SandboxHelper(globalExecRoot, workDir)
121136
.addAndCreateInputFile("anInputFile", "anInputFile", "Just stuff")
@@ -150,8 +165,37 @@ public void finishExecution_copiesOutputs() throws IOException, InterruptedExcep
150165
.isEqualTo("some output");
151166
}
152167

153-
private SandboxedWorkerProxy createSandboxedWorkerProxy() throws IOException {
154-
ImmutableMap.Builder<String, String> req = TestUtils.execRequirementsBuilder("Mnem");
168+
@Test
169+
public void differentProxiesSameMultiplexerHaveSameWorkDir()
170+
throws IOException, InterruptedException {
171+
ArrayList<SandboxedWorkerProxy> proxies = createSandboxedWorkerProxies("Mnem", 2);
172+
SandboxedWorkerProxy proxyOne = proxies.get(0);
173+
SandboxedWorkerProxy proxyTwo = proxies.get(1);
174+
175+
int multiplexerIdProxyOne = proxyOne.workerMultiplexer.getMultiplexerId();
176+
Path expectedWorkDirProxyOne =
177+
workerBaseDir
178+
.getChild("Mnem-multiplex-worker-" + multiplexerIdProxyOne + "-workdir")
179+
.getChild("execroot");
180+
181+
int multiplexerIdProxyTwo = proxyTwo.workerMultiplexer.getMultiplexerId();
182+
Path expectedWorkDirProxyTwo =
183+
workerBaseDir
184+
.getChild("Mnem-multiplex-worker-" + multiplexerIdProxyTwo + "-workdir")
185+
.getChild("execroot");
186+
187+
assertThat(proxyOne.workDir).isEqualTo(proxyTwo.workDir);
188+
assertThat(proxyOne.workDir).isEqualTo(expectedWorkDirProxyOne);
189+
assertThat(proxyTwo.workDir).isEqualTo(expectedWorkDirProxyTwo);
190+
}
191+
192+
@Test
193+
public void differentProxiesDifferentMultiplexerSameMnemHaveDifferentWorkDirs()
194+
throws IOException, InterruptedException, UserExecException {
195+
String sharedMnemonic = "Mnem";
196+
197+
// Create a proxy on the first multiplexer
198+
ImmutableMap.Builder<String, String> req = TestUtils.execRequirementsBuilder(sharedMnemonic);
155199
req.put(SUPPORTS_MULTIPLEX_SANDBOXING, "1");
156200
Spawn spawn = TestUtils.createSpawn(req.buildOrThrow());
157201

@@ -163,7 +207,51 @@ private SandboxedWorkerProxy createSandboxedWorkerProxy() throws IOException {
163207
TestUtils.createWorkerKeyFromOptions(
164208
PROTO, globalOutputBase, options, true, spawn, "worker.sh");
165209
WorkerFactory factory = new WorkerFactory(workerBaseDir);
166-
return (SandboxedWorkerProxy) factory.create(key);
210+
211+
SandboxedWorkerProxy proxyOneMultiplexerOne = (SandboxedWorkerProxy) factory.create(key);
212+
int multiplexerIdOne = proxyOneMultiplexerOne.workerMultiplexer.getMultiplexerId();
213+
Path expectedWorkDirOne =
214+
workerBaseDir
215+
.getChild(sharedMnemonic + "-multiplex-worker-" + multiplexerIdOne + "-workdir")
216+
.getChild("execroot");
217+
218+
// Shut down the first multiplexer, so we get a different multiplexer for the next proxy
219+
WorkerMultiplexerManager.removeInstance(key);
220+
221+
// Create a proxy on the second multiplexer
222+
SandboxedWorkerProxy proxyOneMultiplexerTwo = (SandboxedWorkerProxy) factory.create(key);
223+
int multiplexerIdTwo = proxyOneMultiplexerTwo.workerMultiplexer.getMultiplexerId();
224+
Path expectedWorkDirTwo =
225+
workerBaseDir
226+
.getChild(sharedMnemonic + "-multiplex-worker-" + multiplexerIdTwo + "-workdir")
227+
.getChild("execroot");
228+
229+
assertThat(proxyOneMultiplexerOne.workDir).isNotEqualTo(proxyOneMultiplexerTwo.workDir);
230+
assertThat(proxyOneMultiplexerOne.workDir).isEqualTo(expectedWorkDirOne);
231+
assertThat(proxyOneMultiplexerTwo.workDir).isEqualTo(expectedWorkDirTwo);
232+
}
233+
234+
private ArrayList<SandboxedWorkerProxy> createSandboxedWorkerProxies(
235+
String mnemonic, int numProxiesToCreate) throws IOException {
236+
ImmutableMap.Builder<String, String> req = TestUtils.execRequirementsBuilder(mnemonic);
237+
req.put(SUPPORTS_MULTIPLEX_SANDBOXING, "1");
238+
Spawn spawn = TestUtils.createSpawn(req.buildOrThrow());
239+
240+
WorkerOptions options = new WorkerOptions();
241+
options.workerMultiplex = true;
242+
options.multiplexSandboxing = true;
243+
244+
WorkerKey key =
245+
TestUtils.createWorkerKeyFromOptions(
246+
PROTO, globalOutputBase, options, true, spawn, "worker.sh");
247+
WorkerFactory factory = new WorkerFactory(workerBaseDir);
248+
249+
assertThat(numProxiesToCreate).isGreaterThan(0);
250+
ArrayList<SandboxedWorkerProxy> proxies = Lists.newArrayListWithCapacity(numProxiesToCreate);
251+
for (int i = 0; i < numProxiesToCreate; i++) {
252+
proxies.add((SandboxedWorkerProxy) factory.create(key));
253+
}
254+
return proxies;
167255
}
168256

169257
private SandboxedWorkerProxy createFakedSandboxedWorkerProxy() throws IOException {
@@ -180,7 +268,7 @@ private SandboxedWorkerProxy createFakedSandboxedWorkerProxy() throws IOExceptio
180268
PROTO, globalOutputBase, options, true, spawn, "worker.sh");
181269
WorkerMultiplexerManager.injectForTesting(
182270
key,
183-
new WorkerMultiplexer(globalExecRoot.getChild("testWorker.log"), key) {
271+
new WorkerMultiplexer(globalExecRoot.getChild("testWorker.log"), key, 0) {
184272
@Override
185273
public synchronized void createProcess(Path workDir) throws IOException {
186274
PipedInputStream serverInputStream = new PipedInputStream();

0 commit comments

Comments
 (0)