connection_manager.rs

  1use super::Project;
  2use anyhow::Result;
  3use client::Client;
  4use collections::{HashMap, HashSet};
  5use futures::{FutureExt, StreamExt};
  6use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Global, Task, WeakEntity};
  7use postage::stream::Stream;
  8use rpc::proto;
  9use std::{sync::Arc, time::Duration};
 10use util::ResultExt;
 11
 12impl Global for GlobalManager {}
 13struct GlobalManager(Entity<Manager>);
 14
 15pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
 16
 17pub struct Manager {
 18    client: Arc<Client>,
 19    maintain_connection: Option<Task<Option<()>>>,
 20    projects: HashSet<WeakEntity<Project>>,
 21}
 22
 23pub fn init(client: Arc<Client>, cx: &mut App) {
 24    let manager = cx.new(|_| Manager {
 25        client,
 26        maintain_connection: None,
 27        projects: HashSet::default(),
 28    });
 29    cx.set_global(GlobalManager(manager));
 30}
 31
 32impl Manager {
 33    pub fn global(cx: &App) -> Entity<Manager> {
 34        cx.global::<GlobalManager>().0.clone()
 35    }
 36
 37    pub fn maintain_project_connection(
 38        &mut self,
 39        project: &Entity<Project>,
 40        cx: &mut Context<Self>,
 41    ) {
 42        let manager = cx.weak_entity();
 43        project.update(cx, |_, cx| {
 44            let manager = manager.clone();
 45            cx.on_release(move |project, cx| {
 46                manager
 47                    .update(cx, |manager, cx| {
 48                        manager.projects.retain(|p| {
 49                            if let Some(p) = p.upgrade() {
 50                                p.read(cx).remote_id() != project.remote_id()
 51                            } else {
 52                                false
 53                            }
 54                        });
 55                        if manager.projects.is_empty() {
 56                            manager.maintain_connection.take();
 57                        }
 58                    })
 59                    .ok();
 60            })
 61            .detach();
 62        });
 63
 64        self.projects.insert(project.downgrade());
 65        if self.maintain_connection.is_none() {
 66            self.maintain_connection = Some(cx.spawn({
 67                let client = self.client.clone();
 68                async move |_, cx| {
 69                    Self::maintain_connection(manager, client.clone(), cx)
 70                        .await
 71                        .log_err()
 72                }
 73            }));
 74        }
 75    }
 76
 77    fn reconnected(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 78        let mut projects = HashMap::default();
 79
 80        let request = self.client.request_envelope(proto::RejoinRemoteProjects {
 81            rejoined_projects: self
 82                .projects
 83                .iter()
 84                .filter_map(|project| {
 85                    if let Some(handle) = project.upgrade() {
 86                        let project = handle.read(cx);
 87                        let project_id = project.remote_id()?;
 88                        projects.insert(project_id, handle.clone());
 89                        Some(proto::RejoinProject {
 90                            id: project_id,
 91                            worktrees: project
 92                                .worktrees(cx)
 93                                .map(|worktree| {
 94                                    let worktree = worktree.read(cx);
 95                                    proto::RejoinWorktree {
 96                                        id: worktree.id().to_proto(),
 97                                        scan_id: worktree.completed_scan_id() as u64,
 98                                    }
 99                                })
100                                .collect(),
101                        })
102                    } else {
103                        None
104                    }
105                })
106                .collect(),
107        });
108
109        cx.spawn(async move |this, cx| {
110            let response = request.await?;
111            let message_id = response.message_id;
112
113            this.update(cx, |_, cx| {
114                for rejoined_project in response.payload.rejoined_projects {
115                    if let Some(project) = projects.get(&rejoined_project.id) {
116                        project.update(cx, |project, cx| {
117                            project.rejoined(rejoined_project, message_id, cx).log_err();
118                        });
119                    }
120                }
121            })
122        })
123    }
124
125    fn connection_lost(&mut self, cx: &mut Context<Self>) {
126        for project in self.projects.drain() {
127            if let Some(project) = project.upgrade() {
128                project.update(cx, |project, cx| {
129                    project.disconnected_from_host(cx);
130                    project.close(cx);
131                });
132            }
133        }
134        self.maintain_connection.take();
135    }
136
137    async fn maintain_connection(
138        this: WeakEntity<Self>,
139        client: Arc<Client>,
140        cx: &mut AsyncApp,
141    ) -> Result<()> {
142        let mut client_status = client.status();
143        loop {
144            let _ = client_status.try_recv();
145
146            let is_connected = client_status.borrow().is_connected();
147            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
148            if !is_connected || client_status.next().await.is_some() {
149                log::info!("detected client disconnection");
150
151                // Wait for client to re-establish a connection to the server.
152                {
153                    let mut reconnection_timeout =
154                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
155                    let client_reconnection = async {
156                        let mut remaining_attempts = 3;
157                        while remaining_attempts > 0 {
158                            if client_status.borrow().is_connected() {
159                                log::info!("client reconnected, attempting to rejoin projects");
160
161                                let Some(this) = this.upgrade() else { break };
162                                match this.update(cx, |this, cx| this.reconnected(cx)) {
163                                    Ok(task) => {
164                                        if task.await.log_err().is_some() {
165                                            return true;
166                                        } else {
167                                            remaining_attempts -= 1;
168                                        }
169                                    }
170                                    Err(_app_dropped) => return false,
171                                }
172                            } else if client_status.borrow().is_signed_out() {
173                                return false;
174                            }
175
176                            log::info!(
177                                "waiting for client status change, remaining attempts {}",
178                                remaining_attempts
179                            );
180                            client_status.next().await;
181                        }
182                        false
183                    }
184                    .fuse();
185                    futures::pin_mut!(client_reconnection);
186
187                    futures::select_biased! {
188                        reconnected = client_reconnection => {
189                            if reconnected {
190                                log::info!("successfully reconnected");
191                                // If we successfully joined the room, go back around the loop
192                                // waiting for future connection status changes.
193                                continue;
194                            }
195                        }
196                        _ = reconnection_timeout => {
197                            log::info!("rejoin project reconnection timeout expired");
198                        }
199                    }
200                }
201
202                break;
203            }
204        }
205
206        // The client failed to re-establish a connection to the server
207        // or an error occurred while trying to re-join the room. Either way
208        // we leave the room and return an error.
209        if let Some(this) = this.upgrade() {
210            log::info!("reconnection failed, disconnecting projects");
211            this.update(cx, |this, cx| this.connection_lost(cx))?;
212        }
213
214        Ok(())
215    }
216}