1mod assets;
2mod only_instance;
3mod open_listener;
4
5pub use assets::*;
6use client2::{Client, UserStore};
7use collections::HashMap;
8use gpui2::{AsyncAppContext, Handle, Point};
9pub use only_instance::*;
10pub use open_listener::*;
11
12use anyhow::{Context, Result};
13use cli::{
14 ipc::{self, IpcSender},
15 CliRequest, CliResponse, IpcHandshake,
16};
17use futures::{
18 channel::{mpsc, oneshot},
19 FutureExt, SinkExt, StreamExt,
20};
21use std::{path::Path, sync::Arc, thread, time::Duration};
22use util::{paths::PathLikeWithPosition, ResultExt};
23
24pub fn connect_to_cli(
25 server_name: &str,
26) -> Result<(mpsc::Receiver<CliRequest>, IpcSender<CliResponse>)> {
27 let handshake_tx = cli::ipc::IpcSender::<IpcHandshake>::connect(server_name.to_string())
28 .context("error connecting to cli")?;
29 let (request_tx, request_rx) = ipc::channel::<CliRequest>()?;
30 let (response_tx, response_rx) = ipc::channel::<CliResponse>()?;
31
32 handshake_tx
33 .send(IpcHandshake {
34 requests: request_tx,
35 responses: response_rx,
36 })
37 .context("error sending ipc handshake")?;
38
39 let (mut async_request_tx, async_request_rx) =
40 futures::channel::mpsc::channel::<CliRequest>(16);
41 thread::spawn(move || {
42 while let Ok(cli_request) = request_rx.recv() {
43 if smol::block_on(async_request_tx.send(cli_request)).is_err() {
44 break;
45 }
46 }
47 Ok::<_, anyhow::Error>(())
48 });
49
50 Ok((async_request_rx, response_tx))
51}
52
53pub struct AppState {
54 pub client: Arc<Client>,
55 pub user_store: Handle<UserStore>,
56}
57
58pub async fn handle_cli_connection(
59 (mut requests, responses): (mpsc::Receiver<CliRequest>, IpcSender<CliResponse>),
60 app_state: Arc<AppState>,
61 mut cx: AsyncAppContext,
62) {
63 if let Some(request) = requests.next().await {
64 match request {
65 CliRequest::Open { paths, wait } => {
66 let mut caret_positions = HashMap::default();
67
68 let paths = if paths.is_empty() {
69 todo!()
70 // workspace::last_opened_workspace_paths()
71 // .await
72 // .map(|location| location.paths().to_vec())
73 // .unwrap_or_default()
74 } else {
75 paths
76 .into_iter()
77 .filter_map(|path_with_position_string| {
78 let path_with_position = PathLikeWithPosition::parse_str(
79 &path_with_position_string,
80 |path_str| {
81 Ok::<_, std::convert::Infallible>(
82 Path::new(path_str).to_path_buf(),
83 )
84 },
85 )
86 .expect("Infallible");
87 let path = path_with_position.path_like;
88 if let Some(row) = path_with_position.row {
89 if path.is_file() {
90 let row = row.saturating_sub(1);
91 let col =
92 path_with_position.column.unwrap_or(0).saturating_sub(1);
93 caret_positions.insert(path.clone(), Point::new(row, col));
94 }
95 }
96 Some(path)
97 })
98 .collect()
99 };
100
101 let mut errored = false;
102
103 match cx
104 .update(|cx| workspace2::open_paths(&paths, &app_state, None, cx))
105 .await
106 {
107 Ok((workspace, items)) => {
108 let mut item_release_futures = Vec::new();
109
110 for (item, path) in items.into_iter().zip(&paths) {
111 match item {
112 Some(Ok(item)) => {
113 if let Some(point) = caret_positions.remove(path) {
114 todo!()
115 // if let Some(active_editor) = item.downcast::<Editor>() {
116 // active_editor
117 // .downgrade()
118 // .update(&mut cx, |editor, cx| {
119 // let snapshot =
120 // editor.snapshot(cx).display_snapshot;
121 // let point = snapshot
122 // .buffer_snapshot
123 // .clip_point(point, Bias::Left);
124 // editor.change_selections(
125 // Some(Autoscroll::center()),
126 // cx,
127 // |s| s.select_ranges([point..point]),
128 // );
129 // })
130 // .log_err();
131 // }
132 }
133
134 let released = oneshot::channel();
135 cx.update(|cx| {
136 item.on_release(
137 cx,
138 Box::new(move |_| {
139 let _ = released.0.send(());
140 }),
141 )
142 .detach();
143 });
144 item_release_futures.push(released.1);
145 }
146 Some(Err(err)) => {
147 responses
148 .send(CliResponse::Stderr {
149 message: format!("error opening {:?}: {}", path, err),
150 })
151 .log_err();
152 errored = true;
153 }
154 None => {}
155 }
156 }
157
158 if wait {
159 let executor = cx.executor();
160 let wait = async move {
161 if paths.is_empty() {
162 let (done_tx, done_rx) = oneshot::channel();
163 if let Some(workspace) = workspace.upgrade(&cx) {
164 let _subscription = cx.update(|cx| {
165 cx.observe_release(&workspace, move |_, _| {
166 let _ = done_tx.send(());
167 })
168 });
169 drop(workspace);
170 let _ = done_rx.await;
171 }
172 } else {
173 let _ =
174 futures::future::try_join_all(item_release_futures).await;
175 };
176 }
177 .fuse();
178 futures::pin_mut!(wait);
179
180 loop {
181 // Repeatedly check if CLI is still open to avoid wasting resources
182 // waiting for files or workspaces to close.
183 let mut timer = executor.timer(Duration::from_secs(1)).fuse();
184 futures::select_biased! {
185 _ = wait => break,
186 _ = timer => {
187 if responses.send(CliResponse::Ping).is_err() {
188 break;
189 }
190 }
191 }
192 }
193 }
194 }
195 Err(error) => {
196 errored = true;
197 responses
198 .send(CliResponse::Stderr {
199 message: format!("error opening {:?}: {}", paths, error),
200 })
201 .log_err();
202 }
203 }
204
205 responses
206 .send(CliResponse::Exit {
207 status: i32::from(errored),
208 })
209 .log_err();
210 }
211 }
212 }
213}