Skip to content

Commit 2aa6f93

Browse files
Failing but working :)
Diarmuid enright concurrent downloads
2 parents b79969b + 21b5cbe commit 2aa6f93

File tree

12 files changed

+814
-210
lines changed

12 files changed

+814
-210
lines changed

Cargo.lock

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ enum-map = "2.5.0"
5252
env_proxy = { version = "0.4.1", optional = true }
5353
flate2 = { version = "1.1.1", default-features = false, features = ["zlib-rs"] }
5454
fs_at = "0.2.1"
55+
futures = "0.3.31"
5556
git-testament = "0.2"
5657
home = "0.5.4"
5758
itertools = "0.14"
@@ -78,6 +79,7 @@ serde = { version = "1.0", features = ["derive"] }
7879
sha2 = "0.10"
7980
sharded-slab = "0.1.1"
8081
strsim = "0.11"
82+
sys-info = "0.9.1"
8183
tar = "0.4.26"
8284
tempfile = "3.8"
8385
termcolor = "1.2"

src/cli/rustup_mode.rs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ use anyhow::{Context, Error, Result, anyhow};
1010
use clap::{Args, CommandFactory, Parser, Subcommand, ValueEnum, builder::PossibleValue};
1111
use clap_complete::Shell;
1212
use itertools::Itertools;
13-
use tracing::{info, trace, warn};
13+
use tracing::{info, trace, warn, debug};
1414
use tracing_subscriber::{EnvFilter, Registry, reload::Handle};
15+
use sys_info;
1516

