Skip to content

Don't use confirmed transaction because it has serialization inconsistencies #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions src/event-transformer/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import "./borsh";
import { Program, Provider, Wallet as NodeWallet } from "@project-serum/anchor";
import { BlockResponse, ConfirmedTransaction, Keypair, PublicKey, Transaction } from "@solana/web3.js";
import { BlockResponse, ConfirmedTransaction, Keypair, PublicKey, Transaction, TransactionResponse } from "@solana/web3.js";
import BN from "bn.js";
import { Message as KafkaMessage, Producer, TopicMessages } from "kafkajs";
import { kafka } from "../setup/kafka";
Expand All @@ -27,11 +27,8 @@ function hasIntersect(set1: Set<any>, set2: Set<any>): boolean {
return [...set1].some(x => set2.has(x));
}

function processTxn(transformers: Transformer[], txn: ConfirmedTransaction): KafkaMessage[] {
const accounts = txn.transaction.compileMessage().accountKeys.map((key) => (
// @ts-ignore
new PublicKey(new BN(key._bn, 'hex'))
));
function processTxn(transformers: Transformer[], txn: TransactionResponse & { signature: string }): KafkaMessage[] {
const accounts = txn.transaction.message.accountKeys.map(k => new PublicKey(k));
const accountsSet = new Set(accounts.map(a => a.toBase58()));

return transformers
Expand All @@ -43,8 +40,9 @@ function processTxn(transformers: Transformer[], txn: ConfirmedTransaction): Kaf
type,
payload,
slot: txn.slot,
recentBlockhash: txn.transaction.recentBlockhash,
blockTime: txn.blockTime
recentBlockhash: txn.transaction.message.recentBlockhash,
blockTime: txn.blockTime,
signatures: txn.signature
}
})
.map((item: any) => ({
Expand Down Expand Up @@ -122,13 +120,9 @@ async function run() {
const results = (await Promise.all(
messages
.map((message: any) => JSON.parse(message.value!.toString()))
.filter(txn => txn.transaction)
.map(txn => ({
...txn,
transaction: Transaction.from(txn.transaction)
}))
.filter((txn: ConfirmedTransaction) => !txn.meta?.err)
.flatMap((txn: ConfirmedTransaction) => processTxn(transformers, txn))
.filter(txn => txn as TransactionResponse & { signature: string })
.filter((txn: TransactionResponse & { signature: string }) => !txn.meta?.err)
.flatMap((txn: TransactionResponse & { signature: string }) => processTxn(transformers, txn))
)).flat()
console.log(`Sending batch of ${results.length} events`)
await publishFixedBatches(producer, {
Expand Down
8 changes: 4 additions & 4 deletions src/event-transformer/transformers/InstructionTransformer.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { CompiledInstruction, ConfirmedTransaction, ConfirmedTransactionMeta, Message, PublicKey, Transaction, TransactionInstruction } from "@solana/web3.js";
import { CompiledInstruction, ConfirmedTransaction, ConfirmedTransactionMeta, Message, PublicKey, Transaction, TransactionInstruction, TransactionResponse } from "@solana/web3.js";
import { BlockTransaction, TransformedMessage, Transformer } from "./Transformer";

export abstract class InstructionTransformer implements Transformer {
abstract get relevantKeys(): Set<string>;

transform(accountKeys: PublicKey[], transaction: ConfirmedTransaction): TransformedMessage[] {
const indexedNormalInstrs = transaction.transaction.compileMessage().instructions
transform(accountKeys: PublicKey[], transaction: TransactionResponse & { signature: string }): TransformedMessage[] {
const indexedNormalInstrs = transaction.transaction.message.instructions
.map((instruction, index) => ({ instruction, instructionIndex: index, innerInstructionIndex: null }))
const indexedInnerInstrs = (transaction.meta?.innerInstructions || [])
.flatMap((innerInstruction) =>
Expand All @@ -23,5 +23,5 @@ export abstract class InstructionTransformer implements Transformer {
)
}

abstract transformInstruction(accountKeys: PublicKey[], transaction: ConfirmedTransaction, instruction: CompiledInstruction): TransformedMessage[]
abstract transformInstruction(accountKeys: PublicKey[], transaction: TransactionResponse & { signature: string }, instruction: CompiledInstruction): TransformedMessage[]
}
4 changes: 2 additions & 2 deletions src/event-transformer/transformers/Transformer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ConfirmedTransaction, ConfirmedTransactionMeta, Message, PublicKey } from "@solana/web3.js";
import { ConfirmedTransaction, ConfirmedTransactionMeta, Message, PublicKey, TransactionResponse } from "@solana/web3.js";

export type BlockTransaction = {
transaction: {
Expand All @@ -14,5 +14,5 @@ export interface TransformedMessage {

export interface Transformer {
get relevantKeys(): Set<string>
transform(accountKeys: PublicKey[], transaction: ConfirmedTransaction): TransformedMessage[]
transform(accountKeys: PublicKey[], transaction: TransactionResponse & { signature: string }): TransformedMessage[]
}
4 changes: 2 additions & 2 deletions src/event-transformer/transformers/anchorProgram.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Message, PublicKey, TokenBalance, MessageArgs, CompiledInstruction, ConfirmedTransaction } from "@solana/web3.js";
import { Message, PublicKey, TokenBalance, MessageArgs, CompiledInstruction, ConfirmedTransaction, TransactionResponse } from "@solana/web3.js";
import { BlockTransaction, Transformer } from "./Transformer";
import { BinaryReader, deserializeUnchecked, baseDecode } from "borsh";
import BN from "bn.js";
Expand Down Expand Up @@ -70,7 +70,7 @@ export default class AnchorProgramTransformer extends InstructionTransformer {
}, {} as Record<string, any>)
}

transformInstruction(accountKeys: PublicKey[], transaction: ConfirmedTransaction, instruction: CompiledInstruction | CompiledInstruction): any[] {
transformInstruction(accountKeys: PublicKey[], transaction: TransactionResponse & { signature: string }, instruction: CompiledInstruction | CompiledInstruction): any[] {
const programId = accountKeys[instruction.programIdIndex].toBase58();
const ixData = bs58.decode(instruction.data);
let codedInstruction = this.coder.instruction.decode(ixData);
Expand Down
6 changes: 3 additions & 3 deletions src/event-transformer/transformers/programSpec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CompiledInstruction, ConfirmedTransaction, PublicKey, TokenBalance } from "@solana/web3.js";
import { CompiledInstruction, ConfirmedTransaction, PublicKey, TokenBalance, TransactionResponse } from "@solana/web3.js";
import { BlockTransaction, Transformer } from "./Transformer";
import { BinaryReader, deserializeUnchecked, baseDecode } from "borsh";
import BN from "bn.js";
Expand Down Expand Up @@ -60,7 +60,7 @@ export default class ProgramSpecTransformer extends InstructionTransformer {
return pids;
}

transformInstruction(accountKeys: PublicKey[], transaction: ConfirmedTransaction, instruction: CompiledInstruction): any[] {
transformInstruction(accountKeys: PublicKey[], transaction: TransactionResponse & { signature: string }, instruction: CompiledInstruction): any[] {
try {
const index = instruction.data.length == 0 ? 0 : new BinaryReader(baseDecode(instruction.data)).readU8();
const programId = accountKeys[instruction.programIdIndex].toBase58()
Expand All @@ -85,7 +85,7 @@ export default class ProgramSpecTransformer extends InstructionTransformer {
}]
}
} catch (e: any) {
console.log(`Failed to process ${transaction.transaction.signature}`);
console.log(`Failed to process ${transaction.signature}`);
console.error(e);
}

Expand Down
29 changes: 19 additions & 10 deletions src/event-transformer/transformers/tokenAccounts.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { ConfirmedTransaction, PublicKey, TokenBalance } from "@solana/web3.js";
import { BlockTransaction, Transformer } from "./Transformer";
import BN from "bn.js";
import { PublicKey, TokenBalance, TransactionResponse } from "@solana/web3.js";
import { Transformer } from "./Transformer";

// Zip two arrays by some key. Output array of length 2 arrays that are each object with its pair (or undefined)
function zipBy<A, B>(a: A[], b: A[], getKey: (a: A) => B): (A | undefined)[][] {
Expand All @@ -16,29 +15,39 @@ function zipBy<A, B>(a: A[], b: A[], getKey: (a: A) => B): (A | undefined)[][] {
return [...keys].map(key => [aMap.get(key), bMap.get(key)])
}

type PubkeyAmount = { mint: string, amount: string, pubkey: PublicKey, decimals: number };

function groupByPubkey(acc: Record<string, PubkeyAmount>, record: PubkeyAmount): Record<string, PubkeyAmount> {
acc[record.pubkey.toBase58()] = record;

return acc;
}

export default class TokenAccountTransformer implements Transformer {
get relevantKeys(): Set<string> {
return new Set(["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"])
}

transform(accountKeys: PublicKey[], transaction: ConfirmedTransaction): any[] {
function toPubkeyAmount({ accountIndex, mint, uiTokenAmount: { decimals, amount } }: TokenBalance) {
transform(accountKeys: PublicKey[], transaction: TransactionResponse & { signature: string }): any[] {
function toPubkeyAmount({ accountIndex, mint, uiTokenAmount: { decimals, amount } }: TokenBalance): PubkeyAmount {
return {
mint,
pubkey: accountKeys[accountIndex],
amount,
decimals
}
};
const preBalances = transaction.meta?.preTokenBalances?.map(toPubkeyAmount);
const postBalances = transaction.meta?.postTokenBalances?.map(toPubkeyAmount);
const preBalances = transaction.meta?.preTokenBalances?.map(toPubkeyAmount)?.reduce(groupByPubkey, {} as Record<string, PubkeyAmount>);
const postBalances = transaction.meta?.postTokenBalances?.map(toPubkeyAmount)?.reduce(groupByPubkey, {} as Record<string, PubkeyAmount>);
const keys = new Set([...Object.keys(preBalances || {}), ...Object.keys(postBalances || {})])
const emptyItem = { pubkey: null, amount: null, mint: null, decimals: null }
const zipped = zipBy(preBalances || [], postBalances || [], i => i.pubkey.toBase58())
const rawEvents = zipped.map(([preItem = emptyItem, postItem = emptyItem]) => {
const rawEvents = [...keys].map((key) => {
const preItem = (preBalances && preBalances[key]) || emptyItem;
const postItem = (postBalances && postBalances[key]) || emptyItem;
return {
type: "TokenAccountBalanceChange",
// @ts-ignore
pubkey: new PublicKey(new BN((preItem.pubkey || postItem.pubkey)!._bn, 'hex')).toBase58(),
pubkey: (preItem.pubkey || postItem.pubkey).toBase58(),
preAmount: preItem.amount || 0,
mint: preItem.mint || postItem.mint,
postAmount: postItem.amount,
Expand Down
8 changes: 2 additions & 6 deletions src/kafka-signature-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,11 @@ const { KAFKA_TOPIC, KAFKA_INPUT_TOPIC, KAFKA_GROUP_ID } = process.env
const producer = kafka.producer()

async function processSignature(signature: ConfirmedSignatureInfo): Promise<any | null> {
const txn = await connection.getConfirmedTransaction(signature.signature, FINALITY);
const txn = await connection.getTransaction(signature.signature, { commitment: FINALITY });
try {
const data = txn?.transaction.serialize({
requireAllSignatures: false,
verifySignatures: false
}).toJSON().data
const value = JSON.stringify({
...txn,
transaction: data
signature: signature.signature.toString()
})
const size = Buffer.byteLength(value);
if (size > 500000) {
Expand Down
6 changes: 2 additions & 4 deletions tests/event-transformer/transformers/anchor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ describe("anchor-transformer", () => {
const block: BlockResponse & { slot: number } = blockResp as any;
const mapped: any = block.transactions.flatMap(txn => {
const accounts = txn.transaction.message.accountKeys.map((key) => (
// @ts-ignore
new PublicKey(new BN(key._bn, 'hex'))
new PublicKey(key)
));

return transformer.transform(accounts, txn)
Expand All @@ -34,8 +33,7 @@ describe("anchor-transformer", () => {
const block: BlockResponse & { slot: number } = wumboBlockResp as any;
const mapped: any = block.transactions.flatMap(txn => {
const accounts = txn.transaction.message.accountKeys.map((key) => (
// @ts-ignore
new PublicKey(new BN(key._bn, 'hex'))
new PublicKey(key)
));

return transformer.transform(accounts, txn)
Expand Down