refactor: ThreadPool use build not build_global (#512)

Dangerous: Change how threadpool initialized
This commit is contained in:
andy.boot
2025-07-06 09:42:02 +01:00
committed by GitHub
parent 9cc557cada
commit 17662e8ff1

View File

@@ -13,7 +13,6 @@ mod utils;
use crate::cli::Cli; use crate::cli::Cli;
use crate::config::Config; use crate::config::Config;
use crate::display_node::DisplayNode; use crate::display_node::DisplayNode;
use crate::node::FileTime;
use crate::progress::RuntimeErrors; use crate::progress::RuntimeErrors;
use clap::Parser; use clap::Parser;
use dir_walker::WalkData; use dir_walker::WalkData;
@@ -276,43 +275,49 @@ fn main() {
progress_data: indicator.data.clone(), progress_data: indicator.data.clone(),
errors: errors_for_rayon, errors: errors_for_rayon,
}; };
let threads_to_use = config.get_threads(&options); let threads_to_use = config.get_threads(&options);
let stack_size = config.get_custom_stack_size(&options); let stack_size = config.get_custom_stack_size(&options);
init_rayon(&stack_size, &threads_to_use);
let top_level_nodes = walk_it(simplified_dirs, &walk_data); init_rayon(&stack_size, &threads_to_use).install(|| {
let top_level_nodes = walk_it(simplified_dirs, &walk_data);
let tree = match summarize_file_types { let tree = match summarize_file_types {
true => get_all_file_types(&top_level_nodes, number_of_lines, &by_filetime), true => get_all_file_types(&top_level_nodes, number_of_lines, walk_data.by_filetime),
false => { false => {
let agg_data = AggregateData { let agg_data = AggregateData {
min_size: config.get_min_size(&options), min_size: config.get_min_size(&options),
only_dir: config.get_only_dir(&options), only_dir: config.get_only_dir(&options),
only_file: config.get_only_file(&options), only_file: config.get_only_file(&options),
number_of_lines, number_of_lines,
depth, depth,
using_a_filter: !filter_regexs.is_empty() || !invert_filter_regexs.is_empty(), using_a_filter: !filter_regexs.is_empty() || !invert_filter_regexs.is_empty(),
short_paths: !config.get_full_paths(&options), short_paths: !config.get_full_paths(&options),
}; };
get_biggest(top_level_nodes, agg_data, &by_filetime, keep_collapsed) get_biggest(
} top_level_nodes,
}; agg_data,
walk_data.by_filetime,
keep_collapsed,
)
}
};
// Must have stopped indicator before we print to stderr // Must have stopped indicator before we print to stderr
indicator.stop(); indicator.stop();
let print_errors = config.get_print_errors(&options); let print_errors = config.get_print_errors(&options);
print_any_errors(print_errors, walk_data.errors); print_any_errors(print_errors, walk_data.errors);
print_output( print_output(
config, config,
options, options,
tree, tree,
walk_data.by_filecount, walk_data.by_filecount,
by_filetime, is_colors,
is_colors, terminal_width,
terminal_width, )
) });
} }
fn print_output( fn print_output(
@@ -320,7 +325,6 @@ fn print_output(
options: Cli, options: Cli,
tree: DisplayNode, tree: DisplayNode,
by_filecount: bool, by_filecount: bool,
by_filetime: Option<FileTime>,
is_colors: bool, is_colors: bool,
terminal_width: usize, terminal_width: usize,
) { ) {
@@ -337,7 +341,7 @@ fn print_output(
is_reversed: !config.get_reverse(&options), is_reversed: !config.get_reverse(&options),
colors_on: is_colors, colors_on: is_colors,
by_filecount, by_filecount,
by_filetime, by_filetime: config.get_filetime(&options),
is_screen_reader: config.get_screen_reader(&options), is_screen_reader: config.get_screen_reader(&options),
output_format, output_format,
bars_on_right: config.get_bars_on_right(&options), bars_on_right: config.get_bars_on_right(&options),
@@ -390,44 +394,54 @@ fn print_any_errors(print_errors: bool, errors: Arc<Mutex<RuntimeErrors>>) {
} }
} }
fn init_rayon(stack_size: &Option<usize>, threads: &Option<usize>) { fn init_rayon(stack: &Option<usize>, threads: &Option<usize>) -> rayon::ThreadPool {
// Rayon seems to raise this error on 32-bit builds let stack_size = match stack {
// The global thread pool has not been initialized.: ThreadPoolBuildError { kind: GlobalPoolAlreadyInitialized } Some(s) => Some(*s),
if cfg!(target_pointer_width = "64") { None => {
let result = panic::catch_unwind(|| build_thread_pool(*stack_size, *threads)); // Do not increase the stack size on a 32 bit system, it will fail
if result.is_err() { if cfg!(target_pointer_width = "32") {
eprintln!("Problem initializing rayon, try: export RAYON_NUM_THREADS=1") None
} else {
let large_stack = usize::pow(1024, 3);
let mut s = System::new();
s.refresh_memory();
// Larger stack size if possible to handle cases with lots of nested directories
let available = s.available_memory();
if available > (large_stack * threads.unwrap_or(1)).try_into().unwrap() {
Some(large_stack)
} else {
None
}
}
}
};
match build_thread_pool(stack_size, threads) {
Ok(pool) => pool,
Err(err) => {
eprintln!("Problem initializing rayon, try: export RAYON_NUM_THREADS=1");
if stack.is_none() && stack_size.is_some() {
// stack parameter was none, try with default stack size
if let Ok(pool) = build_thread_pool(None, threads) {
eprintln!("WARNING: not using large stack size, got error: {err}");
return pool;
}
}
panic!("{err}");
} }
} }
} }
fn build_thread_pool( fn build_thread_pool(
stack: Option<usize>, stack_size: Option<usize>,
threads: Option<usize>, threads: &Option<usize>,
) -> Result<(), rayon::ThreadPoolBuildError> { ) -> Result<rayon::ThreadPool, rayon::ThreadPoolBuildError> {
let mut pool = rayon::ThreadPoolBuilder::new(); let mut pool_builder = rayon::ThreadPoolBuilder::new();
if let Some(thread_count) = threads {
pool = pool.num_threads(thread_count);
}
let stack_size = match stack {
Some(s) => Some(s),
None => {
let large_stack = usize::pow(1024, 3);
let mut s = System::new();
s.refresh_memory();
// Larger stack size if possible to handle cases with lots of nested directories
let available = s.available_memory();
if available > large_stack.try_into().unwrap() {
Some(large_stack)
} else {
None
}
}
};
if let Some(stack_size_param) = stack_size { if let Some(stack_size_param) = stack_size {
pool = pool.stack_size(stack_size_param); pool_builder = pool_builder.stack_size(stack_size_param);
} }
pool.build_global() if let Some(thread_count) = threads {
pool_builder = pool_builder.num_threads(*thread_count);
}
pool_builder.build()
} }