1use super::Project;
2use anyhow::Result;
3use client::Client;
4use collections::{HashMap, HashSet};
5use futures::{FutureExt, StreamExt};
6use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Global, Task, WeakEntity};
7use postage::stream::Stream;
8use rpc::proto;
9use std::{sync::Arc, time::Duration};
10use util::{ResultExt, TryFutureExt};
11
12impl Global for GlobalManager {}
13struct GlobalManager(Entity<Manager>);
14
15pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
16
17pub struct Manager {
18 client: Arc<Client>,
19 maintain_connection: Option<Task<Option<()>>>,
20 projects: HashSet<WeakEntity<Project>>,
21}
22
23pub fn init(client: Arc<Client>, cx: &mut App) {
24 let manager = cx.new(|_| Manager {
25 client,
26 maintain_connection: None,
27 projects: HashSet::default(),
28 });
29 cx.set_global(GlobalManager(manager));
30}
31
32impl Manager {
33 pub fn global(cx: &App) -> Entity<Manager> {
34 cx.global::<GlobalManager>().0.clone()
35 }
36
37 pub fn maintain_project_connection(
38 &mut self,
39 project: &Entity<Project>,
40 cx: &mut Context<Self>,
41 ) {
42 let manager = cx.weak_entity();
43 project.update(cx, |_, cx| {
44 let manager = manager.clone();
45 cx.on_release(move |project, cx| {
46 manager
47 .update(cx, |manager, cx| {
48 manager.projects.retain(|p| {
49 if let Some(p) = p.upgrade() {
50 p.read(cx).remote_id() != project.remote_id()
51 } else {
52 false
53 }
54 });
55 if manager.projects.is_empty() {
56 manager.maintain_connection.take();
57 }
58 })
59 .ok();
60 })
61 .detach();
62 });
63
64 self.projects.insert(project.downgrade());
65 if self.maintain_connection.is_none() {
66 self.maintain_connection = Some(cx.spawn({
67 let client = self.client.clone();
68 move |_, cx| Self::maintain_connection(manager, client.clone(), cx).log_err()
69 }));
70 }
71 }
72
73 fn reconnected(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
74 let mut projects = HashMap::default();
75
76 let request = self.client.request_envelope(proto::RejoinRemoteProjects {
77 rejoined_projects: self
78 .projects
79 .iter()
80 .filter_map(|project| {
81 if let Some(handle) = project.upgrade() {
82 let project = handle.read(cx);
83 let project_id = project.remote_id()?;
84 projects.insert(project_id, handle.clone());
85 Some(proto::RejoinProject {
86 id: project_id,
87 worktrees: project
88 .worktrees(cx)
89 .map(|worktree| {
90 let worktree = worktree.read(cx);
91 proto::RejoinWorktree {
92 id: worktree.id().to_proto(),
93 scan_id: worktree.completed_scan_id() as u64,
94 }
95 })
96 .collect(),
97 })
98 } else {
99 None
100 }
101 })
102 .collect(),
103 });
104
105 cx.spawn(|this, mut cx| async move {
106 let response = request.await?;
107 let message_id = response.message_id;
108
109 this.update(&mut cx, |_, cx| {
110 for rejoined_project in response.payload.rejoined_projects {
111 if let Some(project) = projects.get(&rejoined_project.id) {
112 project.update(cx, |project, cx| {
113 project.rejoined(rejoined_project, message_id, cx).log_err();
114 });
115 }
116 }
117 })
118 })
119 }
120
121 fn connection_lost(&mut self, cx: &mut Context<Self>) {
122 for project in self.projects.drain() {
123 if let Some(project) = project.upgrade() {
124 project.update(cx, |project, cx| {
125 project.disconnected_from_host(cx);
126 project.close(cx);
127 });
128 }
129 }
130 self.maintain_connection.take();
131 }
132
133 async fn maintain_connection(
134 this: WeakEntity<Self>,
135 client: Arc<Client>,
136 mut cx: AsyncApp,
137 ) -> Result<()> {
138 let mut client_status = client.status();
139 loop {
140 let _ = client_status.try_recv();
141
142 let is_connected = client_status.borrow().is_connected();
143 // Even if we're initially connected, any future change of the status means we momentarily disconnected.
144 if !is_connected || client_status.next().await.is_some() {
145 log::info!("detected client disconnection");
146
147 // Wait for client to re-establish a connection to the server.
148 {
149 let mut reconnection_timeout =
150 cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
151 let client_reconnection = async {
152 let mut remaining_attempts = 3;
153 while remaining_attempts > 0 {
154 if client_status.borrow().is_connected() {
155 log::info!("client reconnected, attempting to rejoin projects");
156
157 let Some(this) = this.upgrade() else { break };
158 match this.update(&mut cx, |this, cx| this.reconnected(cx)) {
159 Ok(task) => {
160 if task.await.log_err().is_some() {
161 return true;
162 } else {
163 remaining_attempts -= 1;
164 }
165 }
166 Err(_app_dropped) => return false,
167 }
168 } else if client_status.borrow().is_signed_out() {
169 return false;
170 }
171
172 log::info!(
173 "waiting for client status change, remaining attempts {}",
174 remaining_attempts
175 );
176 client_status.next().await;
177 }
178 false
179 }
180 .fuse();
181 futures::pin_mut!(client_reconnection);
182
183 futures::select_biased! {
184 reconnected = client_reconnection => {
185 if reconnected {
186 log::info!("successfully reconnected");
187 // If we successfully joined the room, go back around the loop
188 // waiting for future connection status changes.
189 continue;
190 }
191 }
192 _ = reconnection_timeout => {
193 log::info!("rejoin project reconnection timeout expired");
194 }
195 }
196 }
197
198 break;
199 }
200 }
201
202 // The client failed to re-establish a connection to the server
203 // or an error occurred while trying to re-join the room. Either way
204 // we leave the room and return an error.
205 if let Some(this) = this.upgrade() {
206 log::info!("reconnection failed, disconnecting projects");
207 this.update(&mut cx, |this, cx| this.connection_lost(cx))?;
208 }
209
210 Ok(())
211 }
212}