1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44pub trait Executor {
45 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
46}
47
48pub struct RealExecutor;
49
50const MESSAGE_COUNT_PER_PAGE: usize = 100;
51const MAX_MESSAGE_LEN: usize = 1024;
52
53impl Server {
54 pub fn new(
55 app_state: Arc<AppState>,
56 peer: Arc<Peer>,
57 notifications: Option<mpsc::Sender<()>>,
58 ) -> Arc<Self> {
59 let mut server = Self {
60 peer,
61 app_state,
62 store: Default::default(),
63 handlers: Default::default(),
64 notifications,
65 };
66
67 server
68 .add_request_handler(Server::ping)
69 .add_request_handler(Server::register_project)
70 .add_message_handler(Server::unregister_project)
71 .add_request_handler(Server::share_project)
72 .add_message_handler(Server::unshare_project)
73 .add_request_handler(Server::join_project)
74 .add_message_handler(Server::leave_project)
75 .add_request_handler(Server::register_worktree)
76 .add_message_handler(Server::unregister_worktree)
77 .add_request_handler(Server::share_worktree)
78 .add_message_handler(Server::update_worktree)
79 .add_message_handler(Server::update_diagnostic_summary)
80 .add_message_handler(Server::disk_based_diagnostics_updating)
81 .add_message_handler(Server::disk_based_diagnostics_updated)
82 .add_request_handler(Server::get_definition)
83 .add_request_handler(Server::open_buffer)
84 .add_message_handler(Server::close_buffer)
85 .add_request_handler(Server::update_buffer)
86 .add_message_handler(Server::update_buffer_file)
87 .add_message_handler(Server::buffer_reloaded)
88 .add_message_handler(Server::buffer_saved)
89 .add_request_handler(Server::save_buffer)
90 .add_request_handler(Server::format_buffers)
91 .add_request_handler(Server::get_completions)
92 .add_request_handler(Server::apply_additional_edits_for_completion)
93 .add_request_handler(Server::get_code_actions)
94 .add_request_handler(Server::apply_code_action)
95 .add_request_handler(Server::get_channels)
96 .add_request_handler(Server::get_users)
97 .add_request_handler(Server::join_channel)
98 .add_message_handler(Server::leave_channel)
99 .add_request_handler(Server::send_channel_message)
100 .add_request_handler(Server::get_channel_messages);
101
102 Arc::new(server)
103 }
104
105 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
106 where
107 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
108 Fut: 'static + Send + Future<Output = tide::Result<()>>,
109 M: EnvelopedMessage,
110 {
111 let prev_handler = self.handlers.insert(
112 TypeId::of::<M>(),
113 Box::new(move |server, envelope| {
114 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
115 (handler)(server, *envelope).boxed()
116 }),
117 );
118 if prev_handler.is_some() {
119 panic!("registered a handler for the same message twice");
120 }
121 self
122 }
123
124 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
125 where
126 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
127 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
128 M: RequestMessage,
129 {
130 self.add_message_handler(move |server, envelope| {
131 let receipt = envelope.receipt();
132 let response = (handler)(server.clone(), envelope);
133 async move {
134 match response.await {
135 Ok(response) => {
136 server.peer.respond(receipt, response)?;
137 Ok(())
138 }
139 Err(error) => {
140 server.peer.respond_with_error(
141 receipt,
142 proto::Error {
143 message: error.to_string(),
144 },
145 )?;
146 Err(error)
147 }
148 }
149 }
150 })
151 }
152
153 pub fn handle_connection<E: Executor>(
154 self: &Arc<Self>,
155 connection: Connection,
156 addr: String,
157 user_id: UserId,
158 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
159 executor: E,
160 ) -> impl Future<Output = ()> {
161 let mut this = self.clone();
162 async move {
163 let (connection_id, handle_io, mut incoming_rx) =
164 this.peer.add_connection(connection).await;
165
166 if let Some(send_connection_id) = send_connection_id.as_mut() {
167 let _ = send_connection_id.send(connection_id).await;
168 }
169
170 this.state_mut().add_connection(connection_id, user_id);
171 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
172 log::error!("error updating contacts for {:?}: {}", user_id, err);
173 }
174
175 let handle_io = handle_io.fuse();
176 futures::pin_mut!(handle_io);
177 loop {
178 let next_message = incoming_rx.next().fuse();
179 futures::pin_mut!(next_message);
180 futures::select_biased! {
181 result = handle_io => {
182 if let Err(err) = result {
183 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
184 }
185 break;
186 }
187 message = next_message => {
188 if let Some(message) = message {
189 let start_time = Instant::now();
190 let type_name = message.payload_type_name();
191 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
192 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
193 let handle_message = (handler)(this.clone(), message);
194 executor.spawn_detached(async move {
195 if let Err(err) = handle_message.await {
196 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
197 } else {
198 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
199 }
200 });
201 if let Some(mut notifications) = this.notifications.clone() {
202 let _ = notifications.send(()).await;
203 }
204 } else {
205 log::warn!("unhandled message: {}", type_name);
206 }
207 } else {
208 log::info!("rpc connection closed {:?}", addr);
209 break;
210 }
211 }
212 }
213 }
214
215 if let Err(err) = this.sign_out(connection_id).await {
216 log::error!("error signing out connection {:?} - {:?}", addr, err);
217 }
218 }
219 }
220
221 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
222 self.peer.disconnect(connection_id);
223 let removed_connection = self.state_mut().remove_connection(connection_id)?;
224
225 for (project_id, project) in removed_connection.hosted_projects {
226 if let Some(share) = project.share {
227 broadcast(
228 connection_id,
229 share.guests.keys().copied().collect(),
230 |conn_id| {
231 self.peer
232 .send(conn_id, proto::UnshareProject { project_id })
233 },
234 )?;
235 }
236 }
237
238 for (project_id, peer_ids) in removed_connection.guest_project_ids {
239 broadcast(connection_id, peer_ids, |conn_id| {
240 self.peer.send(
241 conn_id,
242 proto::RemoveProjectCollaborator {
243 project_id,
244 peer_id: connection_id.0,
245 },
246 )
247 })?;
248 }
249
250 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
251 Ok(())
252 }
253
254 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
255 Ok(proto::Ack {})
256 }
257
258 async fn register_project(
259 mut self: Arc<Server>,
260 request: TypedEnvelope<proto::RegisterProject>,
261 ) -> tide::Result<proto::RegisterProjectResponse> {
262 let project_id = {
263 let mut state = self.state_mut();
264 let user_id = state.user_id_for_connection(request.sender_id)?;
265 state.register_project(request.sender_id, user_id)
266 };
267 Ok(proto::RegisterProjectResponse { project_id })
268 }
269
270 async fn unregister_project(
271 mut self: Arc<Server>,
272 request: TypedEnvelope<proto::UnregisterProject>,
273 ) -> tide::Result<()> {
274 let project = self
275 .state_mut()
276 .unregister_project(request.payload.project_id, request.sender_id)?;
277 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
278 Ok(())
279 }
280
281 async fn share_project(
282 mut self: Arc<Server>,
283 request: TypedEnvelope<proto::ShareProject>,
284 ) -> tide::Result<proto::Ack> {
285 self.state_mut()
286 .share_project(request.payload.project_id, request.sender_id);
287 Ok(proto::Ack {})
288 }
289
290 async fn unshare_project(
291 mut self: Arc<Server>,
292 request: TypedEnvelope<proto::UnshareProject>,
293 ) -> tide::Result<()> {
294 let project_id = request.payload.project_id;
295 let project = self
296 .state_mut()
297 .unshare_project(project_id, request.sender_id)?;
298
299 broadcast(request.sender_id, project.connection_ids, |conn_id| {
300 self.peer
301 .send(conn_id, proto::UnshareProject { project_id })
302 })?;
303 self.update_contacts_for_users(&project.authorized_user_ids)?;
304 Ok(())
305 }
306
307 async fn join_project(
308 mut self: Arc<Server>,
309 request: TypedEnvelope<proto::JoinProject>,
310 ) -> tide::Result<proto::JoinProjectResponse> {
311 let project_id = request.payload.project_id;
312
313 let user_id = self.state().user_id_for_connection(request.sender_id)?;
314 let (response, connection_ids, contact_user_ids) = self
315 .state_mut()
316 .join_project(request.sender_id, user_id, project_id)
317 .and_then(|joined| {
318 let share = joined.project.share()?;
319 let peer_count = share.guests.len();
320 let mut collaborators = Vec::with_capacity(peer_count);
321 collaborators.push(proto::Collaborator {
322 peer_id: joined.project.host_connection_id.0,
323 replica_id: 0,
324 user_id: joined.project.host_user_id.to_proto(),
325 });
326 let worktrees = joined
327 .project
328 .worktrees
329 .iter()
330 .filter_map(|(id, worktree)| {
331 worktree.share.as_ref().map(|share| proto::Worktree {
332 id: *id,
333 root_name: worktree.root_name.clone(),
334 entries: share.entries.values().cloned().collect(),
335 diagnostic_summaries: share
336 .diagnostic_summaries
337 .values()
338 .cloned()
339 .collect(),
340 weak: worktree.weak,
341 })
342 })
343 .collect();
344 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
345 if *peer_conn_id != request.sender_id {
346 collaborators.push(proto::Collaborator {
347 peer_id: peer_conn_id.0,
348 replica_id: *peer_replica_id as u32,
349 user_id: peer_user_id.to_proto(),
350 });
351 }
352 }
353 let response = proto::JoinProjectResponse {
354 worktrees,
355 replica_id: joined.replica_id as u32,
356 collaborators,
357 };
358 let connection_ids = joined.project.connection_ids();
359 let contact_user_ids = joined.project.authorized_user_ids();
360 Ok((response, connection_ids, contact_user_ids))
361 })?;
362
363 broadcast(request.sender_id, connection_ids, |conn_id| {
364 self.peer.send(
365 conn_id,
366 proto::AddProjectCollaborator {
367 project_id,
368 collaborator: Some(proto::Collaborator {
369 peer_id: request.sender_id.0,
370 replica_id: response.replica_id,
371 user_id: user_id.to_proto(),
372 }),
373 },
374 )
375 })?;
376 self.update_contacts_for_users(&contact_user_ids)?;
377 Ok(response)
378 }
379
380 async fn leave_project(
381 mut self: Arc<Server>,
382 request: TypedEnvelope<proto::LeaveProject>,
383 ) -> tide::Result<()> {
384 let sender_id = request.sender_id;
385 let project_id = request.payload.project_id;
386 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
387
388 broadcast(sender_id, worktree.connection_ids, |conn_id| {
389 self.peer.send(
390 conn_id,
391 proto::RemoveProjectCollaborator {
392 project_id,
393 peer_id: sender_id.0,
394 },
395 )
396 })?;
397 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
398
399 Ok(())
400 }
401
402 async fn register_worktree(
403 mut self: Arc<Server>,
404 request: TypedEnvelope<proto::RegisterWorktree>,
405 ) -> tide::Result<proto::Ack> {
406 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
407
408 let mut contact_user_ids = HashSet::default();
409 contact_user_ids.insert(host_user_id);
410 for github_login in request.payload.authorized_logins {
411 let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
412 contact_user_ids.insert(contact_user_id);
413 }
414
415 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
416 self.state_mut().register_worktree(
417 request.payload.project_id,
418 request.payload.worktree_id,
419 request.sender_id,
420 Worktree {
421 authorized_user_ids: contact_user_ids.clone(),
422 root_name: request.payload.root_name,
423 share: None,
424 weak: false,
425 },
426 )?;
427 self.update_contacts_for_users(&contact_user_ids)?;
428 Ok(proto::Ack {})
429 }
430
431 async fn unregister_worktree(
432 mut self: Arc<Server>,
433 request: TypedEnvelope<proto::UnregisterWorktree>,
434 ) -> tide::Result<()> {
435 let project_id = request.payload.project_id;
436 let worktree_id = request.payload.worktree_id;
437 let (worktree, guest_connection_ids) =
438 self.state_mut()
439 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
440 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
441 self.peer.send(
442 conn_id,
443 proto::UnregisterWorktree {
444 project_id,
445 worktree_id,
446 },
447 )
448 })?;
449 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
450 Ok(())
451 }
452
453 async fn share_worktree(
454 mut self: Arc<Server>,
455 mut request: TypedEnvelope<proto::ShareWorktree>,
456 ) -> tide::Result<proto::Ack> {
457 let worktree = request
458 .payload
459 .worktree
460 .as_mut()
461 .ok_or_else(|| anyhow!("missing worktree"))?;
462 let entries = worktree
463 .entries
464 .iter()
465 .map(|entry| (entry.id, entry.clone()))
466 .collect();
467 let diagnostic_summaries = worktree
468 .diagnostic_summaries
469 .iter()
470 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
471 .collect();
472
473 let shared_worktree = self.state_mut().share_worktree(
474 request.payload.project_id,
475 worktree.id,
476 request.sender_id,
477 entries,
478 diagnostic_summaries,
479 )?;
480
481 broadcast(
482 request.sender_id,
483 shared_worktree.connection_ids,
484 |connection_id| {
485 self.peer
486 .forward_send(request.sender_id, connection_id, request.payload.clone())
487 },
488 )?;
489 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
490
491 Ok(proto::Ack {})
492 }
493
494 async fn update_worktree(
495 mut self: Arc<Server>,
496 request: TypedEnvelope<proto::UpdateWorktree>,
497 ) -> tide::Result<()> {
498 let connection_ids = self.state_mut().update_worktree(
499 request.sender_id,
500 request.payload.project_id,
501 request.payload.worktree_id,
502 &request.payload.removed_entries,
503 &request.payload.updated_entries,
504 )?;
505
506 broadcast(request.sender_id, connection_ids, |connection_id| {
507 self.peer
508 .forward_send(request.sender_id, connection_id, request.payload.clone())
509 })?;
510
511 Ok(())
512 }
513
514 async fn update_diagnostic_summary(
515 mut self: Arc<Server>,
516 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
517 ) -> tide::Result<()> {
518 let summary = request
519 .payload
520 .summary
521 .clone()
522 .ok_or_else(|| anyhow!("invalid summary"))?;
523 let receiver_ids = self.state_mut().update_diagnostic_summary(
524 request.payload.project_id,
525 request.payload.worktree_id,
526 request.sender_id,
527 summary,
528 )?;
529
530 broadcast(request.sender_id, receiver_ids, |connection_id| {
531 self.peer
532 .forward_send(request.sender_id, connection_id, request.payload.clone())
533 })?;
534 Ok(())
535 }
536
537 async fn disk_based_diagnostics_updating(
538 self: Arc<Server>,
539 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
540 ) -> tide::Result<()> {
541 let receiver_ids = self
542 .state()
543 .project_connection_ids(request.payload.project_id, request.sender_id)?;
544 broadcast(request.sender_id, receiver_ids, |connection_id| {
545 self.peer
546 .forward_send(request.sender_id, connection_id, request.payload.clone())
547 })?;
548 Ok(())
549 }
550
551 async fn disk_based_diagnostics_updated(
552 self: Arc<Server>,
553 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
554 ) -> tide::Result<()> {
555 let receiver_ids = self
556 .state()
557 .project_connection_ids(request.payload.project_id, request.sender_id)?;
558 broadcast(request.sender_id, receiver_ids, |connection_id| {
559 self.peer
560 .forward_send(request.sender_id, connection_id, request.payload.clone())
561 })?;
562 Ok(())
563 }
564
565 async fn get_definition(
566 self: Arc<Server>,
567 request: TypedEnvelope<proto::GetDefinition>,
568 ) -> tide::Result<proto::GetDefinitionResponse> {
569 let host_connection_id = self
570 .state()
571 .read_project(request.payload.project_id, request.sender_id)?
572 .host_connection_id;
573 Ok(self
574 .peer
575 .forward_request(request.sender_id, host_connection_id, request.payload)
576 .await?)
577 }
578
579 async fn open_buffer(
580 self: Arc<Server>,
581 request: TypedEnvelope<proto::OpenBuffer>,
582 ) -> tide::Result<proto::OpenBufferResponse> {
583 let host_connection_id = self
584 .state()
585 .read_project(request.payload.project_id, request.sender_id)?
586 .host_connection_id;
587 Ok(self
588 .peer
589 .forward_request(request.sender_id, host_connection_id, request.payload)
590 .await?)
591 }
592
593 async fn close_buffer(
594 self: Arc<Server>,
595 request: TypedEnvelope<proto::CloseBuffer>,
596 ) -> tide::Result<()> {
597 let host_connection_id = self
598 .state()
599 .read_project(request.payload.project_id, request.sender_id)?
600 .host_connection_id;
601 self.peer
602 .forward_send(request.sender_id, host_connection_id, request.payload)?;
603 Ok(())
604 }
605
606 async fn save_buffer(
607 self: Arc<Server>,
608 request: TypedEnvelope<proto::SaveBuffer>,
609 ) -> tide::Result<proto::BufferSaved> {
610 let host;
611 let mut guests;
612 {
613 let state = self.state();
614 let project = state.read_project(request.payload.project_id, request.sender_id)?;
615 host = project.host_connection_id;
616 guests = project.guest_connection_ids()
617 }
618
619 let response = self
620 .peer
621 .forward_request(request.sender_id, host, request.payload.clone())
622 .await?;
623
624 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
625 broadcast(host, guests, |conn_id| {
626 self.peer.forward_send(host, conn_id, response.clone())
627 })?;
628
629 Ok(response)
630 }
631
632 async fn format_buffers(
633 self: Arc<Server>,
634 request: TypedEnvelope<proto::FormatBuffers>,
635 ) -> tide::Result<proto::FormatBuffersResponse> {
636 let host = self
637 .state()
638 .read_project(request.payload.project_id, request.sender_id)?
639 .host_connection_id;
640 Ok(self
641 .peer
642 .forward_request(request.sender_id, host, request.payload.clone())
643 .await?)
644 }
645
646 async fn get_completions(
647 self: Arc<Server>,
648 request: TypedEnvelope<proto::GetCompletions>,
649 ) -> tide::Result<proto::GetCompletionsResponse> {
650 let host = self
651 .state()
652 .read_project(request.payload.project_id, request.sender_id)?
653 .host_connection_id;
654 Ok(self
655 .peer
656 .forward_request(request.sender_id, host, request.payload.clone())
657 .await?)
658 }
659
660 async fn apply_additional_edits_for_completion(
661 self: Arc<Server>,
662 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
663 ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
664 let host = self
665 .state()
666 .read_project(request.payload.project_id, request.sender_id)?
667 .host_connection_id;
668 Ok(self
669 .peer
670 .forward_request(request.sender_id, host, request.payload.clone())
671 .await?)
672 }
673
674 async fn get_code_actions(
675 self: Arc<Server>,
676 request: TypedEnvelope<proto::GetCodeActions>,
677 ) -> tide::Result<proto::GetCodeActionsResponse> {
678 let host = self
679 .state()
680 .read_project(request.payload.project_id, request.sender_id)?
681 .host_connection_id;
682 Ok(self
683 .peer
684 .forward_request(request.sender_id, host, request.payload.clone())
685 .await?)
686 }
687
688 async fn apply_code_action(
689 self: Arc<Server>,
690 request: TypedEnvelope<proto::ApplyCodeAction>,
691 ) -> tide::Result<proto::ApplyCodeActionResponse> {
692 let host = self
693 .state()
694 .read_project(request.payload.project_id, request.sender_id)?
695 .host_connection_id;
696 Ok(self
697 .peer
698 .forward_request(request.sender_id, host, request.payload.clone())
699 .await?)
700 }
701
702 async fn update_buffer(
703 self: Arc<Server>,
704 request: TypedEnvelope<proto::UpdateBuffer>,
705 ) -> tide::Result<proto::Ack> {
706 let receiver_ids = self
707 .state()
708 .project_connection_ids(request.payload.project_id, request.sender_id)?;
709 broadcast(request.sender_id, receiver_ids, |connection_id| {
710 self.peer
711 .forward_send(request.sender_id, connection_id, request.payload.clone())
712 })?;
713 Ok(proto::Ack {})
714 }
715
716 async fn update_buffer_file(
717 self: Arc<Server>,
718 request: TypedEnvelope<proto::UpdateBufferFile>,
719 ) -> tide::Result<()> {
720 let receiver_ids = self
721 .state()
722 .project_connection_ids(request.payload.project_id, request.sender_id)?;
723 broadcast(request.sender_id, receiver_ids, |connection_id| {
724 self.peer
725 .forward_send(request.sender_id, connection_id, request.payload.clone())
726 })?;
727 Ok(())
728 }
729
730 async fn buffer_reloaded(
731 self: Arc<Server>,
732 request: TypedEnvelope<proto::BufferReloaded>,
733 ) -> tide::Result<()> {
734 let receiver_ids = self
735 .state()
736 .project_connection_ids(request.payload.project_id, request.sender_id)?;
737 broadcast(request.sender_id, receiver_ids, |connection_id| {
738 self.peer
739 .forward_send(request.sender_id, connection_id, request.payload.clone())
740 })?;
741 Ok(())
742 }
743
744 async fn buffer_saved(
745 self: Arc<Server>,
746 request: TypedEnvelope<proto::BufferSaved>,
747 ) -> tide::Result<()> {
748 let receiver_ids = self
749 .state()
750 .project_connection_ids(request.payload.project_id, request.sender_id)?;
751 broadcast(request.sender_id, receiver_ids, |connection_id| {
752 self.peer
753 .forward_send(request.sender_id, connection_id, request.payload.clone())
754 })?;
755 Ok(())
756 }
757
758 async fn get_channels(
759 self: Arc<Server>,
760 request: TypedEnvelope<proto::GetChannels>,
761 ) -> tide::Result<proto::GetChannelsResponse> {
762 let user_id = self.state().user_id_for_connection(request.sender_id)?;
763 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
764 Ok(proto::GetChannelsResponse {
765 channels: channels
766 .into_iter()
767 .map(|chan| proto::Channel {
768 id: chan.id.to_proto(),
769 name: chan.name,
770 })
771 .collect(),
772 })
773 }
774
775 async fn get_users(
776 self: Arc<Server>,
777 request: TypedEnvelope<proto::GetUsers>,
778 ) -> tide::Result<proto::GetUsersResponse> {
779 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
780 let users = self
781 .app_state
782 .db
783 .get_users_by_ids(user_ids)
784 .await?
785 .into_iter()
786 .map(|user| proto::User {
787 id: user.id.to_proto(),
788 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
789 github_login: user.github_login,
790 })
791 .collect();
792 Ok(proto::GetUsersResponse { users })
793 }
794
795 fn update_contacts_for_users<'a>(
796 self: &Arc<Server>,
797 user_ids: impl IntoIterator<Item = &'a UserId>,
798 ) -> anyhow::Result<()> {
799 let mut result = Ok(());
800 let state = self.state();
801 for user_id in user_ids {
802 let contacts = state.contacts_for_user(*user_id);
803 for connection_id in state.connection_ids_for_user(*user_id) {
804 if let Err(error) = self.peer.send(
805 connection_id,
806 proto::UpdateContacts {
807 contacts: contacts.clone(),
808 },
809 ) {
810 result = Err(error);
811 }
812 }
813 }
814 result
815 }
816
817 async fn join_channel(
818 mut self: Arc<Self>,
819 request: TypedEnvelope<proto::JoinChannel>,
820 ) -> tide::Result<proto::JoinChannelResponse> {
821 let user_id = self.state().user_id_for_connection(request.sender_id)?;
822 let channel_id = ChannelId::from_proto(request.payload.channel_id);
823 if !self
824 .app_state
825 .db
826 .can_user_access_channel(user_id, channel_id)
827 .await?
828 {
829 Err(anyhow!("access denied"))?;
830 }
831
832 self.state_mut().join_channel(request.sender_id, channel_id);
833 let messages = self
834 .app_state
835 .db
836 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
837 .await?
838 .into_iter()
839 .map(|msg| proto::ChannelMessage {
840 id: msg.id.to_proto(),
841 body: msg.body,
842 timestamp: msg.sent_at.unix_timestamp() as u64,
843 sender_id: msg.sender_id.to_proto(),
844 nonce: Some(msg.nonce.as_u128().into()),
845 })
846 .collect::<Vec<_>>();
847 Ok(proto::JoinChannelResponse {
848 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
849 messages,
850 })
851 }
852
853 async fn leave_channel(
854 mut self: Arc<Self>,
855 request: TypedEnvelope<proto::LeaveChannel>,
856 ) -> tide::Result<()> {
857 let user_id = self.state().user_id_for_connection(request.sender_id)?;
858 let channel_id = ChannelId::from_proto(request.payload.channel_id);
859 if !self
860 .app_state
861 .db
862 .can_user_access_channel(user_id, channel_id)
863 .await?
864 {
865 Err(anyhow!("access denied"))?;
866 }
867
868 self.state_mut()
869 .leave_channel(request.sender_id, channel_id);
870
871 Ok(())
872 }
873
874 async fn send_channel_message(
875 self: Arc<Self>,
876 request: TypedEnvelope<proto::SendChannelMessage>,
877 ) -> tide::Result<proto::SendChannelMessageResponse> {
878 let channel_id = ChannelId::from_proto(request.payload.channel_id);
879 let user_id;
880 let connection_ids;
881 {
882 let state = self.state();
883 user_id = state.user_id_for_connection(request.sender_id)?;
884 connection_ids = state.channel_connection_ids(channel_id)?;
885 }
886
887 // Validate the message body.
888 let body = request.payload.body.trim().to_string();
889 if body.len() > MAX_MESSAGE_LEN {
890 return Err(anyhow!("message is too long"))?;
891 }
892 if body.is_empty() {
893 return Err(anyhow!("message can't be blank"))?;
894 }
895
896 let timestamp = OffsetDateTime::now_utc();
897 let nonce = request
898 .payload
899 .nonce
900 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
901
902 let message_id = self
903 .app_state
904 .db
905 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
906 .await?
907 .to_proto();
908 let message = proto::ChannelMessage {
909 sender_id: user_id.to_proto(),
910 id: message_id,
911 body,
912 timestamp: timestamp.unix_timestamp() as u64,
913 nonce: Some(nonce),
914 };
915 broadcast(request.sender_id, connection_ids, |conn_id| {
916 self.peer.send(
917 conn_id,
918 proto::ChannelMessageSent {
919 channel_id: channel_id.to_proto(),
920 message: Some(message.clone()),
921 },
922 )
923 })?;
924 Ok(proto::SendChannelMessageResponse {
925 message: Some(message),
926 })
927 }
928
929 async fn get_channel_messages(
930 self: Arc<Self>,
931 request: TypedEnvelope<proto::GetChannelMessages>,
932 ) -> tide::Result<proto::GetChannelMessagesResponse> {
933 let user_id = self.state().user_id_for_connection(request.sender_id)?;
934 let channel_id = ChannelId::from_proto(request.payload.channel_id);
935 if !self
936 .app_state
937 .db
938 .can_user_access_channel(user_id, channel_id)
939 .await?
940 {
941 Err(anyhow!("access denied"))?;
942 }
943
944 let messages = self
945 .app_state
946 .db
947 .get_channel_messages(
948 channel_id,
949 MESSAGE_COUNT_PER_PAGE,
950 Some(MessageId::from_proto(request.payload.before_message_id)),
951 )
952 .await?
953 .into_iter()
954 .map(|msg| proto::ChannelMessage {
955 id: msg.id.to_proto(),
956 body: msg.body,
957 timestamp: msg.sent_at.unix_timestamp() as u64,
958 sender_id: msg.sender_id.to_proto(),
959 nonce: Some(msg.nonce.as_u128().into()),
960 })
961 .collect::<Vec<_>>();
962
963 Ok(proto::GetChannelMessagesResponse {
964 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
965 messages,
966 })
967 }
968
969 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
970 self.store.read()
971 }
972
973 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
974 self.store.write()
975 }
976}
977
978impl Executor for RealExecutor {
979 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
980 task::spawn(future);
981 }
982}
983
984fn broadcast<F>(
985 sender_id: ConnectionId,
986 receiver_ids: Vec<ConnectionId>,
987 mut f: F,
988) -> anyhow::Result<()>
989where
990 F: FnMut(ConnectionId) -> anyhow::Result<()>,
991{
992 let mut result = Ok(());
993 for receiver_id in receiver_ids {
994 if receiver_id != sender_id {
995 if let Err(error) = f(receiver_id) {
996 if result.is_ok() {
997 result = Err(error);
998 }
999 }
1000 }
1001 }
1002 result
1003}
1004
1005pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1006 let server = Server::new(app.state().clone(), rpc.clone(), None);
1007 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1008 let server = server.clone();
1009 async move {
1010 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1011
1012 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1013 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1014 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1015 let client_protocol_version: Option<u32> = request
1016 .header("X-Zed-Protocol-Version")
1017 .and_then(|v| v.as_str().parse().ok());
1018
1019 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1020 return Ok(Response::new(StatusCode::UpgradeRequired));
1021 }
1022
1023 let header = match request.header("Sec-Websocket-Key") {
1024 Some(h) => h.as_str(),
1025 None => return Err(anyhow!("expected sec-websocket-key"))?,
1026 };
1027
1028 let user_id = process_auth_header(&request).await?;
1029
1030 let mut response = Response::new(StatusCode::SwitchingProtocols);
1031 response.insert_header(UPGRADE, "websocket");
1032 response.insert_header(CONNECTION, "Upgrade");
1033 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1034 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1035 response.insert_header("Sec-Websocket-Version", "13");
1036
1037 let http_res: &mut tide::http::Response = response.as_mut();
1038 let upgrade_receiver = http_res.recv_upgrade().await;
1039 let addr = request.remote().unwrap_or("unknown").to_string();
1040 task::spawn(async move {
1041 if let Some(stream) = upgrade_receiver.await {
1042 server
1043 .handle_connection(
1044 Connection::new(
1045 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1046 ),
1047 addr,
1048 user_id,
1049 None,
1050 RealExecutor,
1051 )
1052 .await;
1053 }
1054 });
1055
1056 Ok(response)
1057 }
1058 });
1059}
1060
1061fn header_contains_ignore_case<T>(
1062 request: &tide::Request<T>,
1063 header_name: HeaderName,
1064 value: &str,
1065) -> bool {
1066 request
1067 .header(header_name)
1068 .map(|h| {
1069 h.as_str()
1070 .split(',')
1071 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1072 })
1073 .unwrap_or(false)
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078 use super::*;
1079 use crate::{
1080 auth,
1081 db::{tests::TestDb, UserId},
1082 github, AppState, Config,
1083 };
1084 use ::rpc::Peer;
1085 use gpui::{executor, ModelHandle, TestAppContext};
1086 use parking_lot::Mutex;
1087 use postage::{mpsc, watch};
1088 use rand::prelude::*;
1089 use rpc::PeerId;
1090 use serde_json::json;
1091 use sqlx::types::time::OffsetDateTime;
1092 use std::{
1093 ops::Deref,
1094 path::Path,
1095 rc::Rc,
1096 sync::{
1097 atomic::{AtomicBool, Ordering::SeqCst},
1098 Arc,
1099 },
1100 time::Duration,
1101 };
1102 use zed::{
1103 client::{
1104 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1105 EstablishConnectionError, UserStore,
1106 },
1107 editor::{
1108 self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1109 Redo, ToggleCodeActions, Undo,
1110 },
1111 fs::{FakeFs, Fs as _},
1112 language::{
1113 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1114 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1115 },
1116 lsp,
1117 project::{worktree::WorktreeHandle, DiagnosticSummary, Project, ProjectPath},
1118 workspace::{Workspace, WorkspaceParams},
1119 };
1120
1121 #[cfg(test)]
1122 #[ctor::ctor]
1123 fn init_logger() {
1124 if std::env::var("RUST_LOG").is_ok() {
1125 env_logger::init();
1126 }
1127 }
1128
1129 #[gpui::test(iterations = 10)]
1130 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1131 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1132 let lang_registry = Arc::new(LanguageRegistry::new());
1133 let fs = Arc::new(FakeFs::new(cx_a.background()));
1134 cx_a.foreground().forbid_parking();
1135
1136 // Connect to a server as 2 clients.
1137 let mut server = TestServer::start(cx_a.foreground()).await;
1138 let client_a = server.create_client(&mut cx_a, "user_a").await;
1139 let client_b = server.create_client(&mut cx_b, "user_b").await;
1140
1141 // Share a project as client A
1142 fs.insert_tree(
1143 "/a",
1144 json!({
1145 ".zed.toml": r#"collaborators = ["user_b"]"#,
1146 "a.txt": "a-contents",
1147 "b.txt": "b-contents",
1148 }),
1149 )
1150 .await;
1151 let project_a = cx_a.update(|cx| {
1152 Project::local(
1153 client_a.clone(),
1154 client_a.user_store.clone(),
1155 lang_registry.clone(),
1156 fs.clone(),
1157 cx,
1158 )
1159 });
1160 let (worktree_a, _) = project_a
1161 .update(&mut cx_a, |p, cx| {
1162 p.find_or_create_local_worktree("/a", false, cx)
1163 })
1164 .await
1165 .unwrap();
1166 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1167 worktree_a
1168 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1169 .await;
1170 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1171 project_a
1172 .update(&mut cx_a, |p, cx| p.share(cx))
1173 .await
1174 .unwrap();
1175
1176 // Join that project as client B
1177 let project_b = Project::remote(
1178 project_id,
1179 client_b.clone(),
1180 client_b.user_store.clone(),
1181 lang_registry.clone(),
1182 fs.clone(),
1183 &mut cx_b.to_async(),
1184 )
1185 .await
1186 .unwrap();
1187
1188 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1189 assert_eq!(
1190 project
1191 .collaborators()
1192 .get(&client_a.peer_id)
1193 .unwrap()
1194 .user
1195 .github_login,
1196 "user_a"
1197 );
1198 project.replica_id()
1199 });
1200 project_a
1201 .condition(&cx_a, |tree, _| {
1202 tree.collaborators()
1203 .get(&client_b.peer_id)
1204 .map_or(false, |collaborator| {
1205 collaborator.replica_id == replica_id_b
1206 && collaborator.user.github_login == "user_b"
1207 })
1208 })
1209 .await;
1210
1211 // Open the same file as client B and client A.
1212 let buffer_b = project_b
1213 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1214 .await
1215 .unwrap();
1216 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1217 buffer_b.read_with(&cx_b, |buf, cx| {
1218 assert_eq!(buf.read(cx).text(), "b-contents")
1219 });
1220 project_a.read_with(&cx_a, |project, cx| {
1221 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1222 });
1223 let buffer_a = project_a
1224 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1225 .await
1226 .unwrap();
1227
1228 let editor_b = cx_b.add_view(window_b, |cx| {
1229 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1230 });
1231
1232 // TODO
1233 // // Create a selection set as client B and see that selection set as client A.
1234 // buffer_a
1235 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1236 // .await;
1237
1238 // Edit the buffer as client B and see that edit as client A.
1239 editor_b.update(&mut cx_b, |editor, cx| {
1240 editor.handle_input(&Input("ok, ".into()), cx)
1241 });
1242 buffer_a
1243 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1244 .await;
1245
1246 // TODO
1247 // // Remove the selection set as client B, see those selections disappear as client A.
1248 cx_b.update(move |_| drop(editor_b));
1249 // buffer_a
1250 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1251 // .await;
1252
1253 // Close the buffer as client A, see that the buffer is closed.
1254 cx_a.update(move |_| drop(buffer_a));
1255 project_a
1256 .condition(&cx_a, |project, cx| {
1257 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1258 })
1259 .await;
1260
1261 // Dropping the client B's project removes client B from client A's collaborators.
1262 cx_b.update(move |_| drop(project_b));
1263 project_a
1264 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1265 .await;
1266 }
1267
1268 #[gpui::test(iterations = 10)]
1269 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1270 let lang_registry = Arc::new(LanguageRegistry::new());
1271 let fs = Arc::new(FakeFs::new(cx_a.background()));
1272 cx_a.foreground().forbid_parking();
1273
1274 // Connect to a server as 2 clients.
1275 let mut server = TestServer::start(cx_a.foreground()).await;
1276 let client_a = server.create_client(&mut cx_a, "user_a").await;
1277 let client_b = server.create_client(&mut cx_b, "user_b").await;
1278
1279 // Share a project as client A
1280 fs.insert_tree(
1281 "/a",
1282 json!({
1283 ".zed.toml": r#"collaborators = ["user_b"]"#,
1284 "a.txt": "a-contents",
1285 "b.txt": "b-contents",
1286 }),
1287 )
1288 .await;
1289 let project_a = cx_a.update(|cx| {
1290 Project::local(
1291 client_a.clone(),
1292 client_a.user_store.clone(),
1293 lang_registry.clone(),
1294 fs.clone(),
1295 cx,
1296 )
1297 });
1298 let (worktree_a, _) = project_a
1299 .update(&mut cx_a, |p, cx| {
1300 p.find_or_create_local_worktree("/a", false, cx)
1301 })
1302 .await
1303 .unwrap();
1304 worktree_a
1305 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1306 .await;
1307 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1308 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1309 project_a
1310 .update(&mut cx_a, |p, cx| p.share(cx))
1311 .await
1312 .unwrap();
1313 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1314
1315 // Join that project as client B
1316 let project_b = Project::remote(
1317 project_id,
1318 client_b.clone(),
1319 client_b.user_store.clone(),
1320 lang_registry.clone(),
1321 fs.clone(),
1322 &mut cx_b.to_async(),
1323 )
1324 .await
1325 .unwrap();
1326 project_b
1327 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1328 .await
1329 .unwrap();
1330
1331 // Unshare the project as client A
1332 project_a
1333 .update(&mut cx_a, |project, cx| project.unshare(cx))
1334 .await
1335 .unwrap();
1336 project_b
1337 .condition(&mut cx_b, |project, _| project.is_read_only())
1338 .await;
1339 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1340 drop(project_b);
1341
1342 // Share the project again and ensure guests can still join.
1343 project_a
1344 .update(&mut cx_a, |project, cx| project.share(cx))
1345 .await
1346 .unwrap();
1347 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1348
1349 let project_c = Project::remote(
1350 project_id,
1351 client_b.clone(),
1352 client_b.user_store.clone(),
1353 lang_registry.clone(),
1354 fs.clone(),
1355 &mut cx_b.to_async(),
1356 )
1357 .await
1358 .unwrap();
1359 project_c
1360 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1361 .await
1362 .unwrap();
1363 }
1364
1365 #[gpui::test(iterations = 10)]
1366 async fn test_propagate_saves_and_fs_changes(
1367 mut cx_a: TestAppContext,
1368 mut cx_b: TestAppContext,
1369 mut cx_c: TestAppContext,
1370 ) {
1371 let lang_registry = Arc::new(LanguageRegistry::new());
1372 let fs = Arc::new(FakeFs::new(cx_a.background()));
1373 cx_a.foreground().forbid_parking();
1374
1375 // Connect to a server as 3 clients.
1376 let mut server = TestServer::start(cx_a.foreground()).await;
1377 let client_a = server.create_client(&mut cx_a, "user_a").await;
1378 let client_b = server.create_client(&mut cx_b, "user_b").await;
1379 let client_c = server.create_client(&mut cx_c, "user_c").await;
1380
1381 // Share a worktree as client A.
1382 fs.insert_tree(
1383 "/a",
1384 json!({
1385 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1386 "file1": "",
1387 "file2": ""
1388 }),
1389 )
1390 .await;
1391 let project_a = cx_a.update(|cx| {
1392 Project::local(
1393 client_a.clone(),
1394 client_a.user_store.clone(),
1395 lang_registry.clone(),
1396 fs.clone(),
1397 cx,
1398 )
1399 });
1400 let (worktree_a, _) = project_a
1401 .update(&mut cx_a, |p, cx| {
1402 p.find_or_create_local_worktree("/a", false, cx)
1403 })
1404 .await
1405 .unwrap();
1406 worktree_a
1407 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1408 .await;
1409 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1410 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1411 project_a
1412 .update(&mut cx_a, |p, cx| p.share(cx))
1413 .await
1414 .unwrap();
1415
1416 // Join that worktree as clients B and C.
1417 let project_b = Project::remote(
1418 project_id,
1419 client_b.clone(),
1420 client_b.user_store.clone(),
1421 lang_registry.clone(),
1422 fs.clone(),
1423 &mut cx_b.to_async(),
1424 )
1425 .await
1426 .unwrap();
1427 let project_c = Project::remote(
1428 project_id,
1429 client_c.clone(),
1430 client_c.user_store.clone(),
1431 lang_registry.clone(),
1432 fs.clone(),
1433 &mut cx_c.to_async(),
1434 )
1435 .await
1436 .unwrap();
1437 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1438 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1439
1440 // Open and edit a buffer as both guests B and C.
1441 let buffer_b = project_b
1442 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1443 .await
1444 .unwrap();
1445 let buffer_c = project_c
1446 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1447 .await
1448 .unwrap();
1449 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1450 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1451
1452 // Open and edit that buffer as the host.
1453 let buffer_a = project_a
1454 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1455 .await
1456 .unwrap();
1457
1458 buffer_a
1459 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1460 .await;
1461 buffer_a.update(&mut cx_a, |buf, cx| {
1462 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1463 });
1464
1465 // Wait for edits to propagate
1466 buffer_a
1467 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1468 .await;
1469 buffer_b
1470 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1471 .await;
1472 buffer_c
1473 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1474 .await;
1475
1476 // Edit the buffer as the host and concurrently save as guest B.
1477 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1478 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1479 save_b.await.unwrap();
1480 assert_eq!(
1481 fs.load("/a/file1".as_ref()).await.unwrap(),
1482 "hi-a, i-am-c, i-am-b, i-am-a"
1483 );
1484 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1485 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1486 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1487
1488 // Ensure worktree observes a/file1's change event *before* the rename occurs, otherwise
1489 // when interpreting the change event it will mistakenly think that the file has been
1490 // deleted (because its path has changed) and will subsequently fail to detect the rename.
1491 worktree_a.flush_fs_events(&cx_a).await;
1492
1493 // Make changes on host's file system, see those changes on guest worktrees.
1494 fs.rename(
1495 "/a/file1".as_ref(),
1496 "/a/file1-renamed".as_ref(),
1497 Default::default(),
1498 )
1499 .await
1500 .unwrap();
1501 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1502 .await
1503 .unwrap();
1504 fs.insert_file(Path::new("/a/file4"), "4".into())
1505 .await
1506 .unwrap();
1507
1508 worktree_a
1509 .condition(&cx_a, |tree, _| {
1510 tree.paths()
1511 .map(|p| p.to_string_lossy())
1512 .collect::<Vec<_>>()
1513 == [".zed.toml", "file1-renamed", "file3", "file4"]
1514 })
1515 .await;
1516 worktree_b
1517 .condition(&cx_b, |tree, _| {
1518 tree.paths()
1519 .map(|p| p.to_string_lossy())
1520 .collect::<Vec<_>>()
1521 == [".zed.toml", "file1-renamed", "file3", "file4"]
1522 })
1523 .await;
1524 worktree_c
1525 .condition(&cx_c, |tree, _| {
1526 tree.paths()
1527 .map(|p| p.to_string_lossy())
1528 .collect::<Vec<_>>()
1529 == [".zed.toml", "file1-renamed", "file3", "file4"]
1530 })
1531 .await;
1532
1533 // Ensure buffer files are updated as well.
1534 buffer_a
1535 .condition(&cx_a, |buf, _| {
1536 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1537 })
1538 .await;
1539 buffer_b
1540 .condition(&cx_b, |buf, _| {
1541 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1542 })
1543 .await;
1544 buffer_c
1545 .condition(&cx_c, |buf, _| {
1546 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1547 })
1548 .await;
1549 }
1550
1551 #[gpui::test(iterations = 10)]
1552 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1553 cx_a.foreground().forbid_parking();
1554 let lang_registry = Arc::new(LanguageRegistry::new());
1555 let fs = Arc::new(FakeFs::new(cx_a.background()));
1556
1557 // Connect to a server as 2 clients.
1558 let mut server = TestServer::start(cx_a.foreground()).await;
1559 let client_a = server.create_client(&mut cx_a, "user_a").await;
1560 let client_b = server.create_client(&mut cx_b, "user_b").await;
1561
1562 // Share a project as client A
1563 fs.insert_tree(
1564 "/dir",
1565 json!({
1566 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1567 "a.txt": "a-contents",
1568 }),
1569 )
1570 .await;
1571
1572 let project_a = cx_a.update(|cx| {
1573 Project::local(
1574 client_a.clone(),
1575 client_a.user_store.clone(),
1576 lang_registry.clone(),
1577 fs.clone(),
1578 cx,
1579 )
1580 });
1581 let (worktree_a, _) = project_a
1582 .update(&mut cx_a, |p, cx| {
1583 p.find_or_create_local_worktree("/dir", false, cx)
1584 })
1585 .await
1586 .unwrap();
1587 worktree_a
1588 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1589 .await;
1590 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1591 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1592 project_a
1593 .update(&mut cx_a, |p, cx| p.share(cx))
1594 .await
1595 .unwrap();
1596
1597 // Join that project as client B
1598 let project_b = Project::remote(
1599 project_id,
1600 client_b.clone(),
1601 client_b.user_store.clone(),
1602 lang_registry.clone(),
1603 fs.clone(),
1604 &mut cx_b.to_async(),
1605 )
1606 .await
1607 .unwrap();
1608 let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1609
1610 // Open a buffer as client B
1611 let buffer_b = project_b
1612 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1613 .await
1614 .unwrap();
1615 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1616
1617 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1618 buffer_b.read_with(&cx_b, |buf, _| {
1619 assert!(buf.is_dirty());
1620 assert!(!buf.has_conflict());
1621 });
1622
1623 buffer_b
1624 .update(&mut cx_b, |buf, cx| buf.save(cx))
1625 .await
1626 .unwrap();
1627 worktree_b
1628 .condition(&cx_b, |_, cx| {
1629 buffer_b.read(cx).file().unwrap().mtime() != mtime
1630 })
1631 .await;
1632 buffer_b.read_with(&cx_b, |buf, _| {
1633 assert!(!buf.is_dirty());
1634 assert!(!buf.has_conflict());
1635 });
1636
1637 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1638 buffer_b.read_with(&cx_b, |buf, _| {
1639 assert!(buf.is_dirty());
1640 assert!(!buf.has_conflict());
1641 });
1642 }
1643
1644 #[gpui::test(iterations = 10)]
1645 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1646 cx_a.foreground().forbid_parking();
1647 let lang_registry = Arc::new(LanguageRegistry::new());
1648 let fs = Arc::new(FakeFs::new(cx_a.background()));
1649
1650 // Connect to a server as 2 clients.
1651 let mut server = TestServer::start(cx_a.foreground()).await;
1652 let client_a = server.create_client(&mut cx_a, "user_a").await;
1653 let client_b = server.create_client(&mut cx_b, "user_b").await;
1654
1655 // Share a project as client A
1656 fs.insert_tree(
1657 "/dir",
1658 json!({
1659 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1660 "a.txt": "a-contents",
1661 }),
1662 )
1663 .await;
1664
1665 let project_a = cx_a.update(|cx| {
1666 Project::local(
1667 client_a.clone(),
1668 client_a.user_store.clone(),
1669 lang_registry.clone(),
1670 fs.clone(),
1671 cx,
1672 )
1673 });
1674 let (worktree_a, _) = project_a
1675 .update(&mut cx_a, |p, cx| {
1676 p.find_or_create_local_worktree("/dir", false, cx)
1677 })
1678 .await
1679 .unwrap();
1680 worktree_a
1681 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1682 .await;
1683 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1684 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1685 project_a
1686 .update(&mut cx_a, |p, cx| p.share(cx))
1687 .await
1688 .unwrap();
1689
1690 // Join that project as client B
1691 let project_b = Project::remote(
1692 project_id,
1693 client_b.clone(),
1694 client_b.user_store.clone(),
1695 lang_registry.clone(),
1696 fs.clone(),
1697 &mut cx_b.to_async(),
1698 )
1699 .await
1700 .unwrap();
1701 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1702
1703 // Open a buffer as client B
1704 let buffer_b = project_b
1705 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1706 .await
1707 .unwrap();
1708 buffer_b.read_with(&cx_b, |buf, _| {
1709 assert!(!buf.is_dirty());
1710 assert!(!buf.has_conflict());
1711 });
1712
1713 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1714 .await
1715 .unwrap();
1716 buffer_b
1717 .condition(&cx_b, |buf, _| {
1718 buf.text() == "new contents" && !buf.is_dirty()
1719 })
1720 .await;
1721 buffer_b.read_with(&cx_b, |buf, _| {
1722 assert!(!buf.has_conflict());
1723 });
1724 }
1725
1726 #[gpui::test(iterations = 10)]
1727 async fn test_editing_while_guest_opens_buffer(
1728 mut cx_a: TestAppContext,
1729 mut cx_b: TestAppContext,
1730 ) {
1731 cx_a.foreground().forbid_parking();
1732 let lang_registry = Arc::new(LanguageRegistry::new());
1733 let fs = Arc::new(FakeFs::new(cx_a.background()));
1734
1735 // Connect to a server as 2 clients.
1736 let mut server = TestServer::start(cx_a.foreground()).await;
1737 let client_a = server.create_client(&mut cx_a, "user_a").await;
1738 let client_b = server.create_client(&mut cx_b, "user_b").await;
1739
1740 // Share a project as client A
1741 fs.insert_tree(
1742 "/dir",
1743 json!({
1744 ".zed.toml": r#"collaborators = ["user_b"]"#,
1745 "a.txt": "a-contents",
1746 }),
1747 )
1748 .await;
1749 let project_a = cx_a.update(|cx| {
1750 Project::local(
1751 client_a.clone(),
1752 client_a.user_store.clone(),
1753 lang_registry.clone(),
1754 fs.clone(),
1755 cx,
1756 )
1757 });
1758 let (worktree_a, _) = project_a
1759 .update(&mut cx_a, |p, cx| {
1760 p.find_or_create_local_worktree("/dir", false, cx)
1761 })
1762 .await
1763 .unwrap();
1764 worktree_a
1765 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1766 .await;
1767 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1768 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1769 project_a
1770 .update(&mut cx_a, |p, cx| p.share(cx))
1771 .await
1772 .unwrap();
1773
1774 // Join that project as client B
1775 let project_b = Project::remote(
1776 project_id,
1777 client_b.clone(),
1778 client_b.user_store.clone(),
1779 lang_registry.clone(),
1780 fs.clone(),
1781 &mut cx_b.to_async(),
1782 )
1783 .await
1784 .unwrap();
1785
1786 // Open a buffer as client A
1787 let buffer_a = project_a
1788 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1789 .await
1790 .unwrap();
1791
1792 // Start opening the same buffer as client B
1793 let buffer_b = cx_b
1794 .background()
1795 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1796
1797 // Edit the buffer as client A while client B is still opening it.
1798 cx_b.background().simulate_random_delay().await;
1799 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1800 cx_b.background().simulate_random_delay().await;
1801 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1802
1803 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1804 let buffer_b = buffer_b.await.unwrap();
1805 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1806 }
1807
1808 #[gpui::test(iterations = 10)]
1809 async fn test_leaving_worktree_while_opening_buffer(
1810 mut cx_a: TestAppContext,
1811 mut cx_b: TestAppContext,
1812 ) {
1813 cx_a.foreground().forbid_parking();
1814 let lang_registry = Arc::new(LanguageRegistry::new());
1815 let fs = Arc::new(FakeFs::new(cx_a.background()));
1816
1817 // Connect to a server as 2 clients.
1818 let mut server = TestServer::start(cx_a.foreground()).await;
1819 let client_a = server.create_client(&mut cx_a, "user_a").await;
1820 let client_b = server.create_client(&mut cx_b, "user_b").await;
1821
1822 // Share a project as client A
1823 fs.insert_tree(
1824 "/dir",
1825 json!({
1826 ".zed.toml": r#"collaborators = ["user_b"]"#,
1827 "a.txt": "a-contents",
1828 }),
1829 )
1830 .await;
1831 let project_a = cx_a.update(|cx| {
1832 Project::local(
1833 client_a.clone(),
1834 client_a.user_store.clone(),
1835 lang_registry.clone(),
1836 fs.clone(),
1837 cx,
1838 )
1839 });
1840 let (worktree_a, _) = project_a
1841 .update(&mut cx_a, |p, cx| {
1842 p.find_or_create_local_worktree("/dir", false, cx)
1843 })
1844 .await
1845 .unwrap();
1846 worktree_a
1847 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1848 .await;
1849 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1850 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1851 project_a
1852 .update(&mut cx_a, |p, cx| p.share(cx))
1853 .await
1854 .unwrap();
1855
1856 // Join that project as client B
1857 let project_b = Project::remote(
1858 project_id,
1859 client_b.clone(),
1860 client_b.user_store.clone(),
1861 lang_registry.clone(),
1862 fs.clone(),
1863 &mut cx_b.to_async(),
1864 )
1865 .await
1866 .unwrap();
1867
1868 // See that a guest has joined as client A.
1869 project_a
1870 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1871 .await;
1872
1873 // Begin opening a buffer as client B, but leave the project before the open completes.
1874 let buffer_b = cx_b
1875 .background()
1876 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1877 cx_b.update(|_| drop(project_b));
1878 drop(buffer_b);
1879
1880 // See that the guest has left.
1881 project_a
1882 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1883 .await;
1884 }
1885
1886 #[gpui::test(iterations = 10)]
1887 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1888 cx_a.foreground().forbid_parking();
1889 let lang_registry = Arc::new(LanguageRegistry::new());
1890 let fs = Arc::new(FakeFs::new(cx_a.background()));
1891
1892 // Connect to a server as 2 clients.
1893 let mut server = TestServer::start(cx_a.foreground()).await;
1894 let client_a = server.create_client(&mut cx_a, "user_a").await;
1895 let client_b = server.create_client(&mut cx_b, "user_b").await;
1896
1897 // Share a project as client A
1898 fs.insert_tree(
1899 "/a",
1900 json!({
1901 ".zed.toml": r#"collaborators = ["user_b"]"#,
1902 "a.txt": "a-contents",
1903 "b.txt": "b-contents",
1904 }),
1905 )
1906 .await;
1907 let project_a = cx_a.update(|cx| {
1908 Project::local(
1909 client_a.clone(),
1910 client_a.user_store.clone(),
1911 lang_registry.clone(),
1912 fs.clone(),
1913 cx,
1914 )
1915 });
1916 let (worktree_a, _) = project_a
1917 .update(&mut cx_a, |p, cx| {
1918 p.find_or_create_local_worktree("/a", false, cx)
1919 })
1920 .await
1921 .unwrap();
1922 worktree_a
1923 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1924 .await;
1925 let project_id = project_a
1926 .update(&mut cx_a, |project, _| project.next_remote_id())
1927 .await;
1928 project_a
1929 .update(&mut cx_a, |project, cx| project.share(cx))
1930 .await
1931 .unwrap();
1932
1933 // Join that project as client B
1934 let _project_b = Project::remote(
1935 project_id,
1936 client_b.clone(),
1937 client_b.user_store.clone(),
1938 lang_registry.clone(),
1939 fs.clone(),
1940 &mut cx_b.to_async(),
1941 )
1942 .await
1943 .unwrap();
1944
1945 // See that a guest has joined as client A.
1946 project_a
1947 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1948 .await;
1949
1950 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1951 client_b.disconnect(&cx_b.to_async()).unwrap();
1952 project_a
1953 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1954 .await;
1955 }
1956
1957 #[gpui::test(iterations = 10)]
1958 async fn test_collaborating_with_diagnostics(
1959 mut cx_a: TestAppContext,
1960 mut cx_b: TestAppContext,
1961 ) {
1962 cx_a.foreground().forbid_parking();
1963 let mut lang_registry = Arc::new(LanguageRegistry::new());
1964 let fs = Arc::new(FakeFs::new(cx_a.background()));
1965
1966 // Set up a fake language server.
1967 let (language_server_config, mut fake_language_server) =
1968 LanguageServerConfig::fake(&cx_a).await;
1969 Arc::get_mut(&mut lang_registry)
1970 .unwrap()
1971 .add(Arc::new(Language::new(
1972 LanguageConfig {
1973 name: "Rust".to_string(),
1974 path_suffixes: vec!["rs".to_string()],
1975 language_server: Some(language_server_config),
1976 ..Default::default()
1977 },
1978 Some(tree_sitter_rust::language()),
1979 )));
1980
1981 // Connect to a server as 2 clients.
1982 let mut server = TestServer::start(cx_a.foreground()).await;
1983 let client_a = server.create_client(&mut cx_a, "user_a").await;
1984 let client_b = server.create_client(&mut cx_b, "user_b").await;
1985
1986 // Share a project as client A
1987 fs.insert_tree(
1988 "/a",
1989 json!({
1990 ".zed.toml": r#"collaborators = ["user_b"]"#,
1991 "a.rs": "let one = two",
1992 "other.rs": "",
1993 }),
1994 )
1995 .await;
1996 let project_a = cx_a.update(|cx| {
1997 Project::local(
1998 client_a.clone(),
1999 client_a.user_store.clone(),
2000 lang_registry.clone(),
2001 fs.clone(),
2002 cx,
2003 )
2004 });
2005 let (worktree_a, _) = project_a
2006 .update(&mut cx_a, |p, cx| {
2007 p.find_or_create_local_worktree("/a", false, cx)
2008 })
2009 .await
2010 .unwrap();
2011 worktree_a
2012 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2013 .await;
2014 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2015 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2016 project_a
2017 .update(&mut cx_a, |p, cx| p.share(cx))
2018 .await
2019 .unwrap();
2020
2021 // Cause the language server to start.
2022 let _ = cx_a
2023 .background()
2024 .spawn(project_a.update(&mut cx_a, |project, cx| {
2025 project.open_buffer(
2026 ProjectPath {
2027 worktree_id,
2028 path: Path::new("other.rs").into(),
2029 },
2030 cx,
2031 )
2032 }))
2033 .await
2034 .unwrap();
2035
2036 // Simulate a language server reporting errors for a file.
2037 fake_language_server
2038 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2039 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2040 version: None,
2041 diagnostics: vec![lsp::Diagnostic {
2042 severity: Some(lsp::DiagnosticSeverity::ERROR),
2043 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2044 message: "message 1".to_string(),
2045 ..Default::default()
2046 }],
2047 })
2048 .await;
2049
2050 // Wait for server to see the diagnostics update.
2051 server
2052 .condition(|store| {
2053 let worktree = store
2054 .project(project_id)
2055 .unwrap()
2056 .worktrees
2057 .get(&worktree_id.to_proto())
2058 .unwrap();
2059
2060 !worktree
2061 .share
2062 .as_ref()
2063 .unwrap()
2064 .diagnostic_summaries
2065 .is_empty()
2066 })
2067 .await;
2068
2069 // Join the worktree as client B.
2070 let project_b = Project::remote(
2071 project_id,
2072 client_b.clone(),
2073 client_b.user_store.clone(),
2074 lang_registry.clone(),
2075 fs.clone(),
2076 &mut cx_b.to_async(),
2077 )
2078 .await
2079 .unwrap();
2080
2081 project_b.read_with(&cx_b, |project, cx| {
2082 assert_eq!(
2083 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2084 &[(
2085 ProjectPath {
2086 worktree_id,
2087 path: Arc::from(Path::new("a.rs")),
2088 },
2089 DiagnosticSummary {
2090 error_count: 1,
2091 warning_count: 0,
2092 ..Default::default()
2093 },
2094 )]
2095 )
2096 });
2097
2098 // Simulate a language server reporting more errors for a file.
2099 fake_language_server
2100 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2101 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2102 version: None,
2103 diagnostics: vec![
2104 lsp::Diagnostic {
2105 severity: Some(lsp::DiagnosticSeverity::ERROR),
2106 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2107 message: "message 1".to_string(),
2108 ..Default::default()
2109 },
2110 lsp::Diagnostic {
2111 severity: Some(lsp::DiagnosticSeverity::WARNING),
2112 range: lsp::Range::new(
2113 lsp::Position::new(0, 10),
2114 lsp::Position::new(0, 13),
2115 ),
2116 message: "message 2".to_string(),
2117 ..Default::default()
2118 },
2119 ],
2120 })
2121 .await;
2122
2123 // Client b gets the updated summaries
2124 project_b
2125 .condition(&cx_b, |project, cx| {
2126 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2127 == &[(
2128 ProjectPath {
2129 worktree_id,
2130 path: Arc::from(Path::new("a.rs")),
2131 },
2132 DiagnosticSummary {
2133 error_count: 1,
2134 warning_count: 1,
2135 ..Default::default()
2136 },
2137 )]
2138 })
2139 .await;
2140
2141 // Open the file with the errors on client B. They should be present.
2142 let buffer_b = cx_b
2143 .background()
2144 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2145 .await
2146 .unwrap();
2147
2148 buffer_b.read_with(&cx_b, |buffer, _| {
2149 assert_eq!(
2150 buffer
2151 .snapshot()
2152 .diagnostics_in_range::<_, Point>(0..buffer.len())
2153 .map(|entry| entry)
2154 .collect::<Vec<_>>(),
2155 &[
2156 DiagnosticEntry {
2157 range: Point::new(0, 4)..Point::new(0, 7),
2158 diagnostic: Diagnostic {
2159 group_id: 0,
2160 message: "message 1".to_string(),
2161 severity: lsp::DiagnosticSeverity::ERROR,
2162 is_primary: true,
2163 ..Default::default()
2164 }
2165 },
2166 DiagnosticEntry {
2167 range: Point::new(0, 10)..Point::new(0, 13),
2168 diagnostic: Diagnostic {
2169 group_id: 1,
2170 severity: lsp::DiagnosticSeverity::WARNING,
2171 message: "message 2".to_string(),
2172 is_primary: true,
2173 ..Default::default()
2174 }
2175 }
2176 ]
2177 );
2178 });
2179 }
2180
2181 #[gpui::test(iterations = 10)]
2182 async fn test_collaborating_with_completion(
2183 mut cx_a: TestAppContext,
2184 mut cx_b: TestAppContext,
2185 ) {
2186 cx_a.foreground().forbid_parking();
2187 let mut lang_registry = Arc::new(LanguageRegistry::new());
2188 let fs = Arc::new(FakeFs::new(cx_a.background()));
2189
2190 // Set up a fake language server.
2191 let (language_server_config, mut fake_language_server) =
2192 LanguageServerConfig::fake_with_capabilities(
2193 lsp::ServerCapabilities {
2194 completion_provider: Some(lsp::CompletionOptions {
2195 trigger_characters: Some(vec![".".to_string()]),
2196 ..Default::default()
2197 }),
2198 ..Default::default()
2199 },
2200 &cx_a,
2201 )
2202 .await;
2203 Arc::get_mut(&mut lang_registry)
2204 .unwrap()
2205 .add(Arc::new(Language::new(
2206 LanguageConfig {
2207 name: "Rust".to_string(),
2208 path_suffixes: vec!["rs".to_string()],
2209 language_server: Some(language_server_config),
2210 ..Default::default()
2211 },
2212 Some(tree_sitter_rust::language()),
2213 )));
2214
2215 // Connect to a server as 2 clients.
2216 let mut server = TestServer::start(cx_a.foreground()).await;
2217 let client_a = server.create_client(&mut cx_a, "user_a").await;
2218 let client_b = server.create_client(&mut cx_b, "user_b").await;
2219
2220 // Share a project as client A
2221 fs.insert_tree(
2222 "/a",
2223 json!({
2224 ".zed.toml": r#"collaborators = ["user_b"]"#,
2225 "main.rs": "fn main() { a }",
2226 "other.rs": "",
2227 }),
2228 )
2229 .await;
2230 let project_a = cx_a.update(|cx| {
2231 Project::local(
2232 client_a.clone(),
2233 client_a.user_store.clone(),
2234 lang_registry.clone(),
2235 fs.clone(),
2236 cx,
2237 )
2238 });
2239 let (worktree_a, _) = project_a
2240 .update(&mut cx_a, |p, cx| {
2241 p.find_or_create_local_worktree("/a", false, cx)
2242 })
2243 .await
2244 .unwrap();
2245 worktree_a
2246 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2247 .await;
2248 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2249 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2250 project_a
2251 .update(&mut cx_a, |p, cx| p.share(cx))
2252 .await
2253 .unwrap();
2254
2255 // Join the worktree as client B.
2256 let project_b = Project::remote(
2257 project_id,
2258 client_b.clone(),
2259 client_b.user_store.clone(),
2260 lang_registry.clone(),
2261 fs.clone(),
2262 &mut cx_b.to_async(),
2263 )
2264 .await
2265 .unwrap();
2266
2267 // Open a file in an editor as the guest.
2268 let buffer_b = project_b
2269 .update(&mut cx_b, |p, cx| {
2270 p.open_buffer((worktree_id, "main.rs"), cx)
2271 })
2272 .await
2273 .unwrap();
2274 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2275 let editor_b = cx_b.add_view(window_b, |cx| {
2276 Editor::for_buffer(
2277 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2278 Arc::new(|cx| EditorSettings::test(cx)),
2279 Some(project_b.clone()),
2280 cx,
2281 )
2282 });
2283
2284 // Type a completion trigger character as the guest.
2285 editor_b.update(&mut cx_b, |editor, cx| {
2286 editor.select_ranges([13..13], None, cx);
2287 editor.handle_input(&Input(".".into()), cx);
2288 cx.focus(&editor_b);
2289 });
2290
2291 // Receive a completion request as the host's language server.
2292 // Return some completions from the host's language server.
2293 fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2294 assert_eq!(
2295 params.text_document_position.text_document.uri,
2296 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2297 );
2298 assert_eq!(
2299 params.text_document_position.position,
2300 lsp::Position::new(0, 14),
2301 );
2302
2303 Some(lsp::CompletionResponse::Array(vec![
2304 lsp::CompletionItem {
2305 label: "first_method(…)".into(),
2306 detail: Some("fn(&mut self, B) -> C".into()),
2307 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2308 new_text: "first_method($1)".to_string(),
2309 range: lsp::Range::new(
2310 lsp::Position::new(0, 14),
2311 lsp::Position::new(0, 14),
2312 ),
2313 })),
2314 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2315 ..Default::default()
2316 },
2317 lsp::CompletionItem {
2318 label: "second_method(…)".into(),
2319 detail: Some("fn(&mut self, C) -> D<E>".into()),
2320 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2321 new_text: "second_method()".to_string(),
2322 range: lsp::Range::new(
2323 lsp::Position::new(0, 14),
2324 lsp::Position::new(0, 14),
2325 ),
2326 })),
2327 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2328 ..Default::default()
2329 },
2330 ]))
2331 });
2332
2333 // Open the buffer on the host.
2334 let buffer_a = project_a
2335 .update(&mut cx_a, |p, cx| {
2336 p.open_buffer((worktree_id, "main.rs"), cx)
2337 })
2338 .await
2339 .unwrap();
2340 buffer_a
2341 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2342 .await;
2343
2344 // Confirm a completion on the guest.
2345 editor_b.next_notification(&cx_b).await;
2346 editor_b.update(&mut cx_b, |editor, cx| {
2347 assert!(editor.context_menu_visible());
2348 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2349 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2350 });
2351
2352 // Return a resolved completion from the host's language server.
2353 // The resolved completion has an additional text edit.
2354 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2355 assert_eq!(params.label, "first_method(…)");
2356 lsp::CompletionItem {
2357 label: "first_method(…)".into(),
2358 detail: Some("fn(&mut self, B) -> C".into()),
2359 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2360 new_text: "first_method($1)".to_string(),
2361 range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2362 })),
2363 additional_text_edits: Some(vec![lsp::TextEdit {
2364 new_text: "use d::SomeTrait;\n".to_string(),
2365 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2366 }]),
2367 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2368 ..Default::default()
2369 }
2370 });
2371
2372 buffer_a
2373 .condition(&cx_a, |buffer, _| {
2374 buffer.text() == "fn main() { a.first_method() }"
2375 })
2376 .await;
2377
2378 // The additional edit is applied.
2379 buffer_b
2380 .condition(&cx_b, |buffer, _| {
2381 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2382 })
2383 .await;
2384 assert_eq!(
2385 buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2386 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2387 );
2388 }
2389
2390 #[gpui::test(iterations = 10)]
2391 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2392 cx_a.foreground().forbid_parking();
2393 let mut lang_registry = Arc::new(LanguageRegistry::new());
2394 let fs = Arc::new(FakeFs::new(cx_a.background()));
2395
2396 // Set up a fake language server.
2397 let (language_server_config, mut fake_language_server) =
2398 LanguageServerConfig::fake(&cx_a).await;
2399 Arc::get_mut(&mut lang_registry)
2400 .unwrap()
2401 .add(Arc::new(Language::new(
2402 LanguageConfig {
2403 name: "Rust".to_string(),
2404 path_suffixes: vec!["rs".to_string()],
2405 language_server: Some(language_server_config),
2406 ..Default::default()
2407 },
2408 Some(tree_sitter_rust::language()),
2409 )));
2410
2411 // Connect to a server as 2 clients.
2412 let mut server = TestServer::start(cx_a.foreground()).await;
2413 let client_a = server.create_client(&mut cx_a, "user_a").await;
2414 let client_b = server.create_client(&mut cx_b, "user_b").await;
2415
2416 // Share a project as client A
2417 fs.insert_tree(
2418 "/a",
2419 json!({
2420 ".zed.toml": r#"collaborators = ["user_b"]"#,
2421 "a.rs": "let one = two",
2422 }),
2423 )
2424 .await;
2425 let project_a = cx_a.update(|cx| {
2426 Project::local(
2427 client_a.clone(),
2428 client_a.user_store.clone(),
2429 lang_registry.clone(),
2430 fs.clone(),
2431 cx,
2432 )
2433 });
2434 let (worktree_a, _) = project_a
2435 .update(&mut cx_a, |p, cx| {
2436 p.find_or_create_local_worktree("/a", false, cx)
2437 })
2438 .await
2439 .unwrap();
2440 worktree_a
2441 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2442 .await;
2443 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2444 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2445 project_a
2446 .update(&mut cx_a, |p, cx| p.share(cx))
2447 .await
2448 .unwrap();
2449
2450 // Join the worktree as client B.
2451 let project_b = Project::remote(
2452 project_id,
2453 client_b.clone(),
2454 client_b.user_store.clone(),
2455 lang_registry.clone(),
2456 fs.clone(),
2457 &mut cx_b.to_async(),
2458 )
2459 .await
2460 .unwrap();
2461
2462 let buffer_b = cx_b
2463 .background()
2464 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2465 .await
2466 .unwrap();
2467
2468 let format = project_b.update(&mut cx_b, |project, cx| {
2469 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2470 });
2471
2472 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2473 Some(vec![
2474 lsp::TextEdit {
2475 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2476 new_text: "h".to_string(),
2477 },
2478 lsp::TextEdit {
2479 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2480 new_text: "y".to_string(),
2481 },
2482 ])
2483 });
2484
2485 format.await.unwrap();
2486 assert_eq!(
2487 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2488 "let honey = two"
2489 );
2490 }
2491
2492 #[gpui::test(iterations = 10)]
2493 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2494 cx_a.foreground().forbid_parking();
2495 let mut lang_registry = Arc::new(LanguageRegistry::new());
2496 let fs = Arc::new(FakeFs::new(cx_a.background()));
2497 fs.insert_tree(
2498 "/root-1",
2499 json!({
2500 ".zed.toml": r#"collaborators = ["user_b"]"#,
2501 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2502 }),
2503 )
2504 .await;
2505 fs.insert_tree(
2506 "/root-2",
2507 json!({
2508 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2509 }),
2510 )
2511 .await;
2512
2513 // Set up a fake language server.
2514 let (language_server_config, mut fake_language_server) =
2515 LanguageServerConfig::fake(&cx_a).await;
2516 Arc::get_mut(&mut lang_registry)
2517 .unwrap()
2518 .add(Arc::new(Language::new(
2519 LanguageConfig {
2520 name: "Rust".to_string(),
2521 path_suffixes: vec!["rs".to_string()],
2522 language_server: Some(language_server_config),
2523 ..Default::default()
2524 },
2525 Some(tree_sitter_rust::language()),
2526 )));
2527
2528 // Connect to a server as 2 clients.
2529 let mut server = TestServer::start(cx_a.foreground()).await;
2530 let client_a = server.create_client(&mut cx_a, "user_a").await;
2531 let client_b = server.create_client(&mut cx_b, "user_b").await;
2532
2533 // Share a project as client A
2534 let project_a = cx_a.update(|cx| {
2535 Project::local(
2536 client_a.clone(),
2537 client_a.user_store.clone(),
2538 lang_registry.clone(),
2539 fs.clone(),
2540 cx,
2541 )
2542 });
2543 let (worktree_a, _) = project_a
2544 .update(&mut cx_a, |p, cx| {
2545 p.find_or_create_local_worktree("/root-1", false, cx)
2546 })
2547 .await
2548 .unwrap();
2549 worktree_a
2550 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2551 .await;
2552 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2553 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2554 project_a
2555 .update(&mut cx_a, |p, cx| p.share(cx))
2556 .await
2557 .unwrap();
2558
2559 // Join the worktree as client B.
2560 let project_b = Project::remote(
2561 project_id,
2562 client_b.clone(),
2563 client_b.user_store.clone(),
2564 lang_registry.clone(),
2565 fs.clone(),
2566 &mut cx_b.to_async(),
2567 )
2568 .await
2569 .unwrap();
2570
2571 // Open the file on client B.
2572 let buffer_b = cx_b
2573 .background()
2574 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2575 .await
2576 .unwrap();
2577
2578 // Request the definition of a symbol as the guest.
2579 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2580 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2581 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2582 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2583 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2584 )))
2585 });
2586
2587 let definitions_1 = definitions_1.await.unwrap();
2588 cx_b.read(|cx| {
2589 assert_eq!(definitions_1.len(), 1);
2590 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2591 let target_buffer = definitions_1[0].target_buffer.read(cx);
2592 assert_eq!(
2593 target_buffer.text(),
2594 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2595 );
2596 assert_eq!(
2597 definitions_1[0].target_range.to_point(target_buffer),
2598 Point::new(0, 6)..Point::new(0, 9)
2599 );
2600 });
2601
2602 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2603 // the previous call to `definition`.
2604 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2605 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2606 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2607 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2608 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2609 )))
2610 });
2611
2612 let definitions_2 = definitions_2.await.unwrap();
2613 cx_b.read(|cx| {
2614 assert_eq!(definitions_2.len(), 1);
2615 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2616 let target_buffer = definitions_2[0].target_buffer.read(cx);
2617 assert_eq!(
2618 target_buffer.text(),
2619 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2620 );
2621 assert_eq!(
2622 definitions_2[0].target_range.to_point(target_buffer),
2623 Point::new(1, 6)..Point::new(1, 11)
2624 );
2625 });
2626 assert_eq!(
2627 definitions_1[0].target_buffer,
2628 definitions_2[0].target_buffer
2629 );
2630
2631 cx_b.update(|_| {
2632 drop(definitions_1);
2633 drop(definitions_2);
2634 });
2635 project_b
2636 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2637 .await;
2638 }
2639
2640 #[gpui::test(iterations = 10)]
2641 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2642 mut cx_a: TestAppContext,
2643 mut cx_b: TestAppContext,
2644 mut rng: StdRng,
2645 ) {
2646 cx_a.foreground().forbid_parking();
2647 let mut lang_registry = Arc::new(LanguageRegistry::new());
2648 let fs = Arc::new(FakeFs::new(cx_a.background()));
2649 fs.insert_tree(
2650 "/root",
2651 json!({
2652 ".zed.toml": r#"collaborators = ["user_b"]"#,
2653 "a.rs": "const ONE: usize = b::TWO;",
2654 "b.rs": "const TWO: usize = 2",
2655 }),
2656 )
2657 .await;
2658
2659 // Set up a fake language server.
2660 let (language_server_config, mut fake_language_server) =
2661 LanguageServerConfig::fake(&cx_a).await;
2662 Arc::get_mut(&mut lang_registry)
2663 .unwrap()
2664 .add(Arc::new(Language::new(
2665 LanguageConfig {
2666 name: "Rust".to_string(),
2667 path_suffixes: vec!["rs".to_string()],
2668 language_server: Some(language_server_config),
2669 ..Default::default()
2670 },
2671 Some(tree_sitter_rust::language()),
2672 )));
2673
2674 // Connect to a server as 2 clients.
2675 let mut server = TestServer::start(cx_a.foreground()).await;
2676 let client_a = server.create_client(&mut cx_a, "user_a").await;
2677 let client_b = server.create_client(&mut cx_b, "user_b").await;
2678
2679 // Share a project as client A
2680 let project_a = cx_a.update(|cx| {
2681 Project::local(
2682 client_a.clone(),
2683 client_a.user_store.clone(),
2684 lang_registry.clone(),
2685 fs.clone(),
2686 cx,
2687 )
2688 });
2689 let (worktree_a, _) = project_a
2690 .update(&mut cx_a, |p, cx| {
2691 p.find_or_create_local_worktree("/root", false, cx)
2692 })
2693 .await
2694 .unwrap();
2695 worktree_a
2696 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2697 .await;
2698 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2699 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2700 project_a
2701 .update(&mut cx_a, |p, cx| p.share(cx))
2702 .await
2703 .unwrap();
2704
2705 // Join the worktree as client B.
2706 let project_b = Project::remote(
2707 project_id,
2708 client_b.clone(),
2709 client_b.user_store.clone(),
2710 lang_registry.clone(),
2711 fs.clone(),
2712 &mut cx_b.to_async(),
2713 )
2714 .await
2715 .unwrap();
2716
2717 let buffer_b1 = cx_b
2718 .background()
2719 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2720 .await
2721 .unwrap();
2722
2723 let definitions;
2724 let buffer_b2;
2725 if rng.gen() {
2726 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2727 buffer_b2 =
2728 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2729 } else {
2730 buffer_b2 =
2731 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2732 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2733 }
2734
2735 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2736 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2737 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2738 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2739 )))
2740 });
2741
2742 let buffer_b2 = buffer_b2.await.unwrap();
2743 let definitions = definitions.await.unwrap();
2744 assert_eq!(definitions.len(), 1);
2745 assert_eq!(definitions[0].target_buffer, buffer_b2);
2746 }
2747
2748 #[gpui::test(iterations = 10)]
2749 async fn test_collaborating_with_code_actions(
2750 mut cx_a: TestAppContext,
2751 mut cx_b: TestAppContext,
2752 ) {
2753 cx_a.foreground().forbid_parking();
2754 let mut lang_registry = Arc::new(LanguageRegistry::new());
2755 let fs = Arc::new(FakeFs::new(cx_a.background()));
2756 let mut path_openers_b = Vec::new();
2757 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2758
2759 // Set up a fake language server.
2760 let (language_server_config, mut fake_language_server) =
2761 LanguageServerConfig::fake_with_capabilities(
2762 lsp::ServerCapabilities {
2763 ..Default::default()
2764 },
2765 &cx_a,
2766 )
2767 .await;
2768 Arc::get_mut(&mut lang_registry)
2769 .unwrap()
2770 .add(Arc::new(Language::new(
2771 LanguageConfig {
2772 name: "Rust".to_string(),
2773 path_suffixes: vec!["rs".to_string()],
2774 language_server: Some(language_server_config),
2775 ..Default::default()
2776 },
2777 Some(tree_sitter_rust::language()),
2778 )));
2779
2780 // Connect to a server as 2 clients.
2781 let mut server = TestServer::start(cx_a.foreground()).await;
2782 let client_a = server.create_client(&mut cx_a, "user_a").await;
2783 let client_b = server.create_client(&mut cx_b, "user_b").await;
2784
2785 // Share a project as client A
2786 fs.insert_tree(
2787 "/a",
2788 json!({
2789 ".zed.toml": r#"collaborators = ["user_b"]"#,
2790 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2791 "other.rs": "pub fn foo() -> usize { 4 }",
2792 }),
2793 )
2794 .await;
2795 let project_a = cx_a.update(|cx| {
2796 Project::local(
2797 client_a.clone(),
2798 client_a.user_store.clone(),
2799 lang_registry.clone(),
2800 fs.clone(),
2801 cx,
2802 )
2803 });
2804 let (worktree_a, _) = project_a
2805 .update(&mut cx_a, |p, cx| {
2806 p.find_or_create_local_worktree("/a", false, cx)
2807 })
2808 .await
2809 .unwrap();
2810 worktree_a
2811 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2812 .await;
2813 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2814 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2815 project_a
2816 .update(&mut cx_a, |p, cx| p.share(cx))
2817 .await
2818 .unwrap();
2819
2820 // Join the worktree as client B.
2821 let project_b = Project::remote(
2822 project_id,
2823 client_b.clone(),
2824 client_b.user_store.clone(),
2825 lang_registry.clone(),
2826 fs.clone(),
2827 &mut cx_b.to_async(),
2828 )
2829 .await
2830 .unwrap();
2831 let mut params = cx_b.update(WorkspaceParams::test);
2832 params.languages = lang_registry.clone();
2833 params.client = client_b.client.clone();
2834 params.user_store = client_b.user_store.clone();
2835 params.project = project_b;
2836 params.path_openers = path_openers_b.into();
2837
2838 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
2839 let editor_b = workspace_b
2840 .update(&mut cx_b, |workspace, cx| {
2841 workspace.open_path((worktree_id, "main.rs").into(), cx)
2842 })
2843 .await
2844 .unwrap()
2845 .downcast::<Editor>()
2846 .unwrap();
2847 fake_language_server
2848 .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2849 assert_eq!(
2850 params.text_document.uri,
2851 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2852 );
2853 assert_eq!(params.range.start, lsp::Position::new(0, 0));
2854 assert_eq!(params.range.end, lsp::Position::new(0, 0));
2855 None
2856 })
2857 .next()
2858 .await;
2859
2860 // Move cursor to a location that contains code actions.
2861 editor_b.update(&mut cx_b, |editor, cx| {
2862 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2863 cx.focus(&editor_b);
2864 });
2865 fake_language_server.handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2866 assert_eq!(
2867 params.text_document.uri,
2868 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2869 );
2870 assert_eq!(params.range.start, lsp::Position::new(1, 31));
2871 assert_eq!(params.range.end, lsp::Position::new(1, 31));
2872
2873 Some(vec![lsp::CodeActionOrCommand::CodeAction(
2874 lsp::CodeAction {
2875 title: "Inline into all callers".to_string(),
2876 edit: Some(lsp::WorkspaceEdit {
2877 changes: Some(
2878 [
2879 (
2880 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2881 vec![lsp::TextEdit::new(
2882 lsp::Range::new(
2883 lsp::Position::new(1, 22),
2884 lsp::Position::new(1, 34),
2885 ),
2886 "4".to_string(),
2887 )],
2888 ),
2889 (
2890 lsp::Url::from_file_path("/a/other.rs").unwrap(),
2891 vec![lsp::TextEdit::new(
2892 lsp::Range::new(
2893 lsp::Position::new(0, 0),
2894 lsp::Position::new(0, 27),
2895 ),
2896 "".to_string(),
2897 )],
2898 ),
2899 ]
2900 .into_iter()
2901 .collect(),
2902 ),
2903 ..Default::default()
2904 }),
2905 data: Some(json!({
2906 "codeActionParams": {
2907 "range": {
2908 "start": {"line": 1, "column": 31},
2909 "end": {"line": 1, "column": 31},
2910 }
2911 }
2912 })),
2913 ..Default::default()
2914 },
2915 )])
2916 });
2917
2918 // Toggle code actions and wait for them to display.
2919 editor_b.update(&mut cx_b, |editor, cx| {
2920 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2921 });
2922 editor_b
2923 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2924 .await;
2925
2926 // Confirming the code action will trigger a resolve request.
2927 let confirm_action = workspace_b
2928 .update(&mut cx_b, |workspace, cx| {
2929 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2930 })
2931 .unwrap();
2932 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2933 lsp::CodeAction {
2934 title: "Inline into all callers".to_string(),
2935 edit: Some(lsp::WorkspaceEdit {
2936 changes: Some(
2937 [
2938 (
2939 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2940 vec![lsp::TextEdit::new(
2941 lsp::Range::new(
2942 lsp::Position::new(1, 22),
2943 lsp::Position::new(1, 34),
2944 ),
2945 "4".to_string(),
2946 )],
2947 ),
2948 (
2949 lsp::Url::from_file_path("/a/other.rs").unwrap(),
2950 vec![lsp::TextEdit::new(
2951 lsp::Range::new(
2952 lsp::Position::new(0, 0),
2953 lsp::Position::new(0, 27),
2954 ),
2955 "".to_string(),
2956 )],
2957 ),
2958 ]
2959 .into_iter()
2960 .collect(),
2961 ),
2962 ..Default::default()
2963 }),
2964 ..Default::default()
2965 }
2966 });
2967
2968 // After the action is confirmed, an editor containing both modified files is opened.
2969 confirm_action.await.unwrap();
2970 let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
2971 workspace
2972 .active_item(cx)
2973 .unwrap()
2974 .downcast::<Editor>()
2975 .unwrap()
2976 });
2977 code_action_editor.update(&mut cx_b, |editor, cx| {
2978 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2979 editor.undo(&Undo, cx);
2980 assert_eq!(
2981 editor.text(cx),
2982 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
2983 );
2984 editor.redo(&Redo, cx);
2985 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2986 });
2987 }
2988
2989 #[gpui::test(iterations = 10)]
2990 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2991 cx_a.foreground().forbid_parking();
2992
2993 // Connect to a server as 2 clients.
2994 let mut server = TestServer::start(cx_a.foreground()).await;
2995 let client_a = server.create_client(&mut cx_a, "user_a").await;
2996 let client_b = server.create_client(&mut cx_b, "user_b").await;
2997
2998 // Create an org that includes these 2 users.
2999 let db = &server.app_state.db;
3000 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3001 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3002 .await
3003 .unwrap();
3004 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3005 .await
3006 .unwrap();
3007
3008 // Create a channel that includes all the users.
3009 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3010 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3011 .await
3012 .unwrap();
3013 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3014 .await
3015 .unwrap();
3016 db.create_channel_message(
3017 channel_id,
3018 client_b.current_user_id(&cx_b),
3019 "hello A, it's B.",
3020 OffsetDateTime::now_utc(),
3021 1,
3022 )
3023 .await
3024 .unwrap();
3025
3026 let channels_a = cx_a
3027 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3028 channels_a
3029 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3030 .await;
3031 channels_a.read_with(&cx_a, |list, _| {
3032 assert_eq!(
3033 list.available_channels().unwrap(),
3034 &[ChannelDetails {
3035 id: channel_id.to_proto(),
3036 name: "test-channel".to_string()
3037 }]
3038 )
3039 });
3040 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3041 this.get_channel(channel_id.to_proto(), cx).unwrap()
3042 });
3043 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3044 channel_a
3045 .condition(&cx_a, |channel, _| {
3046 channel_messages(channel)
3047 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3048 })
3049 .await;
3050
3051 let channels_b = cx_b
3052 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3053 channels_b
3054 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3055 .await;
3056 channels_b.read_with(&cx_b, |list, _| {
3057 assert_eq!(
3058 list.available_channels().unwrap(),
3059 &[ChannelDetails {
3060 id: channel_id.to_proto(),
3061 name: "test-channel".to_string()
3062 }]
3063 )
3064 });
3065
3066 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3067 this.get_channel(channel_id.to_proto(), cx).unwrap()
3068 });
3069 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3070 channel_b
3071 .condition(&cx_b, |channel, _| {
3072 channel_messages(channel)
3073 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3074 })
3075 .await;
3076
3077 channel_a
3078 .update(&mut cx_a, |channel, cx| {
3079 channel
3080 .send_message("oh, hi B.".to_string(), cx)
3081 .unwrap()
3082 .detach();
3083 let task = channel.send_message("sup".to_string(), cx).unwrap();
3084 assert_eq!(
3085 channel_messages(channel),
3086 &[
3087 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3088 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3089 ("user_a".to_string(), "sup".to_string(), true)
3090 ]
3091 );
3092 task
3093 })
3094 .await
3095 .unwrap();
3096
3097 channel_b
3098 .condition(&cx_b, |channel, _| {
3099 channel_messages(channel)
3100 == [
3101 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3102 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3103 ("user_a".to_string(), "sup".to_string(), false),
3104 ]
3105 })
3106 .await;
3107
3108 assert_eq!(
3109 server
3110 .state()
3111 .await
3112 .channel(channel_id)
3113 .unwrap()
3114 .connection_ids
3115 .len(),
3116 2
3117 );
3118 cx_b.update(|_| drop(channel_b));
3119 server
3120 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3121 .await;
3122
3123 cx_a.update(|_| drop(channel_a));
3124 server
3125 .condition(|state| state.channel(channel_id).is_none())
3126 .await;
3127 }
3128
3129 #[gpui::test(iterations = 10)]
3130 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3131 cx_a.foreground().forbid_parking();
3132
3133 let mut server = TestServer::start(cx_a.foreground()).await;
3134 let client_a = server.create_client(&mut cx_a, "user_a").await;
3135
3136 let db = &server.app_state.db;
3137 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3138 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3139 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3140 .await
3141 .unwrap();
3142 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3143 .await
3144 .unwrap();
3145
3146 let channels_a = cx_a
3147 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3148 channels_a
3149 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3150 .await;
3151 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3152 this.get_channel(channel_id.to_proto(), cx).unwrap()
3153 });
3154
3155 // Messages aren't allowed to be too long.
3156 channel_a
3157 .update(&mut cx_a, |channel, cx| {
3158 let long_body = "this is long.\n".repeat(1024);
3159 channel.send_message(long_body, cx).unwrap()
3160 })
3161 .await
3162 .unwrap_err();
3163
3164 // Messages aren't allowed to be blank.
3165 channel_a.update(&mut cx_a, |channel, cx| {
3166 channel.send_message(String::new(), cx).unwrap_err()
3167 });
3168
3169 // Leading and trailing whitespace are trimmed.
3170 channel_a
3171 .update(&mut cx_a, |channel, cx| {
3172 channel
3173 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3174 .unwrap()
3175 })
3176 .await
3177 .unwrap();
3178 assert_eq!(
3179 db.get_channel_messages(channel_id, 10, None)
3180 .await
3181 .unwrap()
3182 .iter()
3183 .map(|m| &m.body)
3184 .collect::<Vec<_>>(),
3185 &["surrounded by whitespace"]
3186 );
3187 }
3188
3189 #[gpui::test(iterations = 10)]
3190 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3191 cx_a.foreground().forbid_parking();
3192
3193 // Connect to a server as 2 clients.
3194 let mut server = TestServer::start(cx_a.foreground()).await;
3195 let client_a = server.create_client(&mut cx_a, "user_a").await;
3196 let client_b = server.create_client(&mut cx_b, "user_b").await;
3197 let mut status_b = client_b.status();
3198
3199 // Create an org that includes these 2 users.
3200 let db = &server.app_state.db;
3201 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3202 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3203 .await
3204 .unwrap();
3205 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3206 .await
3207 .unwrap();
3208
3209 // Create a channel that includes all the users.
3210 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3211 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3212 .await
3213 .unwrap();
3214 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3215 .await
3216 .unwrap();
3217 db.create_channel_message(
3218 channel_id,
3219 client_b.current_user_id(&cx_b),
3220 "hello A, it's B.",
3221 OffsetDateTime::now_utc(),
3222 2,
3223 )
3224 .await
3225 .unwrap();
3226
3227 let channels_a = cx_a
3228 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3229 channels_a
3230 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3231 .await;
3232
3233 channels_a.read_with(&cx_a, |list, _| {
3234 assert_eq!(
3235 list.available_channels().unwrap(),
3236 &[ChannelDetails {
3237 id: channel_id.to_proto(),
3238 name: "test-channel".to_string()
3239 }]
3240 )
3241 });
3242 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3243 this.get_channel(channel_id.to_proto(), cx).unwrap()
3244 });
3245 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3246 channel_a
3247 .condition(&cx_a, |channel, _| {
3248 channel_messages(channel)
3249 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3250 })
3251 .await;
3252
3253 let channels_b = cx_b
3254 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3255 channels_b
3256 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3257 .await;
3258 channels_b.read_with(&cx_b, |list, _| {
3259 assert_eq!(
3260 list.available_channels().unwrap(),
3261 &[ChannelDetails {
3262 id: channel_id.to_proto(),
3263 name: "test-channel".to_string()
3264 }]
3265 )
3266 });
3267
3268 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3269 this.get_channel(channel_id.to_proto(), cx).unwrap()
3270 });
3271 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3272 channel_b
3273 .condition(&cx_b, |channel, _| {
3274 channel_messages(channel)
3275 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3276 })
3277 .await;
3278
3279 // Disconnect client B, ensuring we can still access its cached channel data.
3280 server.forbid_connections();
3281 server.disconnect_client(client_b.current_user_id(&cx_b));
3282 while !matches!(
3283 status_b.next().await,
3284 Some(client::Status::ReconnectionError { .. })
3285 ) {}
3286
3287 channels_b.read_with(&cx_b, |channels, _| {
3288 assert_eq!(
3289 channels.available_channels().unwrap(),
3290 [ChannelDetails {
3291 id: channel_id.to_proto(),
3292 name: "test-channel".to_string()
3293 }]
3294 )
3295 });
3296 channel_b.read_with(&cx_b, |channel, _| {
3297 assert_eq!(
3298 channel_messages(channel),
3299 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3300 )
3301 });
3302
3303 // Send a message from client B while it is disconnected.
3304 channel_b
3305 .update(&mut cx_b, |channel, cx| {
3306 let task = channel
3307 .send_message("can you see this?".to_string(), cx)
3308 .unwrap();
3309 assert_eq!(
3310 channel_messages(channel),
3311 &[
3312 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3313 ("user_b".to_string(), "can you see this?".to_string(), true)
3314 ]
3315 );
3316 task
3317 })
3318 .await
3319 .unwrap_err();
3320
3321 // Send a message from client A while B is disconnected.
3322 channel_a
3323 .update(&mut cx_a, |channel, cx| {
3324 channel
3325 .send_message("oh, hi B.".to_string(), cx)
3326 .unwrap()
3327 .detach();
3328 let task = channel.send_message("sup".to_string(), cx).unwrap();
3329 assert_eq!(
3330 channel_messages(channel),
3331 &[
3332 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3333 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3334 ("user_a".to_string(), "sup".to_string(), true)
3335 ]
3336 );
3337 task
3338 })
3339 .await
3340 .unwrap();
3341
3342 // Give client B a chance to reconnect.
3343 server.allow_connections();
3344 cx_b.foreground().advance_clock(Duration::from_secs(10));
3345
3346 // Verify that B sees the new messages upon reconnection, as well as the message client B
3347 // sent while offline.
3348 channel_b
3349 .condition(&cx_b, |channel, _| {
3350 channel_messages(channel)
3351 == [
3352 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3353 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3354 ("user_a".to_string(), "sup".to_string(), false),
3355 ("user_b".to_string(), "can you see this?".to_string(), false),
3356 ]
3357 })
3358 .await;
3359
3360 // Ensure client A and B can communicate normally after reconnection.
3361 channel_a
3362 .update(&mut cx_a, |channel, cx| {
3363 channel.send_message("you online?".to_string(), cx).unwrap()
3364 })
3365 .await
3366 .unwrap();
3367 channel_b
3368 .condition(&cx_b, |channel, _| {
3369 channel_messages(channel)
3370 == [
3371 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3372 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3373 ("user_a".to_string(), "sup".to_string(), false),
3374 ("user_b".to_string(), "can you see this?".to_string(), false),
3375 ("user_a".to_string(), "you online?".to_string(), false),
3376 ]
3377 })
3378 .await;
3379
3380 channel_b
3381 .update(&mut cx_b, |channel, cx| {
3382 channel.send_message("yep".to_string(), cx).unwrap()
3383 })
3384 .await
3385 .unwrap();
3386 channel_a
3387 .condition(&cx_a, |channel, _| {
3388 channel_messages(channel)
3389 == [
3390 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3391 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3392 ("user_a".to_string(), "sup".to_string(), false),
3393 ("user_b".to_string(), "can you see this?".to_string(), false),
3394 ("user_a".to_string(), "you online?".to_string(), false),
3395 ("user_b".to_string(), "yep".to_string(), false),
3396 ]
3397 })
3398 .await;
3399 }
3400
3401 #[gpui::test(iterations = 10)]
3402 async fn test_contacts(
3403 mut cx_a: TestAppContext,
3404 mut cx_b: TestAppContext,
3405 mut cx_c: TestAppContext,
3406 ) {
3407 cx_a.foreground().forbid_parking();
3408 let lang_registry = Arc::new(LanguageRegistry::new());
3409 let fs = Arc::new(FakeFs::new(cx_a.background()));
3410
3411 // Connect to a server as 3 clients.
3412 let mut server = TestServer::start(cx_a.foreground()).await;
3413 let client_a = server.create_client(&mut cx_a, "user_a").await;
3414 let client_b = server.create_client(&mut cx_b, "user_b").await;
3415 let client_c = server.create_client(&mut cx_c, "user_c").await;
3416
3417 // Share a worktree as client A.
3418 fs.insert_tree(
3419 "/a",
3420 json!({
3421 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3422 }),
3423 )
3424 .await;
3425
3426 let project_a = cx_a.update(|cx| {
3427 Project::local(
3428 client_a.clone(),
3429 client_a.user_store.clone(),
3430 lang_registry.clone(),
3431 fs.clone(),
3432 cx,
3433 )
3434 });
3435 let (worktree_a, _) = project_a
3436 .update(&mut cx_a, |p, cx| {
3437 p.find_or_create_local_worktree("/a", false, cx)
3438 })
3439 .await
3440 .unwrap();
3441 worktree_a
3442 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3443 .await;
3444
3445 client_a
3446 .user_store
3447 .condition(&cx_a, |user_store, _| {
3448 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3449 })
3450 .await;
3451 client_b
3452 .user_store
3453 .condition(&cx_b, |user_store, _| {
3454 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3455 })
3456 .await;
3457 client_c
3458 .user_store
3459 .condition(&cx_c, |user_store, _| {
3460 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3461 })
3462 .await;
3463
3464 let project_id = project_a
3465 .update(&mut cx_a, |project, _| project.next_remote_id())
3466 .await;
3467 project_a
3468 .update(&mut cx_a, |project, cx| project.share(cx))
3469 .await
3470 .unwrap();
3471
3472 let _project_b = Project::remote(
3473 project_id,
3474 client_b.clone(),
3475 client_b.user_store.clone(),
3476 lang_registry.clone(),
3477 fs.clone(),
3478 &mut cx_b.to_async(),
3479 )
3480 .await
3481 .unwrap();
3482
3483 client_a
3484 .user_store
3485 .condition(&cx_a, |user_store, _| {
3486 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3487 })
3488 .await;
3489 client_b
3490 .user_store
3491 .condition(&cx_b, |user_store, _| {
3492 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3493 })
3494 .await;
3495 client_c
3496 .user_store
3497 .condition(&cx_c, |user_store, _| {
3498 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3499 })
3500 .await;
3501
3502 project_a
3503 .condition(&cx_a, |project, _| {
3504 project.collaborators().contains_key(&client_b.peer_id)
3505 })
3506 .await;
3507
3508 cx_a.update(move |_| drop(project_a));
3509 client_a
3510 .user_store
3511 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3512 .await;
3513 client_b
3514 .user_store
3515 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3516 .await;
3517 client_c
3518 .user_store
3519 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3520 .await;
3521
3522 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3523 user_store
3524 .contacts()
3525 .iter()
3526 .map(|contact| {
3527 let worktrees = contact
3528 .projects
3529 .iter()
3530 .map(|p| {
3531 (
3532 p.worktree_root_names[0].as_str(),
3533 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3534 )
3535 })
3536 .collect();
3537 (contact.user.github_login.as_str(), worktrees)
3538 })
3539 .collect()
3540 }
3541 }
3542
3543 struct TestServer {
3544 peer: Arc<Peer>,
3545 app_state: Arc<AppState>,
3546 server: Arc<Server>,
3547 foreground: Rc<executor::Foreground>,
3548 notifications: mpsc::Receiver<()>,
3549 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3550 forbid_connections: Arc<AtomicBool>,
3551 _test_db: TestDb,
3552 }
3553
3554 impl TestServer {
3555 async fn start(foreground: Rc<executor::Foreground>) -> Self {
3556 let test_db = TestDb::new();
3557 let app_state = Self::build_app_state(&test_db).await;
3558 let peer = Peer::new();
3559 let notifications = mpsc::channel(128);
3560 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3561 Self {
3562 peer,
3563 app_state,
3564 server,
3565 foreground,
3566 notifications: notifications.1,
3567 connection_killers: Default::default(),
3568 forbid_connections: Default::default(),
3569 _test_db: test_db,
3570 }
3571 }
3572
3573 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3574 let http = FakeHttpClient::with_404_response();
3575 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3576 let client_name = name.to_string();
3577 let mut client = Client::new(http.clone());
3578 let server = self.server.clone();
3579 let connection_killers = self.connection_killers.clone();
3580 let forbid_connections = self.forbid_connections.clone();
3581 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3582
3583 Arc::get_mut(&mut client)
3584 .unwrap()
3585 .override_authenticate(move |cx| {
3586 cx.spawn(|_| async move {
3587 let access_token = "the-token".to_string();
3588 Ok(Credentials {
3589 user_id: user_id.0 as u64,
3590 access_token,
3591 })
3592 })
3593 })
3594 .override_establish_connection(move |credentials, cx| {
3595 assert_eq!(credentials.user_id, user_id.0 as u64);
3596 assert_eq!(credentials.access_token, "the-token");
3597
3598 let server = server.clone();
3599 let connection_killers = connection_killers.clone();
3600 let forbid_connections = forbid_connections.clone();
3601 let client_name = client_name.clone();
3602 let connection_id_tx = connection_id_tx.clone();
3603 cx.spawn(move |cx| async move {
3604 if forbid_connections.load(SeqCst) {
3605 Err(EstablishConnectionError::other(anyhow!(
3606 "server is forbidding connections"
3607 )))
3608 } else {
3609 let (client_conn, server_conn, kill_conn) =
3610 Connection::in_memory(cx.background());
3611 connection_killers.lock().insert(user_id, kill_conn);
3612 cx.background()
3613 .spawn(server.handle_connection(
3614 server_conn,
3615 client_name,
3616 user_id,
3617 Some(connection_id_tx),
3618 cx.background(),
3619 ))
3620 .detach();
3621 Ok(client_conn)
3622 }
3623 })
3624 });
3625
3626 client
3627 .authenticate_and_connect(&cx.to_async())
3628 .await
3629 .unwrap();
3630
3631 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3632 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3633 let mut authed_user =
3634 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3635 while authed_user.next().await.unwrap().is_none() {}
3636
3637 TestClient {
3638 client,
3639 peer_id,
3640 user_store,
3641 }
3642 }
3643
3644 fn disconnect_client(&self, user_id: UserId) {
3645 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3646 let _ = kill_conn.try_send(Some(()));
3647 }
3648 }
3649
3650 fn forbid_connections(&self) {
3651 self.forbid_connections.store(true, SeqCst);
3652 }
3653
3654 fn allow_connections(&self) {
3655 self.forbid_connections.store(false, SeqCst);
3656 }
3657
3658 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3659 let mut config = Config::default();
3660 config.session_secret = "a".repeat(32);
3661 config.database_url = test_db.url.clone();
3662 let github_client = github::AppClient::test();
3663 Arc::new(AppState {
3664 db: test_db.db().clone(),
3665 handlebars: Default::default(),
3666 auth_client: auth::build_client("", ""),
3667 repo_client: github::RepoClient::test(&github_client),
3668 github_client,
3669 config,
3670 })
3671 }
3672
3673 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3674 self.server.store.read()
3675 }
3676
3677 async fn condition<F>(&mut self, mut predicate: F)
3678 where
3679 F: FnMut(&Store) -> bool,
3680 {
3681 async_std::future::timeout(Duration::from_millis(500), async {
3682 while !(predicate)(&*self.server.store.read()) {
3683 self.foreground.start_waiting();
3684 self.notifications.next().await;
3685 self.foreground.finish_waiting();
3686 }
3687 })
3688 .await
3689 .expect("condition timed out");
3690 }
3691 }
3692
3693 impl Drop for TestServer {
3694 fn drop(&mut self) {
3695 self.peer.reset();
3696 }
3697 }
3698
3699 struct TestClient {
3700 client: Arc<Client>,
3701 pub peer_id: PeerId,
3702 pub user_store: ModelHandle<UserStore>,
3703 }
3704
3705 impl Deref for TestClient {
3706 type Target = Arc<Client>;
3707
3708 fn deref(&self) -> &Self::Target {
3709 &self.client
3710 }
3711 }
3712
3713 impl TestClient {
3714 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3715 UserId::from_proto(
3716 self.user_store
3717 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3718 )
3719 }
3720 }
3721
3722 impl Executor for Arc<gpui::executor::Background> {
3723 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
3724 self.spawn(future).detach();
3725 }
3726 }
3727
3728 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3729 channel
3730 .messages()
3731 .cursor::<()>()
3732 .map(|m| {
3733 (
3734 m.sender.github_login.clone(),
3735 m.body.clone(),
3736 m.is_pending(),
3737 )
3738 })
3739 .collect()
3740 }
3741
3742 struct EmptyView;
3743
3744 impl gpui::Entity for EmptyView {
3745 type Event = ();
3746 }
3747
3748 impl gpui::View for EmptyView {
3749 fn ui_name() -> &'static str {
3750 "empty view"
3751 }
3752
3753 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3754 gpui::Element::boxed(gpui::elements::Empty)
3755 }
3756 }
3757}