Skip to content
This repository was archived by the owner on Jul 25, 2024. It is now read-only.

Commit 7bd872a

Browse files
authored
102 historical content scan should not impede current content scan (#103)
This PR does the following: - add cache prefix for Redis and BullMQ keys to avoid conflicts with other apps - refactor NestJS modules to avoid multiple instantiations - add webhook URL to search request and propagate through queues - Remove STARTING_BLOCK env; scan should always start from last known scan position, or end-of-chain if no cached position - Fix search crawler to process correct block list - update OpenAPI generated document - process search block list in chunks to avoid OOM Closes #102
1 parent 41175db commit 7bd872a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+552
-795
lines changed

.env.docker.dev

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ IPFS_GATEWAY_URL="http://ipfs:8080/ipfs/[CID]"
1818
# Blockchain node address
1919
FREQUENCY_URL=ws://frequency:9944
2020

21-
# Block number from which the service will start scanning the chain
22-
STARTING_BLOCK=1
23-
2421
# Redis URL
2522
REDIS_URL=redis://redis:6379
2623

ENVIRONMENT.md

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@
22

33
This application recognizes the following environment variables:
44

5-
| Name | Description | Range/Type | Required? | Default |
6-
| ---------------------------------- | ------------------------------------------------------------------------------------------------------------------ | :--------------------: | :-------: | :-----: |
7-
| `API_PORT` | HTTP port that the application listens on | 1025 - 65535 | | 3000 |
8-
| `BLOCKCHAIN_SCAN_INTERVAL_SECONDS` | How many seconds to delay between successive scans of the chain for new content (after end of chain is reached) | > 0 | | 12 |
9-
| `FREQUENCY_URL` | Blockchain node address | http(s): or ws(s): URL | Y | |
10-
| `IPFS_BASIC_AUTH_SECRET` | If required for read requests, put Infura auth token here, or leave blank for default Kubo RPC | string | N | blank |
11-
| `IPFS_BASIC_AUTH_USER` | If required for read requests, put Infura Project ID here, or leave blank for default Kubo RPC | string | N | blank |
12-
| `IPFS_ENDPOINT` | URL to IPFS endpoint | URL | Y | |
13-
| `IPFS_GATEWAY_URL` | IPFS gateway URL. '[CID]' is a token that will be replaced with an actual content ID | URL template | Y | |
14-
| `QUEUE_HIGH_WATER` | Max number of jobs allowed on the '' before blockchain scan will be paused to allow queue to drain | >= 100 | | 1000 |
15-
| `REDIS_URL` | Connection URL for Redis | URL | Y |
16-
| `STARTING_BLOCK` | Block number from which the service will start scanning the chain | > 0 | | 1 |
17-
| `WEBHOOK_FAILURE_THRESHOLD` | Number of failures allowing in the provider webhook before the service is marked down | > 0 | | 3 |
18-
| `WEBHOOK_RETRY_INTERVAL_SECONDS` | Number of seconds between provider webhook retry attempts when failing | > 0 | | 10 |
5+
| Name | Description | Range/Type | Required? | Default |
6+
| ---------------------------------- | --------------------------------------------------------------------------------------------------------------- | :--------------------: | :-------: | :--------------: |
7+
| `API_PORT` | HTTP port that the application listens on | 1025 - 65535 | | 3000 |
8+
| `BLOCKCHAIN_SCAN_INTERVAL_SECONDS` | How many seconds to delay between successive scans of the chain for new content (after end of chain is reached) | > 0 | | 12 |
9+
| `CACHE_KEY_PREFIX` | Prefix to use for Redis cache keys | string | | content-watcher: |
10+
| `FREQUENCY_URL` | Blockchain node address | http(s): or ws(s): URL | Y | |
11+
| `IPFS_BASIC_AUTH_SECRET` | If required for read requests, put Infura auth token here, or leave blank for default Kubo RPC | string | N | blank |
12+
| `IPFS_BASIC_AUTH_USER` | If required for read requests, put Infura Project ID here, or leave blank for default Kubo RPC | string | N | blank |
13+
| `IPFS_ENDPOINT` | URL to IPFS endpoint | URL | Y | |
14+
| `IPFS_GATEWAY_URL` | IPFS gateway URL. '[CID]' is a token that will be replaced with an actual content ID | URL template | Y | |
15+
| `QUEUE_HIGH_WATER` | Max number of jobs allowed on the '' before blockchain scan will be paused to allow queue to drain | >= 100 | | 1000 |
16+
| `REDIS_URL` | Connection URL for Redis | URL | Y |
17+
| `STARTING_BLOCK` | Block number from which the service will start scanning the chain | > 0 | | 1 |
18+
| `WEBHOOK_FAILURE_THRESHOLD` | Number of failures allowing in the provider webhook before the service is marked down | > 0 | | 3 |
19+
| `WEBHOOK_RETRY_INTERVAL_SECONDS` | Number of seconds between provider webhook retry attempts when failing | > 0 | | 10 |

apps/api/src/api.module.ts

Lines changed: 25 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { Module } from '@nestjs/common';
22
import { EventEmitterModule } from '@nestjs/event-emitter';
3-
import { BullModule } from '@nestjs/bullmq';
43
import { ScheduleModule } from '@nestjs/schedule';
54
import { RedisModule } from '@songkeys/nestjs-redis';
65
import { BullBoardModule } from '@bull-board/nestjs';
@@ -13,131 +12,71 @@ import { CrawlerModule } from '@libs/common/crawler/crawler.module';
1312
import { IPFSProcessorModule } from '@libs/common/ipfs/ipfs.module';
1413
import { PubSubModule } from '@libs/common/pubsub/pubsub.module';
1514
import { ScannerModule } from '@libs/common/scanner/scanner.module';
16-
import { ConfigModule } from '@libs/common/config/config.module';
17-
import { ConfigService } from '@libs/common/config/config.service';
15+
import { AppConfigModule } from '@libs/common/config/config.module';
16+
import { AppConfigService } from '@libs/common/config/config.service';
1817
import * as QueueConstants from '@libs/common';
18+
import { QueueModule } from '@libs/common/queues/queue.module';
1919

2020
@Module({
2121
imports: [
22-
ConfigModule,
22+
AppConfigModule,
23+
ScheduleModule.forRoot(),
2324
BlockchainModule,
2425
ScannerModule,
2526
CrawlerModule,
2627
IPFSProcessorModule,
2728
PubSubModule,
2829
RedisModule.forRootAsync(
2930
{
30-
imports: [ConfigModule],
31-
useFactory: (configService: ConfigService) => ({
32-
config: [{ url: configService.redisUrl.toString() }],
31+
// imports: [ConfigModule],
32+
useFactory: (configService: AppConfigService) => ({
33+
config: [{ url: configService.redisUrl.toString(), keyPrefix: configService.cacheKeyPrefix }],
3334
}),
34-
inject: [ConfigService],
35+
inject: [AppConfigService],
3536
},
3637
true, // isGlobal
3738
),
38-
BullModule.forRootAsync({
39-
imports: [ConfigModule],
40-
useFactory: (configService: ConfigService) => {
41-
// Note: BullMQ doesn't honor a URL for the Redis connection, and
42-
// JS URL doesn't parse 'redis://' as a valid protocol, so we fool
43-
// it by changing the URL to use 'http://' in order to parse out
44-
// the host, port, username, password, etc.
45-
// We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but
46-
// trying to keep the # of environment variables from proliferating
47-
const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http'));
48-
const { hostname, port, username, password, pathname } = url;
49-
return {
50-
connection: {
51-
host: hostname || undefined,
52-
port: port ? Number(port) : undefined,
53-
username: username || undefined,
54-
password: password || undefined,
55-
db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined,
56-
},
57-
};
58-
},
59-
inject: [ConfigService],
39+
QueueModule,
40+
41+
// Bullboard UI
42+
BullBoardModule.forRoot({
43+
route: '/queues',
44+
adapter: ExpressAdapter,
6045
}),
61-
BullModule.registerQueue(
46+
BullBoardModule.forFeature(
6247
{
6348
name: QueueConstants.REQUEST_QUEUE_NAME,
64-
defaultJobOptions: {
65-
attempts: 3,
66-
backoff: {
67-
type: 'exponential',
68-
},
69-
removeOnComplete: true,
70-
removeOnFail: false,
71-
},
49+
adapter: BullMQAdapter,
7250
},
7351
{
7452
name: QueueConstants.IPFS_QUEUE,
75-
defaultJobOptions: {
76-
attempts: 3,
77-
backoff: {
78-
type: 'exponential',
79-
},
80-
removeOnComplete: true,
81-
removeOnFail: false,
82-
},
53+
adapter: BullMQAdapter,
8354
},
8455
{
8556
name: QueueConstants.BROADCAST_QUEUE_NAME,
57+
adapter: BullMQAdapter,
8658
},
8759
{
8860
name: QueueConstants.REPLY_QUEUE_NAME,
61+
adapter: BullMQAdapter,
8962
},
9063
{
9164
name: QueueConstants.REACTION_QUEUE_NAME,
65+
adapter: BullMQAdapter,
9266
},
9367
{
9468
name: QueueConstants.TOMBSTONE_QUEUE_NAME,
69+
adapter: BullMQAdapter,
9570
},
9671
{
9772
name: QueueConstants.PROFILE_QUEUE_NAME,
73+
adapter: BullMQAdapter,
9874
},
9975
{
10076
name: QueueConstants.UPDATE_QUEUE_NAME,
77+
adapter: BullMQAdapter,
10178
},
10279
),
103-
104-
// Bullboard UI
105-
BullBoardModule.forRoot({
106-
route: '/queues',
107-
adapter: ExpressAdapter,
108-
}),
109-
BullBoardModule.forFeature({
110-
name: QueueConstants.REQUEST_QUEUE_NAME,
111-
adapter: BullMQAdapter,
112-
}),
113-
BullBoardModule.forFeature({
114-
name: QueueConstants.IPFS_QUEUE,
115-
adapter: BullMQAdapter,
116-
}),
117-
BullBoardModule.forFeature({
118-
name: QueueConstants.BROADCAST_QUEUE_NAME,
119-
adapter: BullMQAdapter,
120-
}),
121-
BullBoardModule.forFeature({
122-
name: QueueConstants.REPLY_QUEUE_NAME,
123-
adapter: BullMQAdapter,
124-
}),
125-
BullBoardModule.forFeature({
126-
name: QueueConstants.REACTION_QUEUE_NAME,
127-
adapter: BullMQAdapter,
128-
}),
129-
BullBoardModule.forFeature({
130-
name: QueueConstants.TOMBSTONE_QUEUE_NAME,
131-
adapter: BullMQAdapter,
132-
}),
133-
BullBoardModule.forFeature({
134-
name: QueueConstants.PROFILE_QUEUE_NAME,
135-
adapter: BullMQAdapter,
136-
}),
137-
BullBoardModule.forFeature({
138-
name: QueueConstants.UPDATE_QUEUE_NAME,
139-
adapter: BullMQAdapter,
140-
}),
14180
EventEmitterModule.forRoot({
14281
// Use this instance throughout the application
14382
global: true,
@@ -156,12 +95,11 @@ import * as QueueConstants from '@libs/common';
15695
// disable throwing uncaughtException if an error event is emitted and it has no listeners
15796
ignoreErrors: false,
15897
}),
159-
ScheduleModule.forRoot(),
16098
],
16199
providers: [ApiService],
162100
// Controller order determines the order of display for docs
163101
// v[Desc first][ABC Second], Health, and then Dev only last
164102
controllers: [ScanControllerV1, SearchControllerV1, WebhookControllerV1, HealthController],
165-
exports: [],
103+
exports: [RedisModule, ScheduleModule],
166104
})
167105
export class ApiModule {}

apps/api/src/api.service.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ import Redis from 'ioredis';
44
import { InjectQueue } from '@nestjs/bullmq';
55
import { Queue } from 'bullmq';
66
import { ContentSearchRequestDto, REQUEST_QUEUE_NAME, calculateJobId } from '../../../libs/common/src';
7-
import { ScannerService } from '../../../libs/common/src/scanner/scanner';
7+
import { ScannerService } from '../../../libs/common/src/scanner/scanner.service';
88
import { EVENTS_TO_WATCH_KEY, REGISTERED_WEBHOOK_KEY } from '../../../libs/common/src/constants';
99
import { ChainWatchOptionsDto } from '../../../libs/common/src/dtos/chain.watch.dto';
10-
import { WebhookRegistrationDto } from '../../../libs/common/src/dtos/subscription.webhook.dto';
10+
import { IWebhookRegistration } from '../../../libs/common/src/dtos/subscription.webhook.dto';
1111
import * as RedisUtils from '../../../libs/common/src/utils/redis';
1212
import { IScanReset } from '../../../libs/common/src/interfaces/scan-reset.interface';
13+
import { IAnnouncementSubscription } from '@libs/common/interfaces/announcement-subscription.interface';
1314

1415
@Injectable()
1516
export class ApiService {
@@ -50,7 +51,7 @@ export class ApiService {
5051
}
5152

5253
public async searchContent(contentSearchRequestDto: ContentSearchRequestDto) {
53-
const jobId = contentSearchRequestDto.id ?? calculateJobId(contentSearchRequestDto);
54+
const jobId = contentSearchRequestDto.clientReferenceId ?? calculateJobId(contentSearchRequestDto);
5455
this.logger.debug(`Searching for content with request ${JSON.stringify(contentSearchRequestDto)}`);
5556

5657
const job = await this.requestQueue.getJob(jobId);
@@ -60,18 +61,18 @@ export class ApiService {
6061
}
6162
this.requestQueue.remove(jobId);
6263
// eslint-disable-next-line no-param-reassign
63-
contentSearchRequestDto.id = jobId;
64+
contentSearchRequestDto.clientReferenceId = jobId;
6465
const jobPromise = await this.requestQueue.add(`Content Search ${jobId}`, contentSearchRequestDto, { jobId });
6566
const JOB_REQUEST_WATCH_KEY = `${EVENTS_TO_WATCH_KEY}:${jobId}`;
6667
await this.redis.setex(JOB_REQUEST_WATCH_KEY, RedisUtils.STORAGE_EXPIRE_UPPER_LIMIT_SECONDS, JSON.stringify(contentSearchRequestDto.filters));
6768
return jobPromise;
6869
}
6970

70-
public async setWebhook(webhookRegistration: WebhookRegistrationDto) {
71+
public async setWebhook(webhookRegistration: IWebhookRegistration) {
7172
this.logger.debug(`Registering webhook ${JSON.stringify(webhookRegistration)}`);
7273
const currentRegistedWebooks = await this.redis.get(REGISTERED_WEBHOOK_KEY);
7374

74-
let currentWebhookRegistrationDtos: { announcementType: string; urls: string[] }[] = [];
75+
let currentWebhookRegistrationDtos: IAnnouncementSubscription[] = [];
7576
if (currentRegistedWebooks) {
7677
currentWebhookRegistrationDtos = JSON.parse(currentRegistedWebooks);
7778
}
@@ -97,7 +98,7 @@ export class ApiService {
9798
await this.redis.del(REGISTERED_WEBHOOK_KEY);
9899
}
99100

100-
public async getRegisteredWebhooks(): Promise<WebhookRegistrationDto[]> {
101+
public async getRegisteredWebhooks(): Promise<IWebhookRegistration[]> {
101102
this.logger.debug('Getting registered webhooks');
102103
const registeredWebhooks = await this.redis.get(REGISTERED_WEBHOOK_KEY);
103104
return registeredWebhooks ? JSON.parse(registeredWebhooks) : [];

apps/api/src/main.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Logger, ValidationPipe } from '@nestjs/common';
33
import { EventEmitter2 } from '@nestjs/event-emitter';
44
import { ApiModule } from './api.module';
55
import { initSwagger } from '../../../libs/common/src/config/swagger_config';
6-
import { ConfigService } from '../../../libs/common/src/config/config.service';
6+
import { AppConfigService } from '../../../libs/common/src/config/config.service';
77

88
const logger = new Logger('main');
99

@@ -17,7 +17,7 @@ async function bootstrap() {
1717
const app = await NestFactory.create(ApiModule, {
1818
logger: process.env.DEBUG ? ['error', 'warn', 'log', 'verbose', 'debug'] : ['error', 'warn', 'log'],
1919
});
20-
const configService = app.get<ConfigService>(ConfigService);
20+
const configService = app.get<AppConfigService>(AppConfigService);
2121

2222
// Get event emitter & register a shutdown listener
2323
const eventEmitter = app.get<EventEmitter2>(EventEmitter2);

apps/api/src/metadata.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,14 @@ export default async () => {
9898
{ ChainWatchOptionsDto: { schemaIds: { required: true, type: () => [Number] }, dsnpIds: { required: true, type: () => [String] } } },
9999
],
100100
[
101-
import('../../../libs/common/src/dtos/request-job.dto'),
101+
import('../../../libs/common/src/dtos/content-search-request.dto'),
102102
{
103103
ContentSearchRequestDto: {
104-
id: { required: true, type: () => String },
104+
clientReferenceId: { required: true, type: () => String },
105105
startBlock: { required: true, type: () => Number, minimum: 1 },
106-
endBlock: { required: true, type: () => Number, minimum: 1 },
106+
blockCount: { required: true, type: () => Number, minimum: 1 },
107107
filters: { required: true, type: () => t['../../../libs/common/src/dtos/chain.watch.dto'].ChainWatchOptionsDto },
108+
webhookUrl: { required: true, type: () => String },
108109
},
109110
},
110111
],

dev.Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@ USER node
1919
EXPOSE 3000
2020

2121
# Run the application.
22-
ENTRYPOINT [ "npm", "run", "start:api:watch" ]
22+
ENTRYPOINT [ "npm", "run", "start:watch" ]

docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ services:
9696
tags:
9797
- content-watcher-service:latest
9898
environment:
99-
<< : *content-watcher-environment
99+
<<: *content-watcher-environment
100100
ports:
101101
- 3000:3000
102102
volumes:

env.template

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@ IPFS_GATEWAY_URL="http://127.0.0.1:8080/ipfs/[CID]"
1717
# Blockchain node address
1818
FREQUENCY_URL=ws://0.0.0.0:9944
1919

20-
# Block number from which the service will start scanning the chain
21-
STARTING_BLOCK=1
22-
2320
# Redis URL
2421
REDIS_URL=redis://0.0.0.0:6379
2522

@@ -39,3 +36,6 @@ WEBHOOK_RETRY_INTERVAL_SECONDS=10
3936

4037
# Port that the application REST endpoints listen on
4138
API_PORT=3000
39+
40+
# Prefix to use for Redis cache keys
41+
CACHE_KEY_PREFIX=content-watcher:

libs/common/src/blockchain/blockchain.module.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
1-
/*
2-
https://docs.nestjs.com/modules
3-
*/
4-
51
import { Module } from '@nestjs/common';
62
import { BlockchainService } from './blockchain.service';
7-
import { ConfigModule } from '../config/config.module';
83
import { ChainEventProcessorService } from './chain-event-processor.service';
94

105
@Module({
11-
imports: [ConfigModule],
6+
imports: [],
127
controllers: [],
138
providers: [BlockchainService, ChainEventProcessorService],
149
exports: [BlockchainService, ChainEventProcessorService],

libs/common/src/blockchain/blockchain.service.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { KeyringPair } from '@polkadot/keyring/types';
77
import { BlockHash, BlockNumber, SignedBlock } from '@polkadot/types/interfaces';
88
import { SubmittableExtrinsic } from '@polkadot/api/types';
99
import { AnyNumber, ISubmittableResult } from '@polkadot/types/types';
10-
import { ConfigService } from '../config/config.service';
10+
import { AppConfigService } from '../config/config.service';
1111
import { Extrinsic } from './extrinsic';
1212

1313
@Injectable()
@@ -16,7 +16,7 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
1616

1717
public apiPromise: ApiPromise;
1818

19-
private configService: ConfigService;
19+
private configService: AppConfigService;
2020

2121
private logger: Logger;
2222

@@ -54,7 +54,7 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
5454
await Promise.all(promises);
5555
}
5656

57-
constructor(configService: ConfigService) {
57+
constructor(configService: AppConfigService) {
5858
this.configService = configService;
5959
this.logger = new Logger(this.constructor.name);
6060
}
@@ -63,7 +63,7 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
6363
return firstValueFrom(this.api.rpc.chain.getBlockHash(block));
6464
}
6565

66-
public getBlock(block: BlockHash): Promise<SignedBlock> {
66+
public getBlock(block?: BlockHash): Promise<SignedBlock> {
6767
return firstValueFrom(this.api.rpc.chain.getBlock(block));
6868
}
6969

0 commit comments

Comments
 (0)