1617
use crate::dist::AutoInstallMode;
1718
use crate::{
@@ -78,6 +79,10 @@ struct Rustup {
7879
#[arg(short, long, conflicts_with = "verbose")]
7980
quiet: bool,
8081

82+
/// Enable concurrent downloads and I/O operations
83+
#[arg(long)]
84+
concurrent: bool,
85+
8186
/// Release channel (e.g. +stable) or custom toolchain to set override
8287
#[arg(
8388
name = "+toolchain",
@@ -594,6 +599,34 @@ pub async fn main(
594599

595600
update_console_filter(process, &console_filter, matches.quiet, matches.verbose);
596601

602+
// Configure concurrency mode if specified
603+
if matches.concurrent {
604+
// Set up environment variables to enable concurrent operations
605+
info!("Enabling concurrent downloads and I/O operations");
606+
unsafe {
607+
std::env::set_var("RUSTUP_IO_THREADS", "0"); // Use auto-detected optimal thread count
608+
}
609+
610+
// Based on system memory, set a reasonable RAM budget for I/O operations
611+
if std::env::var("RUSTUP_RAM_BUDGET").is_err() {
612+
if let Ok(mem_info) = sys_info::mem_info() {
613+
let total_ram = mem_info.total as usize * 1024; // Convert to bytes
614+
let ram_budget = total_ram / 10; // Use 10% of system memory
615+
unsafe {
616+
std::env::set_var("RUSTUP_RAM_BUDGET", ram_budget.to_string());
617+
}
618+
debug!("Auto-configured RAM budget to {} MB", ram_budget / (1024 * 1024));
619+
}
620+
}
621+
} else {
622+
// Ensure we use the non-concurrent mode for compatibility
623+
if std::env::var("RUSTUP_IO_THREADS").is_err() {
624+
unsafe {
625+
std::env::set_var("RUSTUP_IO_THREADS", "1"); // Use single-threaded I/O
626+
}
627+
}
628+
}
629+
597630
let cfg = &mut common::set_globals(current_dir, matches.quiet, process)?;
598631

599632
if let Some(t) = &matches.plus_toolchain {
@@ -1701,6 +1734,7 @@ impl clap::ValueEnum for CompletionCommand {
17011734
}
17021735
}
17031736

1737+
// Implementation outside of the ValueEnum trait
17041738
impl fmt::Display for CompletionCommand {
17051739
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17061740
match self.to_possible_value() {
@@ -1728,7 +1762,6 @@ fn output_completion_script(
17281762
if let Shell::Zsh = shell {
17291763
writeln!(process.stdout().lock(), "#compdef cargo")?;
17301764
}
1731-
17321765
let script = match shell {
17331766
Shell::Bash => "/etc/bash_completion.d/cargo",
17341767
Shell::Zsh => "/share/zsh/site-functions/_cargo",
@@ -1740,7 +1773,6 @@ fn output_completion_script(
17401773
));
17411774
}
17421775
};
1743-
17441776
writeln!(
17451777
process.stdout().lock(),
17461778
"if command -v rustc >/dev/null 2>&1; then\n\
@@ -1749,6 +1781,5 @@ fn output_completion_script(
17491781
)?;
17501782
}
17511783
}
1752-
17531784
Ok(utils::ExitCode(0))
17541785
}

src/diskio/mod.rs

Lines changed: 154 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,10 @@ use std::sync::mpsc::Receiver;
6363
use std::thread::available_parallelism;
6464
use std::time::{Duration, Instant};
6565
use std::{fmt::Debug, fs::OpenOptions};
66+
use tracing::debug;
6667

6768
use anyhow::{Context, Result};
69+
use sys_info;
6870

6971
use crate::process::Process;
7072
use crate::utils::notifications::Notification;
@@ -78,6 +80,26 @@ pub(crate) enum FileBuffer {
7880
Threaded(PoolReference),
7981
}
8082

83+
impl PartialEq for FileBuffer {
84+
fn eq(&self, other: &Self) -> bool {
85+
self.deref() == other.deref()
86+
}
87+
}
88+
89+
impl Eq for FileBuffer {}
90+
91+
impl PartialOrd for FileBuffer {
92+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
93+
Some(self.cmp(other))
94+
}
95+
}
96+
97+
impl Ord for FileBuffer {
98+
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
99+
self.deref().cmp(other.deref())
100+
}
101+
}
102+
81103
impl FileBuffer {
82104
/// All the buffers space to be re-used when the last reference to it is dropped.
83105
pub(crate) fn clear(&mut self) {
@@ -137,6 +159,37 @@ pub(crate) enum IncrementalFile {
137159
ThreadedReceiver(Receiver<FileBuffer>),
138160
}
139161

162+
impl PartialEq for IncrementalFile {
163+
fn eq(&self, other: &Self) -> bool {
164+
// Just compare discriminants since Receiver cannot be compared
165+
matches!(
166+
(self, other),
167+
(Self::ImmediateReceiver, Self::ImmediateReceiver) |
168+
(Self::ThreadedReceiver(_), Self::ThreadedReceiver(_))
169+
)
170+
}
171+
}
172+
173+
impl Eq for IncrementalFile {}
174+
175+
impl PartialOrd for IncrementalFile {
176+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
177+
Some(self.cmp(other))
178+
}
179+
}
180+
181+
impl Ord for IncrementalFile {
182+
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
183+
// ImmediateReceiver is "less than" ThreadedReceiver
184+
match (self, other) {
185+
(Self::ImmediateReceiver, Self::ImmediateReceiver) => std::cmp::Ordering::Equal,
186+
(Self::ImmediateReceiver, Self::ThreadedReceiver(_)) => std::cmp::Ordering::Less,
187+
(Self::ThreadedReceiver(_), Self::ImmediateReceiver) => std::cmp::Ordering::Greater,
188+
(Self::ThreadedReceiver(_), Self::ThreadedReceiver(_)) => std::cmp::Ordering::Equal,
189+
}
190+
}
191+
}
192+
140193
// The basic idea is that in single threaded mode we get this pattern:
141194
// package budget io-layer
142195
// +<-claim->
@@ -176,13 +229,28 @@ pub(crate) enum IncrementalFile {
176229
// Error reporting is passed through the regular completion port, to avoid creating a new special case.
177230

178231
/// What kind of IO operation to perform
179-
#[derive(Debug)]
232+
#[derive(Debug, Eq, Ord, PartialEq, PartialOrd)]
180233
pub(crate) enum Kind {
181234
Directory,
182235
File(FileBuffer),
183236
IncrementalFile(IncrementalFile),
184237
}
185238

239+
/// Priority level for I/O operations
240+
/// Higher values indicate higher priority
241+
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
242+
pub enum IOPriority {
243+
Critical,
244+
Normal,
245+
Background,
246+
}
247+
248+
impl Default for IOPriority {
249+
fn default() -> Self {
250+
Self::Normal
251+
}
252+
}
253+
186254
/// The details of the IO operation
187255
#[derive(Debug)]
188256
pub(crate) struct Item {
@@ -198,6 +266,32 @@ pub(crate) struct Item {
198266
pub(crate) result: io::Result<()>,
199267
/// The mode to apply
200268
mode: u32,
269+
/// Priority of this operation
270+
priority: IOPriority,
271+
}
272+
273+
impl PartialEq for Item {
274+
fn eq(&self, other: &Self) -> bool {
275+
self.priority == other.priority && self.full_path == other.full_path
276+
}
277+
}
278+
279+
impl Eq for Item {}
280+
281+
impl PartialOrd for Item {
282+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
283+
Some(self.cmp(other))
284+
}
285+
}
286+
287+
impl Ord for Item {
288+
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
289+
// Sort by priority first (higher priority comes first)
290+
match other.priority.cmp(&self.priority) {
291+
std::cmp::Ordering::Equal => self.full_path.cmp(&other.full_path),
292+
ordering => ordering,
293+
}
294+
}
201295
}
202296

