Skip to content

refactor(hermes/server): use program accounts instead of mapping acct #2643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions apps/hermes/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.8.6"
version = "0.9.0"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down Expand Up @@ -30,7 +30,7 @@ nonzero_ext = { version = "0.3.0" }
prometheus-client = { version = "0.21.2" }
prost = { version = "0.12.1" }
pyth-sdk = { version = "0.8.0" }
pyth-sdk-solana = { version = "0.10.3" }
pyth-sdk-solana = { version = "0.10.4" }
pythnet-sdk = { path = "../../../pythnet/pythnet_sdk/", version = "2.0.0", features = ["strum"] }
rand = { version = "0.8.5" }
reqwest = { version = "0.11.14", features = ["blocking", "json"] }
Expand Down
12 changes: 6 additions & 6 deletions apps/hermes/server/src/config/pythnet.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {clap::Args, solana_sdk::pubkey::Pubkey};

const DEFAULT_PYTHNET_MAPPING_ADDR: &str = "AHtgzX45WTKfkPG53L6WYhGEXwQkN1BVknET3sVsLL8J";
const DEFAULT_PYTHNET_ORACLE_PROGRAM_ADDR: &str = "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH";

#[derive(Args, Clone, Debug)]
#[command(next_help_heading = "Pythnet Options")]
Expand All @@ -16,9 +16,9 @@ pub struct Options {
#[arg(env = "PYTHNET_HTTP_ADDR")]
pub http_addr: String,

/// Pyth mapping account address on Pythnet.
#[arg(long = "pythnet-mapping-addr")]
#[arg(default_value = DEFAULT_PYTHNET_MAPPING_ADDR)]
#[arg(env = "PYTHNET_MAPPING_ADDR")]
pub mapping_addr: Pubkey,
/// Pythnet oracle program address.
#[arg(long = "pythnet-oracle-program-addr")]
#[arg(default_value = DEFAULT_PYTHNET_ORACLE_PROGRAM_ADDR)]
#[arg(env = "PYTHNET_ORACLE_PROGRAM_ADDR")]
pub oracle_program_addr: Pubkey,
}
201 changes: 106 additions & 95 deletions apps/hermes/server/src/network/pythnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,25 @@ use {
wormhole::Wormhole,
},
},
anyhow::{anyhow, Result},
anyhow::{anyhow, bail, Result},
borsh::BorshDeserialize,
futures::stream::StreamExt,
pyth_sdk::PriceIdentifier,
pyth_sdk_solana::state::{load_mapping_account, load_product_account},
pyth_sdk_solana::state::load_product_account,
solana_account_decoder::UiAccountEncoding,
solana_client::{
nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient},
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType},
},
solana_sdk::{
account::Account, bs58, commitment_config::CommitmentConfig, pubkey::Pubkey, system_program,
account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey, system_program,
},
std::{
collections::{BTreeMap, HashSet},
sync::Arc,
time::Duration,
},
std::{collections::BTreeMap, sync::Arc, time::Duration},
tokio::time::Instant,
};

Expand Down Expand Up @@ -230,6 +234,100 @@ where
Ok(())
}

pub async fn fetch_and_store_price_feeds_metadata<S>(
state: &S,
oracle_program_address: &Pubkey,
rpc_client: &RpcClient,
) -> Result<Vec<PriceFeedMetadata>>
where
S: PriceFeedMeta + Aggregates,
{
let price_feeds_metadata =
fetch_price_feeds_metadata(oracle_program_address, rpc_client).await?;

// Wait for the crosschain price feed ids to be available in the state
// This is to prune the price feeds that are not available crosschain yet (i.e. they are coming soon)
let mut all_ids = HashSet::new();
let mut retry_count = 0;
while all_ids.is_empty() {
all_ids = Aggregates::get_price_feed_ids(state).await;
tracing::info!("Waiting for price feed ids...");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ordering of these ops means that we'll always have to wait at least 1 second right? should we check all_ids.is_empty() before sleeping?

tokio::time::sleep(Duration::from_secs(retry_count + 1)).await;
retry_count += 1;
if retry_count > 10 {
bail!("Failed to fetch price feed ids after 10 retries");
}
}

// Filter price_feeds_metadata to only include entries with IDs in all_ids
let filtered_metadata: Vec<PriceFeedMetadata> = price_feeds_metadata
.into_iter()
.filter(|metadata| all_ids.contains(&PriceIdentifier::from(metadata.id)))
.collect();

state.store_price_feeds_metadata(&filtered_metadata).await?;
Ok(filtered_metadata)
}

async fn fetch_price_feeds_metadata(
oracle_program_address: &Pubkey,
rpc_client: &RpcClient,
) -> Result<Vec<PriceFeedMetadata>> {
let product_accounts = rpc_client
.get_program_accounts_with_config(
oracle_program_address,
RpcProgramAccountsConfig {
filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new(
0, // offset
// Product account header: <magic:u32le:0xa1b2c3d4> <version:u32le:0x02> <account_type:u32le:0x02>
MemcmpEncodedBytes::Bytes(hex::decode("d4c3b2a10200000002000000").unwrap()),
))]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the string literal not match the comment because its BE vs LE? maybe worth mentioning that in the comment

account_config: RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64Zstd),
commitment: Some(CommitmentConfig::confirmed()),
..Default::default()
},
..Default::default()
},
)
.await?;

