1use super::Project;
  2use anyhow::Result;
  3use client::Client;
  4use collections::{HashMap, HashSet};
  5use futures::{FutureExt, StreamExt};
  6use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Global, Task, WeakEntity};
  7use postage::stream::Stream;
  8use rpc::proto;
  9use std::{sync::Arc, time::Duration};
 10use util::ResultExt;
 11
 12impl Global for GlobalManager {}
 13struct GlobalManager(Entity<Manager>);
 14
 15pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
 16
 17pub struct Manager {
 18    client: Arc<Client>,
 19    maintain_connection: Option<Task<Option<()>>>,
 20    projects: HashSet<WeakEntity<Project>>,
 21}
 22
 23pub fn init(client: Arc<Client>, cx: &mut App) {
 24    let manager = cx.new(|_| Manager {
 25        client,
 26        maintain_connection: None,
 27        projects: HashSet::default(),
 28    });
 29    cx.set_global(GlobalManager(manager));
 30}
 31
 32impl Manager {
 33    pub fn global(cx: &App) -> Entity<Manager> {
 34        cx.global::<GlobalManager>().0.clone()
 35    }
 36
 37    pub fn maintain_project_connection(
 38        &mut self,
 39        project: &Entity<Project>,
 40        cx: &mut Context<Self>,
 41    ) {
 42        let manager = cx.weak_entity();
 43        project.update(cx, |_, cx| {
 44            let manager = manager.clone();
 45            cx.on_release(move |project, cx| {
 46                manager
 47                    .update(cx, |manager, cx| {
 48                        manager.projects.retain(|p| {
 49                            if let Some(p) = p.upgrade() {
 50                                p.read(cx).remote_id() != project.remote_id()
 51                            } else {
 52                                false
 53                            }
 54                        });
 55                        if manager.projects.is_empty() {
 56                            manager.maintain_connection.take();
 57                        }
 58                    })
 59                    .ok();
 60            })
 61            .detach();
 62        });
 63
 64        self.projects.insert(project.downgrade());
 65        if self.maintain_connection.is_none() {
 66            self.maintain_connection = Some(cx.spawn({
 67                let client = self.client.clone();
 68                async move |_, cx| {
 69                    Self::maintain_connection(manager, client.clone(), cx)
 70                        .await
 71                        .log_err()
 72                }
 73            }));
 74        }
 75    }
 76
 77    fn reconnected(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 78        let mut projects = HashMap::default();
 79
 80        let request = self.client.request_envelope(proto::RejoinRemoteProjects {
 81            rejoined_projects: self
 82                .projects
 83                .iter()
 84                .filter_map(|project| {
 85                    if let Some(handle) = project.upgrade() {
 86                        let project = handle.read(cx);
 87                        let project_id = project.remote_id()?;
 88                        projects.insert(project_id, handle.clone());
 89                        let mut worktrees = Vec::new();
 90                        let mut repositories = Vec::new();
 91                        for (id, repository) in project.repositories(cx) {
 92                            repositories.push(proto::RejoinRepository {
 93                                id: id.to_proto(),
 94                                scan_id: repository.read(cx).scan_id,
 95                            });
 96                        }
 97                        for worktree in project.worktrees(cx) {
 98                            let worktree = worktree.read(cx);
 99                            worktrees.push(proto::RejoinWorktree {
100                                id: worktree.id().to_proto(),
101                                scan_id: worktree.completed_scan_id() as u64,
102                            });
103                        }
104                        Some(proto::RejoinProject {
105                            id: project_id,
106                            worktrees,
107                            repositories,
108                        })
109                    } else {
110                        None
111                    }
112                })
113                .collect(),
114        });
115
116        cx.spawn(async move |this, cx| {
117            let response = request.await?;
118            let message_id = response.message_id;
119
120            this.update(cx, |_, cx| {
121                for rejoined_project in response.payload.rejoined_projects {
122                    if let Some(project) = projects.get(&rejoined_project.id) {
123                        project.update(cx, |project, cx| {
124                            project.rejoined(rejoined_project, message_id, cx).log_err();
125                        });
126                    }
127                }
128            })
129        })
130    }
131
132    fn connection_lost(&mut self, cx: &mut Context<Self>) {
133        for project in self.projects.drain() {
134            if let Some(project) = project.upgrade() {
135                project.update(cx, |project, cx| {
136                    project.disconnected_from_host(cx);
137                    project.close(cx);
138                });
139            }
140        }
141        self.maintain_connection.take();
142    }
143
144    async fn maintain_connection(
145        this: WeakEntity<Self>,
146        client: Arc<Client>,
147        cx: &mut AsyncApp,
148    ) -> Result<()> {
149        let mut client_status = client.status();
150        loop {
151            let _ = client_status.try_recv();
152
153            let is_connected = client_status.borrow().is_connected();
154            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
155            if !is_connected || client_status.next().await.is_some() {
156                log::info!("detected client disconnection");
157
158                // Wait for client to re-establish a connection to the server.
159                {
160                    let mut reconnection_timeout =
161                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
162                    let client_reconnection = async {
163                        let mut remaining_attempts = 3;
164                        while remaining_attempts > 0 {
165                            if client_status.borrow().is_connected() {
166                                log::info!("client reconnected, attempting to rejoin projects");
167
168                                let Some(this) = this.upgrade() else { break };
169                                match this.update(cx, |this, cx| this.reconnected(cx)) {
170                                    Ok(task) => {
171                                        if task.await.log_err().is_some() {
172                                            return true;
173                                        } else {
174                                            remaining_attempts -= 1;
175                                        }
176                                    }
177                                    Err(_app_dropped) => return false,
178                                }
179                            } else if client_status.borrow().is_signed_out() {
180                                return false;
181                            }
182
183                            log::info!(
184                                "waiting for client status change, remaining attempts {}",
185                                remaining_attempts
186                            );
187                            client_status.next().await;
188                        }
189                        false
190                    }
191                    .fuse();
192                    futures::pin_mut!(client_reconnection);
193
194                    futures::select_biased! {
195                        reconnected = client_reconnection => {
196                            if reconnected {
197                                log::info!("successfully reconnected");
198                                // If we successfully joined the room, go back around the loop
199                                // waiting for future connection status changes.
200                                continue;
201                            }
202                        }
203                        _ = reconnection_timeout => {
204                            log::info!("rejoin project reconnection timeout expired");
205                        }
206                    }
207                }
208
209                break;
210            }
211        }
212
213        // The client failed to re-establish a connection to the server
214        // or an error occurred while trying to re-join the room. Either way
215        // we leave the room and return an error.
216        if let Some(this) = this.upgrade() {
217            log::info!("reconnection failed, disconnecting projects");
218            this.update(cx, |this, cx| this.connection_lost(cx))?;
219        }
220
221        Ok(())
222    }
223}