Skip to content
This repository was archived by the owner on Jul 25, 2024. It is now read-only.

Commit b94d285

Browse files
authored
fix: crawler update to new MessagesInBlock event (#59)
# Description `crawler` service was stlil using the deprected `MessagesStored` event. Updated/refactored to use common logic with `scanner` to look for `MessagesInBlock` events. Also changed all occurrences of Schema IDs and Block Numbers to JS `number` types, as SchemaId is `u16` and Block Number is `u32` (no need to handle `u64`/`bigint`, so no need to transmit as strings)
1 parent e2830d1 commit b94d285

File tree

13 files changed

+208
-377
lines changed

13 files changed

+208
-377
lines changed

apps/api/src/metadata.ts

Lines changed: 7 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,94 +1,9 @@
11
/* eslint-disable */
22
export default async () => {
3-
const t = {
4-
['../../../libs/common/src/dtos/activity.dto']: await import('../../../libs/common/src/dtos/activity.dto'),
5-
['../../../libs/common/src/dtos/announcement.dto']: await import('../../../libs/common/src/dtos/announcement.dto'),
6-
};
7-
return {
8-
'@nestjs/swagger': {
9-
models: [
10-
[
11-
import('../../../libs/common/src/dtos/common.dto'),
12-
{
13-
DsnpUserIdParam: { userDsnpId: { required: true, type: () => String } },
14-
AnnouncementResponseDto: { referenceId: { required: true, type: () => String } },
15-
UploadResponseDto: { assetIds: { required: true, type: () => [String] } },
16-
FilesUploadDto: { files: { required: true, type: () => [Object] } },
17-
},
18-
],
19-
[
20-
import('../../../libs/common/src/dtos/activity.dto'),
21-
{
22-
LocationDto: {
23-
name: { required: true, type: () => String, minLength: 1 },
24-
accuracy: { required: false, type: () => Number, minimum: 0, maximum: 100 },
25-
altitude: { required: false, type: () => Number },
26-
latitude: { required: false, type: () => Number },
27-
longitude: { required: false, type: () => Number },
28-
radius: { required: false, type: () => Number, minimum: 0 },
29-
units: { required: false, enum: t['../../../libs/common/src/dtos/activity.dto'].UnitTypeDto },
30-
},
31-
AssetReferenceDto: {
32-
referenceId: { required: true, type: () => String, minLength: 1 },
33-
height: { required: false, type: () => Number, minimum: 1 },
34-
width: { required: false, type: () => Number, minimum: 1 },
35-
duration: { required: false, type: () => String, pattern: 'DURATION_REGEX' },
36-
},
37-
TagDto: {
38-
type: { required: true, enum: t['../../../libs/common/src/dtos/activity.dto'].TagTypeDto },
39-
name: { required: false, type: () => String, minLength: 1 },
40-
mentionedId: { required: false, type: () => String, minLength: 1, pattern: 'DSNP_USER_URI_REGEX' },
41-
},
42-
AssetDto: {
43-
type: { required: true, enum: t['../../../libs/common/src/dtos/activity.dto'].AttachmentTypeDto },
44-
references: { required: false, type: () => [t['../../../libs/common/src/dtos/activity.dto'].AssetReferenceDto] },
45-
name: { required: false, type: () => String, minLength: 1 },
46-
href: { required: false, type: () => String, minLength: 1 },
47-
},
48-
BaseActivityDto: {
49-
name: { required: false, type: () => String },
50-
tag: { required: false, type: () => [t['../../../libs/common/src/dtos/activity.dto'].TagDto] },
51-
location: { required: false, type: () => t['../../../libs/common/src/dtos/activity.dto'].LocationDto },
52-
},
53-
NoteActivityDto: {
54-
content: { required: true, type: () => String, minLength: 1 },
55-
published: { required: true, type: () => String, pattern: 'ISO8601_REGEX' },
56-
assets: { required: false, type: () => [t['../../../libs/common/src/dtos/activity.dto'].AssetDto] },
57-
},
58-
ProfileActivityDto: {
59-
icon: { required: false, type: () => [t['../../../libs/common/src/dtos/activity.dto'].AssetReferenceDto] },
60-
summary: { required: false, type: () => String },
61-
published: { required: false, type: () => String, pattern: 'ISO8601_REGEX' },
62-
},
63-
},
64-
],
65-
[
66-
import('../../../libs/common/src/dtos/announcement.dto'),
67-
{
68-
BroadcastDto: { content: { required: true, type: () => t['../../../libs/common/src/dtos/activity.dto'].NoteActivityDto } },
69-
ReplyDto: {
70-
inReplyTo: { required: true, type: () => String, pattern: 'DSNP_CONTENT_URI_REGEX' },
71-
content: { required: true, type: () => t['../../../libs/common/src/dtos/activity.dto'].NoteActivityDto },
72-
},
73-
TombstoneDto: {
74-
targetContentHash: { required: true, type: () => String, pattern: 'DSNP_CONTENT_HASH_REGEX' },
75-
targetAnnouncementType: { required: true, enum: t['../../../libs/common/src/dtos/announcement.dto'].ModifiableAnnouncementTypeDto },
76-
},
77-
UpdateDto: {
78-
targetContentHash: { required: true, type: () => String, pattern: 'DSNP_CONTENT_HASH_REGEX' },
79-
targetAnnouncementType: { required: true, enum: t['../../../libs/common/src/dtos/announcement.dto'].ModifiableAnnouncementTypeDto },
80-
content: { required: true, type: () => t['../../../libs/common/src/dtos/activity.dto'].NoteActivityDto },
81-
},
82-
ReactionDto: {
83-
emoji: { required: true, type: () => String, minLength: 1, pattern: 'DSNP_EMOJI_REGEX' },
84-
apply: { required: true, type: () => Number, minimum: 0, maximum: 255 },
85-
inReplyTo: { required: true, type: () => String, pattern: 'DSNP_CONTENT_URI_REGEX' },
86-
},
87-
ProfileDto: { profile: { required: true, type: () => t['../../../libs/common/src/dtos/activity.dto'].ProfileActivityDto } },
88-
},
89-
],
90-
],
91-
controllers: [[import('./api.controller'), { ApiController: { health: {} } }]],
92-
},
93-
};
94-
};
3+
const t = {
4+
["../../../libs/common/src/dtos/activity.dto"]: await import("../../../libs/common/src/dtos/activity.dto"),
5+
["../../../libs/common/src/dtos/announcement.dto"]: await import("../../../libs/common/src/dtos/announcement.dto"),
6+
["../../../libs/common/src/dtos/chain.watch.dto"]: await import("../../../libs/common/src/dtos/chain.watch.dto")
7+
};
8+
return { "@nestjs/swagger": { "models": [[import("../../../libs/common/src/dtos/common.dto"), { "DsnpUserIdParam": { userDsnpId: { required: true, type: () => String } }, "AnnouncementResponseDto": { referenceId: { required: true, type: () => String } }, "UploadResponseDto": { assetIds: { required: true, type: () => [String] } }, "FilesUploadDto": { files: { required: true, type: () => [Object] } }, "ResetScannerDto": { blockNumber: { required: true, type: () => Number } } }], [import("../../../libs/common/src/dtos/activity.dto"), { "LocationDto": { name: { required: true, type: () => String, minLength: 1 }, accuracy: { required: false, type: () => Number, minimum: 0, maximum: 100 }, altitude: { required: false, type: () => Number }, latitude: { required: false, type: () => Number }, longitude: { required: false, type: () => Number }, radius: { required: false, type: () => Number, minimum: 0 }, units: { required: false, enum: t["../../../libs/common/src/dtos/activity.dto"].UnitTypeDto } }, "AssetReferenceDto": { referenceId: { required: true, type: () => String, minLength: 1 }, height: { required: false, type: () => Number, minimum: 1 }, width: { required: false, type: () => Number, minimum: 1 }, duration: { required: false, type: () => String, pattern: "DURATION_REGEX" } }, "TagDto": { type: { required: true, enum: t["../../../libs/common/src/dtos/activity.dto"].TagTypeDto }, name: { required: false, type: () => String, minLength: 1 }, mentionedId: { required: false, type: () => String, minLength: 1, pattern: "DSNP_USER_URI_REGEX" } }, "AssetDto": { type: { required: true, enum: t["../../../libs/common/src/dtos/activity.dto"].AttachmentTypeDto }, references: { required: false, type: () => [t["../../../libs/common/src/dtos/activity.dto"].AssetReferenceDto] }, name: { required: false, type: () => String, minLength: 1 }, href: { required: false, type: () => String, minLength: 1 } }, "BaseActivityDto": { name: { required: false, type: () => String }, tag: { required: false, type: () => [t["../../../libs/common/src/dtos/activity.dto"].TagDto] }, location: { required: false, type: () => t["../../../libs/common/src/dtos/activity.dto"].LocationDto } }, "NoteActivityDto": { content: { required: true, type: () => String, minLength: 1 }, published: { required: true, type: () => String, pattern: "ISO8601_REGEX" }, assets: { required: false, type: () => [t["../../../libs/common/src/dtos/activity.dto"].AssetDto] } }, "ProfileActivityDto": { icon: { required: false, type: () => [t["../../../libs/common/src/dtos/activity.dto"].AssetReferenceDto] }, summary: { required: false, type: () => String }, published: { required: false, type: () => String, pattern: "ISO8601_REGEX" } } }], [import("../../../libs/common/src/dtos/announcement.dto"), { "BroadcastDto": { content: { required: true, type: () => t["../../../libs/common/src/dtos/activity.dto"].NoteActivityDto } }, "ReplyDto": { inReplyTo: { required: true, type: () => String, pattern: "DSNP_CONTENT_URI_REGEX" }, content: { required: true, type: () => t["../../../libs/common/src/dtos/activity.dto"].NoteActivityDto } }, "TombstoneDto": { targetContentHash: { required: true, type: () => String, pattern: "DSNP_CONTENT_HASH_REGEX" }, targetAnnouncementType: { required: true, enum: t["../../../libs/common/src/dtos/announcement.dto"].ModifiableAnnouncementTypeDto } }, "UpdateDto": { targetContentHash: { required: true, type: () => String, pattern: "DSNP_CONTENT_HASH_REGEX" }, targetAnnouncementType: { required: true, enum: t["../../../libs/common/src/dtos/announcement.dto"].ModifiableAnnouncementTypeDto }, content: { required: true, type: () => t["../../../libs/common/src/dtos/activity.dto"].NoteActivityDto } }, "ReactionDto": { emoji: { required: true, type: () => String, minLength: 1, pattern: "DSNP_EMOJI_REGEX" }, apply: { required: true, type: () => Number, minimum: 0, maximum: 255 }, inReplyTo: { required: true, type: () => String, pattern: "DSNP_CONTENT_URI_REGEX" } }, "ProfileDto": { profile: { required: true, type: () => t["../../../libs/common/src/dtos/activity.dto"].ProfileActivityDto } } }], [import("../../../libs/common/src/dtos/chain.watch.dto"), { "ChainWatchOptionsDto": { schemaIds: { required: true, type: () => [Number] }, dsnpIds: { required: true, type: () => [String] } } }], [import("../../../libs/common/src/dtos/request-job.dto"), { "ContentSearchRequestDto": { id: { required: true, type: () => String }, startBlock: { required: true, type: () => Number, minimum: 1 }, endBlock: { required: true, type: () => Number, minimum: 1 }, filters: { required: true, type: () => t["../../../libs/common/src/dtos/chain.watch.dto"].ChainWatchOptionsDto } } }], [import("../../../libs/common/src/dtos/subscription.webhook.dto"), { "WebhookRegistrationDto": { url: { required: true, type: () => String }, announcementTypes: { required: true, type: () => [String] } } }]], "controllers": [[import("./api.controller"), { "ApiController": { "health": {}, "resetScanner": { type: String }, "setWatchOptions": {}, "pauseScanner": {}, "startScanner": {}, "search": {}, "registerWebhook": {}, "clearAllWebHooks": {}, "getRegisteredWebhooks": {} } }]] } };
9+
};

