Skip to content

Commit 5af8f6c

Browse files
authored
fixed nonce and transaction receipt (#80)
1 parent 426df4a commit 5af8f6c

File tree

8 files changed

+153
-119
lines changed

8 files changed

+153
-119
lines changed

services/content-watcher/apps/worker/src/batch_announcer/batch.announcer.service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export class BatchAnnouncementService extends BaseConsumer implements OnModuleDe
4343
try {
4444
const publisherJob = await this.ipfsPublisher.announce(job.data);
4545
// eslint-disable-next-line no-promise-executor-return
46-
await this.publishQueue.add(publisherJob.id, publisherJob, { jobId: publisherJob.id, removeOnComplete: 1000 });
46+
await this.publishQueue.add(publisherJob.id, publisherJob, { jobId: publisherJob.id, removeOnComplete: 1000, attempts: 3 });
4747
this.logger.log(`Completed job ${job.id} of type ${job.name}`);
4848
return job.data;
4949
} catch (e) {

services/content-watcher/apps/worker/src/monitor/status.monitor.module.ts

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,24 +54,12 @@ import { QueueConstants } from '../../../../libs/common/src';
5454
{
5555
name: QueueConstants.TRANSACTION_RECEIPT_QUEUE_NAME,
5656
defaultJobOptions: {
57-
attempts: 3,
58-
backoff: {
59-
type: 'exponential',
60-
},
6157
removeOnComplete: true,
6258
removeOnFail: false,
6359
},
6460
},
6561
{
6662
name: QueueConstants.PUBLISH_QUEUE_NAME,
67-
defaultJobOptions: {
68-
attempts: 1,
69-
backoff: {
70-
type: 'exponential',
71-
},
72-
removeOnComplete: true,
73-
removeOnFail: false,
74-
},
7563
},
7664
),
7765
],

services/content-watcher/apps/worker/src/monitor/tx.status.monitor.service.ts

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { InjectRedis } from '@liaoliaots/nestjs-redis';
22
import { Processor, InjectQueue } from '@nestjs/bullmq';
33
import { Injectable } from '@nestjs/common';
4-
import { Job, Queue } from 'bullmq';
4+
import { Job, Queue, UnrecoverableError } from 'bullmq';
55
import Redis from 'ioredis';
66
import { MILLISECONDS_PER_SECOND } from 'time-constants';
77
import { RegistryError } from '@polkadot/types/types';
@@ -11,6 +11,7 @@ import { QueueConstants } from '../../../../libs/common/src';
1111
import { SECONDS_PER_BLOCK } from '../../../../libs/common/src/constants';
1212
import { BlockchainConstants } from '../../../../libs/common/src/blockchain/blockchain-constants';
1313
import { BaseConsumer } from '../BaseConsumer';
14+
import { IPublisherJob } from '../interfaces/publisher-job.interface';
1415

