@@ -13,19 +13,19 @@ use {
13
13
wormhole:: Wormhole ,
14
14
} ,
15
15
} ,
16
- anyhow:: { anyhow, Result } ,
16
+ anyhow:: { anyhow, bail , Result } ,
17
17
borsh:: BorshDeserialize ,
18
18
futures:: stream:: StreamExt ,
19
19
pyth_sdk:: PriceIdentifier ,
20
- pyth_sdk_solana:: state:: { load_mapping_account , load_product_account} ,
20
+ pyth_sdk_solana:: state:: load_product_account,
21
21
solana_account_decoder:: UiAccountEncoding ,
22
22
solana_client:: {
23
23
nonblocking:: { pubsub_client:: PubsubClient , rpc_client:: RpcClient } ,
24
24
rpc_config:: { RpcAccountInfoConfig , RpcProgramAccountsConfig } ,
25
25
rpc_filter:: { Memcmp , MemcmpEncodedBytes , RpcFilterType } ,
26
26
} ,
27
27
solana_sdk:: {
28
- account:: Account , bs58 , commitment_config:: CommitmentConfig , pubkey:: Pubkey , system_program,
28
+ account:: Account , commitment_config:: CommitmentConfig , pubkey:: Pubkey , system_program,
29
29
} ,
30
30
std:: { collections:: BTreeMap , sync:: Arc , time:: Duration } ,
31
31
tokio:: time:: Instant ,
@@ -230,6 +230,104 @@ where
230
230
Ok ( ( ) )
231
231
}
232
232
233
+ pub async fn fetch_and_store_price_feeds_metadata < S > (
234
+ state : & S ,
235
+ oracle_program_address : & Pubkey ,
236
+ rpc_client : & RpcClient ,
237
+ ) -> Result < Vec < PriceFeedMetadata > >
238
+ where
239
+ S : PriceFeedMeta + Aggregates ,
240
+ {
241
+ let price_feeds_metadata =
242
+ fetch_price_feeds_metadata ( oracle_program_address, rpc_client) . await ?;
243
+
244
+ // Wait for the crosschain price feed ids to be available in the state
245
+ // This is to prune the price feeds that are not available crosschain yet (i.e. they are coming soon)
246
+ let mut all_ids;
247
+ let mut retry_count = 0 ;
248
+ loop {
249
+ all_ids = Aggregates :: get_price_feed_ids ( state) . await ;
250
+ if !all_ids. is_empty ( ) {
251
+ break ;
252
+ }
253
+ tracing:: info!( "Waiting for price feed ids..." ) ;
254
+ tokio:: time:: sleep ( Duration :: from_secs ( retry_count + 1 ) ) . await ;
255
+ retry_count += 1 ;
256
+ if retry_count > 10 {
257
+ bail ! ( "Failed to fetch price feed ids after 10 retries" ) ;
258
+ }
259
+ }
260
+
261
+ // Filter price_feeds_metadata to only include entries with IDs in all_ids
262
+ let filtered_metadata: Vec < PriceFeedMetadata > = price_feeds_metadata
263
+ . into_iter ( )
264
+ . filter ( |metadata| all_ids. contains ( & PriceIdentifier :: from ( metadata. id ) ) )
265
+ . collect ( ) ;
266
+
267
+ state. store_price_feeds_metadata ( & filtered_metadata) . await ?;
268
+ Ok ( filtered_metadata)
269
+ }
270
+
271
+ async fn fetch_price_feeds_metadata (
272
+ oracle_program_address : & Pubkey ,
273
+ rpc_client : & RpcClient ,
274
+ ) -> Result < Vec < PriceFeedMetadata > > {
275
+ let product_accounts = rpc_client
276
+ . get_program_accounts_with_config (
277
+ oracle_program_address,
278
+ RpcProgramAccountsConfig {
279
+ filters : Some ( vec ! [ RpcFilterType :: Memcmp ( Memcmp :: new(
280
+ 0 , // offset
281
+ // Product account header: <magic:u32le:0xa1b2c3d4> <version:u32le:0x02> <account_type:u32le:0x02>
282
+ // The string literal in hex::decode is represented as be (big endian).
283
+ MemcmpEncodedBytes :: Bytes ( hex:: decode( "d4c3b2a10200000002000000" ) . unwrap( ) ) ,
284
+ ) ) ] ) ,
285
+ account_config : RpcAccountInfoConfig {
286
+ encoding : Some ( UiAccountEncoding :: Base64Zstd ) ,
287
+ commitment : Some ( CommitmentConfig :: confirmed ( ) ) ,
288
+ ..Default :: default ( )
289
+ } ,
290
+ ..Default :: default ( )
291
+ } ,
292
+ )
293
+ . await ?;
294
+
295
+ let price_feeds_metadata: Vec < PriceFeedMetadata > = product_accounts
296
+ . into_iter ( )
297
+ . filter_map (
298
+ |( pubkey, account) | match load_product_account ( & account. data ) {
299
+ Ok ( product_account) => {
300
+ if product_account. px_acc == Pubkey :: default ( ) {
301
+ return None ;
302
+ }
303
+
304
+ let attributes = product_account
305
+ . iter ( )
306
+ . filter ( |( key, _) | !key. is_empty ( ) )
307
+ . map ( |( key, val) | ( key. to_string ( ) , val. to_string ( ) ) )
308
+ . collect :: < BTreeMap < String , String > > ( ) ;
309
+
310
+ Some ( PriceFeedMetadata {
311
+ id : RpcPriceIdentifier :: new ( product_account. px_acc . to_bytes ( ) ) ,
312
+ attributes,
313
+ } )
314
+ }
315
+ Err ( e) => {
316
+ tracing:: warn!( error = ?e, pubkey = ?pubkey, "Error loading product account" ) ;
317
+ None
318
+ }
319
+ } ,
320
+ )
321
+ . collect ( ) ;
322
+
323
+ tracing:: info!(
324
+ len = price_feeds_metadata. len( ) ,
325
+ "Fetched price feeds metadata"
326
+ ) ;
327
+
328
+ Ok ( price_feeds_metadata)
329
+ }
330
+
233
331
#[ tracing:: instrument( skip( opts, state) ) ]
234
332
pub async fn spawn < S > ( opts : RunOptions , state : Arc < S > ) -> Result < ( ) >
235
333
where
@@ -300,9 +398,10 @@ where
300
398
let mut exit = crate :: EXIT . subscribe ( ) ;
301
399
tokio:: spawn ( async move {
302
400
// Run fetch and store once before the loop
401
+ tracing:: info!( "Fetching and storing price feeds metadata..." ) ;
303
402
if let Err ( e) = fetch_and_store_price_feeds_metadata (
304
403
price_feeds_state. as_ref ( ) ,
305
- & opts. pythnet . mapping_addr ,
404
+ & opts. pythnet . oracle_program_addr ,
306
405
& rpc_client,
307
406
)
308
407
. await
@@ -316,9 +415,10 @@ where
316
415
tokio:: select! {
317
416
_ = exit. changed( ) => break ,
318
417
_ = tokio:: time:: sleep( Duration :: from_secs( DEFAULT_PRICE_FEEDS_CACHE_UPDATE_INTERVAL ) ) => {
418
+ tracing:: info!( "Fetching and storing price feeds metadata..." ) ;
319
419
if let Err ( e) = fetch_and_store_price_feeds_metadata(
320
420
price_feeds_state. as_ref( ) ,
321
- & opts. pythnet. mapping_addr ,
421
+ & opts. pythnet. oracle_program_addr ,
322
422
& rpc_client,
323
423
)
324
424
. await
@@ -338,92 +438,3 @@ where
338
438
) ;
339
439
Ok ( ( ) )
340
440
}
341
-
342
- pub async fn fetch_and_store_price_feeds_metadata < S > (
343
- state : & S ,
344
- mapping_address : & Pubkey ,
345
- rpc_client : & RpcClient ,
346
- ) -> Result < Vec < PriceFeedMetadata > >
347
- where
348
- S : PriceFeedMeta + Aggregates ,
349
- {
350
- let price_feeds_metadata = fetch_price_feeds_metadata ( mapping_address, rpc_client) . await ?;
351
- let all_ids = Aggregates :: get_price_feed_ids ( state) . await ;
352
-
353
- // Filter price_feeds_metadata to only include entries with IDs in all_ids
354
- let filtered_metadata: Vec < PriceFeedMetadata > = price_feeds_metadata
355
- . into_iter ( )
356
- . filter ( |metadata| all_ids. contains ( & PriceIdentifier :: from ( metadata. id ) ) )
357
- . collect ( ) ;
358
-
359
- state. store_price_feeds_metadata ( & filtered_metadata) . await ?;
360
- Ok ( filtered_metadata)
361
- }
362
-
363
- async fn fetch_price_feeds_metadata (
364
- mapping_address : & Pubkey ,
365
- rpc_client : & RpcClient ,
366
- ) -> Result < Vec < PriceFeedMetadata > > {
367
- let mut price_feeds_metadata = Vec :: < PriceFeedMetadata > :: new ( ) ;
368
- let mapping_data = rpc_client. get_account_data ( mapping_address) . await ?;
369
- let mapping_acct = load_mapping_account ( & mapping_data) ?;
370
-
371
- // Split product keys into chunks of 150 to avoid too many open files error (error trying to connect: tcp open error: Too many open files (os error 24))
372
- for product_keys_chunk in mapping_acct
373
- . products
374
- . iter ( )
375
- . filter ( |& prod_pkey| * prod_pkey != Pubkey :: default ( ) )
376
- . collect :: < Vec < _ > > ( )
377
- . chunks ( 150 )
378
- {
379
- // Prepare a list of futures for fetching product account data for each chunk
380
- let fetch_product_data_futures = product_keys_chunk
381
- . iter ( )
382
- . map ( |prod_pkey| rpc_client. get_account_data ( prod_pkey) )
383
- . collect :: < Vec < _ > > ( ) ;
384
-
385
- // Await all futures concurrently within the chunk
386
- let products_data_results = futures:: future:: join_all ( fetch_product_data_futures) . await ;
387
-
388
- for prod_data_result in products_data_results {
389
- match prod_data_result {
390
- Ok ( prod_data) => {
391
- let prod_acct = match load_product_account ( & prod_data) {
392
- Ok ( prod_acct) => prod_acct,
393
- Err ( e) => {
394
- println ! ( "Error loading product account: {}" , e) ;
395
- continue ;
396
- }
397
- } ;
398
-
399
- // TODO: Add stricter type checking for attributes
400
- let attributes = prod_acct
401
- . iter ( )
402
- . filter ( |( key, _) | !key. is_empty ( ) )
403
- . map ( |( key, val) | ( key. to_string ( ) , val. to_string ( ) ) )
404
- . collect :: < BTreeMap < String , String > > ( ) ;
405
-
406
- if prod_acct. px_acc != Pubkey :: default ( ) {
407
- let px_pkey = prod_acct. px_acc ;
408
- let px_pkey_bytes = bs58:: decode ( & px_pkey. to_string ( ) ) . into_vec ( ) ?;
409
- let px_pkey_array: [ u8 ; 32 ] = px_pkey_bytes
410
- . try_into ( )
411
- . expect ( "Invalid length for PriceIdentifier" ) ;
412
-
413
- let price_feed_metadata = PriceFeedMetadata {
414
- id : RpcPriceIdentifier :: new ( px_pkey_array) ,
415
- attributes,
416
- } ;
417
-
418
- price_feeds_metadata. push ( price_feed_metadata) ;
419
- }
420
- }
421
- Err ( e) => {
422
- println ! ( "Error loading product account: {}" , e) ;
423
- continue ;
424
- }
425
- }
426
- }
427
- }
428
- Ok ( price_feeds_metadata)
429
- }
0 commit comments