Skip to content

Commit 7cd8977

Browse files
authored
Merge pull request #2612 from pyth-network/hermes-ws
feat(apps/hermes): add connection timeout for SSE & WebSocket connections
2 parents bbc699f + 2c26a12 commit 7cd8977

File tree

4 files changed

+47
-6
lines changed

4 files changed

+47
-6
lines changed

apps/hermes/server/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/hermes/server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hermes"
3-
version = "0.8.5"
3+
version = "0.8.6"
44
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
55
edition = "2021"
66

apps/hermes/server/src/api/rest/v2/sse.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@ use {
1919
pyth_sdk::PriceIdentifier,
2020
serde::Deserialize,
2121
serde_qs::axum::QsQuery,
22-
std::convert::Infallible,
23-
tokio::sync::broadcast,
22+
std::{convert::Infallible, time::Duration},
23+
tokio::{sync::broadcast, time::Instant},
2424
tokio_stream::{wrappers::BroadcastStream, StreamExt as _},
2525
utoipa::IntoParams,
2626
};
2727

28+
// Constants
29+
const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours
30+
2831
#[derive(Debug, Deserialize, IntoParams)]
2932
#[into_params(parameter_in = Query)]
3033
pub struct StreamPriceUpdatesQueryParams {
@@ -75,6 +78,9 @@ fn default_true() -> bool {
7578
params(StreamPriceUpdatesQueryParams)
7679
)]
7780
/// SSE route handler for streaming price updates.
81+
///
82+
/// The connection will automatically close after 24 hours to prevent resource leaks.
83+
/// Clients should implement reconnection logic to maintain continuous price updates.
7884
pub async fn price_stream_sse_handler<S>(
7985
State(state): State<ApiState<S>>,
8086
QsQuery(params): QsQuery<StreamPriceUpdatesQueryParams>,
@@ -93,7 +99,11 @@ where
9399
// Convert the broadcast receiver into a Stream
94100
let stream = BroadcastStream::new(update_rx);
95101

102+
// Set connection start time
103+
let start_time = Instant::now();
104+
96105
let sse_stream = stream
106+
.take_while(move |_| start_time.elapsed() < MAX_CONNECTION_DURATION)
97107
.then(move |message| {
98108
let state_clone = state.clone(); // Clone again to use inside the async block
99109
let price_ids_clone = price_ids.clone(); // Clone again for use inside the async block
@@ -122,7 +132,12 @@ where
122132
}
123133
}
124134
})
125-
.filter_map(|x| x);
135+
.filter_map(|x| x)
136+
.chain(futures::stream::once(async {
137+
Ok(Event::default()
138+
.event("error")
139+
.data("Connection timeout reached (24h)"))
140+
}));
126141

127142
Ok(Sse::new(sse_stream).keep_alive(KeepAlive::default()))
128143
}

apps/hermes/server/src/api/ws.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,15 @@ use {
4040
},
4141
time::Duration,
4242
},
43-
tokio::sync::{broadcast::Receiver, watch},
43+
tokio::{
44+
sync::{broadcast::Receiver, watch},
45+
time::Instant,
46+
},
4447
};
4548

4649
const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30);
4750
const MAX_CLIENT_MESSAGE_SIZE: usize = 100 * 1024; // 100 KiB
51+
const MAX_CONNECTION_DURATION: Duration = Duration::from_secs(24 * 60 * 60); // 24 hours
4852

4953
/// The maximum number of bytes that can be sent per second per IP address.
5054
/// If the limit is exceeded, the connection is closed.
@@ -252,6 +256,7 @@ pub struct Subscriber<S> {
252256
sender: SplitSink<WebSocket, Message>,
253257
price_feeds_with_config: HashMap<PriceIdentifier, PriceFeedClientConfig>,
254258
ping_interval: tokio::time::Interval,
259+
connection_deadline: Instant,
255260
exit: watch::Receiver<bool>,
256261
responded_to_ping: bool,
257262
}
@@ -280,6 +285,7 @@ where
280285
sender,
281286
price_feeds_with_config: HashMap::new(),
282287
ping_interval: tokio::time::interval(PING_INTERVAL_DURATION),
288+
connection_deadline: Instant::now() + MAX_CONNECTION_DURATION,
283289
exit: crate::EXIT.subscribe(),
284290
responded_to_ping: true, // We start with true so we don't close the connection immediately
285291
}
@@ -325,6 +331,26 @@ where
325331
self.sender.send(Message::Ping(vec![])).await?;
326332
Ok(())
327333
},
334+
_ = tokio::time::sleep_until(self.connection_deadline) => {
335+
tracing::info!(
336+
id = self.id,
337+
ip = ?self.ip_addr,
338+
"Connection timeout reached (24h). Closing connection.",
339+
);
340+
self.sender
341+
.send(
342+
serde_json::to_string(&ServerMessage::Response(
343+
ServerResponseMessage::Err {
344+
error: "Connection timeout reached (24h)".to_string(),
345+
},
346+
))?
347+
.into(),
348+
)
349+
.await?;
350+
self.sender.close().await?;
351+
self.closed = true;
352+
Ok(())
353+
},
328354
_ = self.exit.changed() => {
329355
self.sender.close().await?;
330356
self.closed = true;

0 commit comments

Comments
 (0)