1mod assets;
2pub mod languages;
3mod only_instance;
4mod open_listener;
5
6pub use assets::*;
7use client2::{Client, UserStore};
8use collections::HashMap;
9use gpui2::{AsyncAppContext, Model};
10pub use only_instance::*;
11pub use open_listener::*;
12
13use anyhow::{Context, Result};
14use cli::{
15 ipc::{self, IpcSender},
16 CliRequest, CliResponse, IpcHandshake,
17};
18use futures::{
19 channel::{mpsc, oneshot},
20 FutureExt, SinkExt, StreamExt,
21};
22use std::{path::Path, sync::Arc, thread, time::Duration};
23use util::{paths::PathLikeWithPosition, ResultExt};
24
25pub fn connect_to_cli(
26 server_name: &str,
27) -> Result<(mpsc::Receiver<CliRequest>, IpcSender<CliResponse>)> {
28 let handshake_tx = cli::ipc::IpcSender::<IpcHandshake>::connect(server_name.to_string())
29 .context("error connecting to cli")?;
30 let (request_tx, request_rx) = ipc::channel::<CliRequest>()?;
31 let (response_tx, response_rx) = ipc::channel::<CliResponse>()?;
32
33 handshake_tx
34 .send(IpcHandshake {
35 requests: request_tx,
36 responses: response_rx,
37 })
38 .context("error sending ipc handshake")?;
39
40 let (mut async_request_tx, async_request_rx) =
41 futures::channel::mpsc::channel::<CliRequest>(16);
42 thread::spawn(move || {
43 while let Ok(cli_request) = request_rx.recv() {
44 if smol::block_on(async_request_tx.send(cli_request)).is_err() {
45 break;
46 }
47 }
48 Ok::<_, anyhow::Error>(())
49 });
50
51 Ok((async_request_rx, response_tx))
52}
53
54pub struct AppState {
55 pub client: Arc<Client>,
56 pub user_store: Model<UserStore>,
57}
58
59pub async fn handle_cli_connection(
60 (mut requests, responses): (mpsc::Receiver<CliRequest>, IpcSender<CliResponse>),
61 app_state: Arc<AppState>,
62 mut cx: AsyncAppContext,
63) {
64 if let Some(request) = requests.next().await {
65 match request {
66 CliRequest::Open { paths, wait } => {
67 let mut caret_positions = HashMap::default();
68
69 let paths = if paths.is_empty() {
70 todo!()
71 // workspace::last_opened_workspace_paths()
72 // .await
73 // .map(|location| location.paths().to_vec())
74 // .unwrap_or_default()
75 } else {
76 paths
77 .into_iter()
78 .filter_map(|path_with_position_string| {
79 let path_with_position = PathLikeWithPosition::parse_str(
80 &path_with_position_string,
81 |path_str| {
82 Ok::<_, std::convert::Infallible>(
83 Path::new(path_str).to_path_buf(),
84 )
85 },
86 )
87 .expect("Infallible");
88 let path = path_with_position.path_like;
89 if let Some(row) = path_with_position.row {
90 if path.is_file() {
91 let row = row.saturating_sub(1);
92 let col =
93 path_with_position.column.unwrap_or(0).saturating_sub(1);
94 caret_positions.insert(path.clone(), Point::new(row, col));
95 }
96 }
97 Some(path)
98 })
99 .collect()
100 };
101
102 let mut errored = false;
103
104 match cx
105 .update(|cx| workspace2::open_paths(&paths, &app_state, None, cx))
106 .await
107 {
108 Ok((workspace, items)) => {
109 let mut item_release_futures = Vec::new();
110
111 for (item, path) in items.into_iter().zip(&paths) {
112 match item {
113 Some(Ok(item)) => {
114 if let Some(point) = caret_positions.remove(path) {
115 todo!()
116 // if let Some(active_editor) = item.downcast::<Editor>() {
117 // active_editor
118 // .downgrade()
119 // .update(&mut cx, |editor, cx| {
120 // let snapshot =
121 // editor.snapshot(cx).display_snapshot;
122 // let point = snapshot
123 // .buffer_snapshot
124 // .clip_point(point, Bias::Left);
125 // editor.change_selections(
126 // Some(Autoscroll::center()),
127 // cx,
128 // |s| s.select_ranges([point..point]),
129 // );
130 // })
131 // .log_err();
132 // }
133 }
134
135 let released = oneshot::channel();
136 cx.update(|cx| {
137 item.on_release(
138 cx,
139 Box::new(move |_| {
140 let _ = released.0.send(());
141 }),
142 )
143 .detach();
144 });
145 item_release_futures.push(released.1);
146 }
147 Some(Err(err)) => {
148 responses
149 .send(CliResponse::Stderr {
150 message: format!("error opening {:?}: {}", path, err),
151 })
152 .log_err();
153 errored = true;
154 }
155 None => {}
156 }
157 }
158
159 if wait {
160 let executor = cx.executor();
161 let wait = async move {
162 if paths.is_empty() {
163 let (done_tx, done_rx) = oneshot::channel();
164 if let Some(workspace) = workspace.upgrade(&cx) {
165 let _subscription = cx.update(|cx| {
166 cx.observe_release(&workspace, move |_, _| {
167 let _ = done_tx.send(());
168 })
169 });
170 drop(workspace);
171 let _ = done_rx.await;
172 }
173 } else {
174 let _ =
175 futures::future::try_join_all(item_release_futures).await;
176 };
177 }
178 .fuse();
179 futures::pin_mut!(wait);
180
181 loop {
182 // Repeatedly check if CLI is still open to avoid wasting resources
183 // waiting for files or workspaces to close.
184 let mut timer = executor.timer(Duration::from_secs(1)).fuse();
185 futures::select_biased! {
186 _ = wait => break,
187 _ = timer => {
188 if responses.send(CliResponse::Ping).is_err() {
189 break;
190 }
191 }
192 }
193 }
194 }
195 }
196 Err(error) => {
197 errored = true;
198 responses
199 .send(CliResponse::Stderr {
200 message: format!("error opening {:?}: {}", paths, error),
201 })
202 .log_err();
203 }
204 }
205
206 responses
207 .send(CliResponse::Exit {
208 status: i32::from(errored),
209 })
210 .log_err();
211 }
212 }
213 }
214}