|
49 | 49 | import java.util.concurrent.BlockingQueue;
|
50 | 50 | import java.util.concurrent.ExecutionException;
|
51 | 51 | import java.util.concurrent.Future;
|
| 52 | +import java.util.concurrent.atomic.AtomicBoolean; |
52 | 53 | import java.util.function.Consumer;
|
53 | 54 | import java.util.regex.Matcher;
|
54 | 55 | import java.util.regex.Pattern;
|
@@ -207,8 +208,7 @@ private Stream<RowResult> runManyStatements(
|
207 | 208 | runSchemaStatementsInTx(
|
208 | 209 | scanner, internalQueue, params, addStatistics, timeout, reportError, fileName);
|
209 | 210 | } else {
|
210 |
| - runDataStatementsInTx( |
211 |
| - scanner, internalQueue, params, addStatistics, timeout, reportError, fileName); |
| 211 | + runDataStatementsInTx(scanner, internalQueue, params, addStatistics, reportError, fileName); |
212 | 212 | }
|
213 | 213 | },
|
214 | 214 | RowResult.TOMBSTONE);
|
@@ -246,40 +246,47 @@ private void runDataStatementsInTx(
|
246 | 246 | BlockingQueue<RowResult> queue,
|
247 | 247 | Map<String, Object> params,
|
248 | 248 | boolean addStatistics,
|
249 |
| - long timeout, |
250 | 249 | boolean reportError,
|
251 | 250 | String fileName) {
|
252 | 251 | while (scanner.hasNext()) {
|
253 | 252 | String stmt = removeShellControlCommands(scanner.next());
|
254 | 253 | if (stmt.trim().isEmpty()) continue;
|
255 |
| - boolean schemaOperation; |
256 |
| - try { |
257 |
| - schemaOperation = isSchemaOperation(stmt); |
258 |
| - } catch (Exception e) { |
259 |
| - collectError(queue, reportError, e, fileName); |
260 |
| - return; |
261 |
| - } |
262 | 254 |
|
263 |
| - if (!schemaOperation) { |
264 |
| - if (isPeriodicOperation(stmt)) { |
265 |
| - Util.inThread(pools, () -> { |
266 |
| - try { |
267 |
| - return db.executeTransactionally( |
268 |
| - stmt, params, result -> consumeResult(result, queue, addStatistics, tx, fileName)); |
269 |
| - } catch (Exception e) { |
270 |
| - collectError(queue, reportError, e, fileName); |
271 |
| - return null; |
272 |
| - } |
273 |
| - }); |
274 |
| - } else { |
| 255 | + // Periodic operations cannot be schema operations, so no need to check that here (will fail as invalid |
| 256 | + // query) |
| 257 | + if (isPeriodicOperation(stmt)) { |
| 258 | + Util.inThread(pools, () -> { |
| 259 | + try { |
| 260 | + return db.executeTransactionally( |
| 261 | + stmt, params, result -> consumeResult(result, queue, addStatistics, tx, fileName)); |
| 262 | + } catch (Exception e) { |
| 263 | + collectError(queue, reportError, e, fileName); |
| 264 | + return null; |
| 265 | + } |
| 266 | + }); |
| 267 | + } else { |
| 268 | + AtomicBoolean isSchemaError = new AtomicBoolean(false); |
| 269 | + try { |
275 | 270 | Util.inTx(db, pools, threadTx -> {
|
276 | 271 | try (Result result = threadTx.execute(stmt, params)) {
|
277 | 272 | return consumeResult(result, queue, addStatistics, tx, fileName);
|
278 | 273 | } catch (Exception e) {
|
279 |
| - collectError(queue, reportError, e, fileName); |
| 274 | + // APOC historically skips schema operations |
| 275 | + if (!(e.getMessage().contains("Schema operations on database") |
| 276 | + && e.getMessage().contains("are not allowed"))) { |
| 277 | + collectError(queue, reportError, e, fileName); |
| 278 | + return null; |
| 279 | + } |
| 280 | + isSchemaError.set(true); |
280 | 281 | return null;
|
281 | 282 | }
|
282 | 283 | });
|
| 284 | + } catch (Exception e) { |
| 285 | + // An error thrown by a schema operation |
| 286 | + if (isSchemaError.get()) { |
| 287 | + continue; |
| 288 | + } |
| 289 | + throw e; |
283 | 290 | }
|
284 | 291 | }
|
285 | 292 | }
|
|
0 commit comments