Detailed changes
@@ -339,59 +339,70 @@ fn main() -> Result<()> {
"Dev servers were removed in v0.157.x please upgrade to SSH remoting: https://zed.dev/docs/remote-development"
);
- let sender: JoinHandle<anyhow::Result<()>> = thread::spawn({
- let exit_status = exit_status.clone();
- let user_data_dir_for_thread = user_data_dir.clone();
- move || {
- let (_, handshake) = server.accept().context("Handshake after Zed spawn")?;
- let (tx, rx) = (handshake.requests, handshake.responses);
-
- #[cfg(target_os = "windows")]
- let wsl = args.wsl;
- #[cfg(not(target_os = "windows"))]
- let wsl = None;
-
- tx.send(CliRequest::Open {
- paths,
- urls,
- diff_paths,
- wsl,
- wait: args.wait,
- open_new_workspace,
- env,
- user_data_dir: user_data_dir_for_thread,
- })?;
-
- while let Ok(response) = rx.recv() {
- match response {
- CliResponse::Ping => {}
- CliResponse::Stdout { message } => println!("{message}"),
- CliResponse::Stderr { message } => eprintln!("{message}"),
- CliResponse::Exit { status } => {
- exit_status.lock().replace(status);
- return Ok(());
+ let sender: JoinHandle<anyhow::Result<()>> = thread::Builder::new()
+ .name("CliReceiver".to_string())
+ .spawn({
+ let exit_status = exit_status.clone();
+ let user_data_dir_for_thread = user_data_dir.clone();
+ move || {
+ let (_, handshake) = server.accept().context("Handshake after Zed spawn")?;
+ let (tx, rx) = (handshake.requests, handshake.responses);
+
+ #[cfg(target_os = "windows")]
+ let wsl = args.wsl;
+ #[cfg(not(target_os = "windows"))]
+ let wsl = None;
+
+ tx.send(CliRequest::Open {
+ paths,
+ urls,
+ diff_paths,
+ wsl,
+ wait: args.wait,
+ open_new_workspace,
+ env,
+ user_data_dir: user_data_dir_for_thread,
+ })?;
+
+ while let Ok(response) = rx.recv() {
+ match response {
+ CliResponse::Ping => {}
+ CliResponse::Stdout { message } => println!("{message}"),
+ CliResponse::Stderr { message } => eprintln!("{message}"),
+ CliResponse::Exit { status } => {
+ exit_status.lock().replace(status);
+ return Ok(());
+ }
}
}
- }
- Ok(())
- }
- });
+ Ok(())
+ }
+ })
+ .unwrap();
let stdin_pipe_handle: Option<JoinHandle<anyhow::Result<()>>> =
stdin_tmp_file.map(|mut tmp_file| {
- thread::spawn(move || {
- let mut stdin = std::io::stdin().lock();
- if !io::IsTerminal::is_terminal(&stdin) {
- io::copy(&mut stdin, &mut tmp_file)?;
- }
- Ok(())
- })
+ thread::Builder::new()
+ .name("CliStdin".to_string())
+ .spawn(move || {
+ let mut stdin = std::io::stdin().lock();
+ if !io::IsTerminal::is_terminal(&stdin) {
+ io::copy(&mut stdin, &mut tmp_file)?;
+ }
+ Ok(())
+ })
+ .unwrap()
});
let anonymous_fd_pipe_handles: Vec<_> = anonymous_fd_tmp_files
.into_iter()
- .map(|(mut file, mut tmp_file)| thread::spawn(move || io::copy(&mut file, &mut tmp_file)))
+ .map(|(mut file, mut tmp_file)| {
+ thread::Builder::new()
+ .name("CliAnonymousFd".to_string())
+ .spawn(move || io::copy(&mut file, &mut tmp_file))
+ .unwrap()
+ })
.collect();
if args.foreground {
@@ -321,16 +321,19 @@ pub fn crash_server(socket: &Path) {
let shutdown = Arc::new(AtomicBool::new(false));
let has_connection = Arc::new(AtomicBool::new(false));
- std::thread::spawn({
- let shutdown = shutdown.clone();
- let has_connection = has_connection.clone();
- move || {
- std::thread::sleep(CRASH_HANDLER_CONNECT_TIMEOUT);
- if !has_connection.load(Ordering::SeqCst) {
- shutdown.store(true, Ordering::SeqCst);
+ thread::Builder::new()
+ .name("CrashServerTimeout".to_owned())
+ .spawn({
+ let shutdown = shutdown.clone();
+ let has_connection = has_connection.clone();
+ move || {
+ std::thread::sleep(CRASH_HANDLER_CONNECT_TIMEOUT);
+ if !has_connection.load(Ordering::SeqCst) {
+ shutdown.store(true, Ordering::SeqCst);
+ }
}
- }
- });
+ })
+ .unwrap();
server
.run(
@@ -79,9 +79,12 @@ impl<S: Source> Denoiser<S> {
let (input_tx, input_rx) = mpsc::channel();
let (denoised_tx, denoised_rx) = mpsc::channel();
- thread::spawn(move || {
- run_neural_denoiser(denoised_tx, input_rx);
- });
+ thread::Builder::new()
+ .name("NeuralDenoiser".to_owned())
+ .spawn(move || {
+ run_neural_denoiser(denoised_tx, input_rx);
+ })
+ .unwrap();
Ok(Self {
inner: source,
@@ -6,6 +6,7 @@ use parking_lot::Mutex;
use std::{
path::{Path, PathBuf},
sync::Weak,
+ thread,
time::Duration,
};
@@ -48,9 +49,12 @@ impl Watcher for MacWatcher {
let (stream, handle) = EventStream::new(&[path], self.latency);
let tx = self.events_tx.clone();
- std::thread::spawn(move || {
- stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
- });
+ thread::Builder::new()
+ .name("MacWatcher".to_owned())
+ .spawn(move || {
+ stream.run(move |events| smol::block_on(tx.send(events)).is_ok());
+ })
+ .unwrap();
handles.insert(path.into(), handle);
Ok(())
@@ -37,51 +37,57 @@ impl LinuxDispatcher {
let mut background_threads = (0..thread_count)
.map(|i| {
let receiver = background_receiver.clone();
- std::thread::spawn(move || {
- for runnable in receiver {
- let start = Instant::now();
-
- runnable.run();
-
- log::trace!(
- "background thread {}: ran runnable. took: {:?}",
- i,
- start.elapsed()
- );
- }
- })
+ std::thread::Builder::new()
+ .name(format!("Worker-{i}"))
+ .spawn(move || {
+ for runnable in receiver {
+ let start = Instant::now();
+
+ runnable.run();
+
+ log::trace!(
+ "background thread {}: ran runnable. took: {:?}",
+ i,
+ start.elapsed()
+ );
+ }
+ })
+ .unwrap()
})
.collect::<Vec<_>>();
let (timer_sender, timer_channel) = calloop::channel::channel::<TimerAfter>();
- let timer_thread = std::thread::spawn(|| {
- let mut event_loop: EventLoop<()> =
- EventLoop::try_new().expect("Failed to initialize timer loop!");
-
- let handle = event_loop.handle();
- let timer_handle = event_loop.handle();
- handle
- .insert_source(timer_channel, move |e, _, _| {
- if let channel::Event::Msg(timer) = e {
- // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
- let mut runnable = Some(timer.runnable);
- timer_handle
- .insert_source(
- calloop::timer::Timer::from_duration(timer.duration),
- move |_, _, _| {
- if let Some(runnable) = runnable.take() {
- runnable.run();
- }
- TimeoutAction::Drop
- },
- )
- .expect("Failed to start timer");
- }
- })
- .expect("Failed to start timer thread");
-
- event_loop.run(None, &mut (), |_| {}).log_err();
- });
+ let timer_thread = std::thread::Builder::new()
+ .name("Timer".to_owned())
+ .spawn(|| {
+ let mut event_loop: EventLoop<()> =
+ EventLoop::try_new().expect("Failed to initialize timer loop!");
+
+ let handle = event_loop.handle();
+ let timer_handle = event_loop.handle();
+ handle
+ .insert_source(timer_channel, move |e, _, _| {
+ if let channel::Event::Msg(timer) = e {
+ // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once.
+ let mut runnable = Some(timer.runnable);
+ timer_handle
+ .insert_source(
+ calloop::timer::Timer::from_duration(timer.duration),
+ move |_, _, _| {
+ if let Some(runnable) = runnable.take() {
+ runnable.run();
+ }
+ TimeoutAction::Drop
+ },
+ )
+ .expect("Failed to start timer");
+ }
+ })
+ .expect("Failed to start timer thread");
+
+ event_loop.run(None, &mut (), |_| {}).log_err();
+ })
+ .unwrap();
background_threads.push(timer_thread);
@@ -957,15 +957,17 @@ impl Clipboard {
}
// At this point we know that the clipboard does not exist.
let ctx = Arc::new(Inner::new()?);
- let join_handle;
- {
- let ctx = Arc::clone(&ctx);
- join_handle = std::thread::spawn(move || {
- if let Err(error) = serve_requests(ctx) {
- log::error!("Worker thread errored with: {}", error);
+ let join_handle = std::thread::Builder::new()
+ .name("Clipboard".to_owned())
+ .spawn({
+ let ctx = Arc::clone(&ctx);
+ move || {
+ if let Err(error) = serve_requests(ctx) {
+ log::error!("Worker thread errored with: {}", error);
+ }
}
- });
- }
+ })
+ .unwrap();
*global_cb = Some(GlobalClipboard {
inner: Arc::clone(&ctx),
server_handle: join_handle,
@@ -243,29 +243,32 @@ impl WindowsPlatform {
let validation_number = self.inner.validation_number;
let all_windows = Arc::downgrade(&self.raw_window_handles);
let text_system = Arc::downgrade(&self.text_system);
- std::thread::spawn(move || {
- let vsync_provider = VSyncProvider::new();
- loop {
- vsync_provider.wait_for_vsync();
- if check_device_lost(&directx_device.device) {
- handle_gpu_device_lost(
- &mut directx_device,
- platform_window.as_raw(),
- validation_number,
- &all_windows,
- &text_system,
- );
- }
- let Some(all_windows) = all_windows.upgrade() else {
- break;
- };
- for hwnd in all_windows.read().iter() {
- unsafe {
- let _ = RedrawWindow(Some(hwnd.as_raw()), None, None, RDW_INVALIDATE);
+ std::thread::Builder::new()
+ .name("VSyncProvider".to_owned())
+ .spawn(move || {
+ let vsync_provider = VSyncProvider::new();
+ loop {
+ vsync_provider.wait_for_vsync();
+ if check_device_lost(&directx_device.device) {
+ handle_gpu_device_lost(
+ &mut directx_device,
+ platform_window.as_raw(),
+ validation_number,
+ &all_windows,
+ &text_system,
+ );
+ }
+ let Some(all_windows) = all_windows.upgrade() else {
+ break;
+ };
+ for hwnd in all_windows.read().iter() {
+ unsafe {
+ let _ = RedrawWindow(Some(hwnd.as_raw()), None, None, RDW_INVALIDATE);
+ }
}
}
- }
- });
+ })
+ .unwrap();
}
}
@@ -188,12 +188,15 @@ impl AudioStack {
let voip_parts = audio::VoipParts::new(cx)?;
// Audio needs to run real-time and should never be paused. That is why we are using a
// normal std::thread and not a background task
- thread::spawn(move || {
- // microphone is non send on mac
- let microphone = audio::Audio::open_microphone(voip_parts)?;
- send_to_livekit(frame_tx, microphone);
- Ok::<(), anyhow::Error>(())
- });
+ thread::Builder::new()
+ .name("AudioCapture".to_string())
+ .spawn(move || {
+ // microphone is non send on mac
+ let microphone = audio::Audio::open_microphone(voip_parts)?;
+ send_to_livekit(frame_tx, microphone);
+ Ok::<(), anyhow::Error>(())
+ })
+ .unwrap();
Task::ready(Ok(()))
} else {
self.executor.spawn(async move {
@@ -229,57 +232,60 @@ impl AudioStack {
let mut resampler = audio_resampler::AudioResampler::default();
let mut buf = Vec::new();
- thread::spawn(move || {
- let output_stream = output_device.build_output_stream(
- &output_config.config(),
- {
- move |mut data, _info| {
- while data.len() > 0 {
- if data.len() <= buf.len() {
- let rest = buf.split_off(data.len());
- data.copy_from_slice(&buf);
- buf = rest;
- return;
- }
- if buf.len() > 0 {
- let (prefix, suffix) = data.split_at_mut(buf.len());
- prefix.copy_from_slice(&buf);
- data = suffix;
- }
+ thread::Builder::new()
+ .name("AudioPlayback".to_owned())
+ .spawn(move || {
+ let output_stream = output_device.build_output_stream(
+ &output_config.config(),
+ {
+ move |mut data, _info| {
+ while data.len() > 0 {
+ if data.len() <= buf.len() {
+ let rest = buf.split_off(data.len());
+ data.copy_from_slice(&buf);
+ buf = rest;
+ return;
+ }
+ if buf.len() > 0 {
+ let (prefix, suffix) = data.split_at_mut(buf.len());
+ prefix.copy_from_slice(&buf);
+ data = suffix;
+ }
- let mut mixer = mixer.lock();
- let mixed = mixer.mix(output_config.channels() as usize);
- let sampled = resampler.remix_and_resample(
- mixed,
- sample_rate / 100,
- num_channels,
- sample_rate,
- output_config.channels() as u32,
- output_config.sample_rate().0,
- );
- buf = sampled.to_vec();
- apm.lock()
- .process_reverse_stream(
- &mut buf,
- output_config.sample_rate().0 as i32,
- output_config.channels() as i32,
- )
- .ok();
+ let mut mixer = mixer.lock();
+ let mixed = mixer.mix(output_config.channels() as usize);
+ let sampled = resampler.remix_and_resample(
+ mixed,
+ sample_rate / 100,
+ num_channels,
+ sample_rate,
+ output_config.channels() as u32,
+ output_config.sample_rate().0,
+ );
+ buf = sampled.to_vec();
+ apm.lock()
+ .process_reverse_stream(
+ &mut buf,
+ output_config.sample_rate().0 as i32,
+ output_config.channels() as i32,
+ )
+ .ok();
+ }
}
- }
- },
- |error| log::error!("error playing audio track: {:?}", error),
- Some(Duration::from_millis(100)),
- );
+ },
+ |error| log::error!("error playing audio track: {:?}", error),
+ Some(Duration::from_millis(100)),
+ );
- let Some(output_stream) = output_stream.log_err() else {
- return;
- };
+ let Some(output_stream) = output_stream.log_err() else {
+ return;
+ };
- output_stream.play().log_err();
- // Block forever to keep the output stream alive
- end_on_drop_rx.recv().ok();
- });
+ output_stream.play().log_err();
+ // Block forever to keep the output stream alive
+ end_on_drop_rx.recv().ok();
+ })
+ .unwrap();
device_change_listener.next().await;
drop(end_on_drop_tx)
@@ -300,77 +306,81 @@ impl AudioStack {
let frame_tx = frame_tx.clone();
let mut resampler = audio_resampler::AudioResampler::default();
- thread::spawn(move || {
- maybe!({
- if let Some(name) = device.name().ok() {
- log::info!("Using microphone: {}", name)
- } else {
- log::info!("Using microphone: <unknown>");
- }
-
- let ten_ms_buffer_size =
- (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
- let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
-
- let stream = device
- .build_input_stream_raw(
- &config.config(),
- config.sample_format(),
- move |data, _: &_| {
- let data =
- crate::get_sample_data(config.sample_format(), data).log_err();
- let Some(data) = data else {
- return;
- };
- let mut data = data.as_slice();
+ thread::Builder::new()
+ .name("AudioCapture".to_owned())
+ .spawn(move || {
+ maybe!({
+ if let Some(name) = device.name().ok() {
+ log::info!("Using microphone: {}", name)
+ } else {
+ log::info!("Using microphone: <unknown>");
+ }
- while data.len() > 0 {
- let remainder = (buf.capacity() - buf.len()).min(data.len());
- buf.extend_from_slice(&data[..remainder]);
- data = &data[remainder..];
-
- if buf.capacity() == buf.len() {
- let mut sampled = resampler
- .remix_and_resample(
- buf.as_slice(),
- config.sample_rate().0 / 100,
- config.channels() as u32,
- config.sample_rate().0,
- num_channels,
- sample_rate,
- )
- .to_owned();
- apm.lock()
- .process_stream(
- &mut sampled,
- sample_rate as i32,
- num_channels as i32,
- )
- .log_err();
- buf.clear();
- frame_tx
- .unbounded_send(AudioFrame {
- data: Cow::Owned(sampled),
- sample_rate,
- num_channels,
- samples_per_channel: sample_rate / 100,
- })
- .ok();
+ let ten_ms_buffer_size =
+ (config.channels() as u32 * config.sample_rate().0 / 100) as usize;
+ let mut buf: Vec<i16> = Vec::with_capacity(ten_ms_buffer_size);
+
+ let stream = device
+ .build_input_stream_raw(
+ &config.config(),
+ config.sample_format(),
+ move |data, _: &_| {
+ let data = crate::get_sample_data(config.sample_format(), data)
+ .log_err();
+ let Some(data) = data else {
+ return;
+ };
+ let mut data = data.as_slice();
+
+ while data.len() > 0 {
+ let remainder =
+ (buf.capacity() - buf.len()).min(data.len());
+ buf.extend_from_slice(&data[..remainder]);
+ data = &data[remainder..];
+
+ if buf.capacity() == buf.len() {
+ let mut sampled = resampler
+ .remix_and_resample(
+ buf.as_slice(),
+ config.sample_rate().0 / 100,
+ config.channels() as u32,
+ config.sample_rate().0,
+ num_channels,
+ sample_rate,
+ )
+ .to_owned();
+ apm.lock()
+ .process_stream(
+ &mut sampled,
+ sample_rate as i32,
+ num_channels as i32,
+ )
+ .log_err();
+ buf.clear();
+ frame_tx
+ .unbounded_send(AudioFrame {
+ data: Cow::Owned(sampled),
+ sample_rate,
+ num_channels,
+ samples_per_channel: sample_rate / 100,
+ })
+ .ok();
+ }
}
- }
- },
- |err| log::error!("error capturing audio track: {:?}", err),
- Some(Duration::from_millis(100)),
- )
- .context("failed to build input stream")?;
-
- stream.play()?;
- // Keep the thread alive and holding onto the `stream`
- end_on_drop_rx.recv().ok();
- anyhow::Ok(Some(()))
+ },
+ |err| log::error!("error capturing audio track: {:?}", err),
+ Some(Duration::from_millis(100)),
+ )
+ .context("failed to build input stream")?;
+
+ stream.play()?;
+ // Keep the thread alive and holding onto the `stream`
+ end_on_drop_rx.recv().ok();
+ anyhow::Ok(Some(()))
+ })
+ .log_err();
})
- .log_err();
- });
+ .unwrap();
device_change_listener.next().await;
drop(end_on_drop_tx)
@@ -249,11 +249,14 @@ pub fn background_thread_queue() -> WriteQueueConstructor {
Box::new(|| {
let (sender, receiver) = channel::<QueuedWrite>();
- thread::spawn(move || {
- while let Ok(write) = receiver.recv() {
- write()
- }
- });
+ thread::Builder::new()
+ .name("sqlezWorker".to_string())
+ .spawn(move || {
+ while let Ok(write) = receiver.recv() {
+ write()
+ }
+ })
+ .unwrap();
let sender = UnboundedSyncSender::new(sender);
Box::new(move |queued_write| {
@@ -107,18 +107,21 @@ pub fn ensure_only_instance() -> IsOnlyInstance {
}
};
- thread::spawn(move || {
- for stream in listener.incoming() {
- let mut stream = match stream {
- Ok(stream) => stream,
- Err(_) => return,
- };
-
- _ = stream.set_nodelay(true);
- _ = stream.set_read_timeout(Some(SEND_TIMEOUT));
- _ = stream.write_all(instance_handshake().as_bytes());
- }
- });
+ thread::Builder::new()
+ .name("EnsureSingleton".to_string())
+ .spawn(move || {
+ for stream in listener.incoming() {
+ let mut stream = match stream {
+ Ok(stream) => stream,
+ Err(_) => return,
+ };
+
+ _ = stream.set_nodelay(true);
+ _ = stream.set_read_timeout(Some(SEND_TIMEOUT));
+ _ = stream.write_all(instance_handshake().as_bytes());
+ }
+ })
+ .unwrap();
IsOnlyInstance::Yes
}
@@ -42,14 +42,17 @@ pub fn handle_single_instance(opener: OpenListener, args: &Args) -> bool {
let is_first_instance = is_first_instance();
if is_first_instance {
// We are the first instance, listen for messages sent from other instances
- std::thread::spawn(move || {
- with_pipe(|url| {
- opener.open(RawOpenRequest {
- urls: vec![url],
- ..Default::default()
+ std::thread::Builder::new()
+ .name("EnsureSingleton".to_owned())
+ .spawn(move || {
+ with_pipe(|url| {
+ opener.open(RawOpenRequest {
+ urls: vec![url],
+ ..Default::default()
+ })
})
})
- });
+ .unwrap();
} else if !args.foreground {
// We are not the first instance, send args to the first instance
send_args_to_instance(args).log_err();
@@ -161,28 +164,31 @@ fn send_args_to_instance(args: &Args) -> anyhow::Result<()> {
};
let exit_status = Arc::new(Mutex::new(None));
- let sender: JoinHandle<anyhow::Result<()>> = std::thread::spawn({
- let exit_status = exit_status.clone();
- move || {
- let (_, handshake) = server.accept().context("Handshake after Zed spawn")?;
- let (tx, rx) = (handshake.requests, handshake.responses);
-
- tx.send(request)?;
-
- while let Ok(response) = rx.recv() {
- match response {
- CliResponse::Ping => {}
- CliResponse::Stdout { message } => log::info!("{message}"),
- CliResponse::Stderr { message } => log::error!("{message}"),
- CliResponse::Exit { status } => {
- exit_status.lock().replace(status);
- return Ok(());
+ let sender: JoinHandle<anyhow::Result<()>> = std::thread::Builder::new()
+ .name("CliReceiver".to_owned())
+ .spawn({
+ let exit_status = exit_status.clone();
+ move || {
+ let (_, handshake) = server.accept().context("Handshake after Zed spawn")?;
+ let (tx, rx) = (handshake.requests, handshake.responses);
+
+ tx.send(request)?;
+
+ while let Ok(response) = rx.recv() {
+ match response {
+ CliResponse::Ping => {}
+ CliResponse::Stdout { message } => log::info!("{message}"),
+ CliResponse::Stderr { message } => log::error!("{message}"),
+ CliResponse::Exit { status } => {
+ exit_status.lock().replace(status);
+ return Ok(());
+ }
}
}
+ Ok(())
}
- Ok(())
- }
- });
+ })
+ .unwrap();
write_message_to_instance_pipe(url.as_bytes())?;
sender.join().unwrap()?;