13
13
// limitations under the License.
14
14
package com .google .devtools .build .lib .remote ;
15
15
16
+ import static com .google .common .base .Preconditions .checkNotNull ;
16
17
import static com .google .common .collect .ImmutableList .toImmutableList ;
17
18
import static com .google .common .util .concurrent .Futures .immediateFailedFuture ;
18
19
import static com .google .common .util .concurrent .Futures .immediateFuture ;
22
23
import static com .google .devtools .build .lib .remote .util .RxUtils .mergeBulkTransfer ;
23
24
import static com .google .devtools .build .lib .remote .util .RxUtils .toTransferResult ;
24
25
import static java .lang .String .format ;
25
- import static java .util .concurrent .TimeUnit .SECONDS ;
26
26
27
27
import build .bazel .remote .execution .v2 .Digest ;
28
28
import build .bazel .remote .execution .v2 .Directory ;
29
+ import com .google .common .annotations .VisibleForTesting ;
29
30
import com .google .common .base .Throwables ;
30
31
import com .google .common .collect .ImmutableList ;
31
32
import com .google .common .collect .ImmutableSet ;
32
33
import com .google .common .collect .Iterables ;
34
+ import com .google .common .flogger .GoogleLogger ;
33
35
import com .google .common .util .concurrent .ListenableFuture ;
36
+ import com .google .devtools .build .lib .events .Reporter ;
34
37
import com .google .devtools .build .lib .profiler .Profiler ;
35
38
import com .google .devtools .build .lib .profiler .SilentCloseable ;
39
+ import com .google .devtools .build .lib .remote .common .CacheNotFoundException ;
40
+ import com .google .devtools .build .lib .remote .common .LostInputsEvent ;
36
41
import com .google .devtools .build .lib .remote .common .RemoteActionExecutionContext ;
37
42
import com .google .devtools .build .lib .remote .common .RemoteCacheClient ;
38
43
import com .google .devtools .build .lib .remote .merkletree .MerkleTree ;
39
44
import com .google .devtools .build .lib .remote .merkletree .MerkleTree .PathOrBytes ;
40
45
import com .google .devtools .build .lib .remote .options .RemoteOptions ;
41
46
import com .google .devtools .build .lib .remote .util .DigestUtil ;
42
47
import com .google .devtools .build .lib .remote .util .RxUtils .TransferResult ;
48
+ import com .google .devtools .build .lib .vfs .Path ;
43
49
import com .google .protobuf .Message ;
44
50
import io .reactivex .rxjava3 .annotations .NonNull ;
45
51
import io .reactivex .rxjava3 .core .Completable ;
59
65
/** A {@link RemoteCache} with additional functionality needed for remote execution. */
60
66
public class RemoteExecutionCache extends RemoteCache {
61
67
68
+ private static final GoogleLogger logger = GoogleLogger .forEnclosingClass ();
69
+
70
+ /**
71
+ * An interface used to check whether a given {@link Path} is stored in a remote or a disk cache.
72
+ */
73
+ public interface RemotePathChecker {
74
+ boolean isRemote (RemoteActionExecutionContext context , Path path ) throws IOException ;
75
+ }
76
+
77
+ private RemotePathChecker remotePathChecker =
78
+ new RemotePathChecker () {
79
+ @ Override
80
+ public boolean isRemote (RemoteActionExecutionContext context , Path path )
81
+ throws IOException {
82
+ var fs = path .getFileSystem ();
83
+ if (fs instanceof RemoteActionFileSystem ) {
84
+ var remoteActionFileSystem = (RemoteActionFileSystem ) fs ;
85
+ if (remoteActionFileSystem .isRemote (path )) {
86
+ if (context .getReadCachePolicy ().allowDiskCache ()) {
87
+ try (var inputStream = path .getInputStream ()) {
88
+ // If the file exists in the disk cache, download it and continue the upload.
89
+ return false ;
90
+ } catch (IOException e ) {
91
+ logger .atWarning ().withCause (e ).log (
92
+ "Failed to get input stream for %s" , path .getPathString ());
93
+ }
94
+ }
95
+ return true ;
96
+ }
97
+ }
98
+ return false ;
99
+ }
100
+ };
101
+
62
102
public RemoteExecutionCache (
63
- RemoteCacheClient protocolImpl ,
64
- RemoteOptions options ,
65
- DigestUtil digestUtil ) {
103
+ RemoteCacheClient protocolImpl , RemoteOptions options , DigestUtil digestUtil ) {
66
104
super (protocolImpl , options , digestUtil );
67
105
}
68
106
107
+ @ VisibleForTesting
108
+ void setRemotePathChecker (RemotePathChecker remotePathChecker ) {
109
+ this .remotePathChecker = remotePathChecker ;
110
+ }
111
+
69
112
/**
70
113
* Ensures that the tree structure of the inputs, the input files themselves, and the command are
71
114
* available in the remote cache, such that the tree can be reassembled and executed on another
@@ -82,7 +125,8 @@ public void ensureInputsPresent(
82
125
RemoteActionExecutionContext context ,
83
126
MerkleTree merkleTree ,
84
127
Map <Digest , Message > additionalInputs ,
85
- boolean force )
128
+ boolean force ,
129
+ Reporter reporter )
86
130
throws IOException , InterruptedException {
87
131
Iterable <Digest > merkleTreeAllDigests ;
88
132
try (SilentCloseable s = Profiler .instance ().profile ("merkleTree.getAllDigests()" )) {
@@ -95,7 +139,7 @@ public void ensureInputsPresent(
95
139
}
96
140
97
141
Flowable <TransferResult > uploads =
98
- createUploadTasks (context , merkleTree , additionalInputs , allDigests , force )
142
+ createUploadTasks (context , merkleTree , additionalInputs , allDigests , force , reporter )
99
143
.flatMapPublisher (
100
144
result ->
101
145
Flowable .using (
@@ -113,10 +157,7 @@ public void ensureInputsPresent(
113
157
}));
114
158
115
159
try {
116
- // Workaround for https://github.com/bazelbuild/bazel/issues/19513.
117
- if (!mergeBulkTransfer (uploads ).blockingAwait (options .remoteTimeout .getSeconds (), SECONDS )) {
118
- throw new IOException ("Timed out when waiting for uploads" );
119
- }
160
+ mergeBulkTransfer (uploads ).blockingAwait ();
120
161
} catch (RuntimeException e ) {
121
162
Throwable cause = e .getCause ();
122
163
if (cause != null ) {
@@ -131,7 +172,8 @@ private ListenableFuture<Void> uploadBlob(
131
172
RemoteActionExecutionContext context ,
132
173
Digest digest ,
133
174
MerkleTree merkleTree ,
134
- Map <Digest , Message > additionalInputs ) {
175
+ Map <Digest , Message > additionalInputs ,
176
+ Reporter reporter ) {
135
177
Directory node = merkleTree .getDirectoryByDigest (digest );
136
178
if (node != null ) {
137
179
return cacheProtocol .uploadBlob (context , digest , node .toByteString ());
@@ -142,7 +184,20 @@ private ListenableFuture<Void> uploadBlob(
142
184
if (file .getBytes () != null ) {
143
185
return cacheProtocol .uploadBlob (context , digest , file .getBytes ());
144
186
}
145
- return cacheProtocol .uploadFile (context , digest , file .getPath ());
187
+
188
+ var path = checkNotNull (file .getPath ());
189
+ try {
190
+ if (remotePathChecker .isRemote (context , path )) {
191
+ // If we get here, the remote input was determined to exist in the remote or disk cache at
192
+ // some point before action execution, but reported to be missing when querying the remote
193
+ // for missing action inputs; possibly because it was evicted in the interim.
194
+ reporter .post (new LostInputsEvent ());
195
+ throw new CacheNotFoundException (digest , path .getPathString ());
196
+ }
197
+ } catch (IOException e ) {
198
+ return immediateFailedFuture (e );
199
+ }
200
+ return cacheProtocol .uploadFile (context , digest , path );
146
201
}
147
202
148
203
Message message = additionalInputs .get (digest );
@@ -169,14 +224,16 @@ private Single<List<UploadTask>> createUploadTasks(
169
224
MerkleTree merkleTree ,
170
225
Map <Digest , Message > additionalInputs ,
171
226
Iterable <Digest > allDigests ,
172
- boolean force ) {
227
+ boolean force ,
228
+ Reporter reporter ) {
173
229
return Single .using (
174
230
() -> Profiler .instance ().profile ("collect digests" ),
175
231
ignored ->
176
232
Flowable .fromIterable (allDigests )
177
233
.flatMapMaybe (
178
234
digest ->
179
- maybeCreateUploadTask (context , merkleTree , additionalInputs , digest , force ))
235
+ maybeCreateUploadTask (
236
+ context , merkleTree , additionalInputs , digest , force , reporter ))
180
237
.collect (toImmutableList ()),
181
238
SilentCloseable ::close );
182
239
}
@@ -186,7 +243,8 @@ private Maybe<UploadTask> maybeCreateUploadTask(
186
243
MerkleTree merkleTree ,
187
244
Map <Digest , Message > additionalInputs ,
188
245
Digest digest ,
189
- boolean force ) {
246
+ boolean force ,
247
+ Reporter reporter ) {
190
248
return Maybe .create (
191
249
emitter -> {
192
250
AsyncSubject <Void > completion = AsyncSubject .create ();
@@ -211,7 +269,11 @@ private Maybe<UploadTask> maybeCreateUploadTask(
211
269
return toCompletable (
212
270
() ->
213
271
uploadBlob (
214
- context , uploadTask .digest , merkleTree , additionalInputs ),
272
+ context ,
273
+ uploadTask .digest ,
274
+ merkleTree ,
275
+ additionalInputs ,
276
+ reporter ),
215
277
directExecutor ());
216
278
}),
217
279
/* onAlreadyRunning= */ () -> emitter .onSuccess (uploadTask ),
0 commit comments