203297
#[derive(Debug)]
@@ -218,6 +312,7 @@ impl Item {
218312
finish: None,
219313
result: Ok(()),
220314
mode,
315+
priority: IOPriority::default(),
221316
}
222317
}
223318

@@ -229,6 +324,7 @@ impl Item {
229324
finish: None,
230325
result: Ok(()),
231326
mode,
327+
priority: IOPriority::default(),
232328
}
233329
}
234330

@@ -245,9 +341,23 @@ impl Item {
245341
finish: None,
246342
result: Ok(()),
247343
mode,
344+
priority: IOPriority::default(),
248345
};
249346
Ok((result, Box::new(chunk_submit)))
250347
}
348+
349+
/// Set the priority of this I/O operation
350+
/// remove for now
351+
#[allow(dead_code)]
352+
pub(crate) fn with_priority(mut self, priority: IOPriority) -> Self {
353+
self.priority = priority;
354+
self
355+
}
356+
357+
/// Get the priority of this I/O operation
358+
pub(crate) fn priority(&self) -> IOPriority {
359+
self.priority
360+
}
251361
}
252362

253363
// This could be a boxed trait object perhaps... but since we're looking at
@@ -448,15 +558,56 @@ pub(crate) fn get_executor<'a>(
448558
ram_budget: usize,
449559
process: &Process,
450560
) -> Result<Box<dyn Executor + 'a>> {
561+
// Calculate optimal thread count based on system characteristics
562+
// Default is CPU count for CPU-bound systems, or 2x CPU count for I/O-bound operations
563+
let default_thread_count = available_parallelism()
564+
.map(|p| {
565+
let cpu_count = p.get();
566+
// Use more threads for I/O bound operations to hide latency
567+
// but cap it to avoid too much overhead
568+
std::cmp::min(cpu_count * 2, 16)
569+
})
570+
.unwrap_or(2);
571+
451572
// If this gets lots of use, consider exposing via the config file.
452573
let thread_count = match process.var("RUSTUP_IO_THREADS") {
453-
Err(_) => available_parallelism().map(|p| p.get()).unwrap_or(1),
574+
Err(_) => default_thread_count,
454575
Ok(n) => n
455576
.parse::<usize>()
456577
.context("invalid value in RUSTUP_IO_THREADS. Must be a natural number")?,
457578
};
579+
580+
// Calculate optimal memory budget based on system memory
581+
// Default to 10% of system memory, or fallback to 256MB
582+
let default_ram_budget = if ram_budget == 0 {
583+
match sys_info::mem_info() {
584+
Ok(mem) => {
585+
let total_mem = mem.total as usize * 1024; // Convert to bytes
586+
total_mem / 10 // Use 10% of system memory
587+
}
588+
Err(_) => 256 * 1024 * 1024, // Fallback to 256MB
589+
}
590+
} else {
591+
ram_budget
592+
};
593+
594+
// Allow overriding the memory budget via environment variable (maybe keep this but useful for testing on different systems right now)
595+
let actual_ram_budget = match process.var("RUSTUP_RAM_BUDGET") {
596+
Err(_) => default_ram_budget,
597+
Ok(n) => n
598+
.parse::<usize>()
599+
.context("invalid value in RUSTUP_RAM_BUDGET. Must be in bytes")?,
600+
};
601+
602+
// Log the chosen configuration for debugging
603+
debug!(
604+
"Using IO executor with thread_count={} and ram_budget={}MB",
605+
thread_count,
606+
actual_ram_budget / (1024 * 1024)
607+
);
608+
458609
Ok(match thread_count {
459610
0 | 1 => Box::new(immediate::ImmediateUnpacker::new()),
460-
n => Box::new(threaded::Threaded::new(notify_handler, n, ram_budget)),
611+
n => Box::new(threaded::Threaded::new(notify_handler, n, actual_ram_budget)),
461612
})
462613
}

0 commit comments

Comments
 (0)