1516
@Injectable()
1617
@Processor(QueueConstants.TRANSACTION_RECEIPT_QUEUE_NAME, {
@@ -34,42 +35,44 @@ export class TxStatusMonitoringService extends BaseConsumer {
3435
const previousKnownBlockNumber = (await this.blockchainService.getBlock(job.data.lastFinalizedBlockHash)).block.header.number.toBigInt();
3536
const currentFinalizedBlockNumber = await this.blockchainService.getLatestFinalizedBlockNumber();
3637
const blockList: bigint[] = [];
37-
const blockDelay = 1 * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND;
3838

3939
for (let i = previousKnownBlockNumber; i <= currentFinalizedBlockNumber && i < previousKnownBlockNumber + numberBlocksToParse; i += 1n) {
4040
blockList.push(i);
4141
}
42-
const txResult = await this.blockchainService.crawlBlockListForTx(job.data.txHash, blockList, [{ pallet: 'messages', event: 'MessagesStored' }]);
42+
const txResult = await this.blockchainService.crawlBlockListForTx(job.data.txHash, blockList, [{ pallet: 'system', event: 'ExtrinsicSuccess' }]);
4343

44-
this.setEpochCapacity(txCapacityEpoch, BigInt(txResult.capacityWithDrawn ?? 0n));
45-
46-
if (txResult.success && txResult.blockHash && !txResult.error) {
47-
this.logger.verbose(`Successfully found ${job.data.txHash} found in block ${txResult.blockHash}`);
48-
return txResult;
49-
}
50-
51-
// if tx has not yet included in a block, throw error to retry till max attempts
52-
if (!txResult.blockHash && !txResult.error) {
53-
throw new Error(`Tx not found in block list, retrying (attempts=${job.attemptsMade})`);
54-
}
44+
if (!txResult.found) {
45+
if (job.attemptsMade < (job.opts.attempts ?? 3)) {
46+
// if tx has not yet included in a block, throw error to retry till max attempts
47+
throw new Error(`Tx not found in block list, retrying (attempts=${job.attemptsMade})`);
48+
} else {
49+
this.logger.warn(`Could not fetch the transaction adding to publish again! ${job.id}`);
50+
// could not find the transaction, this might happen if transaction never gets into a block
51+
await this.retryPublishJob(job.data.referencePublishJob);
52+
}
53+
} else {
54+
// found the tx
55+
await this.setEpochCapacity(txCapacityEpoch, BigInt(txResult.capacityWithDrawn ?? 0n));
56+
if (txResult.error) {
57+
this.logger.debug(`Error found in tx result: ${JSON.stringify(txResult.error)}`);
58+
const errorReport = await this.handleMessagesFailure(job.data.id, txResult.error);
5559

56-
if (txResult.error && job.attemptsMade <= (job.opts.attempts ?? 3)) {
57-
this.logger.debug(`Error found in tx result: ${JSON.stringify(txResult.error)}`);
58-
const errorReport = await this.handleMessagesFailure(job.data.id, txResult.error);
59-
const failedError = new Error(`Job ${job.data.id} failed with error ${JSON.stringify(txResult.error)}`);
60+
if (errorReport.pause) {
61+
await this.publishQueue.pause();
62+
}
6063

61-
if (errorReport.pause) {
62-
await this.publishQueue.pause();
64+
if (errorReport.retry) {
65+
await this.retryPublishJob(job.data.referencePublishJob);
66+
} else {
67+
throw new UnrecoverableError(`Job ${job.data.id} failed with error ${JSON.stringify(txResult.error)}`);
68+
}
6369
}
6470

65-
if (errorReport.retry) {
66-
this.logger.debug(`Retrying job ${job.data.id}`);
67-
await this.publishQueue.removeRepeatableByKey(job.data.referencePublishJob.id);
68-
await this.publishQueue.add(job.data.referencePublishJob.id, job.data.referencePublishJob, { delay: blockDelay });
71+
if (txResult.success) {
72+
this.logger.verbose(`Successfully found ${job.data.txHash} found in block ${txResult.blockHash}`);
6973
}
7074
}
71-
await this.txReceiptQueue.removeRepeatableByKey(job.data.id);
72-
throw new Error(`Job ${job.data.id} failed with error ${JSON.stringify(txResult.error)}`);
75+
return txResult;
7376
} catch (e) {
7477
this.logger.error(e);
7578
throw e;
@@ -85,8 +88,6 @@ export class TxStatusMonitoringService extends BaseConsumer {
8588
// Re-try the job in the publish queue
8689
return { pause: false, retry: true };
8790
case 'UnAuthorizedDelegate':
88-
// Re-try the job in the publish, could be a signing error
89-
return { pause: false, retry: true };
9091
case 'InvalidMessageSourceAccount':
9192
case 'InvalidSchemaId':
9293
case 'ExceedsMaxMessagePayloadSizeBytes':
@@ -121,4 +122,10 @@ export class TxStatusMonitoringService extends BaseConsumer {
121122
this.logger.error(`Error setting epoch capacity: ${error}`);
122123
}
123124
}
125+
126+
private async retryPublishJob(publishJob: IPublisherJob) {
127+
this.logger.debug(`Retrying job ${publishJob.id}`);
128+
await this.publishQueue.remove(publishJob.id);
129+
await this.publishQueue.add(`Retrying publish job - ${publishJob.id}`, publishJob, { jobId: publishJob.id });
130+
}
124131
}

services/content-watcher/apps/worker/src/publisher/nonce.service.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export class NonceService implements OnApplicationBootstrap {
2020
) {
2121
this.logger = new Logger(NonceService.name);
2222
redis.defineCommand('incrementNonce', {
23-
numberOfKeys: 1,
23+
numberOfKeys: RedisUtils.NUMBER_OF_NONCE_KEYS_TO_CHECK,
2424
lua: fs.readFileSync('lua/incrementNonce.lua', 'utf8'),
2525
});
2626
}
@@ -33,9 +33,25 @@ export class NonceService implements OnApplicationBootstrap {
3333

3434
async getNextNonce(): Promise<number> {
3535
const nonce = await this.blockchainService.getNonce(this.accountId);
36+
const keys = this.getNextPossibleKeys(nonce);
3637
// @ts-ignore
37-
const nextNonce = await this.redis.incrementNonce(RedisUtils.CHAIN_NONCE_KEY, nonce);
38+
const nextNonceIndex = await this.redis.incrementNonce(...keys, keys.length, RedisUtils.NONCE_KEY_EXPIRE_SECONDS);
39+
if (nextNonceIndex === -1) {
40+
this.logger.warn(`nextNonce was full even with ${RedisUtils.NUMBER_OF_NONCE_KEYS_TO_CHECK} ${nonce}`);
41+
return Number(nonce) + RedisUtils.NUMBER_OF_NONCE_KEYS_TO_CHECK;
42+
}
43+
const nextNonce = Number(nonce) + nextNonceIndex - 1;
3844
this.logger.debug(`nextNonce ${nextNonce}`);
3945
return nextNonce;
4046
}
47+
48+
// eslint-disable-next-line class-methods-use-this
49+
getNextPossibleKeys(currentNonce: number): string[] {
50+
const keys: string[] = [];
51+
for (let i = 0; i < RedisUtils.NUMBER_OF_NONCE_KEYS_TO_CHECK; i += 1) {
52+
const key = currentNonce + i;
53+
keys.push(RedisUtils.getNonceKey(`${key}`));
54+
}
55+
return keys;
56+
}
4157
}

services/content-watcher/apps/worker/src/publisher/publishing.service.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot
2525

2626
constructor(
2727
@InjectRedis() private cacheManager: Redis,
28-
@InjectQueue(QueueConstants.TRANSACTION_RECEIPT_QUEUE_NAME) private txReceiptQueue,
28+
@InjectQueue(QueueConstants.TRANSACTION_RECEIPT_QUEUE_NAME) private txReceiptQueue: Queue,
2929
@InjectQueue(QueueConstants.PUBLISH_QUEUE_NAME) private publishQueue: Queue,
3030
private blockchainService: BlockchainService,
3131
private configService: ConfigService,
@@ -78,9 +78,10 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot
7878
txHash,
7979
referencePublishJob: jobData,
8080
};
81-
// add a delay of 1 block to allow the tx reciept to go through before checking
82-
const delay = 1 * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND;
83-
await this.txReceiptQueue.add(job.id, job, { jobId: job.id, removeOnFail: false, removeOnComplete: 1000, delay });
81+
// add a delay of 1 block to allow the tx receipt to go through before checking
82+
const initialDelay = 1 * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND;
83+
const retryDelay = 3 * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND;
84+
await this.txReceiptQueue.add(`Receipt Job - ${job.id}`, job, { jobId: job.id, delay: initialDelay, attempts: 4, backoff: { type: 'exponential', delay: retryDelay } });
8485
}
8586

8687
private async checkCapacity(): Promise<void> {

services/content-watcher/libs/common/src/blockchain/blockchain.service.ts

Lines changed: 61 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -165,66 +165,68 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
165165
txHash: Hash,
166166
blockList: bigint[],
167167
successEvents: [{ pallet: string; event: string }],
168-
): Promise<{ success: boolean; blockHash?: BlockHash; capacityWithDrawn?: string; error?: RegistryError }> {
169-
const txReceiptPromises: Promise<{ success: boolean; blockHash?: BlockHash; capacityWithDrawn?: string; error?: RegistryError }>[] = blockList.map(async (blockNumber) => {
170-
const blockHash = await this.getBlockHash(blockNumber);
171-
const block = await this.getBlock(blockHash);
172-
const txInfo = block.block.extrinsics.find((extrinsic) => extrinsic.hash.toString() === txHash.toString());
173-
174-
if (!txInfo) {
175-
return { success: false };
176-
}
177-
178-
this.logger.verbose(`Found tx ${txHash} in block ${blockNumber}`);
179-
const at = await this.api.at(blockHash.toHex());
180-
const eventsPromise = firstValueFrom(at.query.system.events());
181-
182-
let isTxSuccess = false;
183-
let totalBlockCapacity: bigint = 0n;
184-
let txError: RegistryError | undefined;
185-
186-
try {
187-
const events = await eventsPromise;
188-
189-
events.forEach((record) => {
190-
const { event } = record;
191-
const eventName = event.section;
192-
const { method } = event;
193-
const { data } = event;
194-
this.logger.debug(`Received event: ${eventName} ${method} ${data}`);
195-
196-
// find capacity withdrawn event
197-
if (eventName.search('capacity') !== -1 && method.search('Withdrawn') !== -1) {
198-
// allow lowercase constructor for eslint
199-
// eslint-disable-next-line new-cap
200-
const currentCapacity: u128 = new u128(this.api.registry, data[1]);
201-
totalBlockCapacity += currentCapacity.toBigInt();
202-
}
203-
204-
// check custom success events
205-
if (successEvents.find((successEvent) => successEvent.pallet === eventName && successEvent.event === method)) {
206-
this.logger.debug(`Found success event ${eventName} ${method}`);
207-
isTxSuccess = true;
208-
}
209-
210-
// check for system extrinsic failure
211-
if (eventName.search('system') !== -1 && method.search('ExtrinsicFailed') !== -1) {
212-
const dispatchError = data[0] as DispatchError;
213-
const moduleThatErrored = dispatchError.asModule;
214-
const moduleError = dispatchError.registry.findMetaError(moduleThatErrored);
215-
txError = moduleError;
216-
this.logger.error(`Extrinsic failed with error: ${JSON.stringify(moduleError)}`);
217-
}
218-
});
219-
} catch (error) {
220-
this.logger.error(error);
221-
}
222-
this.logger.debug(`Total capacity withdrawn in block: ${totalBlockCapacity.toString()}`);
223-
return { success: isTxSuccess, blockHash, capacityWithDrawn: totalBlockCapacity.toString(), error: txError };
224-
});
168+
): Promise<{ found: boolean; success: boolean; blockHash?: BlockHash; capacityWithDrawn?: string; error?: RegistryError }> {
169+
const txReceiptPromises: Promise<{ found: boolean; success: boolean; blockHash?: BlockHash; capacityWithDrawn?: string; error?: RegistryError }>[] = blockList.map(
170+
async (blockNumber) => {
171+
const blockHash = await this.getBlockHash(blockNumber);
172+
const block = await this.getBlock(blockHash);
173+
const txInfo = block.block.extrinsics.find((extrinsic) => extrinsic.hash.toString() === txHash.toString());
174+
175+
if (!txInfo) {
176+
return { found: false, success: false };
177+
}
178+
179+
this.logger.verbose(`Found tx ${txHash} in block ${blockNumber}`);
180+
const at = await this.api.at(blockHash.toHex());
181+
const eventsPromise = firstValueFrom(at.query.system.events());
182+
183+
let isTxSuccess = false;
184+
let totalBlockCapacity: bigint = 0n;
185+
let txError: RegistryError | undefined;
186+
187+
try {
188+
const events = await eventsPromise;
189+
190+
events.forEach((record) => {
191+
const { event } = record;
192+
const eventName = event.section;
193+
const { method } = event;
194+
const { data } = event;
195+
this.logger.debug(`Received event: ${eventName} ${method} ${data}`);
196+
197+
// find capacity withdrawn event
198+
if (eventName.search('capacity') !== -1 && method.search('Withdrawn') !== -1) {
199+
// allow lowercase constructor for eslint
200+
// eslint-disable-next-line new-cap
201+
const currentCapacity: u128 = new u128(this.api.registry, data[1]);
202+
totalBlockCapacity += currentCapacity.toBigInt();
203+
}
204+
205+
// check custom success events
206+
if (successEvents.find((successEvent) => successEvent.pallet === eventName && successEvent.event === method)) {
207+
this.logger.debug(`Found success event ${eventName} ${method}`);
208+
isTxSuccess = true;
209+
}
210+
211+
// check for system extrinsic failure
212+
if (eventName.search('system') !== -1 && method.search('ExtrinsicFailed') !== -1) {
213+
const dispatchError = data[0] as DispatchError;
214+
const moduleThatErrored = dispatchError.asModule;
215+
const moduleError = dispatchError.registry.findMetaError(moduleThatErrored);
216+
txError = moduleError;
217+
this.logger.error(`Extrinsic failed with error: ${JSON.stringify(moduleError)}`);
218+
}
219+
});
220+
} catch (error) {
221+
this.logger.error(error);
222+
}
223+
this.logger.debug(`Total capacity withdrawn in block: ${totalBlockCapacity.toString()}`);
224+
return { found: true, success: isTxSuccess, blockHash, capacityWithDrawn: totalBlockCapacity.toString(), error: txError };
225+
},
226+
);
225227
const results = await Promise.all(txReceiptPromises);
226-
const result = results.find((receipt) => receipt.blockHash !== undefined);
228+
const result = results.find((receipt) => receipt.found);
227229
this.logger.debug(`Found tx receipt: ${JSON.stringify(result)}`);
228-
return result ?? { success: false };
230+
return result ?? { found: false, success: false };
229231
}
230232
}

services/content-watcher/libs/common/src/utils/redis.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,18 @@ export namespace RedisUtils {
77
* batch Lock expire time which applies during closing operation
88
*/
99
export const BATCH_LOCK_EXPIRE_SECONDS = 6;
10-
export const CHAIN_NONCE_KEY = 'chain:nonce';
10+
/**
11+
* To be able to provide mostly unique nonces to submit transactions on chain we would need to check a number of
12+
* temporarily locked keys on redis side and get the first available one. This number defines the number of keys
13+
* we should look into before giving up
14+
*/
15+
export const NUMBER_OF_NONCE_KEYS_TO_CHECK = 50;
16+
/**
17+
* Nonce keys have to get expired shortly so that if any of nonce numbers get skipped we would still have a way to
18+
* submit them after expiration
19+
*/
20+
export const NONCE_KEY_EXPIRE_SECONDS = 2;
21+
const CHAIN_NONCE_KEY = 'chain:nonce';
1122
const ASSET_DATA_KEY_PREFIX = 'asset:data';
1223
const ASSET_METADATA_KEY_PREFIX = 'asset:metadata';
1324
const BATCH_DATA_KEY_PREFIX = 'batch:data';
@@ -33,4 +44,8 @@ export namespace RedisUtils {
3344
export function getLockKey(suffix: string) {
3445
return `${LOCK_KEY_PREFIX}:${suffix}`;
3546
}
47+
48+
export function getNonceKey(suffix: string) {
49+
return `${CHAIN_NONCE_KEY}:${suffix}`;
50+
}
3651
}
Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,26 @@
11
--[[
22
Input:
3-
KEYS[1] nonce key
4-
ARGV[1] current nonce
3+
KEYS[N] nonce keys
4+
ARGV[1] number of keys
5+
ARGV[2] key expire time in seconds
56
Output:
6-
N OK (current nonce)
7+
-1 ERROR (none of keys worked)
8+
N OK (chosen key index)
79
]]
8-
local nonceKey = KEYS[1]
9-
local currentNonce = tonumber(ARGV[1])
10+
local keysSize = tonumber(ARGV[1])
11+
local expireInSeconds = tonumber(ARGV[2])
1012
local rcall = redis.call
1113

12-
local nonce = rcall("GET",nonceKey)
13-
if not nonce or tonumber(nonce) < currentNonce then
14-
rcall('SET', nonceKey, currentNonce)
15-
else
16-
rcall('INCR', nonceKey)
17-
end
18-
return rcall("GET", nonceKey)
14+
local i = 1
15+
repeat
16+
local nextKey = KEYS[i]
17+
if rcall("EXISTS", nextKey) == 0 then
18+
rcall('SETEX', nextKey, expireInSeconds , 1)
19+
return i
20+
end
21+
i = i + 1
22+
until( i > keysSize)
23+
return -1
1924

2025

2126

0 commit comments

Comments
 (0)