1use super::Project;
2use anyhow::Result;
3use client::Client;
4use collections::{HashMap, HashSet};
5use futures::{FutureExt, StreamExt};
6use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Global, Task, WeakEntity};
7use postage::stream::Stream;
8use rpc::proto;
9use std::{sync::Arc, time::Duration};
10use util::ResultExt;
11
12impl Global for GlobalManager {}
13struct GlobalManager(Entity<Manager>);
14
15pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
16
17pub struct Manager {
18 client: Arc<Client>,
19 maintain_connection: Option<Task<Option<()>>>,
20 projects: HashSet<WeakEntity<Project>>,
21}
22
23pub fn init(client: Arc<Client>, cx: &mut App) {
24 let manager = cx.new(|_| Manager {
25 client,
26 maintain_connection: None,
27 projects: HashSet::default(),
28 });
29 cx.set_global(GlobalManager(manager));
30}
31
32impl Manager {
33 pub fn global(cx: &App) -> Entity<Manager> {
34 cx.global::<GlobalManager>().0.clone()
35 }
36
37 pub fn maintain_project_connection(
38 &mut self,
39 project: &Entity<Project>,
40 cx: &mut Context<Self>,
41 ) {
42 let manager = cx.weak_entity();
43 project.update(cx, |_, cx| {
44 let manager = manager.clone();
45 cx.on_release(move |project, cx| {
46 manager
47 .update(cx, |manager, cx| {
48 manager.projects.retain(|p| {
49 if let Some(p) = p.upgrade() {
50 p.read(cx).remote_id() != project.remote_id()
51 } else {
52 false
53 }
54 });
55 if manager.projects.is_empty() {
56 manager.maintain_connection.take();
57 }
58 })
59 .ok();
60 })
61 .detach();
62 });
63
64 self.projects.insert(project.downgrade());
65 if self.maintain_connection.is_none() {
66 self.maintain_connection = Some(cx.spawn({
67 let client = self.client.clone();
68 async move |_, cx| {
69 Self::maintain_connection(manager, client.clone(), cx)
70 .await
71 .log_err()
72 }
73 }));
74 }
75 }
76
77 fn reconnected(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
78 let mut projects = HashMap::default();
79
80 let request = self.client.request_envelope(proto::RejoinRemoteProjects {
81 rejoined_projects: self
82 .projects
83 .iter()
84 .filter_map(|project| {
85 if let Some(handle) = project.upgrade() {
86 let project = handle.read(cx);
87 let project_id = project.remote_id()?;
88 projects.insert(project_id, handle.clone());
89 let mut worktrees = Vec::new();
90 let mut repositories = Vec::new();
91 for (id, repository) in project.repositories(cx) {
92 repositories.push(proto::RejoinRepository {
93 id: id.to_proto(),
94 scan_id: repository.read(cx).scan_id,
95 });
96 }
97 for worktree in project.worktrees(cx) {
98 let worktree = worktree.read(cx);
99 worktrees.push(proto::RejoinWorktree {
100 id: worktree.id().to_proto(),
101 scan_id: worktree.completed_scan_id() as u64,
102 });
103 }
104 Some(proto::RejoinProject {
105 id: project_id,
106 worktrees,
107 repositories,
108 })
109 } else {
110 None
111 }
112 })
113 .collect(),
114 });
115
116 cx.spawn(async move |this, cx| {
117 let response = request.await?;
118 let message_id = response.message_id;
119
120 this.update(cx, |_, cx| {
121 for rejoined_project in response.payload.rejoined_projects {
122 if let Some(project) = projects.get(&rejoined_project.id) {
123 project.update(cx, |project, cx| {
124 project.rejoined(rejoined_project, message_id, cx).log_err();
125 });
126 }
127 }
128 })
129 })
130 }
131
132 fn connection_lost(&mut self, cx: &mut Context<Self>) {
133 for project in self.projects.drain() {
134 if let Some(project) = project.upgrade() {
135 project.update(cx, |project, cx| {
136 project.disconnected_from_host(cx);
137 project.close(cx);
138 });
139 }
140 }
141 self.maintain_connection.take();
142 }
143
144 async fn maintain_connection(
145 this: WeakEntity<Self>,
146 client: Arc<Client>,
147 cx: &mut AsyncApp,
148 ) -> Result<()> {
149 let mut client_status = client.status();
150 loop {
151 let _ = client_status.try_recv();
152
153 let is_connected = client_status.borrow().is_connected();
154 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
155 if !is_connected || client_status.next().await.is_some() {
156 log::info!("detected client disconnection");
157
158 // Wait for client to re-establish a connection to the server.
159 {
160 let mut reconnection_timeout =
161 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
162 let client_reconnection = async {
163 let mut remaining_attempts = 3;
164 while remaining_attempts > 0 {
165 if client_status.borrow().is_connected() {
166 log::info!("client reconnected, attempting to rejoin projects");
167
168 let Some(this) = this.upgrade() else { break };
169 match this.update(cx, |this, cx| this.reconnected(cx)) {
170 Ok(task) => {
171 if task.await.log_err().is_some() {
172 return true;
173 } else {
174 remaining_attempts -= 1;
175 }
176 }
177 Err(_app_dropped) => return false,
178 }
179 } else if client_status.borrow().is_signed_out() {
180 return false;
181 }
182
183 log::info!(
184 "waiting for client status change, remaining attempts {}",
185 remaining_attempts
186 );
187 client_status.next().await;
188 }
189 false
190 }
191 .fuse();
192 futures::pin_mut!(client_reconnection);
193
194 futures::select_biased! {
195 reconnected = client_reconnection => {
196 if reconnected {
197 log::info!("successfully reconnected");
198 // If we successfully joined the room, go back around the loop
199 // waiting for future connection status changes.
200 continue;
201 }
202 }
203 _ = reconnection_timeout => {
204 log::info!("rejoin project reconnection timeout expired");
205 }
206 }
207 }
208
209 break;
210 }
211 }
212
213 // The client failed to re-establish a connection to the server
214 // or an error occurred while trying to re-join the room. Either way
215 // we leave the room and return an error.
216 if let Some(this) = this.upgrade() {
217 log::info!("reconnection failed, disconnecting projects");
218 this.update(cx, |this, cx| this.connection_lost(cx))?;
219 }
220
221 Ok(())
222 }
223}