1use super::Project;
2use anyhow::Result;
3use client::Client;
4use collections::{HashMap, HashSet};
5use futures::{FutureExt, StreamExt};
6use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Global, Task, WeakEntity};
7use postage::stream::Stream;
8use rpc::proto;
9use std::{sync::Arc, time::Duration};
10use util::ResultExt;
11
12impl Global for GlobalManager {}
13struct GlobalManager(Entity<Manager>);
14
15pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
16
17pub struct Manager {
18 client: Arc<Client>,
19 maintain_connection: Option<Task<Option<()>>>,
20 projects: HashSet<WeakEntity<Project>>,
21}
22
23pub fn init(client: Arc<Client>, cx: &mut App) {
24 let manager = cx.new(|_| Manager {
25 client,
26 maintain_connection: None,
27 projects: HashSet::default(),
28 });
29 cx.set_global(GlobalManager(manager));
30}
31
32impl Manager {
33 pub fn global(cx: &App) -> Entity<Manager> {
34 cx.global::<GlobalManager>().0.clone()
35 }
36
37 pub fn maintain_project_connection(
38 &mut self,
39 project: &Entity<Project>,
40 cx: &mut Context<Self>,
41 ) {
42 let manager = cx.weak_entity();
43 project.update(cx, |_, cx| {
44 let manager = manager.clone();
45 cx.on_release(move |project, cx| {
46 manager
47 .update(cx, |manager, cx| {
48 manager.projects.retain(|p| {
49 if let Some(p) = p.upgrade() {
50 p.read(cx).remote_id() != project.remote_id()
51 } else {
52 false
53 }
54 });
55 if manager.projects.is_empty() {
56 manager.maintain_connection.take();
57 }
58 })
59 .ok();
60 })
61 .detach();
62 });
63
64 self.projects.insert(project.downgrade());
65 if self.maintain_connection.is_none() {
66 self.maintain_connection = Some(cx.spawn({
67 let client = self.client.clone();
68 async move |_, cx| {
69 Self::maintain_connection(manager, client.clone(), cx)
70 .await
71 .log_err()
72 }
73 }));
74 }
75 }
76
77 fn reconnected(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
78 let mut projects = HashMap::default();
79
80 let request = self.client.request_envelope(proto::RejoinRemoteProjects {
81 rejoined_projects: self
82 .projects
83 .iter()
84 .filter_map(|project| {
85 if let Some(handle) = project.upgrade() {
86 let project = handle.read(cx);
87 let project_id = project.remote_id()?;
88 projects.insert(project_id, handle.clone());
89 Some(proto::RejoinProject {
90 id: project_id,
91 worktrees: project
92 .worktrees(cx)
93 .map(|worktree| {
94 let worktree = worktree.read(cx);
95 proto::RejoinWorktree {
96 id: worktree.id().to_proto(),
97 scan_id: worktree.completed_scan_id() as u64,
98 }
99 })
100 .collect(),
101 })
102 } else {
103 None
104 }
105 })
106 .collect(),
107 });
108
109 cx.spawn(async move |this, cx| {
110 let response = request.await?;
111 let message_id = response.message_id;
112
113 this.update(cx, |_, cx| {
114 for rejoined_project in response.payload.rejoined_projects {
115 if let Some(project) = projects.get(&rejoined_project.id) {
116 project.update(cx, |project, cx| {
117 project.rejoined(rejoined_project, message_id, cx).log_err();
118 });
119 }
120 }
121 })
122 })
123 }
124
125 fn connection_lost(&mut self, cx: &mut Context<Self>) {
126 for project in self.projects.drain() {
127 if let Some(project) = project.upgrade() {
128 project.update(cx, |project, cx| {
129 project.disconnected_from_host(cx);
130 project.close(cx);
131 });
132 }
133 }
134 self.maintain_connection.take();
135 }
136
137 async fn maintain_connection(
138 this: WeakEntity<Self>,
139 client: Arc<Client>,
140 cx: &mut AsyncApp,
141 ) -> Result<()> {
142 let mut client_status = client.status();
143 loop {
144 let _ = client_status.try_recv();
145
146 let is_connected = client_status.borrow().is_connected();
147 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
148 if !is_connected || client_status.next().await.is_some() {
149 log::info!("detected client disconnection");
150
151 // Wait for client to re-establish a connection to the server.
152 {
153 let mut reconnection_timeout =
154 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
155 let client_reconnection = async {
156 let mut remaining_attempts = 3;
157 while remaining_attempts > 0 {
158 if client_status.borrow().is_connected() {
159 log::info!("client reconnected, attempting to rejoin projects");
160
161 let Some(this) = this.upgrade() else { break };
162 match this.update(cx, |this, cx| this.reconnected(cx)) {
163 Ok(task) => {
164 if task.await.log_err().is_some() {
165 return true;
166 } else {
167 remaining_attempts -= 1;
168 }
169 }
170 Err(_app_dropped) => return false,
171 }
172 } else if client_status.borrow().is_signed_out() {
173 return false;
174 }
175
176 log::info!(
177 "waiting for client status change, remaining attempts {}",
178 remaining_attempts
179 );
180 client_status.next().await;
181 }
182 false
183 }
184 .fuse();
185 futures::pin_mut!(client_reconnection);
186
187 futures::select_biased! {
188 reconnected = client_reconnection => {
189 if reconnected {
190 log::info!("successfully reconnected");
191 // If we successfully joined the room, go back around the loop
192 // waiting for future connection status changes.
193 continue;
194 }
195 }
196 _ = reconnection_timeout => {
197 log::info!("rejoin project reconnection timeout expired");
198 }
199 }
200 }
201
202 break;
203 }
204 }
205
206 // The client failed to re-establish a connection to the server
207 // or an error occurred while trying to re-join the room. Either way
208 // we leave the room and return an error.
209 if let Some(this) = this.upgrade() {
210 log::info!("reconnection failed, disconnecting projects");
211 this.update(cx, |this, cx| this.connection_lost(cx))?;
212 }
213
214 Ok(())
215 }
216}