connection_manager.rs

  1use super::Project;
  2use anyhow::Result;
  3use client::Client;
  4use collections::{HashMap, HashSet};
  5use futures::{FutureExt, StreamExt};
  6use gpui::{App, AppContext as _, AsyncApp, Context, Entity, Global, Task, WeakEntity};
  7use postage::stream::Stream;
  8use rpc::proto;
  9use std::{sync::Arc, time::Duration};
 10use util::{ResultExt, TryFutureExt};
 11
 12impl Global for GlobalManager {}
 13struct GlobalManager(Entity<Manager>);
 14
 15pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
 16
 17pub struct Manager {
 18    client: Arc<Client>,
 19    maintain_connection: Option<Task<Option<()>>>,
 20    projects: HashSet<WeakEntity<Project>>,
 21}
 22
 23pub fn init(client: Arc<Client>, cx: &mut App) {
 24    let manager = cx.new(|_| Manager {
 25        client,
 26        maintain_connection: None,
 27        projects: HashSet::default(),
 28    });
 29    cx.set_global(GlobalManager(manager));
 30}
 31
 32impl Manager {
 33    pub fn global(cx: &App) -> Entity<Manager> {
 34        cx.global::<GlobalManager>().0.clone()
 35    }
 36
 37    pub fn maintain_project_connection(
 38        &mut self,
 39        project: &Entity<Project>,
 40        cx: &mut Context<Self>,
 41    ) {
 42        let manager = cx.weak_entity();
 43        project.update(cx, |_, cx| {
 44            let manager = manager.clone();
 45            cx.on_release(move |project, cx| {
 46                manager
 47                    .update(cx, |manager, cx| {
 48                        manager.projects.retain(|p| {
 49                            if let Some(p) = p.upgrade() {
 50                                p.read(cx).remote_id() != project.remote_id()
 51                            } else {
 52                                false
 53                            }
 54                        });
 55                        if manager.projects.is_empty() {
 56                            manager.maintain_connection.take();
 57                        }
 58                    })
 59                    .ok();
 60            })
 61            .detach();
 62        });
 63
 64        self.projects.insert(project.downgrade());
 65        if self.maintain_connection.is_none() {
 66            self.maintain_connection = Some(cx.spawn({
 67                let client = self.client.clone();
 68                move |_, cx| Self::maintain_connection(manager, client.clone(), cx).log_err()
 69            }));
 70        }
 71    }
 72
 73    fn reconnected(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 74        let mut projects = HashMap::default();
 75
 76        let request = self.client.request_envelope(proto::RejoinRemoteProjects {
 77            rejoined_projects: self
 78                .projects
 79                .iter()
 80                .filter_map(|project| {
 81                    if let Some(handle) = project.upgrade() {
 82                        let project = handle.read(cx);
 83                        let project_id = project.remote_id()?;
 84                        projects.insert(project_id, handle.clone());
 85                        Some(proto::RejoinProject {
 86                            id: project_id,
 87                            worktrees: project
 88                                .worktrees(cx)
 89                                .map(|worktree| {
 90                                    let worktree = worktree.read(cx);
 91                                    proto::RejoinWorktree {
 92                                        id: worktree.id().to_proto(),
 93                                        scan_id: worktree.completed_scan_id() as u64,
 94                                    }
 95                                })
 96                                .collect(),
 97                        })
 98                    } else {
 99                        None
100                    }
101                })
102                .collect(),
103        });
104
105        cx.spawn(|this, mut cx| async move {
106            let response = request.await?;
107            let message_id = response.message_id;
108
109            this.update(&mut cx, |_, cx| {
110                for rejoined_project in response.payload.rejoined_projects {
111                    if let Some(project) = projects.get(&rejoined_project.id) {
112                        project.update(cx, |project, cx| {
113                            project.rejoined(rejoined_project, message_id, cx).log_err();
114                        });
115                    }
116                }
117            })
118        })
119    }
120
121    fn connection_lost(&mut self, cx: &mut Context<Self>) {
122        for project in self.projects.drain() {
123            if let Some(project) = project.upgrade() {
124                project.update(cx, |project, cx| {
125                    project.disconnected_from_host(cx);
126                    project.close(cx);
127                });
128            }
129        }
130        self.maintain_connection.take();
131    }
132
133    async fn maintain_connection(
134        this: WeakEntity<Self>,
135        client: Arc<Client>,
136        mut cx: AsyncApp,
137    ) -> Result<()> {
138        let mut client_status = client.status();
139        loop {
140            let _ = client_status.try_recv();
141
142            let is_connected = client_status.borrow().is_connected();
143            // Even if we're initially connected, any future change of the status means we momentarily disconnected.
144            if !is_connected || client_status.next().await.is_some() {
145                log::info!("detected client disconnection");
146
147                // Wait for client to re-establish a connection to the server.
148                {
149                    let mut reconnection_timeout =
150                        cx.background_executor().timer(RECONNECT_TIMEOUT).fuse();
151                    let client_reconnection = async {
152                        let mut remaining_attempts = 3;
153                        while remaining_attempts > 0 {
154                            if client_status.borrow().is_connected() {
155                                log::info!("client reconnected, attempting to rejoin projects");
156
157                                let Some(this) = this.upgrade() else { break };
158                                match this.update(&mut cx, |this, cx| this.reconnected(cx)) {
159                                    Ok(task) => {
160                                        if task.await.log_err().is_some() {
161                                            return true;
162                                        } else {
163                                            remaining_attempts -= 1;
164                                        }
165                                    }
166                                    Err(_app_dropped) => return false,
167                                }
168                            } else if client_status.borrow().is_signed_out() {
169                                return false;
170                            }
171
172                            log::info!(
173                                "waiting for client status change, remaining attempts {}",
174                                remaining_attempts
175                            );
176                            client_status.next().await;
177                        }
178                        false
179                    }
180                    .fuse();
181                    futures::pin_mut!(client_reconnection);
182
183                    futures::select_biased! {
184                        reconnected = client_reconnection => {
185                            if reconnected {
186                                log::info!("successfully reconnected");
187                                // If we successfully joined the room, go back around the loop
188                                // waiting for future connection status changes.
189                                continue;
190                            }
191                        }
192                        _ = reconnection_timeout => {
193                            log::info!("rejoin project reconnection timeout expired");
194                        }
195                    }
196                }
197
198                break;
199            }
200        }
201
202        // The client failed to re-establish a connection to the server
203        // or an error occurred while trying to re-join the room. Either way
204        // we leave the room and return an error.
205        if let Some(this) = this.upgrade() {
206            log::info!("reconnection failed, disconnecting projects");
207            this.update(&mut cx, |this, cx| this.connection_lost(cx))?;
208        }
209
210        Ok(())
211    }
212}