Skip to content
This repository was archived by the owner on Jul 25, 2024. It is now read-only.

Commit 5c68bf7

Browse files
authored
make reset/pause/restart scan work correctly (#61)
# Description Makes changing the current block number while the scan is paused work correctly. * Adds additional request options to `resetScan` endpoint: * `rewindOffset`: number; indicates number of blocks to rewind from either end of chain or block number in request * `immediate`: boolean; indicates whether to start the scan immediately from the new position, or allow it to take effect at the next scheduled interval Refactors the scan loop a bit to make more compact & readable, as well as to be able to honor scan reset requests even if a scan is in-progress.
1 parent b94d285 commit 5c68bf7

File tree

9 files changed

+120
-76
lines changed

9 files changed

+120
-76
lines changed

apps/api/src/api.controller.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { Body, Controller, Delete, Get, HttpStatus, Logger, Post, Put } from '@nestjs/common';
2-
import { ApiBody, ApiOkResponse } from '@nestjs/swagger';
1+
import { Body, Controller, Delete, Get, HttpStatus, Logger, Post, Put, Query } from '@nestjs/common';
2+
import { ApiBody, ApiOkResponse, ApiQuery } from '@nestjs/swagger';
33
import { ApiService } from './api.service';
44
import { ResetScannerDto, ContentSearchRequestDto } from '../../../libs/common/src';
55
import { ChainWatchOptionsDto } from '../../../libs/common/src/dtos/chain.watch.dto';
@@ -27,7 +27,7 @@ export class ApiController {
2727
type: ResetScannerDto,
2828
})
2929
resetScanner(@Body() resetScannerDto: ResetScannerDto) {
30-
return this.apiService.setLastSeenBlockNumber(BigInt(resetScannerDto.blockNumber ?? 0n));
30+
return this.apiService.resetScanner(resetScannerDto);
3131
}
3232

3333
@Post('setWatchOptions')
@@ -45,8 +45,15 @@ export class ApiController {
4545
}
4646