let price_feeds_metadata: Vec<PriceFeedMetadata> = product_accounts
.into_iter()
.filter_map(
|(pubkey, account)| match load_product_account(&account.data) {
Ok(product_account) => {
if product_account.px_acc == Pubkey::default() {
return None;
}

let attributes = product_account
.iter()
.filter(|(key, _)| !key.is_empty())
.map(|(key, val)| (key.to_string(), val.to_string()))
.collect::<BTreeMap<String, String>>();

Some(PriceFeedMetadata {
id: RpcPriceIdentifier::new(product_account.px_acc.to_bytes()),
attributes,
})
}
Err(e) => {
tracing::warn!(error = ?e, pubkey = ?pubkey, "Error loading product account");
None
}
},
)
.collect();

tracing::info!(
len = price_feeds_metadata.len(),
"Fetched price feeds metadata"
);

Ok(price_feeds_metadata)
}

#[tracing::instrument(skip(opts, state))]
pub async fn spawn<S>(opts: RunOptions, state: Arc<S>) -> Result<()>
where
Expand Down Expand Up @@ -300,9 +398,10 @@ where
let mut exit = crate::EXIT.subscribe();
tokio::spawn(async move {
// Run fetch and store once before the loop
tracing::info!("Fetching and storing price feeds metadata...");
if let Err(e) = fetch_and_store_price_feeds_metadata(
price_feeds_state.as_ref(),
&opts.pythnet.mapping_addr,
&opts.pythnet.oracle_program_addr,
&rpc_client,
)
.await
Expand All @@ -316,9 +415,10 @@ where
tokio::select! {
_ = exit.changed() => break,
_ = tokio::time::sleep(Duration::from_secs(DEFAULT_PRICE_FEEDS_CACHE_UPDATE_INTERVAL)) => {
tracing::info!("Fetching and storing price feeds metadata...");
if let Err(e) = fetch_and_store_price_feeds_metadata(
price_feeds_state.as_ref(),
&opts.pythnet.mapping_addr,
&opts.pythnet.oracle_program_addr,
&rpc_client,
)
.await
Expand All @@ -338,92 +438,3 @@ where
);
Ok(())
}

pub async fn fetch_and_store_price_feeds_metadata<S>(
state: &S,
mapping_address: &Pubkey,
rpc_client: &RpcClient,
) -> Result<Vec<PriceFeedMetadata>>
where
S: PriceFeedMeta + Aggregates,
{
let price_feeds_metadata = fetch_price_feeds_metadata(mapping_address, rpc_client).await?;
let all_ids = Aggregates::get_price_feed_ids(state).await;

// Filter price_feeds_metadata to only include entries with IDs in all_ids
let filtered_metadata: Vec<PriceFeedMetadata> = price_feeds_metadata
.into_iter()
.filter(|metadata| all_ids.contains(&PriceIdentifier::from(metadata.id)))
.collect();

state.store_price_feeds_metadata(&filtered_metadata).await?;
Ok(filtered_metadata)
}

async fn fetch_price_feeds_metadata(
mapping_address: &Pubkey,
rpc_client: &RpcClient,
) -> Result<Vec<PriceFeedMetadata>> {
let mut price_feeds_metadata = Vec::<PriceFeedMetadata>::new();
let mapping_data = rpc_client.get_account_data(mapping_address).await?;
let mapping_acct = load_mapping_account(&mapping_data)?;

// Split product keys into chunks of 150 to avoid too many open files error (error trying to connect: tcp open error: Too many open files (os error 24))
for product_keys_chunk in mapping_acct
.products
.iter()
.filter(|&prod_pkey| *prod_pkey != Pubkey::default())
.collect::<Vec<_>>()
.chunks(150)
{
// Prepare a list of futures for fetching product account data for each chunk
let fetch_product_data_futures = product_keys_chunk
.iter()
.map(|prod_pkey| rpc_client.get_account_data(prod_pkey))
.collect::<Vec<_>>();

// Await all futures concurrently within the chunk
let products_data_results = futures::future::join_all(fetch_product_data_futures).await;

for prod_data_result in products_data_results {
match prod_data_result {
Ok(prod_data) => {
let prod_acct = match load_product_account(&prod_data) {
Ok(prod_acct) => prod_acct,
Err(e) => {
println!("Error loading product account: {}", e);
continue;
}
};

// TODO: Add stricter type checking for attributes
let attributes = prod_acct
.iter()
.filter(|(key, _)| !key.is_empty())
.map(|(key, val)| (key.to_string(), val.to_string()))
.collect::<BTreeMap<String, String>>();

if prod_acct.px_acc != Pubkey::default() {
let px_pkey = prod_acct.px_acc;
let px_pkey_bytes = bs58::decode(&px_pkey.to_string()).into_vec()?;
let px_pkey_array: [u8; 32] = px_pkey_bytes
.try_into()
.expect("Invalid length for PriceIdentifier");

let price_feed_metadata = PriceFeedMetadata {
id: RpcPriceIdentifier::new(px_pkey_array),
attributes,
};

price_feeds_metadata.push(price_feed_metadata);
}
}
Err(e) => {
println!("Error loading product account: {}", e);
continue;
}
}
}
}
Ok(price_feeds_metadata)
}
Loading