diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index d4b4a350f61b5bd1249b33ff3925dd281e9d529c..d0d30f72d7aea7d7f6cf0355caf12b1f2a36eedb 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -339,59 +339,70 @@ fn main() -> Result<()> { "Dev servers were removed in v0.157.x please upgrade to SSH remoting: https://zed.dev/docs/remote-development" ); - let sender: JoinHandle> = thread::spawn({ - let exit_status = exit_status.clone(); - let user_data_dir_for_thread = user_data_dir.clone(); - move || { - let (_, handshake) = server.accept().context("Handshake after Zed spawn")?; - let (tx, rx) = (handshake.requests, handshake.responses); - - #[cfg(target_os = "windows")] - let wsl = args.wsl; - #[cfg(not(target_os = "windows"))] - let wsl = None; - - tx.send(CliRequest::Open { - paths, - urls, - diff_paths, - wsl, - wait: args.wait, - open_new_workspace, - env, - user_data_dir: user_data_dir_for_thread, - })?; - - while let Ok(response) = rx.recv() { - match response { - CliResponse::Ping => {} - CliResponse::Stdout { message } => println!("{message}"), - CliResponse::Stderr { message } => eprintln!("{message}"), - CliResponse::Exit { status } => { - exit_status.lock().replace(status); - return Ok(()); + let sender: JoinHandle> = thread::Builder::new() + .name("CliReceiver".to_string()) + .spawn({ + let exit_status = exit_status.clone(); + let user_data_dir_for_thread = user_data_dir.clone(); + move || { + let (_, handshake) = server.accept().context("Handshake after Zed spawn")?; + let (tx, rx) = (handshake.requests, handshake.responses); + + #[cfg(target_os = "windows")] + let wsl = args.wsl; + #[cfg(not(target_os = "windows"))] + let wsl = None; + + tx.send(CliRequest::Open { + paths, + urls, + diff_paths, + wsl, + wait: args.wait, + open_new_workspace, + env, + user_data_dir: user_data_dir_for_thread, + })?; + + while let Ok(response) = rx.recv() { + match response { + CliResponse::Ping => {} + CliResponse::Stdout { message } => println!("{message}"), + CliResponse::Stderr { message } => eprintln!("{message}"), + CliResponse::Exit { status } => { + exit_status.lock().replace(status); + return Ok(()); + } } } - } - Ok(()) - } - }); + Ok(()) + } + }) + .unwrap(); let stdin_pipe_handle: Option>> = stdin_tmp_file.map(|mut tmp_file| { - thread::spawn(move || { - let mut stdin = std::io::stdin().lock(); - if !io::IsTerminal::is_terminal(&stdin) { - io::copy(&mut stdin, &mut tmp_file)?; - } - Ok(()) - }) + thread::Builder::new() + .name("CliStdin".to_string()) + .spawn(move || { + let mut stdin = std::io::stdin().lock(); + if !io::IsTerminal::is_terminal(&stdin) { + io::copy(&mut stdin, &mut tmp_file)?; + } + Ok(()) + }) + .unwrap() }); let anonymous_fd_pipe_handles: Vec<_> = anonymous_fd_tmp_files .into_iter() - .map(|(mut file, mut tmp_file)| thread::spawn(move || io::copy(&mut file, &mut tmp_file))) + .map(|(mut file, mut tmp_file)| { + thread::Builder::new() + .name("CliAnonymousFd".to_string()) + .spawn(move || io::copy(&mut file, &mut tmp_file)) + .unwrap() + }) .collect(); if args.foreground { diff --git a/crates/crashes/src/crashes.rs b/crates/crashes/src/crashes.rs index 98db4bfc73f157994f3f7286c0764cfb0778e4a4..8312638e2a811767ee245f53c356eca15ef852f1 100644 --- a/crates/crashes/src/crashes.rs +++ b/crates/crashes/src/crashes.rs @@ -321,16 +321,19 @@ pub fn crash_server(socket: &Path) { let shutdown = Arc::new(AtomicBool::new(false)); let has_connection = Arc::new(AtomicBool::new(false)); - std::thread::spawn({ - let shutdown = shutdown.clone(); - let has_connection = has_connection.clone(); - move || { - std::thread::sleep(CRASH_HANDLER_CONNECT_TIMEOUT); - if !has_connection.load(Ordering::SeqCst) { - shutdown.store(true, Ordering::SeqCst); + thread::Builder::new() + .name("CrashServerTimeout".to_owned()) + .spawn({ + let shutdown = shutdown.clone(); + let has_connection = has_connection.clone(); + move || { + std::thread::sleep(CRASH_HANDLER_CONNECT_TIMEOUT); + if !has_connection.load(Ordering::SeqCst) { + shutdown.store(true, Ordering::SeqCst); + } } - } - }); + }) + .unwrap(); server .run( diff --git a/crates/denoise/src/lib.rs b/crates/denoise/src/lib.rs index 3673b28c1c837ee265199b9be2d9aea4a422decf..1422c81a4b915d571d35585447165c04d3695b73 100644 --- a/crates/denoise/src/lib.rs +++ b/crates/denoise/src/lib.rs @@ -79,9 +79,12 @@ impl Denoiser { let (input_tx, input_rx) = mpsc::channel(); let (denoised_tx, denoised_rx) = mpsc::channel(); - thread::spawn(move || { - run_neural_denoiser(denoised_tx, input_rx); - }); + thread::Builder::new() + .name("NeuralDenoiser".to_owned()) + .spawn(move || { + run_neural_denoiser(denoised_tx, input_rx); + }) + .unwrap(); Ok(Self { inner: source, diff --git a/crates/fs/src/mac_watcher.rs b/crates/fs/src/mac_watcher.rs index 7bd176639f1dccef2da4c4ae8dcb317d0be602cb..698014de9716f6505ccd23cd344a62815d9ba0f7 100644 --- a/crates/fs/src/mac_watcher.rs +++ b/crates/fs/src/mac_watcher.rs @@ -6,6 +6,7 @@ use parking_lot::Mutex; use std::{ path::{Path, PathBuf}, sync::Weak, + thread, time::Duration, }; @@ -48,9 +49,12 @@ impl Watcher for MacWatcher { let (stream, handle) = EventStream::new(&[path], self.latency); let tx = self.events_tx.clone(); - std::thread::spawn(move || { - stream.run(move |events| smol::block_on(tx.send(events)).is_ok()); - }); + thread::Builder::new() + .name("MacWatcher".to_owned()) + .spawn(move || { + stream.run(move |events| smol::block_on(tx.send(events)).is_ok()); + }) + .unwrap(); handles.insert(path.into(), handle); Ok(()) diff --git a/crates/gpui/src/platform/linux/dispatcher.rs b/crates/gpui/src/platform/linux/dispatcher.rs index 3d32dbd2fdece5259f48e52550f6983b6a8c5b1d..2f6cd83756054bdbca2c764b046b0c37f51d3515 100644 --- a/crates/gpui/src/platform/linux/dispatcher.rs +++ b/crates/gpui/src/platform/linux/dispatcher.rs @@ -37,51 +37,57 @@ impl LinuxDispatcher { let mut background_threads = (0..thread_count) .map(|i| { let receiver = background_receiver.clone(); - std::thread::spawn(move || { - for runnable in receiver { - let start = Instant::now(); - - runnable.run(); - - log::trace!( - "background thread {}: ran runnable. took: {:?}", - i, - start.elapsed() - ); - } - }) + std::thread::Builder::new() + .name(format!("Worker-{i}")) + .spawn(move || { + for runnable in receiver { + let start = Instant::now(); + + runnable.run(); + + log::trace!( + "background thread {}: ran runnable. took: {:?}", + i, + start.elapsed() + ); + } + }) + .unwrap() }) .collect::>(); let (timer_sender, timer_channel) = calloop::channel::channel::(); - let timer_thread = std::thread::spawn(|| { - let mut event_loop: EventLoop<()> = - EventLoop::try_new().expect("Failed to initialize timer loop!"); - - let handle = event_loop.handle(); - let timer_handle = event_loop.handle(); - handle - .insert_source(timer_channel, move |e, _, _| { - if let channel::Event::Msg(timer) = e { - // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once. - let mut runnable = Some(timer.runnable); - timer_handle - .insert_source( - calloop::timer::Timer::from_duration(timer.duration), - move |_, _, _| { - if let Some(runnable) = runnable.take() { - runnable.run(); - } - TimeoutAction::Drop - }, - ) - .expect("Failed to start timer"); - } - }) - .expect("Failed to start timer thread"); - - event_loop.run(None, &mut (), |_| {}).log_err(); - }); + let timer_thread = std::thread::Builder::new() + .name("Timer".to_owned()) + .spawn(|| { + let mut event_loop: EventLoop<()> = + EventLoop::try_new().expect("Failed to initialize timer loop!"); + + let handle = event_loop.handle(); + let timer_handle = event_loop.handle(); + handle + .insert_source(timer_channel, move |e, _, _| { + if let channel::Event::Msg(timer) = e { + // This has to be in an option to satisfy the borrow checker. The callback below should only be scheduled once. + let mut runnable = Some(timer.runnable); + timer_handle + .insert_source( + calloop::timer::Timer::from_duration(timer.duration), + move |_, _, _| { + if let Some(runnable) = runnable.take() { + runnable.run(); + } + TimeoutAction::Drop + }, + ) + .expect("Failed to start timer"); + } + }) + .expect("Failed to start timer thread"); + + event_loop.run(None, &mut (), |_| {}).log_err(); + }) + .unwrap(); background_threads.push(timer_thread); diff --git a/crates/gpui/src/platform/linux/x11/clipboard.rs b/crates/gpui/src/platform/linux/x11/clipboard.rs index a6f96d38c4254da5a2f92261700126962c16e91c..65ad16e82bf103c4ef08e79c692196d3fae58777 100644 --- a/crates/gpui/src/platform/linux/x11/clipboard.rs +++ b/crates/gpui/src/platform/linux/x11/clipboard.rs @@ -957,15 +957,17 @@ impl Clipboard { } // At this point we know that the clipboard does not exist. let ctx = Arc::new(Inner::new()?); - let join_handle; - { - let ctx = Arc::clone(&ctx); - join_handle = std::thread::spawn(move || { - if let Err(error) = serve_requests(ctx) { - log::error!("Worker thread errored with: {}", error); + let join_handle = std::thread::Builder::new() + .name("Clipboard".to_owned()) + .spawn({ + let ctx = Arc::clone(&ctx); + move || { + if let Err(error) = serve_requests(ctx) { + log::error!("Worker thread errored with: {}", error); + } } - }); - } + }) + .unwrap(); *global_cb = Some(GlobalClipboard { inner: Arc::clone(&ctx), server_handle: join_handle, diff --git a/crates/gpui/src/platform/windows/platform.rs b/crates/gpui/src/platform/windows/platform.rs index b1e2c123e881cecadcdcb6582056c0e5fba64c14..2eb1862f36a26592e18dc2e44875e08319361cc8 100644 --- a/crates/gpui/src/platform/windows/platform.rs +++ b/crates/gpui/src/platform/windows/platform.rs @@ -243,29 +243,32 @@ impl WindowsPlatform { let validation_number = self.inner.validation_number; let all_windows = Arc::downgrade(&self.raw_window_handles); let text_system = Arc::downgrade(&self.text_system); - std::thread::spawn(move || { - let vsync_provider = VSyncProvider::new(); - loop { - vsync_provider.wait_for_vsync(); - if check_device_lost(&directx_device.device) { - handle_gpu_device_lost( - &mut directx_device, - platform_window.as_raw(), - validation_number, - &all_windows, - &text_system, - ); - } - let Some(all_windows) = all_windows.upgrade() else { - break; - }; - for hwnd in all_windows.read().iter() { - unsafe { - let _ = RedrawWindow(Some(hwnd.as_raw()), None, None, RDW_INVALIDATE); + std::thread::Builder::new() + .name("VSyncProvider".to_owned()) + .spawn(move || { + let vsync_provider = VSyncProvider::new(); + loop { + vsync_provider.wait_for_vsync(); + if check_device_lost(&directx_device.device) { + handle_gpu_device_lost( + &mut directx_device, + platform_window.as_raw(), + validation_number, + &all_windows, + &text_system, + ); + } + let Some(all_windows) = all_windows.upgrade() else { + break; + }; + for hwnd in all_windows.read().iter() { + unsafe { + let _ = RedrawWindow(Some(hwnd.as_raw()), None, None, RDW_INVALIDATE); + } } } - } - }); + }) + .unwrap(); } } diff --git a/crates/livekit_client/src/livekit_client/playback.rs b/crates/livekit_client/src/livekit_client/playback.rs index 7c866113103a883e7e7a2d9d3f5651d833d7e637..df8b5ea54fb1ce11bf871faa912757bbff1fd7f9 100644 --- a/crates/livekit_client/src/livekit_client/playback.rs +++ b/crates/livekit_client/src/livekit_client/playback.rs @@ -188,12 +188,15 @@ impl AudioStack { let voip_parts = audio::VoipParts::new(cx)?; // Audio needs to run real-time and should never be paused. That is why we are using a // normal std::thread and not a background task - thread::spawn(move || { - // microphone is non send on mac - let microphone = audio::Audio::open_microphone(voip_parts)?; - send_to_livekit(frame_tx, microphone); - Ok::<(), anyhow::Error>(()) - }); + thread::Builder::new() + .name("AudioCapture".to_string()) + .spawn(move || { + // microphone is non send on mac + let microphone = audio::Audio::open_microphone(voip_parts)?; + send_to_livekit(frame_tx, microphone); + Ok::<(), anyhow::Error>(()) + }) + .unwrap(); Task::ready(Ok(())) } else { self.executor.spawn(async move { @@ -229,57 +232,60 @@ impl AudioStack { let mut resampler = audio_resampler::AudioResampler::default(); let mut buf = Vec::new(); - thread::spawn(move || { - let output_stream = output_device.build_output_stream( - &output_config.config(), - { - move |mut data, _info| { - while data.len() > 0 { - if data.len() <= buf.len() { - let rest = buf.split_off(data.len()); - data.copy_from_slice(&buf); - buf = rest; - return; - } - if buf.len() > 0 { - let (prefix, suffix) = data.split_at_mut(buf.len()); - prefix.copy_from_slice(&buf); - data = suffix; - } + thread::Builder::new() + .name("AudioPlayback".to_owned()) + .spawn(move || { + let output_stream = output_device.build_output_stream( + &output_config.config(), + { + move |mut data, _info| { + while data.len() > 0 { + if data.len() <= buf.len() { + let rest = buf.split_off(data.len()); + data.copy_from_slice(&buf); + buf = rest; + return; + } + if buf.len() > 0 { + let (prefix, suffix) = data.split_at_mut(buf.len()); + prefix.copy_from_slice(&buf); + data = suffix; + } - let mut mixer = mixer.lock(); - let mixed = mixer.mix(output_config.channels() as usize); - let sampled = resampler.remix_and_resample( - mixed, - sample_rate / 100, - num_channels, - sample_rate, - output_config.channels() as u32, - output_config.sample_rate().0, - ); - buf = sampled.to_vec(); - apm.lock() - .process_reverse_stream( - &mut buf, - output_config.sample_rate().0 as i32, - output_config.channels() as i32, - ) - .ok(); + let mut mixer = mixer.lock(); + let mixed = mixer.mix(output_config.channels() as usize); + let sampled = resampler.remix_and_resample( + mixed, + sample_rate / 100, + num_channels, + sample_rate, + output_config.channels() as u32, + output_config.sample_rate().0, + ); + buf = sampled.to_vec(); + apm.lock() + .process_reverse_stream( + &mut buf, + output_config.sample_rate().0 as i32, + output_config.channels() as i32, + ) + .ok(); + } } - } - }, - |error| log::error!("error playing audio track: {:?}", error), - Some(Duration::from_millis(100)), - ); + }, + |error| log::error!("error playing audio track: {:?}", error), + Some(Duration::from_millis(100)), + ); - let Some(output_stream) = output_stream.log_err() else { - return; - }; + let Some(output_stream) = output_stream.log_err() else { + return; + }; - output_stream.play().log_err(); - // Block forever to keep the output stream alive - end_on_drop_rx.recv().ok(); - }); + output_stream.play().log_err(); + // Block forever to keep the output stream alive + end_on_drop_rx.recv().ok(); + }) + .unwrap(); device_change_listener.next().await; drop(end_on_drop_tx) @@ -300,77 +306,81 @@ impl AudioStack { let frame_tx = frame_tx.clone(); let mut resampler = audio_resampler::AudioResampler::default(); - thread::spawn(move || { - maybe!({ - if let Some(name) = device.name().ok() { - log::info!("Using microphone: {}", name) - } else { - log::info!("Using microphone: "); - } - - let ten_ms_buffer_size = - (config.channels() as u32 * config.sample_rate().0 / 100) as usize; - let mut buf: Vec = Vec::with_capacity(ten_ms_buffer_size); - - let stream = device - .build_input_stream_raw( - &config.config(), - config.sample_format(), - move |data, _: &_| { - let data = - crate::get_sample_data(config.sample_format(), data).log_err(); - let Some(data) = data else { - return; - }; - let mut data = data.as_slice(); + thread::Builder::new() + .name("AudioCapture".to_owned()) + .spawn(move || { + maybe!({ + if let Some(name) = device.name().ok() { + log::info!("Using microphone: {}", name) + } else { + log::info!("Using microphone: "); + } - while data.len() > 0 { - let remainder = (buf.capacity() - buf.len()).min(data.len()); - buf.extend_from_slice(&data[..remainder]); - data = &data[remainder..]; - - if buf.capacity() == buf.len() { - let mut sampled = resampler - .remix_and_resample( - buf.as_slice(), - config.sample_rate().0 / 100, - config.channels() as u32, - config.sample_rate().0, - num_channels, - sample_rate, - ) - .to_owned(); - apm.lock() - .process_stream( - &mut sampled, - sample_rate as i32, - num_channels as i32, - ) - .log_err(); - buf.clear(); - frame_tx - .unbounded_send(AudioFrame { - data: Cow::Owned(sampled), - sample_rate, - num_channels, - samples_per_channel: sample_rate / 100, - }) - .ok(); + let ten_ms_buffer_size = + (config.channels() as u32 * config.sample_rate().0 / 100) as usize; + let mut buf: Vec = Vec::with_capacity(ten_ms_buffer_size); + + let stream = device + .build_input_stream_raw( + &config.config(), + config.sample_format(), + move |data, _: &_| { + let data = crate::get_sample_data(config.sample_format(), data) + .log_err(); + let Some(data) = data else { + return; + }; + let mut data = data.as_slice(); + + while data.len() > 0 { + let remainder = + (buf.capacity() - buf.len()).min(data.len()); + buf.extend_from_slice(&data[..remainder]); + data = &data[remainder..]; + + if buf.capacity() == buf.len() { + let mut sampled = resampler + .remix_and_resample( + buf.as_slice(), + config.sample_rate().0 / 100, + config.channels() as u32, + config.sample_rate().0, + num_channels, + sample_rate, + ) + .to_owned(); + apm.lock() + .process_stream( + &mut sampled, + sample_rate as i32, + num_channels as i32, + ) + .log_err(); + buf.clear(); + frame_tx + .unbounded_send(AudioFrame { + data: Cow::Owned(sampled), + sample_rate, + num_channels, + samples_per_channel: sample_rate / 100, + }) + .ok(); + } } - } - }, - |err| log::error!("error capturing audio track: {:?}", err), - Some(Duration::from_millis(100)), - ) - .context("failed to build input stream")?; - - stream.play()?; - // Keep the thread alive and holding onto the `stream` - end_on_drop_rx.recv().ok(); - anyhow::Ok(Some(())) + }, + |err| log::error!("error capturing audio track: {:?}", err), + Some(Duration::from_millis(100)), + ) + .context("failed to build input stream")?; + + stream.play()?; + // Keep the thread alive and holding onto the `stream` + end_on_drop_rx.recv().ok(); + anyhow::Ok(Some(())) + }) + .log_err(); }) - .log_err(); - }); + .unwrap(); device_change_listener.next().await; drop(end_on_drop_tx) diff --git a/crates/sqlez/src/thread_safe_connection.rs b/crates/sqlez/src/thread_safe_connection.rs index 482905ac817bf94fcb64cb858b784c94283b686c..966f14a9c2f244780da7190aebac88e95c7ac068 100644 --- a/crates/sqlez/src/thread_safe_connection.rs +++ b/crates/sqlez/src/thread_safe_connection.rs @@ -249,11 +249,14 @@ pub fn background_thread_queue() -> WriteQueueConstructor { Box::new(|| { let (sender, receiver) = channel::(); - thread::spawn(move || { - while let Ok(write) = receiver.recv() { - write() - } - }); + thread::Builder::new() + .name("sqlezWorker".to_string()) + .spawn(move || { + while let Ok(write) = receiver.recv() { + write() + } + }) + .unwrap(); let sender = UnboundedSyncSender::new(sender); Box::new(move |queued_write| { diff --git a/crates/zed/src/zed/mac_only_instance.rs b/crates/zed/src/zed/mac_only_instance.rs index cb9641e9dfe55660e301faa46d47e1a4b8511466..b7898fae176d3a68f0664a6ed4dddc0a59b87cec 100644 --- a/crates/zed/src/zed/mac_only_instance.rs +++ b/crates/zed/src/zed/mac_only_instance.rs @@ -107,18 +107,21 @@ pub fn ensure_only_instance() -> IsOnlyInstance { } }; - thread::spawn(move || { - for stream in listener.incoming() { - let mut stream = match stream { - Ok(stream) => stream, - Err(_) => return, - }; - - _ = stream.set_nodelay(true); - _ = stream.set_read_timeout(Some(SEND_TIMEOUT)); - _ = stream.write_all(instance_handshake().as_bytes()); - } - }); + thread::Builder::new() + .name("EnsureSingleton".to_string()) + .spawn(move || { + for stream in listener.incoming() { + let mut stream = match stream { + Ok(stream) => stream, + Err(_) => return, + }; + + _ = stream.set_nodelay(true); + _ = stream.set_read_timeout(Some(SEND_TIMEOUT)); + _ = stream.write_all(instance_handshake().as_bytes()); + } + }) + .unwrap(); IsOnlyInstance::Yes } diff --git a/crates/zed/src/zed/windows_only_instance.rs b/crates/zed/src/zed/windows_only_instance.rs index 1dd51b5ffbd7c11cce0346142834581c022f512d..d377f06ede778b47dbac3257069d2b1c647935ae 100644 --- a/crates/zed/src/zed/windows_only_instance.rs +++ b/crates/zed/src/zed/windows_only_instance.rs @@ -42,14 +42,17 @@ pub fn handle_single_instance(opener: OpenListener, args: &Args) -> bool { let is_first_instance = is_first_instance(); if is_first_instance { // We are the first instance, listen for messages sent from other instances - std::thread::spawn(move || { - with_pipe(|url| { - opener.open(RawOpenRequest { - urls: vec![url], - ..Default::default() + std::thread::Builder::new() + .name("EnsureSingleton".to_owned()) + .spawn(move || { + with_pipe(|url| { + opener.open(RawOpenRequest { + urls: vec![url], + ..Default::default() + }) }) }) - }); + .unwrap(); } else if !args.foreground { // We are not the first instance, send args to the first instance send_args_to_instance(args).log_err(); @@ -161,28 +164,31 @@ fn send_args_to_instance(args: &Args) -> anyhow::Result<()> { }; let exit_status = Arc::new(Mutex::new(None)); - let sender: JoinHandle> = std::thread::spawn({ - let exit_status = exit_status.clone(); - move || { - let (_, handshake) = server.accept().context("Handshake after Zed spawn")?; - let (tx, rx) = (handshake.requests, handshake.responses); - - tx.send(request)?; - - while let Ok(response) = rx.recv() { - match response { - CliResponse::Ping => {} - CliResponse::Stdout { message } => log::info!("{message}"), - CliResponse::Stderr { message } => log::error!("{message}"), - CliResponse::Exit { status } => { - exit_status.lock().replace(status); - return Ok(()); + let sender: JoinHandle> = std::thread::Builder::new() + .name("CliReceiver".to_owned()) + .spawn({ + let exit_status = exit_status.clone(); + move || { + let (_, handshake) = server.accept().context("Handshake after Zed spawn")?; + let (tx, rx) = (handshake.requests, handshake.responses); + + tx.send(request)?; + + while let Ok(response) = rx.recv() { + match response { + CliResponse::Ping => {} + CliResponse::Stdout { message } => log::info!("{message}"), + CliResponse::Stderr { message } => log::error!("{message}"), + CliResponse::Exit { status } => { + exit_status.lock().replace(status); + return Ok(()); + } } } + Ok(()) } - Ok(()) - } - }); + }) + .unwrap(); write_message_to_instance_pipe(url.as_bytes())?; sender.join().unwrap()?;