4747
@Post('startScanner')
48-
startScanner() {
49-
return this.apiService.resumeScanner();
48+
@ApiQuery({
49+
name: 'immediate',
50+
description: 'immediate: whether to resume scan immediately (true), or wait until next scheduled scan (false)',
51+
type: 'boolean',
52+
required: false,
53+
})
54+
startScanner(@Query('immediate') immediate?: boolean) {
55+
this.logger.debug(`Resuming scan; immediate=${immediate}`);
56+
return this.apiService.resumeScanner(immediate);
5057
}
5158

5259
@Put('search')

apps/api/src/api.service.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ import { InjectRedis } from '@songkeys/nestjs-redis';
33
import Redis from 'ioredis';
44
import { InjectQueue } from '@nestjs/bullmq';
55
import { Queue } from 'bullmq';
6-
import { ContentSearchRequestDto, REQUEST_QUEUE_NAME, calculateJobId } from '../../../libs/common/src';
6+
import { ContentSearchRequestDto, REQUEST_QUEUE_NAME, ResetScannerDto, calculateJobId } from '../../../libs/common/src';
77
import { ScannerService } from '../../../libs/common/src/scanner/scanner';
8-
import { EVENTS_TO_WATCH_KEY, LAST_SEEN_BLOCK_NUMBER_SCANNER_KEY, REGISTERED_WEBHOOK_KEY } from '../../../libs/common/src/constants';
8+
import { EVENTS_TO_WATCH_KEY, REGISTERED_WEBHOOK_KEY } from '../../../libs/common/src/constants';
99
import { ChainWatchOptionsDto } from '../../../libs/common/src/dtos/chain.watch.dto';
1010
import { WebhookRegistrationDto } from '../../../libs/common/src/dtos/subscription.webhook.dto';
1111
import * as RedisUtils from '../../../libs/common/src/utils/redis';
12+
import { IScanReset } from '../../../libs/common/src/interfaces/scan-reset.interface';
1213

1314
@Injectable()
1415
export class ApiService {
@@ -22,11 +23,6 @@ export class ApiService {
2223
this.logger = new Logger(this.constructor.name);
2324
}
2425

25-
public setLastSeenBlockNumber(blockNumber: bigint) {
26-
this.logger.warn(`Setting last seen block number to ${blockNumber}`);
27-
return this.redis.setex(LAST_SEEN_BLOCK_NUMBER_SCANNER_KEY, RedisUtils.STORAGE_EXPIRE_UPPER_LIMIT_SECONDS, blockNumber.toString());
28-
}
29-
3026
public async setWatchOptions(watchOptions: ChainWatchOptionsDto) {
3127
this.logger.warn(`Setting watch options to ${JSON.stringify(watchOptions)}`);
3228
const currentWatchOptions = await this.redis.get(EVENTS_TO_WATCH_KEY);
@@ -36,12 +32,16 @@ export class ApiService {
3632

3733
public pauseScanner() {
3834
this.logger.warn('Pausing scanner');
39-
return this.scannerService.pauseScanner();
35+
this.scannerService.pauseScanner();
4036
}
4137

42-
public resumeScanner() {
38+
public resumeScanner(immediate = false) {
4339
this.logger.warn('Resuming scanner');
44-
return this.scannerService.resumeScanner();
40+
this.scannerService.resumeScanner(immediate);
41+
}
42+
43+
public async resetScanner(resetScannerOptions: IScanReset) {
44+
await this.scannerService.resetScan(resetScannerOptions);
4545
}
4646

4747
public async searchContent(contentSearchRequestDto: ContentSearchRequestDto) {

libs/common/src/blockchain/blockchain.service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
7171
return (await this.apiPromise.rpc.chain.getFinalizedHead()) as BlockHash;
7272
}
7373

74-
public async getLatestFinalizedBlockNumber(): Promise<bigint> {
75-
return (await this.apiPromise.rpc.chain.getBlock()).block.header.number.toBigInt();
74+
public async getLatestFinalizedBlockNumber(): Promise<number> {
75+
return (await this.apiPromise.rpc.chain.getBlock()).block.header.number.toNumber();
7676
}
7777

7878
public async getBlockNumberForHash(hash: string): Promise<number | undefined> {

libs/common/src/config/config.service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ export class ConfigService {
3737
return this.nestConfigService.get('FREQUENCY_URL')!;
3838
}
3939

40-
public get startingBlock(): string {
41-
return this.nestConfigService.get('STARTING_BLOCK')!;
40+
public get startingBlock(): number | undefined {
41+
return this.nestConfigService.get<number>('STARTING_BLOCK')!;
4242
}
4343

4444
public get blockchainScanIntervalMinutes(): number {

libs/common/src/config/env.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ export const configModuleOptions: ConfigModuleOptions = {
1010
IPFS_BASIC_AUTH_SECRET: Joi.string().allow('').default(''),
1111
REDIS_URL: Joi.string().uri().required(),
1212
FREQUENCY_URL: Joi.string().uri().required(),
13-
STARTING_BLOCK: Joi.number().min(1).default(1),
13+
STARTING_BLOCK: Joi.number().min(1),
1414
BLOCKCHAIN_SCAN_INTERVAL_MINUTES: Joi.number()
1515
.min(1)
1616
.default(3 * 60),

libs/common/src/dtos/announcement.dto.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* File name should always end with `.dto.ts` for swagger metadata generator to get picked up
33
*/
44
// eslint-disable-next-line max-classes-per-file
5-
import { IsEnum, IsHexadecimal, IsInt, IsNotEmpty, IsString, Matches, Max, MaxLength, Min, MinLength, ValidateNested } from 'class-validator';
5+
import { IsEnum, IsInt, IsNotEmpty, IsString, Matches, Max, Min, MinLength, ValidateNested } from 'class-validator';
66
import { Type } from 'class-transformer';
77
import { NoteActivityDto, ProfileActivityDto } from './activity.dto';
88
import { DSNP_CONTENT_HASH_REGEX, DSNP_CONTENT_URI_REGEX, DSNP_EMOJI_REGEX } from './validation.dto';

libs/common/src/dtos/common.dto.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
*/
44
// eslint-disable-next-line max-classes-per-file
55
import { ApiProperty } from '@nestjs/swagger';
6-
import { IsNotEmpty, IsNumberString } from 'class-validator';
6+
import { IsBoolean, IsNotEmpty, IsNumber, IsNumberString, IsOptional, IsPositive, isPositive } from 'class-validator';
7+
import { IScanReset } from '../interfaces/scan-reset.interface';
78

89
export class DsnpUserIdParam {
910
@IsNotEmpty()
@@ -24,9 +25,22 @@ export class FilesUploadDto {
2425
files: any[];
2526
}
2627

27-
export class ResetScannerDto {
28-
@ApiProperty({ type: 'number', description: 'The block number to reset the scanner to', example: 0 })
29-
blockNumber: number;
28+
export class ResetScannerDto implements IScanReset {
29+
@IsOptional()
30+
@IsNumber()
31+
@IsPositive()
32+
@ApiProperty({ required: false, type: 'number', description: 'The block number to reset the scanner to', example: 0 })
33+
blockNumber?: number;
34+
35+
@IsOptional()
36+
@IsNumber()
37+
@ApiProperty({ required: false, type: 'number', description: 'Number of blocks to rewind the scanner to (from `blockNumber` if supplied; else from latest block', example: 100 })
38+
rewindOffset?: number;
39+
40+
@IsOptional()
41+
@IsBoolean()
42+
@ApiProperty({ required: false, type: 'boolean', description: 'Whether to schedule the new scan immediately or wait for the next scheduled interval', example: true })
43+
immediate?: boolean;
3044
}
3145

3246
// eslint-disable-next-line no-shadow
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export interface IScanReset {
2+
blockNumber?: number;
3+
rewindOffset?: number;
4+
immediate?: boolean;
5+
}

libs/common/src/scanner/scanner.ts

Lines changed: 69 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/* eslint-disable no-underscore-dangle */
22
/* eslint-disable no-await-in-loop */
33
import '@frequency-chain/api-augment';
4-
import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common';
4+
import { Injectable, Logger, OnApplicationBootstrap, OnApplicationShutdown } from '@nestjs/common';
55
import { InjectQueue } from '@nestjs/bullmq';
66
import Redis from 'ioredis';
77
import { InjectRedis } from '@songkeys/nestjs-redis';
@@ -15,14 +15,17 @@ import { EVENTS_TO_WATCH_KEY, LAST_SEEN_BLOCK_NUMBER_SCANNER_KEY, REGISTERED_WEB
1515
import { ChainWatchOptionsDto } from '../dtos/chain.watch.dto';
1616
import * as RedisUtils from '../utils/redis';
1717
import { ChainEventProcessorService } from '../blockchain/chain-event-processor.service';
18+
import { IScanReset } from '../interfaces/scan-reset.interface';
19+
20+
const INTERVAL_SCAN_NAME = 'intervalScan';
1821

1922
@Injectable()
20-
export class ScannerService implements OnApplicationBootstrap {
23+
export class ScannerService implements OnApplicationBootstrap, OnApplicationShutdown {
2124
private readonly logger: Logger;
2225

2326
private scanInProgress = false;
24-
2527
private paused = false;
28+
private scanResetBlockNumber: number | undefined;
2629

2730
constructor(
2831
private readonly configService: ConfigService,
@@ -36,32 +39,43 @@ export class ScannerService implements OnApplicationBootstrap {
3639
}
3740

3841
async onApplicationBootstrap() {
39-
const startingBlock = Number(this.configService.startingBlock) - 1;
40-
this.setLastSeenBlockNumber(startingBlock);
41-
this.scheduleInitialScan();
42-
this.scheduleBlockchainScan();
43-
}
44-
45-
private scheduleInitialScan() {
46-
const initialTimeout = setTimeout(() => this.scan(), 0);
47-
this.schedulerRegistry.addTimeout('initialScan', initialTimeout);
48-
}
42+
const startingBlock = this.configService.startingBlock;
43+
if (startingBlock) {
44+
this.logger.log(`Setting initial scan block to ${startingBlock}`);
45+
this.setLastSeenBlockNumber(startingBlock - 1);
46+
}
47+
setImmediate(() => this.scan());
4948

50-
private scheduleBlockchainScan() {
5149
const scanInterval = this.configService.blockchainScanIntervalMinutes * SECONDS_PER_MINUTE * MILLISECONDS_PER_SECOND;
50+
this.schedulerRegistry.addInterval(INTERVAL_SCAN_NAME, setInterval(() => this.scan(), scanInterval));
51+
}
5252

53-
const interval = setInterval(() => this.scan(), scanInterval);
54-
this.schedulerRegistry.addInterval('blockchainScan', interval);
53+
onApplicationShutdown(_signal?: string | undefined) {
54+
const interval = this.schedulerRegistry.getInterval(INTERVAL_SCAN_NAME);
55+
clearInterval(interval);
5556
}
5657

57-
public async pauseScanner() {
58+
public pauseScanner() {
5859
this.logger.debug('Pausing scanner');
5960
this.paused = true;
6061
}
6162

62-
public async resumeScanner() {
63+
public resumeScanner(immediate = false) {
6364
this.logger.debug('Resuming scanner');
6465
this.paused = false;
66+
if (immediate) {
67+
setImmediate(() => this.scan());
68+
}
69+
}
70+
71+
public async resetScan({ blockNumber, rewindOffset, immediate }: IScanReset) {
72+
this.pauseScanner();
73+
let targetBlock = blockNumber ?? await this.blockchainService.getLatestFinalizedBlockNumber();
74+
targetBlock -= rewindOffset ? Math.abs(rewindOffset) : 0;
75+
targetBlock = Math.max(targetBlock, 1);
76+
this.scanResetBlockNumber = targetBlock;
77+
this.logger.log(`Resetting scan to block #${targetBlock}`);
78+
this.resumeScanner(immediate);
6579
}
6680

6781
async scan() {
@@ -73,18 +87,7 @@ export class ScannerService implements OnApplicationBootstrap {
7387
return;
7488
}
7589

76-
if (this.paused) {
77-
this.logger.debug('Scanner is paused');
78-
return;
79-
}
80-
let queueSize = await this.ipfsQueue.count();
81-
82-
if (queueSize > 0) {
83-
this.logger.log('Deferring next blockchain scan until queue is empty');
84-
return;
85-
}
8690
const registeredWebhook = await this.cache.get(REGISTERED_WEBHOOK_KEY);
87-
8891
if (!registeredWebhook) {
8992
this.logger.log('No registered webhooks; no scan performed.');
9093
return;
@@ -93,32 +96,38 @@ export class ScannerService implements OnApplicationBootstrap {
9396
const eventsToWatch: ChainWatchOptionsDto = chainWatchFilters ? JSON.parse(chainWatchFilters) : { msa_ids: [], schemaIds: [] };
9497

9598
this.scanInProgress = true;
96-
let lastScannedBlock = await this.getLastSeenBlockNumber();
97-
const currentBlockNumber = lastScannedBlock + 1;
98-
let currentBlockHash = await this.blockchainService.getBlockHash(currentBlockNumber);
9999

100-
if (currentBlockHash.isEmpty) {
101-
this.logger.log('No new blocks to read; no scan performed.');
102-
this.scanInProgress = false;
103-
return;
104-
}
105-
this.logger.log(`Starting scan from block #${currentBlockNumber} (${currentBlockHash})`);
100+
let first = true;
101+
while (true) {
102+
if (this.paused) {
103+
this.logger.log('Scan paused');
104+
break;
105+
}
106+
107+
const queueSize = await this.ipfsQueue.count();
108+
if (queueSize > this.configService.queueHighWater) {
109+
this.logger.log('Queue soft limit reached; pausing scan until next interval');
110+
break;
111+
}
106112

107-
while (!this.paused && !currentBlockHash.isEmpty && queueSize < this.configService.queueHighWater) {
108-
const messages = await this.chainEventProcessor.getMessagesInBlock(lastScannedBlock, eventsToWatch);
113+
const currentBlockNumber = await this.getNextBlockNumber();
114+
const currentBlockHash = await this.blockchainService.getBlockHash(currentBlockNumber);
115+
if (currentBlockHash.isEmpty) {
116+
this.logger.log(`No new blocks to scan @ block number ${currentBlockNumber}; pausing scan until next interval`);
117+
break;
118+
}
119+
120+
if (first) {
121+
this.logger.log(`Starting scan @ block # ${currentBlockNumber} (${currentBlockHash})`);
122+
first = false;
123+
}
124+
125+
const messages = await this.chainEventProcessor.getMessagesInBlock(currentBlockNumber, eventsToWatch);
109126
if (messages.length > 0) {
110127
this.logger.debug(`Found ${messages.length} messages to process`);
111128
}
112129
await this.chainEventProcessor.queueIPFSJobs(messages, this.ipfsQueue);
113-
await this.saveProgress(lastScannedBlock);
114-
lastScannedBlock += 1;
115-
currentBlockHash = await this.blockchainService.getBlockHash(lastScannedBlock);
116-
queueSize = await this.ipfsQueue.count();
117-
}
118-
if (currentBlockHash.isEmpty) {
119-
this.logger.log(`Scan reached end-of-chain at block ${lastScannedBlock - 1}`);
120-
} else if (queueSize > this.configService.queueHighWater) {
121-
this.logger.log('Queue soft limit reached; pausing scan until next iteration');
130+
await this.saveProgress(currentBlockNumber);
122131
}
123132
} catch (err) {
124133
this.logger.error(err);
@@ -127,8 +136,17 @@ export class ScannerService implements OnApplicationBootstrap {
127136
}
128137
}
129138

130-
private async getLastSeenBlockNumber(): Promise<number> {
131-
return Number((await this.cache.get(LAST_SEEN_BLOCK_NUMBER_SCANNER_KEY)) ?? 0);
139+
private async getNextBlockNumber(): Promise<number> {
140+
let nextBlock: number;
141+
if (this.scanResetBlockNumber) {
142+
await this.setLastSeenBlockNumber(this.scanResetBlockNumber - 1);
143+
nextBlock = this.scanResetBlockNumber;
144+
this.scanResetBlockNumber = undefined;
145+
} else {
146+
nextBlock = (Number(await this.cache.get(LAST_SEEN_BLOCK_NUMBER_SCANNER_KEY)) ?? 0) + 1;
147+
}
148+
149+
return nextBlock;
132150
}
133151

134152
private async saveProgress(blockNumber: number): Promise<void> {

0 commit comments

Comments
 (0)