content-announcement.openapi.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
"description": "An optional identifier for the request, may be used for tracking or correlation"
6262
},
6363
"schemaId": {
64-
"type": "string",
64+
"type": "integer",
6565
"description": "Identifier for the schema being used or referenced"
6666
},
6767
"blockNumber": {

libs/common/src/blockchain/blockchain.module.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ https://docs.nestjs.com/modules
55
import { Module } from '@nestjs/common';
66
import { BlockchainService } from './blockchain.service';
77
import { ConfigModule } from '../config/config.module';
8+
import { ChainEventProcessorService } from './chain-event-processor.service';
89

910
@Module({
1011
imports: [ConfigModule],
1112
controllers: [],
12-
providers: [BlockchainService],
13-
exports: [BlockchainService],
13+
providers: [BlockchainService, ChainEventProcessorService],
14+
exports: [BlockchainService, ChainEventProcessorService],
1415
})
1516
export class BlockchainModule {}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import { Injectable } from '@nestjs/common';
2+
import { BlockchainService } from './blockchain.service';
3+
import { ChainWatchOptionsDto } from '../dtos/chain.watch.dto';
4+
import { ApiDecoration } from '@polkadot/api/types';
5+
import { FrameSystemEventRecord } from '@polkadot/types/lookup';
6+
import { Vec } from '@polkadot/types';
7+
import { BlockPaginationResponseMessage, MessageResponse } from '@frequency-chain/api-augment/interfaces';
8+
import { MessageResponseWithSchemaId } from '../interfaces/message_response_with_schema_id';
9+
import { createIPFSQueueJob } from '../interfaces/ipfs.job.interface';
10+
import { Queue } from 'bullmq';
11+
12+
@Injectable()
13+
export class ChainEventProcessorService {
14+
constructor(private readonly blockchainService: BlockchainService) {}
15+
16+
public async getMessagesInBlock(blockNumber: number, filter?: ChainWatchOptionsDto): Promise<MessageResponseWithSchemaId[]> {
17+
const blockHash = await this.blockchainService.getBlockHash(blockNumber);
18+
if (blockHash.isEmpty) {
19+
return [];
20+
}
21+
const apiAt = await this.blockchainService.apiPromise.at(blockHash);
22+
const events = await apiAt.query.system.events();
23+
return this.getMessagesFromEvents(apiAt, blockNumber, events, filter);
24+
}
25+
26+
private async getMessagesFromEvents(
27+
apiAt: ApiDecoration<'promise'>,
28+
blockNumber: number,
29+
events: Vec<FrameSystemEventRecord>,
30+
filter?: ChainWatchOptionsDto,
31+
): Promise<MessageResponseWithSchemaId[]> {
32+
const hasMessages = events.some(({ event }) => apiAt.events.messages.MessagesInBlock.is(event));
33+
if (!hasMessages) {
34+
return [];
35+
}
36+
37+
const keys = await apiAt.query.messages.messagesV2.keys(blockNumber);
38+
let schemaIds = keys.map((key) => key.args[1].toNumber());
39+
schemaIds = Array.from(new Set<number>(schemaIds));
40+
const filteredEvents: (MessageResponseWithSchemaId | null)[] = await Promise.all(
41+
schemaIds.map(async (schemaId) => {
42+
if (filter && filter?.schemaIds?.length > 0 && !filter.schemaIds.includes(schemaId)) {
43+
return null;
44+
}
45+
let paginationRequest = {
46+
from_block: blockNumber,
47+
from_index: 0,
48+
page_size: 1000,
49+
to_block: blockNumber + 1,
50+
};
51+
52+
let messageResponse: BlockPaginationResponseMessage = await this.blockchainService.apiPromise.rpc.messages.getBySchemaId(schemaId, paginationRequest);
53+
const messages: Vec<MessageResponse> = messageResponse.content;
54+
while (messageResponse.has_next.toHuman()) {
55+
paginationRequest = {
56+
from_block: blockNumber,
57+
from_index: messageResponse.next_index.isSome ? messageResponse.next_index.unwrap().toNumber() : 0,
58+
page_size: 1000,
59+
to_block: blockNumber + 1,
60+
};
61+
// eslint-disable-next-line no-await-in-loop
62+
messageResponse = await this.blockchainService.apiPromise.rpc.messages.getBySchemaId(schemaId, paginationRequest);
63+
if (messageResponse.content.length > 0) {
64+
messages.push(...messageResponse.content);
65+
}
66+
}
67+
const messagesWithSchemaId: MessageResponseWithSchemaId = {
68+
schemaId: schemaId,
69+
messages,
70+
};
71+
return messagesWithSchemaId;
72+
}),
73+
);
74+
const collectedMessages: MessageResponseWithSchemaId[] = [];
75+
filteredEvents.forEach((event) => {
76+
if (event) {
77+
collectedMessages.push(event);
78+
}
79+
});
80+
return collectedMessages;
81+
}
82+
83+
public async queueIPFSJobs(messages: MessageResponseWithSchemaId[], queue: Queue, requestId?: string): Promise<void> {
84+
const jobs = messages.flatMap((messageResponse) =>
85+
messageResponse.messages
86+
.filter((message) => message.cid && message.cid.isSome)
87+
.map((message) => {
88+
const job = createIPFSQueueJob(
89+
message.block_number.toNumber(),
90+
message.msa_id.isNone ? message.provider_msa_id.toString() : message.msa_id.unwrap().toString(),
91+
message.provider_msa_id.toString(),
92+
messageResponse.schemaId,
93+
message.cid.unwrap().toString(),
94+
message.index.toNumber(),
95+
requestId
96+
);
97+
98+
return {
99+
name: `IPFS Job: ${job.key}`,
100+
data: job.data,
101+
opts: { jobId: job.key },
102+
};
103+
}),
104+
);
105+
106+
if (jobs && jobs.length > 0) {
107+
await queue.addBulk(jobs);
108+
}
109+
}
110+
111+
}

0 commit comments

Comments
 (0)