-
Notifications
You must be signed in to change notification settings - Fork 260
refactor(hermes/server): use program accounts instead of mapping acct #2643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,21 +13,25 @@ use { | |
wormhole::Wormhole, | ||
}, | ||
}, | ||
anyhow::{anyhow, Result}, | ||
anyhow::{anyhow, bail, Result}, | ||
borsh::BorshDeserialize, | ||
futures::stream::StreamExt, | ||
pyth_sdk::PriceIdentifier, | ||
pyth_sdk_solana::state::{load_mapping_account, load_product_account}, | ||
pyth_sdk_solana::state::load_product_account, | ||
solana_account_decoder::UiAccountEncoding, | ||
solana_client::{ | ||
nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient}, | ||
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, | ||
rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType}, | ||
}, | ||
solana_sdk::{ | ||
account::Account, bs58, commitment_config::CommitmentConfig, pubkey::Pubkey, system_program, | ||
account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey, system_program, | ||
}, | ||
std::{ | ||
collections::{BTreeMap, HashSet}, | ||
sync::Arc, | ||
time::Duration, | ||
}, | ||
std::{collections::BTreeMap, sync::Arc, time::Duration}, | ||
tokio::time::Instant, | ||
}; | ||
|
||
|
@@ -230,6 +234,100 @@ where | |
Ok(()) | ||
} | ||
|
||
pub async fn fetch_and_store_price_feeds_metadata<S>( | ||
state: &S, | ||
oracle_program_address: &Pubkey, | ||
rpc_client: &RpcClient, | ||
) -> Result<Vec<PriceFeedMetadata>> | ||
where | ||
S: PriceFeedMeta + Aggregates, | ||
{ | ||
let price_feeds_metadata = | ||
fetch_price_feeds_metadata(oracle_program_address, rpc_client).await?; | ||
|
||
// Wait for the crosschain price feed ids to be available in the state | ||
// This is to prune the price feeds that are not available crosschain yet (i.e. they are coming soon) | ||
let mut all_ids = HashSet::new(); | ||
let mut retry_count = 0; | ||
while all_ids.is_empty() { | ||
all_ids = Aggregates::get_price_feed_ids(state).await; | ||
tracing::info!("Waiting for price feed ids..."); | ||
tokio::time::sleep(Duration::from_secs(retry_count + 1)).await; | ||
retry_count += 1; | ||
if retry_count > 10 { | ||
bail!("Failed to fetch price feed ids after 10 retries"); | ||
} | ||
} | ||
|
||
// Filter price_feeds_metadata to only include entries with IDs in all_ids | ||
let filtered_metadata: Vec<PriceFeedMetadata> = price_feeds_metadata | ||
.into_iter() | ||
.filter(|metadata| all_ids.contains(&PriceIdentifier::from(metadata.id))) | ||
.collect(); | ||
|
||
state.store_price_feeds_metadata(&filtered_metadata).await?; | ||
Ok(filtered_metadata) | ||
} | ||
|
||
async fn fetch_price_feeds_metadata( | ||
oracle_program_address: &Pubkey, | ||
rpc_client: &RpcClient, | ||
) -> Result<Vec<PriceFeedMetadata>> { | ||
let product_accounts = rpc_client | ||
.get_program_accounts_with_config( | ||
oracle_program_address, | ||
RpcProgramAccountsConfig { | ||
filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new( | ||
0, // offset | ||
// Product account header: <magic:u32le:0xa1b2c3d4> <version:u32le:0x02> <account_type:u32le:0x02> | ||
MemcmpEncodedBytes::Bytes(hex::decode("d4c3b2a10200000002000000").unwrap()), | ||
))]), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does the string literal not match the comment because its BE vs LE? maybe worth mentioning that in the comment |
||
account_config: RpcAccountInfoConfig { | ||
encoding: Some(UiAccountEncoding::Base64Zstd), | ||
commitment: Some(CommitmentConfig::confirmed()), | ||
..Default::default() | ||
}, | ||
..Default::default() | ||
}, | ||
) | ||
.await?; | ||
|
||
let price_feeds_metadata: Vec<PriceFeedMetadata> = product_accounts | ||
.into_iter() | ||
.filter_map( | ||
|(pubkey, account)| match load_product_account(&account.data) { | ||
Ok(product_account) => { | ||
if product_account.px_acc == Pubkey::default() { | ||
return None; | ||
} | ||
|
||
let attributes = product_account | ||
.iter() | ||
.filter(|(key, _)| !key.is_empty()) | ||
.map(|(key, val)| (key.to_string(), val.to_string())) | ||
.collect::<BTreeMap<String, String>>(); | ||
|
||
Some(PriceFeedMetadata { | ||
id: RpcPriceIdentifier::new(product_account.px_acc.to_bytes()), | ||
attributes, | ||
}) | ||
} | ||
Err(e) => { | ||
tracing::warn!(error = ?e, pubkey = ?pubkey, "Error loading product account"); | ||
None | ||
} | ||
}, | ||
) | ||
.collect(); | ||
|
||
tracing::info!( | ||
len = price_feeds_metadata.len(), | ||
"Fetched price feeds metadata" | ||
); | ||
|
||
Ok(price_feeds_metadata) | ||
} | ||
|
||
#[tracing::instrument(skip(opts, state))] | ||
pub async fn spawn<S>(opts: RunOptions, state: Arc<S>) -> Result<()> | ||
where | ||
|
@@ -300,9 +398,10 @@ where | |
let mut exit = crate::EXIT.subscribe(); | ||
tokio::spawn(async move { | ||
// Run fetch and store once before the loop | ||
tracing::info!("Fetching and storing price feeds metadata..."); | ||
if let Err(e) = fetch_and_store_price_feeds_metadata( | ||
price_feeds_state.as_ref(), | ||
&opts.pythnet.mapping_addr, | ||
&opts.pythnet.oracle_program_addr, | ||
&rpc_client, | ||
) | ||
.await | ||
|
@@ -316,9 +415,10 @@ where | |
tokio::select! { | ||
_ = exit.changed() => break, | ||
_ = tokio::time::sleep(Duration::from_secs(DEFAULT_PRICE_FEEDS_CACHE_UPDATE_INTERVAL)) => { | ||
tracing::info!("Fetching and storing price feeds metadata..."); | ||
if let Err(e) = fetch_and_store_price_feeds_metadata( | ||
price_feeds_state.as_ref(), | ||
&opts.pythnet.mapping_addr, | ||
&opts.pythnet.oracle_program_addr, | ||
&rpc_client, | ||
) | ||
.await | ||
|
@@ -338,92 +438,3 @@ where | |
); | ||
Ok(()) | ||
} | ||
|
||
pub async fn fetch_and_store_price_feeds_metadata<S>( | ||
state: &S, | ||
mapping_address: &Pubkey, | ||
rpc_client: &RpcClient, | ||
) -> Result<Vec<PriceFeedMetadata>> | ||
where | ||
S: PriceFeedMeta + Aggregates, | ||
{ | ||
let price_feeds_metadata = fetch_price_feeds_metadata(mapping_address, rpc_client).await?; | ||
let all_ids = Aggregates::get_price_feed_ids(state).await; | ||
|
||
// Filter price_feeds_metadata to only include entries with IDs in all_ids | ||
let filtered_metadata: Vec<PriceFeedMetadata> = price_feeds_metadata | ||
.into_iter() | ||
.filter(|metadata| all_ids.contains(&PriceIdentifier::from(metadata.id))) | ||
.collect(); | ||
|
||
state.store_price_feeds_metadata(&filtered_metadata).await?; | ||
Ok(filtered_metadata) | ||
} | ||
|
||
async fn fetch_price_feeds_metadata( | ||
mapping_address: &Pubkey, | ||
rpc_client: &RpcClient, | ||
) -> Result<Vec<PriceFeedMetadata>> { | ||
let mut price_feeds_metadata = Vec::<PriceFeedMetadata>::new(); | ||
let mapping_data = rpc_client.get_account_data(mapping_address).await?; | ||
let mapping_acct = load_mapping_account(&mapping_data)?; | ||
|
||
// Split product keys into chunks of 150 to avoid too many open files error (error trying to connect: tcp open error: Too many open files (os error 24)) | ||
for product_keys_chunk in mapping_acct | ||
.products | ||
.iter() | ||
.filter(|&prod_pkey| *prod_pkey != Pubkey::default()) | ||
.collect::<Vec<_>>() | ||
.chunks(150) | ||
{ | ||
// Prepare a list of futures for fetching product account data for each chunk | ||
let fetch_product_data_futures = product_keys_chunk | ||
.iter() | ||
.map(|prod_pkey| rpc_client.get_account_data(prod_pkey)) | ||
.collect::<Vec<_>>(); | ||
|
||
// Await all futures concurrently within the chunk | ||
let products_data_results = futures::future::join_all(fetch_product_data_futures).await; | ||
|
||
for prod_data_result in products_data_results { | ||
match prod_data_result { | ||
Ok(prod_data) => { | ||
let prod_acct = match load_product_account(&prod_data) { | ||
Ok(prod_acct) => prod_acct, | ||
Err(e) => { | ||
println!("Error loading product account: {}", e); | ||
continue; | ||
} | ||
}; | ||
|
||
// TODO: Add stricter type checking for attributes | ||
let attributes = prod_acct | ||
.iter() | ||
.filter(|(key, _)| !key.is_empty()) | ||
.map(|(key, val)| (key.to_string(), val.to_string())) | ||
.collect::<BTreeMap<String, String>>(); | ||
|
||
if prod_acct.px_acc != Pubkey::default() { | ||
let px_pkey = prod_acct.px_acc; | ||
let px_pkey_bytes = bs58::decode(&px_pkey.to_string()).into_vec()?; | ||
let px_pkey_array: [u8; 32] = px_pkey_bytes | ||
.try_into() | ||
.expect("Invalid length for PriceIdentifier"); | ||
|
||
let price_feed_metadata = PriceFeedMetadata { | ||
id: RpcPriceIdentifier::new(px_pkey_array), | ||
attributes, | ||
}; | ||
|
||
price_feeds_metadata.push(price_feed_metadata); | ||
} | ||
} | ||
Err(e) => { | ||
println!("Error loading product account: {}", e); | ||
continue; | ||
} | ||
} | ||
} | ||
} | ||
Ok(price_feeds_metadata) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the ordering of these ops means that we'll always have to wait at least 1 second right? should we check all_ids.is_empty() before sleeping?