1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44pub trait Executor {
45 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
46}
47
48pub struct RealExecutor;
49
50const MESSAGE_COUNT_PER_PAGE: usize = 100;
51const MAX_MESSAGE_LEN: usize = 1024;
52
53impl Server {
54 pub fn new(
55 app_state: Arc<AppState>,
56 peer: Arc<Peer>,
57 notifications: Option<mpsc::Sender<()>>,
58 ) -> Arc<Self> {
59 let mut server = Self {
60 peer,
61 app_state,
62 store: Default::default(),
63 handlers: Default::default(),
64 notifications,
65 };
66
67 server
68 .add_request_handler(Server::ping)
69 .add_request_handler(Server::register_project)
70 .add_message_handler(Server::unregister_project)
71 .add_request_handler(Server::share_project)
72 .add_message_handler(Server::unshare_project)
73 .add_request_handler(Server::join_project)
74 .add_message_handler(Server::leave_project)
75 .add_request_handler(Server::register_worktree)
76 .add_message_handler(Server::unregister_worktree)
77 .add_request_handler(Server::share_worktree)
78 .add_message_handler(Server::update_worktree)
79 .add_message_handler(Server::update_diagnostic_summary)
80 .add_message_handler(Server::disk_based_diagnostics_updating)
81 .add_message_handler(Server::disk_based_diagnostics_updated)
82 .add_request_handler(Server::get_definition)
83 .add_request_handler(Server::open_buffer)
84 .add_message_handler(Server::close_buffer)
85 .add_request_handler(Server::update_buffer)
86 .add_message_handler(Server::update_buffer_file)
87 .add_message_handler(Server::buffer_reloaded)
88 .add_message_handler(Server::buffer_saved)
89 .add_request_handler(Server::save_buffer)
90 .add_request_handler(Server::format_buffers)
91 .add_request_handler(Server::get_completions)
92 .add_request_handler(Server::apply_additional_edits_for_completion)
93 .add_request_handler(Server::get_code_actions)
94 .add_request_handler(Server::apply_code_action)
95 .add_request_handler(Server::get_channels)
96 .add_request_handler(Server::get_users)
97 .add_request_handler(Server::join_channel)
98 .add_message_handler(Server::leave_channel)
99 .add_request_handler(Server::send_channel_message)
100 .add_request_handler(Server::get_channel_messages);
101
102 Arc::new(server)
103 }
104
105 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
106 where
107 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
108 Fut: 'static + Send + Future<Output = tide::Result<()>>,
109 M: EnvelopedMessage,
110 {
111 let prev_handler = self.handlers.insert(
112 TypeId::of::<M>(),
113 Box::new(move |server, envelope| {
114 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
115 (handler)(server, *envelope).boxed()
116 }),
117 );
118 if prev_handler.is_some() {
119 panic!("registered a handler for the same message twice");
120 }
121 self
122 }
123
124 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
125 where
126 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
127 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
128 M: RequestMessage,
129 {
130 self.add_message_handler(move |server, envelope| {
131 let receipt = envelope.receipt();
132 let response = (handler)(server.clone(), envelope);
133 async move {
134 match response.await {
135 Ok(response) => {
136 server.peer.respond(receipt, response)?;
137 Ok(())
138 }
139 Err(error) => {
140 server.peer.respond_with_error(
141 receipt,
142 proto::Error {
143 message: error.to_string(),
144 },
145 )?;
146 Err(error)
147 }
148 }
149 }
150 })
151 }
152
153 pub fn handle_connection<E: Executor>(
154 self: &Arc<Self>,
155 connection: Connection,
156 addr: String,
157 user_id: UserId,
158 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
159 executor: E,
160 ) -> impl Future<Output = ()> {
161 let mut this = self.clone();
162 async move {
163 let (connection_id, handle_io, mut incoming_rx) =
164 this.peer.add_connection(connection).await;
165
166 if let Some(send_connection_id) = send_connection_id.as_mut() {
167 let _ = send_connection_id.send(connection_id).await;
168 }
169
170 this.state_mut().add_connection(connection_id, user_id);
171 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
172 log::error!("error updating contacts for {:?}: {}", user_id, err);
173 }
174
175 let handle_io = handle_io.fuse();
176 futures::pin_mut!(handle_io);
177 loop {
178 let next_message = incoming_rx.next().fuse();
179 futures::pin_mut!(next_message);
180 futures::select_biased! {
181 result = handle_io => {
182 if let Err(err) = result {
183 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
184 }
185 break;
186 }
187 message = next_message => {
188 if let Some(message) = message {
189 let start_time = Instant::now();
190 let type_name = message.payload_type_name();
191 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
192 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
193 let handle_message = (handler)(this.clone(), message);
194 executor.spawn_detached(async move {
195 if let Err(err) = handle_message.await {
196 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
197 } else {
198 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
199 }
200 });
201 if let Some(mut notifications) = this.notifications.clone() {
202 let _ = notifications.send(()).await;
203 }
204 } else {
205 log::warn!("unhandled message: {}", type_name);
206 }
207 } else {
208 log::info!("rpc connection closed {:?}", addr);
209 break;
210 }
211 }
212 }
213 }
214
215 if let Err(err) = this.sign_out(connection_id).await {
216 log::error!("error signing out connection {:?} - {:?}", addr, err);
217 }
218 }
219 }
220
221 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
222 self.peer.disconnect(connection_id);
223 let removed_connection = self.state_mut().remove_connection(connection_id)?;
224
225 for (project_id, project) in removed_connection.hosted_projects {
226 if let Some(share) = project.share {
227 broadcast(
228 connection_id,
229 share.guests.keys().copied().collect(),
230 |conn_id| {
231 self.peer
232 .send(conn_id, proto::UnshareProject { project_id })
233 },
234 )?;
235 }
236 }
237
238 for (project_id, peer_ids) in removed_connection.guest_project_ids {
239 broadcast(connection_id, peer_ids, |conn_id| {
240 self.peer.send(
241 conn_id,
242 proto::RemoveProjectCollaborator {
243 project_id,
244 peer_id: connection_id.0,
245 },
246 )
247 })?;
248 }
249
250 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
251 Ok(())
252 }
253
254 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
255 Ok(proto::Ack {})
256 }
257
258 async fn register_project(
259 mut self: Arc<Server>,
260 request: TypedEnvelope<proto::RegisterProject>,
261 ) -> tide::Result<proto::RegisterProjectResponse> {
262 let project_id = {
263 let mut state = self.state_mut();
264 let user_id = state.user_id_for_connection(request.sender_id)?;
265 state.register_project(request.sender_id, user_id)
266 };
267 Ok(proto::RegisterProjectResponse { project_id })
268 }
269
270 async fn unregister_project(
271 mut self: Arc<Server>,
272 request: TypedEnvelope<proto::UnregisterProject>,
273 ) -> tide::Result<()> {
274 let project = self
275 .state_mut()
276 .unregister_project(request.payload.project_id, request.sender_id)?;
277 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
278 Ok(())
279 }
280
281 async fn share_project(
282 mut self: Arc<Server>,
283 request: TypedEnvelope<proto::ShareProject>,
284 ) -> tide::Result<proto::Ack> {
285 self.state_mut()
286 .share_project(request.payload.project_id, request.sender_id);
287 Ok(proto::Ack {})
288 }
289
290 async fn unshare_project(
291 mut self: Arc<Server>,
292 request: TypedEnvelope<proto::UnshareProject>,
293 ) -> tide::Result<()> {
294 let project_id = request.payload.project_id;
295 let project = self
296 .state_mut()
297 .unshare_project(project_id, request.sender_id)?;
298
299 broadcast(request.sender_id, project.connection_ids, |conn_id| {
300 self.peer
301 .send(conn_id, proto::UnshareProject { project_id })
302 })?;
303 self.update_contacts_for_users(&project.authorized_user_ids)?;
304 Ok(())
305 }
306
307 async fn join_project(
308 mut self: Arc<Server>,
309 request: TypedEnvelope<proto::JoinProject>,
310 ) -> tide::Result<proto::JoinProjectResponse> {
311 let project_id = request.payload.project_id;
312
313 let user_id = self.state().user_id_for_connection(request.sender_id)?;
314 let (response, connection_ids, contact_user_ids) = self
315 .state_mut()
316 .join_project(request.sender_id, user_id, project_id)
317 .and_then(|joined| {
318 let share = joined.project.share()?;
319 let peer_count = share.guests.len();
320 let mut collaborators = Vec::with_capacity(peer_count);
321 collaborators.push(proto::Collaborator {
322 peer_id: joined.project.host_connection_id.0,
323 replica_id: 0,
324 user_id: joined.project.host_user_id.to_proto(),
325 });
326 let worktrees = joined
327 .project
328 .worktrees
329 .iter()
330 .filter_map(|(id, worktree)| {
331 worktree.share.as_ref().map(|share| proto::Worktree {
332 id: *id,
333 root_name: worktree.root_name.clone(),
334 entries: share.entries.values().cloned().collect(),
335 diagnostic_summaries: share
336 .diagnostic_summaries
337 .values()
338 .cloned()
339 .collect(),
340 weak: worktree.weak,
341 })
342 })
343 .collect();
344 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
345 if *peer_conn_id != request.sender_id {
346 collaborators.push(proto::Collaborator {
347 peer_id: peer_conn_id.0,
348 replica_id: *peer_replica_id as u32,
349 user_id: peer_user_id.to_proto(),
350 });
351 }
352 }
353 let response = proto::JoinProjectResponse {
354 worktrees,
355 replica_id: joined.replica_id as u32,
356 collaborators,
357 };
358 let connection_ids = joined.project.connection_ids();
359 let contact_user_ids = joined.project.authorized_user_ids();
360 Ok((response, connection_ids, contact_user_ids))
361 })?;
362
363 broadcast(request.sender_id, connection_ids, |conn_id| {
364 self.peer.send(
365 conn_id,
366 proto::AddProjectCollaborator {
367 project_id,
368 collaborator: Some(proto::Collaborator {
369 peer_id: request.sender_id.0,
370 replica_id: response.replica_id,
371 user_id: user_id.to_proto(),
372 }),
373 },
374 )
375 })?;
376 self.update_contacts_for_users(&contact_user_ids)?;
377 Ok(response)
378 }
379
380 async fn leave_project(
381 mut self: Arc<Server>,
382 request: TypedEnvelope<proto::LeaveProject>,
383 ) -> tide::Result<()> {
384 let sender_id = request.sender_id;
385 let project_id = request.payload.project_id;
386 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
387
388 broadcast(sender_id, worktree.connection_ids, |conn_id| {
389 self.peer.send(
390 conn_id,
391 proto::RemoveProjectCollaborator {
392 project_id,
393 peer_id: sender_id.0,
394 },
395 )
396 })?;
397 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
398
399 Ok(())
400 }
401
402 async fn register_worktree(
403 mut self: Arc<Server>,
404 request: TypedEnvelope<proto::RegisterWorktree>,
405 ) -> tide::Result<proto::Ack> {
406 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
407
408 let mut contact_user_ids = HashSet::default();
409 contact_user_ids.insert(host_user_id);
410 for github_login in request.payload.authorized_logins {
411 let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
412 contact_user_ids.insert(contact_user_id);
413 }
414
415 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
416 self.state_mut().register_worktree(
417 request.payload.project_id,
418 request.payload.worktree_id,
419 request.sender_id,
420 Worktree {
421 authorized_user_ids: contact_user_ids.clone(),
422 root_name: request.payload.root_name,
423 share: None,
424 weak: false,
425 },
426 )?;
427 self.update_contacts_for_users(&contact_user_ids)?;
428 Ok(proto::Ack {})
429 }
430
431 async fn unregister_worktree(
432 mut self: Arc<Server>,
433 request: TypedEnvelope<proto::UnregisterWorktree>,
434 ) -> tide::Result<()> {
435 let project_id = request.payload.project_id;
436 let worktree_id = request.payload.worktree_id;
437 let (worktree, guest_connection_ids) =
438 self.state_mut()
439 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
440 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
441 self.peer.send(
442 conn_id,
443 proto::UnregisterWorktree {
444 project_id,
445 worktree_id,
446 },
447 )
448 })?;
449 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
450 Ok(())
451 }
452
453 async fn share_worktree(
454 mut self: Arc<Server>,
455 mut request: TypedEnvelope<proto::ShareWorktree>,
456 ) -> tide::Result<proto::Ack> {
457 let worktree = request
458 .payload
459 .worktree
460 .as_mut()
461 .ok_or_else(|| anyhow!("missing worktree"))?;
462 let entries = worktree
463 .entries
464 .iter()
465 .map(|entry| (entry.id, entry.clone()))
466 .collect();
467 let diagnostic_summaries = worktree
468 .diagnostic_summaries
469 .iter()
470 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
471 .collect();
472
473 let shared_worktree = self.state_mut().share_worktree(
474 request.payload.project_id,
475 worktree.id,
476 request.sender_id,
477 entries,
478 diagnostic_summaries,
479 )?;
480
481 broadcast(
482 request.sender_id,
483 shared_worktree.connection_ids,
484 |connection_id| {
485 self.peer
486 .forward_send(request.sender_id, connection_id, request.payload.clone())
487 },
488 )?;
489 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
490
491 Ok(proto::Ack {})
492 }
493
494 async fn update_worktree(
495 mut self: Arc<Server>,
496 request: TypedEnvelope<proto::UpdateWorktree>,
497 ) -> tide::Result<()> {
498 let connection_ids = self.state_mut().update_worktree(
499 request.sender_id,
500 request.payload.project_id,
501 request.payload.worktree_id,
502 &request.payload.removed_entries,
503 &request.payload.updated_entries,
504 )?;
505
506 broadcast(request.sender_id, connection_ids, |connection_id| {
507 self.peer
508 .forward_send(request.sender_id, connection_id, request.payload.clone())
509 })?;
510
511 Ok(())
512 }
513
514 async fn update_diagnostic_summary(
515 mut self: Arc<Server>,
516 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
517 ) -> tide::Result<()> {
518 let summary = request
519 .payload
520 .summary
521 .clone()
522 .ok_or_else(|| anyhow!("invalid summary"))?;
523 let receiver_ids = self.state_mut().update_diagnostic_summary(
524 request.payload.project_id,
525 request.payload.worktree_id,
526 request.sender_id,
527 summary,
528 )?;
529
530 broadcast(request.sender_id, receiver_ids, |connection_id| {
531 self.peer
532 .forward_send(request.sender_id, connection_id, request.payload.clone())
533 })?;
534 Ok(())
535 }
536
537 async fn disk_based_diagnostics_updating(
538 self: Arc<Server>,
539 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
540 ) -> tide::Result<()> {
541 let receiver_ids = self
542 .state()
543 .project_connection_ids(request.payload.project_id, request.sender_id)?;
544 broadcast(request.sender_id, receiver_ids, |connection_id| {
545 self.peer
546 .forward_send(request.sender_id, connection_id, request.payload.clone())
547 })?;
548 Ok(())
549 }
550
551 async fn disk_based_diagnostics_updated(
552 self: Arc<Server>,
553 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
554 ) -> tide::Result<()> {
555 let receiver_ids = self
556 .state()
557 .project_connection_ids(request.payload.project_id, request.sender_id)?;
558 broadcast(request.sender_id, receiver_ids, |connection_id| {
559 self.peer
560 .forward_send(request.sender_id, connection_id, request.payload.clone())
561 })?;
562 Ok(())
563 }
564
565 async fn get_definition(
566 self: Arc<Server>,
567 request: TypedEnvelope<proto::GetDefinition>,
568 ) -> tide::Result<proto::GetDefinitionResponse> {
569 let host_connection_id = self
570 .state()
571 .read_project(request.payload.project_id, request.sender_id)?
572 .host_connection_id;
573 Ok(self
574 .peer
575 .forward_request(request.sender_id, host_connection_id, request.payload)
576 .await?)
577 }
578
579 async fn open_buffer(
580 self: Arc<Server>,
581 request: TypedEnvelope<proto::OpenBuffer>,
582 ) -> tide::Result<proto::OpenBufferResponse> {
583 let host_connection_id = self
584 .state()
585 .read_project(request.payload.project_id, request.sender_id)?
586 .host_connection_id;
587 Ok(self
588 .peer
589 .forward_request(request.sender_id, host_connection_id, request.payload)
590 .await?)
591 }
592
593 async fn close_buffer(
594 self: Arc<Server>,
595 request: TypedEnvelope<proto::CloseBuffer>,
596 ) -> tide::Result<()> {
597 let host_connection_id = self
598 .state()
599 .read_project(request.payload.project_id, request.sender_id)?
600 .host_connection_id;
601 self.peer
602 .forward_send(request.sender_id, host_connection_id, request.payload)?;
603 Ok(())
604 }
605
606 async fn save_buffer(
607 self: Arc<Server>,
608 request: TypedEnvelope<proto::SaveBuffer>,
609 ) -> tide::Result<proto::BufferSaved> {
610 let host;
611 let mut guests;
612 {
613 let state = self.state();
614 let project = state.read_project(request.payload.project_id, request.sender_id)?;
615 host = project.host_connection_id;
616 guests = project.guest_connection_ids()
617 }
618
619 let response = self
620 .peer
621 .forward_request(request.sender_id, host, request.payload.clone())
622 .await?;
623
624 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
625 broadcast(host, guests, |conn_id| {
626 self.peer.forward_send(host, conn_id, response.clone())
627 })?;
628
629 Ok(response)
630 }
631
632 async fn format_buffers(
633 self: Arc<Server>,
634 request: TypedEnvelope<proto::FormatBuffers>,
635 ) -> tide::Result<proto::FormatBuffersResponse> {
636 let host = self
637 .state()
638 .read_project(request.payload.project_id, request.sender_id)?
639 .host_connection_id;
640 Ok(self
641 .peer
642 .forward_request(request.sender_id, host, request.payload.clone())
643 .await?)
644 }
645
646 async fn get_completions(
647 self: Arc<Server>,
648 request: TypedEnvelope<proto::GetCompletions>,
649 ) -> tide::Result<proto::GetCompletionsResponse> {
650 let host = self
651 .state()
652 .read_project(request.payload.project_id, request.sender_id)?
653 .host_connection_id;
654 Ok(self
655 .peer
656 .forward_request(request.sender_id, host, request.payload.clone())
657 .await?)
658 }
659
660 async fn apply_additional_edits_for_completion(
661 self: Arc<Server>,
662 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
663 ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
664 let host = self
665 .state()
666 .read_project(request.payload.project_id, request.sender_id)?
667 .host_connection_id;
668 Ok(self
669 .peer
670 .forward_request(request.sender_id, host, request.payload.clone())
671 .await?)
672 }
673
674 async fn get_code_actions(
675 self: Arc<Server>,
676 request: TypedEnvelope<proto::GetCodeActions>,
677 ) -> tide::Result<proto::GetCodeActionsResponse> {
678 let host = self
679 .state()
680 .read_project(request.payload.project_id, request.sender_id)?
681 .host_connection_id;
682 Ok(self
683 .peer
684 .forward_request(request.sender_id, host, request.payload.clone())
685 .await?)
686 }
687
688 async fn apply_code_action(
689 self: Arc<Server>,
690 request: TypedEnvelope<proto::ApplyCodeAction>,
691 ) -> tide::Result<proto::ApplyCodeActionResponse> {
692 let host = self
693 .state()
694 .read_project(request.payload.project_id, request.sender_id)?
695 .host_connection_id;
696 Ok(self
697 .peer
698 .forward_request(request.sender_id, host, request.payload.clone())
699 .await?)
700 }
701
702 async fn update_buffer(
703 self: Arc<Server>,
704 request: TypedEnvelope<proto::UpdateBuffer>,
705 ) -> tide::Result<proto::Ack> {
706 let receiver_ids = self
707 .state()
708 .project_connection_ids(request.payload.project_id, request.sender_id)?;
709 broadcast(request.sender_id, receiver_ids, |connection_id| {
710 self.peer
711 .forward_send(request.sender_id, connection_id, request.payload.clone())
712 })?;
713 Ok(proto::Ack {})
714 }
715
716 async fn update_buffer_file(
717 self: Arc<Server>,
718 request: TypedEnvelope<proto::UpdateBufferFile>,
719 ) -> tide::Result<()> {
720 let receiver_ids = self
721 .state()
722 .project_connection_ids(request.payload.project_id, request.sender_id)?;
723 broadcast(request.sender_id, receiver_ids, |connection_id| {
724 self.peer
725 .forward_send(request.sender_id, connection_id, request.payload.clone())
726 })?;
727 Ok(())
728 }
729
730 async fn buffer_reloaded(
731 self: Arc<Server>,
732 request: TypedEnvelope<proto::BufferReloaded>,
733 ) -> tide::Result<()> {
734 let receiver_ids = self
735 .state()
736 .project_connection_ids(request.payload.project_id, request.sender_id)?;
737 broadcast(request.sender_id, receiver_ids, |connection_id| {
738 self.peer
739 .forward_send(request.sender_id, connection_id, request.payload.clone())
740 })?;
741 Ok(())
742 }
743
744 async fn buffer_saved(
745 self: Arc<Server>,
746 request: TypedEnvelope<proto::BufferSaved>,
747 ) -> tide::Result<()> {
748 let receiver_ids = self
749 .state()
750 .project_connection_ids(request.payload.project_id, request.sender_id)?;
751 broadcast(request.sender_id, receiver_ids, |connection_id| {
752 self.peer
753 .forward_send(request.sender_id, connection_id, request.payload.clone())
754 })?;
755 Ok(())
756 }
757
758 async fn get_channels(
759 self: Arc<Server>,
760 request: TypedEnvelope<proto::GetChannels>,
761 ) -> tide::Result<proto::GetChannelsResponse> {
762 let user_id = self.state().user_id_for_connection(request.sender_id)?;
763 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
764 Ok(proto::GetChannelsResponse {
765 channels: channels
766 .into_iter()
767 .map(|chan| proto::Channel {
768 id: chan.id.to_proto(),
769 name: chan.name,
770 })
771 .collect(),
772 })
773 }
774
775 async fn get_users(
776 self: Arc<Server>,
777 request: TypedEnvelope<proto::GetUsers>,
778 ) -> tide::Result<proto::GetUsersResponse> {
779 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
780 let users = self
781 .app_state
782 .db
783 .get_users_by_ids(user_ids)
784 .await?
785 .into_iter()
786 .map(|user| proto::User {
787 id: user.id.to_proto(),
788 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
789 github_login: user.github_login,
790 })
791 .collect();
792 Ok(proto::GetUsersResponse { users })
793 }
794
795 fn update_contacts_for_users<'a>(
796 self: &Arc<Server>,
797 user_ids: impl IntoIterator<Item = &'a UserId>,
798 ) -> anyhow::Result<()> {
799 let mut result = Ok(());
800 let state = self.state();
801 for user_id in user_ids {
802 let contacts = state.contacts_for_user(*user_id);
803 for connection_id in state.connection_ids_for_user(*user_id) {
804 if let Err(error) = self.peer.send(
805 connection_id,
806 proto::UpdateContacts {
807 contacts: contacts.clone(),
808 },
809 ) {
810 result = Err(error);
811 }
812 }
813 }
814 result
815 }
816
817 async fn join_channel(
818 mut self: Arc<Self>,
819 request: TypedEnvelope<proto::JoinChannel>,
820 ) -> tide::Result<proto::JoinChannelResponse> {
821 let user_id = self.state().user_id_for_connection(request.sender_id)?;
822 let channel_id = ChannelId::from_proto(request.payload.channel_id);
823 if !self
824 .app_state
825 .db
826 .can_user_access_channel(user_id, channel_id)
827 .await?
828 {
829 Err(anyhow!("access denied"))?;
830 }
831
832 self.state_mut().join_channel(request.sender_id, channel_id);
833 let messages = self
834 .app_state
835 .db
836 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
837 .await?
838 .into_iter()
839 .map(|msg| proto::ChannelMessage {
840 id: msg.id.to_proto(),
841 body: msg.body,
842 timestamp: msg.sent_at.unix_timestamp() as u64,
843 sender_id: msg.sender_id.to_proto(),
844 nonce: Some(msg.nonce.as_u128().into()),
845 })
846 .collect::<Vec<_>>();
847 Ok(proto::JoinChannelResponse {
848 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
849 messages,
850 })
851 }
852
853 async fn leave_channel(
854 mut self: Arc<Self>,
855 request: TypedEnvelope<proto::LeaveChannel>,
856 ) -> tide::Result<()> {
857 let user_id = self.state().user_id_for_connection(request.sender_id)?;
858 let channel_id = ChannelId::from_proto(request.payload.channel_id);
859 if !self
860 .app_state
861 .db
862 .can_user_access_channel(user_id, channel_id)
863 .await?
864 {
865 Err(anyhow!("access denied"))?;
866 }
867
868 self.state_mut()
869 .leave_channel(request.sender_id, channel_id);
870
871 Ok(())
872 }
873
874 async fn send_channel_message(
875 self: Arc<Self>,
876 request: TypedEnvelope<proto::SendChannelMessage>,
877 ) -> tide::Result<proto::SendChannelMessageResponse> {
878 let channel_id = ChannelId::from_proto(request.payload.channel_id);
879 let user_id;
880 let connection_ids;
881 {
882 let state = self.state();
883 user_id = state.user_id_for_connection(request.sender_id)?;
884 connection_ids = state.channel_connection_ids(channel_id)?;
885 }
886
887 // Validate the message body.
888 let body = request.payload.body.trim().to_string();
889 if body.len() > MAX_MESSAGE_LEN {
890 return Err(anyhow!("message is too long"))?;
891 }
892 if body.is_empty() {
893 return Err(anyhow!("message can't be blank"))?;
894 }
895
896 let timestamp = OffsetDateTime::now_utc();
897 let nonce = request
898 .payload
899 .nonce
900 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
901
902 let message_id = self
903 .app_state
904 .db
905 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
906 .await?
907 .to_proto();
908 let message = proto::ChannelMessage {
909 sender_id: user_id.to_proto(),
910 id: message_id,
911 body,
912 timestamp: timestamp.unix_timestamp() as u64,
913 nonce: Some(nonce),
914 };
915 broadcast(request.sender_id, connection_ids, |conn_id| {
916 self.peer.send(
917 conn_id,
918 proto::ChannelMessageSent {
919 channel_id: channel_id.to_proto(),
920 message: Some(message.clone()),
921 },
922 )
923 })?;
924 Ok(proto::SendChannelMessageResponse {
925 message: Some(message),
926 })
927 }
928
929 async fn get_channel_messages(
930 self: Arc<Self>,
931 request: TypedEnvelope<proto::GetChannelMessages>,
932 ) -> tide::Result<proto::GetChannelMessagesResponse> {
933 let user_id = self.state().user_id_for_connection(request.sender_id)?;
934 let channel_id = ChannelId::from_proto(request.payload.channel_id);
935 if !self
936 .app_state
937 .db
938 .can_user_access_channel(user_id, channel_id)
939 .await?
940 {
941 Err(anyhow!("access denied"))?;
942 }
943
944 let messages = self
945 .app_state
946 .db
947 .get_channel_messages(
948 channel_id,
949 MESSAGE_COUNT_PER_PAGE,
950 Some(MessageId::from_proto(request.payload.before_message_id)),
951 )
952 .await?
953 .into_iter()
954 .map(|msg| proto::ChannelMessage {
955 id: msg.id.to_proto(),
956 body: msg.body,
957 timestamp: msg.sent_at.unix_timestamp() as u64,
958 sender_id: msg.sender_id.to_proto(),
959 nonce: Some(msg.nonce.as_u128().into()),
960 })
961 .collect::<Vec<_>>();
962
963 Ok(proto::GetChannelMessagesResponse {
964 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
965 messages,
966 })
967 }
968
969 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
970 self.store.read()
971 }
972
973 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
974 self.store.write()
975 }
976}
977
978impl Executor for RealExecutor {
979 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
980 task::spawn(future);
981 }
982}
983
984fn broadcast<F>(
985 sender_id: ConnectionId,
986 receiver_ids: Vec<ConnectionId>,
987 mut f: F,
988) -> anyhow::Result<()>
989where
990 F: FnMut(ConnectionId) -> anyhow::Result<()>,
991{
992 let mut result = Ok(());
993 for receiver_id in receiver_ids {
994 if receiver_id != sender_id {
995 if let Err(error) = f(receiver_id) {
996 if result.is_ok() {
997 result = Err(error);
998 }
999 }
1000 }
1001 }
1002 result
1003}
1004
1005pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1006 let server = Server::new(app.state().clone(), rpc.clone(), None);
1007 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1008 let server = server.clone();
1009 async move {
1010 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1011
1012 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1013 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1014 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1015 let client_protocol_version: Option<u32> = request
1016 .header("X-Zed-Protocol-Version")
1017 .and_then(|v| v.as_str().parse().ok());
1018
1019 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1020 return Ok(Response::new(StatusCode::UpgradeRequired));
1021 }
1022
1023 let header = match request.header("Sec-Websocket-Key") {
1024 Some(h) => h.as_str(),
1025 None => return Err(anyhow!("expected sec-websocket-key"))?,
1026 };
1027
1028 let user_id = process_auth_header(&request).await?;
1029
1030 let mut response = Response::new(StatusCode::SwitchingProtocols);
1031 response.insert_header(UPGRADE, "websocket");
1032 response.insert_header(CONNECTION, "Upgrade");
1033 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1034 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1035 response.insert_header("Sec-Websocket-Version", "13");
1036
1037 let http_res: &mut tide::http::Response = response.as_mut();
1038 let upgrade_receiver = http_res.recv_upgrade().await;
1039 let addr = request.remote().unwrap_or("unknown").to_string();
1040 task::spawn(async move {
1041 if let Some(stream) = upgrade_receiver.await {
1042 server
1043 .handle_connection(
1044 Connection::new(
1045 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1046 ),
1047 addr,
1048 user_id,
1049 None,
1050 RealExecutor,
1051 )
1052 .await;
1053 }
1054 });
1055
1056 Ok(response)
1057 }
1058 });
1059}
1060
1061fn header_contains_ignore_case<T>(
1062 request: &tide::Request<T>,
1063 header_name: HeaderName,
1064 value: &str,
1065) -> bool {
1066 request
1067 .header(header_name)
1068 .map(|h| {
1069 h.as_str()
1070 .split(',')
1071 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1072 })
1073 .unwrap_or(false)
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078 use super::*;
1079 use crate::{
1080 auth,
1081 db::{tests::TestDb, UserId},
1082 github, AppState, Config,
1083 };
1084 use ::rpc::Peer;
1085 use async_std::task;
1086 use gpui::{executor, ModelHandle, TestAppContext};
1087 use parking_lot::Mutex;
1088 use postage::{mpsc, watch};
1089 use rand::prelude::*;
1090 use rpc::PeerId;
1091 use serde_json::json;
1092 use sqlx::types::time::OffsetDateTime;
1093 use std::{
1094 ops::Deref,
1095 path::Path,
1096 rc::Rc,
1097 sync::{
1098 atomic::{AtomicBool, Ordering::SeqCst},
1099 Arc,
1100 },
1101 time::Duration,
1102 };
1103 use zed::{
1104 client::{
1105 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1106 EstablishConnectionError, UserStore,
1107 },
1108 editor::{
1109 self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1110 Redo, ToggleCodeActions, Undo,
1111 },
1112 fs::{FakeFs, Fs as _},
1113 language::{
1114 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1115 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1116 },
1117 lsp,
1118 project::{worktree::WorktreeHandle, DiagnosticSummary, Project, ProjectPath},
1119 workspace::{Workspace, WorkspaceParams},
1120 };
1121
1122 #[cfg(test)]
1123 #[ctor::ctor]
1124 fn init_logger() {
1125 if std::env::var("RUST_LOG").is_ok() {
1126 env_logger::init();
1127 }
1128 }
1129
1130 #[gpui::test(iterations = 10)]
1131 async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1132 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1133 let lang_registry = Arc::new(LanguageRegistry::new());
1134 let fs = Arc::new(FakeFs::new(cx_a.background()));
1135 cx_a.foreground().forbid_parking();
1136
1137 // Connect to a server as 2 clients.
1138 let mut server = TestServer::start(cx_a.foreground()).await;
1139 let client_a = server.create_client(&mut cx_a, "user_a").await;
1140 let client_b = server.create_client(&mut cx_b, "user_b").await;
1141
1142 // Share a project as client A
1143 fs.insert_tree(
1144 "/a",
1145 json!({
1146 ".zed.toml": r#"collaborators = ["user_b"]"#,
1147 "a.txt": "a-contents",
1148 "b.txt": "b-contents",
1149 }),
1150 )
1151 .await;
1152 let project_a = cx_a.update(|cx| {
1153 Project::local(
1154 client_a.clone(),
1155 client_a.user_store.clone(),
1156 lang_registry.clone(),
1157 fs.clone(),
1158 cx,
1159 )
1160 });
1161 let (worktree_a, _) = project_a
1162 .update(&mut cx_a, |p, cx| {
1163 p.find_or_create_local_worktree("/a", false, cx)
1164 })
1165 .await
1166 .unwrap();
1167 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1168 worktree_a
1169 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1170 .await;
1171 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1172 project_a
1173 .update(&mut cx_a, |p, cx| p.share(cx))
1174 .await
1175 .unwrap();
1176
1177 // Join that project as client B
1178 let project_b = Project::remote(
1179 project_id,
1180 client_b.clone(),
1181 client_b.user_store.clone(),
1182 lang_registry.clone(),
1183 fs.clone(),
1184 &mut cx_b.to_async(),
1185 )
1186 .await
1187 .unwrap();
1188
1189 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1190 assert_eq!(
1191 project
1192 .collaborators()
1193 .get(&client_a.peer_id)
1194 .unwrap()
1195 .user
1196 .github_login,
1197 "user_a"
1198 );
1199 project.replica_id()
1200 });
1201 project_a
1202 .condition(&cx_a, |tree, _| {
1203 tree.collaborators()
1204 .get(&client_b.peer_id)
1205 .map_or(false, |collaborator| {
1206 collaborator.replica_id == replica_id_b
1207 && collaborator.user.github_login == "user_b"
1208 })
1209 })
1210 .await;
1211
1212 // Open the same file as client B and client A.
1213 let buffer_b = project_b
1214 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1215 .await
1216 .unwrap();
1217 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1218 buffer_b.read_with(&cx_b, |buf, cx| {
1219 assert_eq!(buf.read(cx).text(), "b-contents")
1220 });
1221 project_a.read_with(&cx_a, |project, cx| {
1222 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1223 });
1224 let buffer_a = project_a
1225 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1226 .await
1227 .unwrap();
1228
1229 let editor_b = cx_b.add_view(window_b, |cx| {
1230 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1231 });
1232
1233 // TODO
1234 // // Create a selection set as client B and see that selection set as client A.
1235 // buffer_a
1236 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1237 // .await;
1238
1239 // Edit the buffer as client B and see that edit as client A.
1240 editor_b.update(&mut cx_b, |editor, cx| {
1241 editor.handle_input(&Input("ok, ".into()), cx)
1242 });
1243 buffer_a
1244 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1245 .await;
1246
1247 // TODO
1248 // // Remove the selection set as client B, see those selections disappear as client A.
1249 cx_b.update(move |_| drop(editor_b));
1250 // buffer_a
1251 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1252 // .await;
1253
1254 // Close the buffer as client A, see that the buffer is closed.
1255 cx_a.update(move |_| drop(buffer_a));
1256 project_a
1257 .condition(&cx_a, |project, cx| {
1258 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1259 })
1260 .await;
1261
1262 // Dropping the client B's project removes client B from client A's collaborators.
1263 cx_b.update(move |_| drop(project_b));
1264 project_a
1265 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1266 .await;
1267 }
1268
1269 #[gpui::test(iterations = 10)]
1270 async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1271 let lang_registry = Arc::new(LanguageRegistry::new());
1272 let fs = Arc::new(FakeFs::new(cx_a.background()));
1273 cx_a.foreground().forbid_parking();
1274
1275 // Connect to a server as 2 clients.
1276 let mut server = TestServer::start(cx_a.foreground()).await;
1277 let client_a = server.create_client(&mut cx_a, "user_a").await;
1278 let client_b = server.create_client(&mut cx_b, "user_b").await;
1279
1280 // Share a project as client A
1281 fs.insert_tree(
1282 "/a",
1283 json!({
1284 ".zed.toml": r#"collaborators = ["user_b"]"#,
1285 "a.txt": "a-contents",
1286 "b.txt": "b-contents",
1287 }),
1288 )
1289 .await;
1290 let project_a = cx_a.update(|cx| {
1291 Project::local(
1292 client_a.clone(),
1293 client_a.user_store.clone(),
1294 lang_registry.clone(),
1295 fs.clone(),
1296 cx,
1297 )
1298 });
1299 let (worktree_a, _) = project_a
1300 .update(&mut cx_a, |p, cx| {
1301 p.find_or_create_local_worktree("/a", false, cx)
1302 })
1303 .await
1304 .unwrap();
1305 worktree_a
1306 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1307 .await;
1308 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1309 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1310 project_a
1311 .update(&mut cx_a, |p, cx| p.share(cx))
1312 .await
1313 .unwrap();
1314 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1315
1316 // Join that project as client B
1317 let project_b = Project::remote(
1318 project_id,
1319 client_b.clone(),
1320 client_b.user_store.clone(),
1321 lang_registry.clone(),
1322 fs.clone(),
1323 &mut cx_b.to_async(),
1324 )
1325 .await
1326 .unwrap();
1327 project_b
1328 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1329 .await
1330 .unwrap();
1331
1332 // Unshare the project as client A
1333 project_a
1334 .update(&mut cx_a, |project, cx| project.unshare(cx))
1335 .await
1336 .unwrap();
1337 project_b
1338 .condition(&mut cx_b, |project, _| project.is_read_only())
1339 .await;
1340 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1341 drop(project_b);
1342
1343 // Share the project again and ensure guests can still join.
1344 project_a
1345 .update(&mut cx_a, |project, cx| project.share(cx))
1346 .await
1347 .unwrap();
1348 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1349
1350 let project_c = Project::remote(
1351 project_id,
1352 client_b.clone(),
1353 client_b.user_store.clone(),
1354 lang_registry.clone(),
1355 fs.clone(),
1356 &mut cx_b.to_async(),
1357 )
1358 .await
1359 .unwrap();
1360 project_c
1361 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1362 .await
1363 .unwrap();
1364 }
1365
1366 #[gpui::test(iterations = 10)]
1367 async fn test_propagate_saves_and_fs_changes(
1368 mut cx_a: TestAppContext,
1369 mut cx_b: TestAppContext,
1370 mut cx_c: TestAppContext,
1371 ) {
1372 let lang_registry = Arc::new(LanguageRegistry::new());
1373 let fs = Arc::new(FakeFs::new(cx_a.background()));
1374 cx_a.foreground().forbid_parking();
1375
1376 // Connect to a server as 3 clients.
1377 let mut server = TestServer::start(cx_a.foreground()).await;
1378 let client_a = server.create_client(&mut cx_a, "user_a").await;
1379 let client_b = server.create_client(&mut cx_b, "user_b").await;
1380 let client_c = server.create_client(&mut cx_c, "user_c").await;
1381
1382 // Share a worktree as client A.
1383 fs.insert_tree(
1384 "/a",
1385 json!({
1386 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1387 "file1": "",
1388 "file2": ""
1389 }),
1390 )
1391 .await;
1392 let project_a = cx_a.update(|cx| {
1393 Project::local(
1394 client_a.clone(),
1395 client_a.user_store.clone(),
1396 lang_registry.clone(),
1397 fs.clone(),
1398 cx,
1399 )
1400 });
1401 let (worktree_a, _) = project_a
1402 .update(&mut cx_a, |p, cx| {
1403 p.find_or_create_local_worktree("/a", false, cx)
1404 })
1405 .await
1406 .unwrap();
1407 worktree_a
1408 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1409 .await;
1410 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1411 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1412 project_a
1413 .update(&mut cx_a, |p, cx| p.share(cx))
1414 .await
1415 .unwrap();
1416
1417 // Join that worktree as clients B and C.
1418 let project_b = Project::remote(
1419 project_id,
1420 client_b.clone(),
1421 client_b.user_store.clone(),
1422 lang_registry.clone(),
1423 fs.clone(),
1424 &mut cx_b.to_async(),
1425 )
1426 .await
1427 .unwrap();
1428 let project_c = Project::remote(
1429 project_id,
1430 client_c.clone(),
1431 client_c.user_store.clone(),
1432 lang_registry.clone(),
1433 fs.clone(),
1434 &mut cx_c.to_async(),
1435 )
1436 .await
1437 .unwrap();
1438 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1439 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1440
1441 // Open and edit a buffer as both guests B and C.
1442 let buffer_b = project_b
1443 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1444 .await
1445 .unwrap();
1446 let buffer_c = project_c
1447 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1448 .await
1449 .unwrap();
1450 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1451 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1452
1453 // Open and edit that buffer as the host.
1454 let buffer_a = project_a
1455 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1456 .await
1457 .unwrap();
1458
1459 buffer_a
1460 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1461 .await;
1462 buffer_a.update(&mut cx_a, |buf, cx| {
1463 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1464 });
1465
1466 // Wait for edits to propagate
1467 buffer_a
1468 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1469 .await;
1470 buffer_b
1471 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1472 .await;
1473 buffer_c
1474 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1475 .await;
1476
1477 // Edit the buffer as the host and concurrently save as guest B.
1478 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1479 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1480 save_b.await.unwrap();
1481 assert_eq!(
1482 fs.load("/a/file1".as_ref()).await.unwrap(),
1483 "hi-a, i-am-c, i-am-b, i-am-a"
1484 );
1485 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1486 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1487 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1488
1489 // Ensure worktree observes a/file1's change event *before* the rename occurs, otherwise
1490 // when interpreting the change event it will mistakenly think that the file has been
1491 // deleted (because its path has changed) and will subsequently fail to detect the rename.
1492 worktree_a.flush_fs_events(&cx_a).await;
1493
1494 // Make changes on host's file system, see those changes on guest worktrees.
1495 fs.rename(
1496 "/a/file1".as_ref(),
1497 "/a/file1-renamed".as_ref(),
1498 Default::default(),
1499 )
1500 .await
1501 .unwrap();
1502 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1503 .await
1504 .unwrap();
1505 fs.insert_file(Path::new("/a/file4"), "4".into())
1506 .await
1507 .unwrap();
1508
1509 worktree_a
1510 .condition(&cx_a, |tree, _| {
1511 tree.paths()
1512 .map(|p| p.to_string_lossy())
1513 .collect::<Vec<_>>()
1514 == [".zed.toml", "file1-renamed", "file3", "file4"]
1515 })
1516 .await;
1517 worktree_b
1518 .condition(&cx_b, |tree, _| {
1519 tree.paths()
1520 .map(|p| p.to_string_lossy())
1521 .collect::<Vec<_>>()
1522 == [".zed.toml", "file1-renamed", "file3", "file4"]
1523 })
1524 .await;
1525 worktree_c
1526 .condition(&cx_c, |tree, _| {
1527 tree.paths()
1528 .map(|p| p.to_string_lossy())
1529 .collect::<Vec<_>>()
1530 == [".zed.toml", "file1-renamed", "file3", "file4"]
1531 })
1532 .await;
1533
1534 // Ensure buffer files are updated as well.
1535 buffer_a
1536 .condition(&cx_a, |buf, _| {
1537 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1538 })
1539 .await;
1540 buffer_b
1541 .condition(&cx_b, |buf, _| {
1542 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1543 })
1544 .await;
1545 buffer_c
1546 .condition(&cx_c, |buf, _| {
1547 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1548 })
1549 .await;
1550 }
1551
1552 #[gpui::test(iterations = 10)]
1553 async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1554 cx_a.foreground().forbid_parking();
1555 let lang_registry = Arc::new(LanguageRegistry::new());
1556 let fs = Arc::new(FakeFs::new(cx_a.background()));
1557
1558 // Connect to a server as 2 clients.
1559 let mut server = TestServer::start(cx_a.foreground()).await;
1560 let client_a = server.create_client(&mut cx_a, "user_a").await;
1561 let client_b = server.create_client(&mut cx_b, "user_b").await;
1562
1563 // Share a project as client A
1564 fs.insert_tree(
1565 "/dir",
1566 json!({
1567 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1568 "a.txt": "a-contents",
1569 }),
1570 )
1571 .await;
1572
1573 let project_a = cx_a.update(|cx| {
1574 Project::local(
1575 client_a.clone(),
1576 client_a.user_store.clone(),
1577 lang_registry.clone(),
1578 fs.clone(),
1579 cx,
1580 )
1581 });
1582 let (worktree_a, _) = project_a
1583 .update(&mut cx_a, |p, cx| {
1584 p.find_or_create_local_worktree("/dir", false, cx)
1585 })
1586 .await
1587 .unwrap();
1588 worktree_a
1589 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1590 .await;
1591 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1592 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1593 project_a
1594 .update(&mut cx_a, |p, cx| p.share(cx))
1595 .await
1596 .unwrap();
1597
1598 // Join that project as client B
1599 let project_b = Project::remote(
1600 project_id,
1601 client_b.clone(),
1602 client_b.user_store.clone(),
1603 lang_registry.clone(),
1604 fs.clone(),
1605 &mut cx_b.to_async(),
1606 )
1607 .await
1608 .unwrap();
1609 let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1610
1611 // Open a buffer as client B
1612 let buffer_b = project_b
1613 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1614 .await
1615 .unwrap();
1616 let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1617
1618 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1619 buffer_b.read_with(&cx_b, |buf, _| {
1620 assert!(buf.is_dirty());
1621 assert!(!buf.has_conflict());
1622 });
1623
1624 buffer_b
1625 .update(&mut cx_b, |buf, cx| buf.save(cx))
1626 .await
1627 .unwrap();
1628 worktree_b
1629 .condition(&cx_b, |_, cx| {
1630 buffer_b.read(cx).file().unwrap().mtime() != mtime
1631 })
1632 .await;
1633 buffer_b.read_with(&cx_b, |buf, _| {
1634 assert!(!buf.is_dirty());
1635 assert!(!buf.has_conflict());
1636 });
1637
1638 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1639 buffer_b.read_with(&cx_b, |buf, _| {
1640 assert!(buf.is_dirty());
1641 assert!(!buf.has_conflict());
1642 });
1643 }
1644
1645 #[gpui::test(iterations = 10)]
1646 async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1647 cx_a.foreground().forbid_parking();
1648 let lang_registry = Arc::new(LanguageRegistry::new());
1649 let fs = Arc::new(FakeFs::new(cx_a.background()));
1650
1651 // Connect to a server as 2 clients.
1652 let mut server = TestServer::start(cx_a.foreground()).await;
1653 let client_a = server.create_client(&mut cx_a, "user_a").await;
1654 let client_b = server.create_client(&mut cx_b, "user_b").await;
1655
1656 // Share a project as client A
1657 fs.insert_tree(
1658 "/dir",
1659 json!({
1660 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1661 "a.txt": "a-contents",
1662 }),
1663 )
1664 .await;
1665
1666 let project_a = cx_a.update(|cx| {
1667 Project::local(
1668 client_a.clone(),
1669 client_a.user_store.clone(),
1670 lang_registry.clone(),
1671 fs.clone(),
1672 cx,
1673 )
1674 });
1675 let (worktree_a, _) = project_a
1676 .update(&mut cx_a, |p, cx| {
1677 p.find_or_create_local_worktree("/dir", false, cx)
1678 })
1679 .await
1680 .unwrap();
1681 worktree_a
1682 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1683 .await;
1684 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1685 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1686 project_a
1687 .update(&mut cx_a, |p, cx| p.share(cx))
1688 .await
1689 .unwrap();
1690
1691 // Join that project as client B
1692 let project_b = Project::remote(
1693 project_id,
1694 client_b.clone(),
1695 client_b.user_store.clone(),
1696 lang_registry.clone(),
1697 fs.clone(),
1698 &mut cx_b.to_async(),
1699 )
1700 .await
1701 .unwrap();
1702 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1703
1704 // Open a buffer as client B
1705 let buffer_b = project_b
1706 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1707 .await
1708 .unwrap();
1709 buffer_b.read_with(&cx_b, |buf, _| {
1710 assert!(!buf.is_dirty());
1711 assert!(!buf.has_conflict());
1712 });
1713
1714 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1715 .await
1716 .unwrap();
1717 buffer_b
1718 .condition(&cx_b, |buf, _| {
1719 buf.text() == "new contents" && !buf.is_dirty()
1720 })
1721 .await;
1722 buffer_b.read_with(&cx_b, |buf, _| {
1723 assert!(!buf.has_conflict());
1724 });
1725 }
1726
1727 #[gpui::test(iterations = 10)]
1728 async fn test_editing_while_guest_opens_buffer(
1729 mut cx_a: TestAppContext,
1730 mut cx_b: TestAppContext,
1731 ) {
1732 cx_a.foreground().forbid_parking();
1733 let lang_registry = Arc::new(LanguageRegistry::new());
1734 let fs = Arc::new(FakeFs::new(cx_a.background()));
1735
1736 // Connect to a server as 2 clients.
1737 let mut server = TestServer::start(cx_a.foreground()).await;
1738 let client_a = server.create_client(&mut cx_a, "user_a").await;
1739 let client_b = server.create_client(&mut cx_b, "user_b").await;
1740
1741 // Share a project as client A
1742 fs.insert_tree(
1743 "/dir",
1744 json!({
1745 ".zed.toml": r#"collaborators = ["user_b"]"#,
1746 "a.txt": "a-contents",
1747 }),
1748 )
1749 .await;
1750 let project_a = cx_a.update(|cx| {
1751 Project::local(
1752 client_a.clone(),
1753 client_a.user_store.clone(),
1754 lang_registry.clone(),
1755 fs.clone(),
1756 cx,
1757 )
1758 });
1759 let (worktree_a, _) = project_a
1760 .update(&mut cx_a, |p, cx| {
1761 p.find_or_create_local_worktree("/dir", false, cx)
1762 })
1763 .await
1764 .unwrap();
1765 worktree_a
1766 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1767 .await;
1768 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1769 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1770 project_a
1771 .update(&mut cx_a, |p, cx| p.share(cx))
1772 .await
1773 .unwrap();
1774
1775 // Join that project as client B
1776 let project_b = Project::remote(
1777 project_id,
1778 client_b.clone(),
1779 client_b.user_store.clone(),
1780 lang_registry.clone(),
1781 fs.clone(),
1782 &mut cx_b.to_async(),
1783 )
1784 .await
1785 .unwrap();
1786
1787 // Open a buffer as client A
1788 let buffer_a = project_a
1789 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1790 .await
1791 .unwrap();
1792
1793 // Start opening the same buffer as client B
1794 let buffer_b = cx_b
1795 .background()
1796 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1797
1798 // Edit the buffer as client A while client B is still opening it.
1799 cx_b.background().simulate_random_delay().await;
1800 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1801 cx_b.background().simulate_random_delay().await;
1802 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1803
1804 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1805 let buffer_b = buffer_b.await.unwrap();
1806 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1807 }
1808
1809 #[gpui::test(iterations = 10)]
1810 async fn test_leaving_worktree_while_opening_buffer(
1811 mut cx_a: TestAppContext,
1812 mut cx_b: TestAppContext,
1813 ) {
1814 cx_a.foreground().forbid_parking();
1815 let lang_registry = Arc::new(LanguageRegistry::new());
1816 let fs = Arc::new(FakeFs::new(cx_a.background()));
1817
1818 // Connect to a server as 2 clients.
1819 let mut server = TestServer::start(cx_a.foreground()).await;
1820 let client_a = server.create_client(&mut cx_a, "user_a").await;
1821 let client_b = server.create_client(&mut cx_b, "user_b").await;
1822
1823 // Share a project as client A
1824 fs.insert_tree(
1825 "/dir",
1826 json!({
1827 ".zed.toml": r#"collaborators = ["user_b"]"#,
1828 "a.txt": "a-contents",
1829 }),
1830 )
1831 .await;
1832 let project_a = cx_a.update(|cx| {
1833 Project::local(
1834 client_a.clone(),
1835 client_a.user_store.clone(),
1836 lang_registry.clone(),
1837 fs.clone(),
1838 cx,
1839 )
1840 });
1841 let (worktree_a, _) = project_a
1842 .update(&mut cx_a, |p, cx| {
1843 p.find_or_create_local_worktree("/dir", false, cx)
1844 })
1845 .await
1846 .unwrap();
1847 worktree_a
1848 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1849 .await;
1850 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1851 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1852 project_a
1853 .update(&mut cx_a, |p, cx| p.share(cx))
1854 .await
1855 .unwrap();
1856
1857 // Join that project as client B
1858 let project_b = Project::remote(
1859 project_id,
1860 client_b.clone(),
1861 client_b.user_store.clone(),
1862 lang_registry.clone(),
1863 fs.clone(),
1864 &mut cx_b.to_async(),
1865 )
1866 .await
1867 .unwrap();
1868
1869 // See that a guest has joined as client A.
1870 project_a
1871 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1872 .await;
1873
1874 // Begin opening a buffer as client B, but leave the project before the open completes.
1875 let buffer_b = cx_b
1876 .background()
1877 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1878 cx_b.update(|_| drop(project_b));
1879 drop(buffer_b);
1880
1881 // See that the guest has left.
1882 project_a
1883 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1884 .await;
1885 }
1886
1887 #[gpui::test(iterations = 10)]
1888 async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1889 cx_a.foreground().forbid_parking();
1890 let lang_registry = Arc::new(LanguageRegistry::new());
1891 let fs = Arc::new(FakeFs::new(cx_a.background()));
1892
1893 // Connect to a server as 2 clients.
1894 let mut server = TestServer::start(cx_a.foreground()).await;
1895 let client_a = server.create_client(&mut cx_a, "user_a").await;
1896 let client_b = server.create_client(&mut cx_b, "user_b").await;
1897
1898 // Share a project as client A
1899 fs.insert_tree(
1900 "/a",
1901 json!({
1902 ".zed.toml": r#"collaborators = ["user_b"]"#,
1903 "a.txt": "a-contents",
1904 "b.txt": "b-contents",
1905 }),
1906 )
1907 .await;
1908 let project_a = cx_a.update(|cx| {
1909 Project::local(
1910 client_a.clone(),
1911 client_a.user_store.clone(),
1912 lang_registry.clone(),
1913 fs.clone(),
1914 cx,
1915 )
1916 });
1917 let (worktree_a, _) = project_a
1918 .update(&mut cx_a, |p, cx| {
1919 p.find_or_create_local_worktree("/a", false, cx)
1920 })
1921 .await
1922 .unwrap();
1923 worktree_a
1924 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1925 .await;
1926 let project_id = project_a
1927 .update(&mut cx_a, |project, _| project.next_remote_id())
1928 .await;
1929 project_a
1930 .update(&mut cx_a, |project, cx| project.share(cx))
1931 .await
1932 .unwrap();
1933
1934 // Join that project as client B
1935 let _project_b = Project::remote(
1936 project_id,
1937 client_b.clone(),
1938 client_b.user_store.clone(),
1939 lang_registry.clone(),
1940 fs.clone(),
1941 &mut cx_b.to_async(),
1942 )
1943 .await
1944 .unwrap();
1945
1946 // See that a guest has joined as client A.
1947 project_a
1948 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1949 .await;
1950
1951 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1952 client_b.disconnect(&cx_b.to_async()).unwrap();
1953 project_a
1954 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1955 .await;
1956 }
1957
1958 #[gpui::test(iterations = 10)]
1959 async fn test_collaborating_with_diagnostics(
1960 mut cx_a: TestAppContext,
1961 mut cx_b: TestAppContext,
1962 ) {
1963 cx_a.foreground().forbid_parking();
1964 let mut lang_registry = Arc::new(LanguageRegistry::new());
1965 let fs = Arc::new(FakeFs::new(cx_a.background()));
1966
1967 // Set up a fake language server.
1968 let (language_server_config, mut fake_language_server) =
1969 LanguageServerConfig::fake(&cx_a).await;
1970 Arc::get_mut(&mut lang_registry)
1971 .unwrap()
1972 .add(Arc::new(Language::new(
1973 LanguageConfig {
1974 name: "Rust".to_string(),
1975 path_suffixes: vec!["rs".to_string()],
1976 language_server: Some(language_server_config),
1977 ..Default::default()
1978 },
1979 Some(tree_sitter_rust::language()),
1980 )));
1981
1982 // Connect to a server as 2 clients.
1983 let mut server = TestServer::start(cx_a.foreground()).await;
1984 let client_a = server.create_client(&mut cx_a, "user_a").await;
1985 let client_b = server.create_client(&mut cx_b, "user_b").await;
1986
1987 // Share a project as client A
1988 fs.insert_tree(
1989 "/a",
1990 json!({
1991 ".zed.toml": r#"collaborators = ["user_b"]"#,
1992 "a.rs": "let one = two",
1993 "other.rs": "",
1994 }),
1995 )
1996 .await;
1997 let project_a = cx_a.update(|cx| {
1998 Project::local(
1999 client_a.clone(),
2000 client_a.user_store.clone(),
2001 lang_registry.clone(),
2002 fs.clone(),
2003 cx,
2004 )
2005 });
2006 let (worktree_a, _) = project_a
2007 .update(&mut cx_a, |p, cx| {
2008 p.find_or_create_local_worktree("/a", false, cx)
2009 })
2010 .await
2011 .unwrap();
2012 worktree_a
2013 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2014 .await;
2015 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2016 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2017 project_a
2018 .update(&mut cx_a, |p, cx| p.share(cx))
2019 .await
2020 .unwrap();
2021
2022 // Cause the language server to start.
2023 let _ = cx_a
2024 .background()
2025 .spawn(project_a.update(&mut cx_a, |project, cx| {
2026 project.open_buffer(
2027 ProjectPath {
2028 worktree_id,
2029 path: Path::new("other.rs").into(),
2030 },
2031 cx,
2032 )
2033 }))
2034 .await
2035 .unwrap();
2036
2037 // Simulate a language server reporting errors for a file.
2038 fake_language_server
2039 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2040 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2041 version: None,
2042 diagnostics: vec![lsp::Diagnostic {
2043 severity: Some(lsp::DiagnosticSeverity::ERROR),
2044 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2045 message: "message 1".to_string(),
2046 ..Default::default()
2047 }],
2048 })
2049 .await;
2050
2051 // Wait for server to see the diagnostics update.
2052 server
2053 .condition(|store| {
2054 let worktree = store
2055 .project(project_id)
2056 .unwrap()
2057 .worktrees
2058 .get(&worktree_id.to_proto())
2059 .unwrap();
2060
2061 !worktree
2062 .share
2063 .as_ref()
2064 .unwrap()
2065 .diagnostic_summaries
2066 .is_empty()
2067 })
2068 .await;
2069
2070 // Join the worktree as client B.
2071 let project_b = Project::remote(
2072 project_id,
2073 client_b.clone(),
2074 client_b.user_store.clone(),
2075 lang_registry.clone(),
2076 fs.clone(),
2077 &mut cx_b.to_async(),
2078 )
2079 .await
2080 .unwrap();
2081
2082 project_b.read_with(&cx_b, |project, cx| {
2083 assert_eq!(
2084 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2085 &[(
2086 ProjectPath {
2087 worktree_id,
2088 path: Arc::from(Path::new("a.rs")),
2089 },
2090 DiagnosticSummary {
2091 error_count: 1,
2092 warning_count: 0,
2093 ..Default::default()
2094 },
2095 )]
2096 )
2097 });
2098
2099 // Simulate a language server reporting more errors for a file.
2100 fake_language_server
2101 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2102 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2103 version: None,
2104 diagnostics: vec![
2105 lsp::Diagnostic {
2106 severity: Some(lsp::DiagnosticSeverity::ERROR),
2107 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2108 message: "message 1".to_string(),
2109 ..Default::default()
2110 },
2111 lsp::Diagnostic {
2112 severity: Some(lsp::DiagnosticSeverity::WARNING),
2113 range: lsp::Range::new(
2114 lsp::Position::new(0, 10),
2115 lsp::Position::new(0, 13),
2116 ),
2117 message: "message 2".to_string(),
2118 ..Default::default()
2119 },
2120 ],
2121 })
2122 .await;
2123
2124 // Client b gets the updated summaries
2125 project_b
2126 .condition(&cx_b, |project, cx| {
2127 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2128 == &[(
2129 ProjectPath {
2130 worktree_id,
2131 path: Arc::from(Path::new("a.rs")),
2132 },
2133 DiagnosticSummary {
2134 error_count: 1,
2135 warning_count: 1,
2136 ..Default::default()
2137 },
2138 )]
2139 })
2140 .await;
2141
2142 // Open the file with the errors on client B. They should be present.
2143 let buffer_b = cx_b
2144 .background()
2145 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2146 .await
2147 .unwrap();
2148
2149 buffer_b.read_with(&cx_b, |buffer, _| {
2150 assert_eq!(
2151 buffer
2152 .snapshot()
2153 .diagnostics_in_range::<_, Point>(0..buffer.len())
2154 .map(|entry| entry)
2155 .collect::<Vec<_>>(),
2156 &[
2157 DiagnosticEntry {
2158 range: Point::new(0, 4)..Point::new(0, 7),
2159 diagnostic: Diagnostic {
2160 group_id: 0,
2161 message: "message 1".to_string(),
2162 severity: lsp::DiagnosticSeverity::ERROR,
2163 is_primary: true,
2164 ..Default::default()
2165 }
2166 },
2167 DiagnosticEntry {
2168 range: Point::new(0, 10)..Point::new(0, 13),
2169 diagnostic: Diagnostic {
2170 group_id: 1,
2171 severity: lsp::DiagnosticSeverity::WARNING,
2172 message: "message 2".to_string(),
2173 is_primary: true,
2174 ..Default::default()
2175 }
2176 }
2177 ]
2178 );
2179 });
2180 }
2181
2182 #[gpui::test(iterations = 10)]
2183 async fn test_collaborating_with_completion(
2184 mut cx_a: TestAppContext,
2185 mut cx_b: TestAppContext,
2186 ) {
2187 cx_a.foreground().forbid_parking();
2188 let mut lang_registry = Arc::new(LanguageRegistry::new());
2189 let fs = Arc::new(FakeFs::new(cx_a.background()));
2190
2191 // Set up a fake language server.
2192 let (language_server_config, mut fake_language_server) =
2193 LanguageServerConfig::fake_with_capabilities(
2194 lsp::ServerCapabilities {
2195 completion_provider: Some(lsp::CompletionOptions {
2196 trigger_characters: Some(vec![".".to_string()]),
2197 ..Default::default()
2198 }),
2199 ..Default::default()
2200 },
2201 &cx_a,
2202 )
2203 .await;
2204 Arc::get_mut(&mut lang_registry)
2205 .unwrap()
2206 .add(Arc::new(Language::new(
2207 LanguageConfig {
2208 name: "Rust".to_string(),
2209 path_suffixes: vec!["rs".to_string()],
2210 language_server: Some(language_server_config),
2211 ..Default::default()
2212 },
2213 Some(tree_sitter_rust::language()),
2214 )));
2215
2216 // Connect to a server as 2 clients.
2217 let mut server = TestServer::start(cx_a.foreground()).await;
2218 let client_a = server.create_client(&mut cx_a, "user_a").await;
2219 let client_b = server.create_client(&mut cx_b, "user_b").await;
2220
2221 // Share a project as client A
2222 fs.insert_tree(
2223 "/a",
2224 json!({
2225 ".zed.toml": r#"collaborators = ["user_b"]"#,
2226 "main.rs": "fn main() { a }",
2227 "other.rs": "",
2228 }),
2229 )
2230 .await;
2231 let project_a = cx_a.update(|cx| {
2232 Project::local(
2233 client_a.clone(),
2234 client_a.user_store.clone(),
2235 lang_registry.clone(),
2236 fs.clone(),
2237 cx,
2238 )
2239 });
2240 let (worktree_a, _) = project_a
2241 .update(&mut cx_a, |p, cx| {
2242 p.find_or_create_local_worktree("/a", false, cx)
2243 })
2244 .await
2245 .unwrap();
2246 worktree_a
2247 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2248 .await;
2249 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2250 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2251 project_a
2252 .update(&mut cx_a, |p, cx| p.share(cx))
2253 .await
2254 .unwrap();
2255
2256 // Join the worktree as client B.
2257 let project_b = Project::remote(
2258 project_id,
2259 client_b.clone(),
2260 client_b.user_store.clone(),
2261 lang_registry.clone(),
2262 fs.clone(),
2263 &mut cx_b.to_async(),
2264 )
2265 .await
2266 .unwrap();
2267
2268 // Open a file in an editor as the guest.
2269 let buffer_b = project_b
2270 .update(&mut cx_b, |p, cx| {
2271 p.open_buffer((worktree_id, "main.rs"), cx)
2272 })
2273 .await
2274 .unwrap();
2275 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2276 let editor_b = cx_b.add_view(window_b, |cx| {
2277 Editor::for_buffer(
2278 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2279 Arc::new(|cx| EditorSettings::test(cx)),
2280 Some(project_b.clone()),
2281 cx,
2282 )
2283 });
2284
2285 // Type a completion trigger character as the guest.
2286 editor_b.update(&mut cx_b, |editor, cx| {
2287 editor.select_ranges([13..13], None, cx);
2288 editor.handle_input(&Input(".".into()), cx);
2289 cx.focus(&editor_b);
2290 });
2291
2292 // Receive a completion request as the host's language server.
2293 // Return some completions from the host's language server.
2294 fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2295 assert_eq!(
2296 params.text_document_position.text_document.uri,
2297 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2298 );
2299 assert_eq!(
2300 params.text_document_position.position,
2301 lsp::Position::new(0, 14),
2302 );
2303
2304 Some(lsp::CompletionResponse::Array(vec![
2305 lsp::CompletionItem {
2306 label: "first_method(…)".into(),
2307 detail: Some("fn(&mut self, B) -> C".into()),
2308 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2309 new_text: "first_method($1)".to_string(),
2310 range: lsp::Range::new(
2311 lsp::Position::new(0, 14),
2312 lsp::Position::new(0, 14),
2313 ),
2314 })),
2315 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2316 ..Default::default()
2317 },
2318 lsp::CompletionItem {
2319 label: "second_method(…)".into(),
2320 detail: Some("fn(&mut self, C) -> D<E>".into()),
2321 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2322 new_text: "second_method()".to_string(),
2323 range: lsp::Range::new(
2324 lsp::Position::new(0, 14),
2325 lsp::Position::new(0, 14),
2326 ),
2327 })),
2328 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2329 ..Default::default()
2330 },
2331 ]))
2332 });
2333
2334 // Open the buffer on the host.
2335 let buffer_a = project_a
2336 .update(&mut cx_a, |p, cx| {
2337 p.open_buffer((worktree_id, "main.rs"), cx)
2338 })
2339 .await
2340 .unwrap();
2341 buffer_a
2342 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2343 .await;
2344
2345 // Confirm a completion on the guest.
2346 editor_b.next_notification(&cx_b).await;
2347 editor_b.update(&mut cx_b, |editor, cx| {
2348 assert!(editor.context_menu_visible());
2349 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2350 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2351 });
2352
2353 // Return a resolved completion from the host's language server.
2354 // The resolved completion has an additional text edit.
2355 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2356 assert_eq!(params.label, "first_method(…)");
2357 lsp::CompletionItem {
2358 label: "first_method(…)".into(),
2359 detail: Some("fn(&mut self, B) -> C".into()),
2360 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2361 new_text: "first_method($1)".to_string(),
2362 range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2363 })),
2364 additional_text_edits: Some(vec![lsp::TextEdit {
2365 new_text: "use d::SomeTrait;\n".to_string(),
2366 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2367 }]),
2368 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2369 ..Default::default()
2370 }
2371 });
2372
2373 buffer_a
2374 .condition(&cx_a, |buffer, _| {
2375 buffer.text() == "fn main() { a.first_method() }"
2376 })
2377 .await;
2378
2379 // The additional edit is applied.
2380 buffer_b
2381 .condition(&cx_b, |buffer, _| {
2382 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2383 })
2384 .await;
2385 assert_eq!(
2386 buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2387 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2388 );
2389 }
2390
2391 #[gpui::test(iterations = 10)]
2392 async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2393 cx_a.foreground().forbid_parking();
2394 let mut lang_registry = Arc::new(LanguageRegistry::new());
2395 let fs = Arc::new(FakeFs::new(cx_a.background()));
2396
2397 // Set up a fake language server.
2398 let (language_server_config, mut fake_language_server) =
2399 LanguageServerConfig::fake(&cx_a).await;
2400 Arc::get_mut(&mut lang_registry)
2401 .unwrap()
2402 .add(Arc::new(Language::new(
2403 LanguageConfig {
2404 name: "Rust".to_string(),
2405 path_suffixes: vec!["rs".to_string()],
2406 language_server: Some(language_server_config),
2407 ..Default::default()
2408 },
2409 Some(tree_sitter_rust::language()),
2410 )));
2411
2412 // Connect to a server as 2 clients.
2413 let mut server = TestServer::start(cx_a.foreground()).await;
2414 let client_a = server.create_client(&mut cx_a, "user_a").await;
2415 let client_b = server.create_client(&mut cx_b, "user_b").await;
2416
2417 // Share a project as client A
2418 fs.insert_tree(
2419 "/a",
2420 json!({
2421 ".zed.toml": r#"collaborators = ["user_b"]"#,
2422 "a.rs": "let one = two",
2423 }),
2424 )
2425 .await;
2426 let project_a = cx_a.update(|cx| {
2427 Project::local(
2428 client_a.clone(),
2429 client_a.user_store.clone(),
2430 lang_registry.clone(),
2431 fs.clone(),
2432 cx,
2433 )
2434 });
2435 let (worktree_a, _) = project_a
2436 .update(&mut cx_a, |p, cx| {
2437 p.find_or_create_local_worktree("/a", false, cx)
2438 })
2439 .await
2440 .unwrap();
2441 worktree_a
2442 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2443 .await;
2444 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2445 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2446 project_a
2447 .update(&mut cx_a, |p, cx| p.share(cx))
2448 .await
2449 .unwrap();
2450
2451 // Join the worktree as client B.
2452 let project_b = Project::remote(
2453 project_id,
2454 client_b.clone(),
2455 client_b.user_store.clone(),
2456 lang_registry.clone(),
2457 fs.clone(),
2458 &mut cx_b.to_async(),
2459 )
2460 .await
2461 .unwrap();
2462
2463 let buffer_b = cx_b
2464 .background()
2465 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2466 .await
2467 .unwrap();
2468
2469 let format = project_b.update(&mut cx_b, |project, cx| {
2470 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2471 });
2472
2473 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2474 Some(vec![
2475 lsp::TextEdit {
2476 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2477 new_text: "h".to_string(),
2478 },
2479 lsp::TextEdit {
2480 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2481 new_text: "y".to_string(),
2482 },
2483 ])
2484 });
2485
2486 format.await.unwrap();
2487 assert_eq!(
2488 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2489 "let honey = two"
2490 );
2491 }
2492
2493 #[gpui::test(iterations = 10)]
2494 async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2495 cx_a.foreground().forbid_parking();
2496 let mut lang_registry = Arc::new(LanguageRegistry::new());
2497 let fs = Arc::new(FakeFs::new(cx_a.background()));
2498 fs.insert_tree(
2499 "/root-1",
2500 json!({
2501 ".zed.toml": r#"collaborators = ["user_b"]"#,
2502 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2503 }),
2504 )
2505 .await;
2506 fs.insert_tree(
2507 "/root-2",
2508 json!({
2509 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2510 }),
2511 )
2512 .await;
2513
2514 // Set up a fake language server.
2515 let (language_server_config, mut fake_language_server) =
2516 LanguageServerConfig::fake(&cx_a).await;
2517 Arc::get_mut(&mut lang_registry)
2518 .unwrap()
2519 .add(Arc::new(Language::new(
2520 LanguageConfig {
2521 name: "Rust".to_string(),
2522 path_suffixes: vec!["rs".to_string()],
2523 language_server: Some(language_server_config),
2524 ..Default::default()
2525 },
2526 Some(tree_sitter_rust::language()),
2527 )));
2528
2529 // Connect to a server as 2 clients.
2530 let mut server = TestServer::start(cx_a.foreground()).await;
2531 let client_a = server.create_client(&mut cx_a, "user_a").await;
2532 let client_b = server.create_client(&mut cx_b, "user_b").await;
2533
2534 // Share a project as client A
2535 let project_a = cx_a.update(|cx| {
2536 Project::local(
2537 client_a.clone(),
2538 client_a.user_store.clone(),
2539 lang_registry.clone(),
2540 fs.clone(),
2541 cx,
2542 )
2543 });
2544 let (worktree_a, _) = project_a
2545 .update(&mut cx_a, |p, cx| {
2546 p.find_or_create_local_worktree("/root-1", false, cx)
2547 })
2548 .await
2549 .unwrap();
2550 worktree_a
2551 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2552 .await;
2553 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2554 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2555 project_a
2556 .update(&mut cx_a, |p, cx| p.share(cx))
2557 .await
2558 .unwrap();
2559
2560 // Join the worktree as client B.
2561 let project_b = Project::remote(
2562 project_id,
2563 client_b.clone(),
2564 client_b.user_store.clone(),
2565 lang_registry.clone(),
2566 fs.clone(),
2567 &mut cx_b.to_async(),
2568 )
2569 .await
2570 .unwrap();
2571
2572 // Open the file on client B.
2573 let buffer_b = cx_b
2574 .background()
2575 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2576 .await
2577 .unwrap();
2578
2579 // Request the definition of a symbol as the guest.
2580 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2581 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2582 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2583 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2584 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2585 )))
2586 });
2587
2588 let definitions_1 = definitions_1.await.unwrap();
2589 cx_b.read(|cx| {
2590 assert_eq!(definitions_1.len(), 1);
2591 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2592 let target_buffer = definitions_1[0].target_buffer.read(cx);
2593 assert_eq!(
2594 target_buffer.text(),
2595 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2596 );
2597 assert_eq!(
2598 definitions_1[0].target_range.to_point(target_buffer),
2599 Point::new(0, 6)..Point::new(0, 9)
2600 );
2601 });
2602
2603 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2604 // the previous call to `definition`.
2605 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2606 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2607 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2608 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2609 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2610 )))
2611 });
2612
2613 let definitions_2 = definitions_2.await.unwrap();
2614 cx_b.read(|cx| {
2615 assert_eq!(definitions_2.len(), 1);
2616 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2617 let target_buffer = definitions_2[0].target_buffer.read(cx);
2618 assert_eq!(
2619 target_buffer.text(),
2620 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2621 );
2622 assert_eq!(
2623 definitions_2[0].target_range.to_point(target_buffer),
2624 Point::new(1, 6)..Point::new(1, 11)
2625 );
2626 });
2627 assert_eq!(
2628 definitions_1[0].target_buffer,
2629 definitions_2[0].target_buffer
2630 );
2631
2632 cx_b.update(|_| {
2633 drop(definitions_1);
2634 drop(definitions_2);
2635 });
2636 project_b
2637 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2638 .await;
2639 }
2640
2641 #[gpui::test(iterations = 10)]
2642 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2643 mut cx_a: TestAppContext,
2644 mut cx_b: TestAppContext,
2645 mut rng: StdRng,
2646 ) {
2647 cx_a.foreground().forbid_parking();
2648 let mut lang_registry = Arc::new(LanguageRegistry::new());
2649 let fs = Arc::new(FakeFs::new(cx_a.background()));
2650 fs.insert_tree(
2651 "/root",
2652 json!({
2653 ".zed.toml": r#"collaborators = ["user_b"]"#,
2654 "a.rs": "const ONE: usize = b::TWO;",
2655 "b.rs": "const TWO: usize = 2",
2656 }),
2657 )
2658 .await;
2659
2660 // Set up a fake language server.
2661 let (language_server_config, mut fake_language_server) =
2662 LanguageServerConfig::fake(&cx_a).await;
2663 Arc::get_mut(&mut lang_registry)
2664 .unwrap()
2665 .add(Arc::new(Language::new(
2666 LanguageConfig {
2667 name: "Rust".to_string(),
2668 path_suffixes: vec!["rs".to_string()],
2669 language_server: Some(language_server_config),
2670 ..Default::default()
2671 },
2672 Some(tree_sitter_rust::language()),
2673 )));
2674
2675 // Connect to a server as 2 clients.
2676 let mut server = TestServer::start(cx_a.foreground()).await;
2677 let client_a = server.create_client(&mut cx_a, "user_a").await;
2678 let client_b = server.create_client(&mut cx_b, "user_b").await;
2679
2680 // Share a project as client A
2681 let project_a = cx_a.update(|cx| {
2682 Project::local(
2683 client_a.clone(),
2684 client_a.user_store.clone(),
2685 lang_registry.clone(),
2686 fs.clone(),
2687 cx,
2688 )
2689 });
2690 let (worktree_a, _) = project_a
2691 .update(&mut cx_a, |p, cx| {
2692 p.find_or_create_local_worktree("/root", false, cx)
2693 })
2694 .await
2695 .unwrap();
2696 worktree_a
2697 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2698 .await;
2699 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2700 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2701 project_a
2702 .update(&mut cx_a, |p, cx| p.share(cx))
2703 .await
2704 .unwrap();
2705
2706 // Join the worktree as client B.
2707 let project_b = Project::remote(
2708 project_id,
2709 client_b.clone(),
2710 client_b.user_store.clone(),
2711 lang_registry.clone(),
2712 fs.clone(),
2713 &mut cx_b.to_async(),
2714 )
2715 .await
2716 .unwrap();
2717
2718 let buffer_b1 = cx_b
2719 .background()
2720 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2721 .await
2722 .unwrap();
2723
2724 let definitions;
2725 let buffer_b2;
2726 if rng.gen() {
2727 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2728 buffer_b2 =
2729 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2730 } else {
2731 buffer_b2 =
2732 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2733 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2734 }
2735
2736 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2737 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2738 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2739 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2740 )))
2741 });
2742
2743 let buffer_b2 = buffer_b2.await.unwrap();
2744 let definitions = definitions.await.unwrap();
2745 assert_eq!(definitions.len(), 1);
2746 assert_eq!(definitions[0].target_buffer, buffer_b2);
2747 }
2748
2749 #[gpui::test(iterations = 10)]
2750 async fn test_collaborating_with_code_actions(
2751 mut cx_a: TestAppContext,
2752 mut cx_b: TestAppContext,
2753 ) {
2754 cx_a.foreground().forbid_parking();
2755 let mut lang_registry = Arc::new(LanguageRegistry::new());
2756 let fs = Arc::new(FakeFs::new(cx_a.background()));
2757 let mut path_openers_b = Vec::new();
2758 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2759
2760 // Set up a fake language server.
2761 let (language_server_config, mut fake_language_server) =
2762 LanguageServerConfig::fake_with_capabilities(
2763 lsp::ServerCapabilities {
2764 ..Default::default()
2765 },
2766 &cx_a,
2767 )
2768 .await;
2769 Arc::get_mut(&mut lang_registry)
2770 .unwrap()
2771 .add(Arc::new(Language::new(
2772 LanguageConfig {
2773 name: "Rust".to_string(),
2774 path_suffixes: vec!["rs".to_string()],
2775 language_server: Some(language_server_config),
2776 ..Default::default()
2777 },
2778 Some(tree_sitter_rust::language()),
2779 )));
2780
2781 // Connect to a server as 2 clients.
2782 let mut server = TestServer::start(cx_a.foreground()).await;
2783 let client_a = server.create_client(&mut cx_a, "user_a").await;
2784 let client_b = server.create_client(&mut cx_b, "user_b").await;
2785
2786 // Share a project as client A
2787 fs.insert_tree(
2788 "/a",
2789 json!({
2790 ".zed.toml": r#"collaborators = ["user_b"]"#,
2791 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2792 "other.rs": "pub fn foo() -> usize { 4 }",
2793 }),
2794 )
2795 .await;
2796 let project_a = cx_a.update(|cx| {
2797 Project::local(
2798 client_a.clone(),
2799 client_a.user_store.clone(),
2800 lang_registry.clone(),
2801 fs.clone(),
2802 cx,
2803 )
2804 });
2805 let (worktree_a, _) = project_a
2806 .update(&mut cx_a, |p, cx| {
2807 p.find_or_create_local_worktree("/a", false, cx)
2808 })
2809 .await
2810 .unwrap();
2811 worktree_a
2812 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2813 .await;
2814 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2815 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2816 project_a
2817 .update(&mut cx_a, |p, cx| p.share(cx))
2818 .await
2819 .unwrap();
2820
2821 // Join the worktree as client B.
2822 let project_b = Project::remote(
2823 project_id,
2824 client_b.clone(),
2825 client_b.user_store.clone(),
2826 lang_registry.clone(),
2827 fs.clone(),
2828 &mut cx_b.to_async(),
2829 )
2830 .await
2831 .unwrap();
2832 let mut params = cx_b.update(WorkspaceParams::test);
2833 params.languages = lang_registry.clone();
2834 params.client = client_b.client.clone();
2835 params.user_store = client_b.user_store.clone();
2836 params.project = project_b;
2837 params.path_openers = path_openers_b.into();
2838
2839 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
2840 let editor_b = workspace_b
2841 .update(&mut cx_b, |workspace, cx| {
2842 workspace.open_path((worktree_id, "main.rs").into(), cx)
2843 })
2844 .await
2845 .unwrap()
2846 .downcast::<Editor>()
2847 .unwrap();
2848 fake_language_server
2849 .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2850 assert_eq!(
2851 params.text_document.uri,
2852 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2853 );
2854 assert_eq!(params.range.start, lsp::Position::new(0, 0));
2855 assert_eq!(params.range.end, lsp::Position::new(0, 0));
2856 None
2857 })
2858 .next()
2859 .await;
2860
2861 // Move cursor to a location that contains code actions.
2862 editor_b.update(&mut cx_b, |editor, cx| {
2863 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2864 cx.focus(&editor_b);
2865 });
2866 fake_language_server.handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2867 assert_eq!(
2868 params.text_document.uri,
2869 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2870 );
2871 assert_eq!(params.range.start, lsp::Position::new(1, 31));
2872 assert_eq!(params.range.end, lsp::Position::new(1, 31));
2873
2874 Some(vec![lsp::CodeActionOrCommand::CodeAction(
2875 lsp::CodeAction {
2876 title: "Inline into all callers".to_string(),
2877 edit: Some(lsp::WorkspaceEdit {
2878 changes: Some(
2879 [
2880 (
2881 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2882 vec![lsp::TextEdit::new(
2883 lsp::Range::new(
2884 lsp::Position::new(1, 22),
2885 lsp::Position::new(1, 34),
2886 ),
2887 "4".to_string(),
2888 )],
2889 ),
2890 (
2891 lsp::Url::from_file_path("/a/other.rs").unwrap(),
2892 vec![lsp::TextEdit::new(
2893 lsp::Range::new(
2894 lsp::Position::new(0, 0),
2895 lsp::Position::new(0, 27),
2896 ),
2897 "".to_string(),
2898 )],
2899 ),
2900 ]
2901 .into_iter()
2902 .collect(),
2903 ),
2904 ..Default::default()
2905 }),
2906 data: Some(json!({
2907 "codeActionParams": {
2908 "range": {
2909 "start": {"line": 1, "column": 31},
2910 "end": {"line": 1, "column": 31},
2911 }
2912 }
2913 })),
2914 ..Default::default()
2915 },
2916 )])
2917 });
2918
2919 // Toggle code actions and wait for them to display.
2920 editor_b.update(&mut cx_b, |editor, cx| {
2921 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2922 });
2923 editor_b
2924 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2925 .await;
2926
2927 // Confirming the code action will trigger a resolve request.
2928 let confirm_action = workspace_b
2929 .update(&mut cx_b, |workspace, cx| {
2930 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2931 })
2932 .unwrap();
2933 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2934 lsp::CodeAction {
2935 title: "Inline into all callers".to_string(),
2936 edit: Some(lsp::WorkspaceEdit {
2937 changes: Some(
2938 [
2939 (
2940 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2941 vec![lsp::TextEdit::new(
2942 lsp::Range::new(
2943 lsp::Position::new(1, 22),
2944 lsp::Position::new(1, 34),
2945 ),
2946 "4".to_string(),
2947 )],
2948 ),
2949 (
2950 lsp::Url::from_file_path("/a/other.rs").unwrap(),
2951 vec![lsp::TextEdit::new(
2952 lsp::Range::new(
2953 lsp::Position::new(0, 0),
2954 lsp::Position::new(0, 27),
2955 ),
2956 "".to_string(),
2957 )],
2958 ),
2959 ]
2960 .into_iter()
2961 .collect(),
2962 ),
2963 ..Default::default()
2964 }),
2965 ..Default::default()
2966 }
2967 });
2968
2969 // After the action is confirmed, an editor containing both modified files is opened.
2970 confirm_action.await.unwrap();
2971 let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
2972 workspace
2973 .active_item(cx)
2974 .unwrap()
2975 .downcast::<Editor>()
2976 .unwrap()
2977 });
2978 code_action_editor.update(&mut cx_b, |editor, cx| {
2979 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2980 editor.undo(&Undo, cx);
2981 assert_eq!(
2982 editor.text(cx),
2983 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
2984 );
2985 editor.redo(&Redo, cx);
2986 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2987 });
2988 }
2989
2990 #[gpui::test(iterations = 10)]
2991 async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2992 cx_a.foreground().forbid_parking();
2993
2994 // Connect to a server as 2 clients.
2995 let mut server = TestServer::start(cx_a.foreground()).await;
2996 let client_a = server.create_client(&mut cx_a, "user_a").await;
2997 let client_b = server.create_client(&mut cx_b, "user_b").await;
2998
2999 // Create an org that includes these 2 users.
3000 let db = &server.app_state.db;
3001 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3002 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3003 .await
3004 .unwrap();
3005 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3006 .await
3007 .unwrap();
3008
3009 // Create a channel that includes all the users.
3010 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3011 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3012 .await
3013 .unwrap();
3014 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3015 .await
3016 .unwrap();
3017 db.create_channel_message(
3018 channel_id,
3019 client_b.current_user_id(&cx_b),
3020 "hello A, it's B.",
3021 OffsetDateTime::now_utc(),
3022 1,
3023 )
3024 .await
3025 .unwrap();
3026
3027 let channels_a = cx_a
3028 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3029 channels_a
3030 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3031 .await;
3032 channels_a.read_with(&cx_a, |list, _| {
3033 assert_eq!(
3034 list.available_channels().unwrap(),
3035 &[ChannelDetails {
3036 id: channel_id.to_proto(),
3037 name: "test-channel".to_string()
3038 }]
3039 )
3040 });
3041 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3042 this.get_channel(channel_id.to_proto(), cx).unwrap()
3043 });
3044 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3045 channel_a
3046 .condition(&cx_a, |channel, _| {
3047 channel_messages(channel)
3048 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3049 })
3050 .await;
3051
3052 let channels_b = cx_b
3053 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3054 channels_b
3055 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3056 .await;
3057 channels_b.read_with(&cx_b, |list, _| {
3058 assert_eq!(
3059 list.available_channels().unwrap(),
3060 &[ChannelDetails {
3061 id: channel_id.to_proto(),
3062 name: "test-channel".to_string()
3063 }]
3064 )
3065 });
3066
3067 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3068 this.get_channel(channel_id.to_proto(), cx).unwrap()
3069 });
3070 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3071 channel_b
3072 .condition(&cx_b, |channel, _| {
3073 channel_messages(channel)
3074 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3075 })
3076 .await;
3077
3078 channel_a
3079 .update(&mut cx_a, |channel, cx| {
3080 channel
3081 .send_message("oh, hi B.".to_string(), cx)
3082 .unwrap()
3083 .detach();
3084 let task = channel.send_message("sup".to_string(), cx).unwrap();
3085 assert_eq!(
3086 channel_messages(channel),
3087 &[
3088 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3089 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3090 ("user_a".to_string(), "sup".to_string(), true)
3091 ]
3092 );
3093 task
3094 })
3095 .await
3096 .unwrap();
3097
3098 channel_b
3099 .condition(&cx_b, |channel, _| {
3100 channel_messages(channel)
3101 == [
3102 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3103 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3104 ("user_a".to_string(), "sup".to_string(), false),
3105 ]
3106 })
3107 .await;
3108
3109 assert_eq!(
3110 server
3111 .state()
3112 .await
3113 .channel(channel_id)
3114 .unwrap()
3115 .connection_ids
3116 .len(),
3117 2
3118 );
3119 cx_b.update(|_| drop(channel_b));
3120 server
3121 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3122 .await;
3123
3124 cx_a.update(|_| drop(channel_a));
3125 server
3126 .condition(|state| state.channel(channel_id).is_none())
3127 .await;
3128 }
3129
3130 #[gpui::test(iterations = 10)]
3131 async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3132 cx_a.foreground().forbid_parking();
3133
3134 let mut server = TestServer::start(cx_a.foreground()).await;
3135 let client_a = server.create_client(&mut cx_a, "user_a").await;
3136
3137 let db = &server.app_state.db;
3138 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3139 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3140 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3141 .await
3142 .unwrap();
3143 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3144 .await
3145 .unwrap();
3146
3147 let channels_a = cx_a
3148 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3149 channels_a
3150 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3151 .await;
3152 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3153 this.get_channel(channel_id.to_proto(), cx).unwrap()
3154 });
3155
3156 // Messages aren't allowed to be too long.
3157 channel_a
3158 .update(&mut cx_a, |channel, cx| {
3159 let long_body = "this is long.\n".repeat(1024);
3160 channel.send_message(long_body, cx).unwrap()
3161 })
3162 .await
3163 .unwrap_err();
3164
3165 // Messages aren't allowed to be blank.
3166 channel_a.update(&mut cx_a, |channel, cx| {
3167 channel.send_message(String::new(), cx).unwrap_err()
3168 });
3169
3170 // Leading and trailing whitespace are trimmed.
3171 channel_a
3172 .update(&mut cx_a, |channel, cx| {
3173 channel
3174 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3175 .unwrap()
3176 })
3177 .await
3178 .unwrap();
3179 assert_eq!(
3180 db.get_channel_messages(channel_id, 10, None)
3181 .await
3182 .unwrap()
3183 .iter()
3184 .map(|m| &m.body)
3185 .collect::<Vec<_>>(),
3186 &["surrounded by whitespace"]
3187 );
3188 }
3189
3190 #[gpui::test(iterations = 10)]
3191 async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3192 cx_a.foreground().forbid_parking();
3193
3194 // Connect to a server as 2 clients.
3195 let mut server = TestServer::start(cx_a.foreground()).await;
3196 let client_a = server.create_client(&mut cx_a, "user_a").await;
3197 let client_b = server.create_client(&mut cx_b, "user_b").await;
3198 let mut status_b = client_b.status();
3199
3200 // Create an org that includes these 2 users.
3201 let db = &server.app_state.db;
3202 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3203 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3204 .await
3205 .unwrap();
3206 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3207 .await
3208 .unwrap();
3209
3210 // Create a channel that includes all the users.
3211 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3212 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3213 .await
3214 .unwrap();
3215 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3216 .await
3217 .unwrap();
3218 db.create_channel_message(
3219 channel_id,
3220 client_b.current_user_id(&cx_b),
3221 "hello A, it's B.",
3222 OffsetDateTime::now_utc(),
3223 2,
3224 )
3225 .await
3226 .unwrap();
3227
3228 let channels_a = cx_a
3229 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3230 channels_a
3231 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3232 .await;
3233
3234 channels_a.read_with(&cx_a, |list, _| {
3235 assert_eq!(
3236 list.available_channels().unwrap(),
3237 &[ChannelDetails {
3238 id: channel_id.to_proto(),
3239 name: "test-channel".to_string()
3240 }]
3241 )
3242 });
3243 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3244 this.get_channel(channel_id.to_proto(), cx).unwrap()
3245 });
3246 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3247 channel_a
3248 .condition(&cx_a, |channel, _| {
3249 channel_messages(channel)
3250 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3251 })
3252 .await;
3253
3254 let channels_b = cx_b
3255 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3256 channels_b
3257 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3258 .await;
3259 channels_b.read_with(&cx_b, |list, _| {
3260 assert_eq!(
3261 list.available_channels().unwrap(),
3262 &[ChannelDetails {
3263 id: channel_id.to_proto(),
3264 name: "test-channel".to_string()
3265 }]
3266 )
3267 });
3268
3269 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3270 this.get_channel(channel_id.to_proto(), cx).unwrap()
3271 });
3272 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3273 channel_b
3274 .condition(&cx_b, |channel, _| {
3275 channel_messages(channel)
3276 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3277 })
3278 .await;
3279
3280 // Disconnect client B, ensuring we can still access its cached channel data.
3281 server.forbid_connections();
3282 server.disconnect_client(client_b.current_user_id(&cx_b));
3283 while !matches!(
3284 status_b.next().await,
3285 Some(client::Status::ReconnectionError { .. })
3286 ) {}
3287
3288 channels_b.read_with(&cx_b, |channels, _| {
3289 assert_eq!(
3290 channels.available_channels().unwrap(),
3291 [ChannelDetails {
3292 id: channel_id.to_proto(),
3293 name: "test-channel".to_string()
3294 }]
3295 )
3296 });
3297 channel_b.read_with(&cx_b, |channel, _| {
3298 assert_eq!(
3299 channel_messages(channel),
3300 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3301 )
3302 });
3303
3304 // Send a message from client B while it is disconnected.
3305 channel_b
3306 .update(&mut cx_b, |channel, cx| {
3307 let task = channel
3308 .send_message("can you see this?".to_string(), cx)
3309 .unwrap();
3310 assert_eq!(
3311 channel_messages(channel),
3312 &[
3313 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3314 ("user_b".to_string(), "can you see this?".to_string(), true)
3315 ]
3316 );
3317 task
3318 })
3319 .await
3320 .unwrap_err();
3321
3322 // Send a message from client A while B is disconnected.
3323 channel_a
3324 .update(&mut cx_a, |channel, cx| {
3325 channel
3326 .send_message("oh, hi B.".to_string(), cx)
3327 .unwrap()
3328 .detach();
3329 let task = channel.send_message("sup".to_string(), cx).unwrap();
3330 assert_eq!(
3331 channel_messages(channel),
3332 &[
3333 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3334 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3335 ("user_a".to_string(), "sup".to_string(), true)
3336 ]
3337 );
3338 task
3339 })
3340 .await
3341 .unwrap();
3342
3343 // Give client B a chance to reconnect.
3344 server.allow_connections();
3345 cx_b.foreground().advance_clock(Duration::from_secs(10));
3346
3347 // Verify that B sees the new messages upon reconnection, as well as the message client B
3348 // sent while offline.
3349 channel_b
3350 .condition(&cx_b, |channel, _| {
3351 channel_messages(channel)
3352 == [
3353 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3354 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3355 ("user_a".to_string(), "sup".to_string(), false),
3356 ("user_b".to_string(), "can you see this?".to_string(), false),
3357 ]
3358 })
3359 .await;
3360
3361 // Ensure client A and B can communicate normally after reconnection.
3362 channel_a
3363 .update(&mut cx_a, |channel, cx| {
3364 channel.send_message("you online?".to_string(), cx).unwrap()
3365 })
3366 .await
3367 .unwrap();
3368 channel_b
3369 .condition(&cx_b, |channel, _| {
3370 channel_messages(channel)
3371 == [
3372 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3373 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3374 ("user_a".to_string(), "sup".to_string(), false),
3375 ("user_b".to_string(), "can you see this?".to_string(), false),
3376 ("user_a".to_string(), "you online?".to_string(), false),
3377 ]
3378 })
3379 .await;
3380
3381 channel_b
3382 .update(&mut cx_b, |channel, cx| {
3383 channel.send_message("yep".to_string(), cx).unwrap()
3384 })
3385 .await
3386 .unwrap();
3387 channel_a
3388 .condition(&cx_a, |channel, _| {
3389 channel_messages(channel)
3390 == [
3391 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3392 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3393 ("user_a".to_string(), "sup".to_string(), false),
3394 ("user_b".to_string(), "can you see this?".to_string(), false),
3395 ("user_a".to_string(), "you online?".to_string(), false),
3396 ("user_b".to_string(), "yep".to_string(), false),
3397 ]
3398 })
3399 .await;
3400 }
3401
3402 #[gpui::test(iterations = 10)]
3403 async fn test_contacts(
3404 mut cx_a: TestAppContext,
3405 mut cx_b: TestAppContext,
3406 mut cx_c: TestAppContext,
3407 ) {
3408 cx_a.foreground().forbid_parking();
3409 let lang_registry = Arc::new(LanguageRegistry::new());
3410 let fs = Arc::new(FakeFs::new(cx_a.background()));
3411
3412 // Connect to a server as 3 clients.
3413 let mut server = TestServer::start(cx_a.foreground()).await;
3414 let client_a = server.create_client(&mut cx_a, "user_a").await;
3415 let client_b = server.create_client(&mut cx_b, "user_b").await;
3416 let client_c = server.create_client(&mut cx_c, "user_c").await;
3417
3418 // Share a worktree as client A.
3419 fs.insert_tree(
3420 "/a",
3421 json!({
3422 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3423 }),
3424 )
3425 .await;
3426
3427 let project_a = cx_a.update(|cx| {
3428 Project::local(
3429 client_a.clone(),
3430 client_a.user_store.clone(),
3431 lang_registry.clone(),
3432 fs.clone(),
3433 cx,
3434 )
3435 });
3436 let (worktree_a, _) = project_a
3437 .update(&mut cx_a, |p, cx| {
3438 p.find_or_create_local_worktree("/a", false, cx)
3439 })
3440 .await
3441 .unwrap();
3442 worktree_a
3443 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3444 .await;
3445
3446 client_a
3447 .user_store
3448 .condition(&cx_a, |user_store, _| {
3449 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3450 })
3451 .await;
3452 client_b
3453 .user_store
3454 .condition(&cx_b, |user_store, _| {
3455 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3456 })
3457 .await;
3458 client_c
3459 .user_store
3460 .condition(&cx_c, |user_store, _| {
3461 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3462 })
3463 .await;
3464
3465 let project_id = project_a
3466 .update(&mut cx_a, |project, _| project.next_remote_id())
3467 .await;
3468 project_a
3469 .update(&mut cx_a, |project, cx| project.share(cx))
3470 .await
3471 .unwrap();
3472
3473 let _project_b = Project::remote(
3474 project_id,
3475 client_b.clone(),
3476 client_b.user_store.clone(),
3477 lang_registry.clone(),
3478 fs.clone(),
3479 &mut cx_b.to_async(),
3480 )
3481 .await
3482 .unwrap();
3483
3484 client_a
3485 .user_store
3486 .condition(&cx_a, |user_store, _| {
3487 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3488 })
3489 .await;
3490 client_b
3491 .user_store
3492 .condition(&cx_b, |user_store, _| {
3493 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3494 })
3495 .await;
3496 client_c
3497 .user_store
3498 .condition(&cx_c, |user_store, _| {
3499 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3500 })
3501 .await;
3502
3503 project_a
3504 .condition(&cx_a, |project, _| {
3505 project.collaborators().contains_key(&client_b.peer_id)
3506 })
3507 .await;
3508
3509 cx_a.update(move |_| drop(project_a));
3510 client_a
3511 .user_store
3512 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3513 .await;
3514 client_b
3515 .user_store
3516 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3517 .await;
3518 client_c
3519 .user_store
3520 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3521 .await;
3522
3523 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3524 user_store
3525 .contacts()
3526 .iter()
3527 .map(|contact| {
3528 let worktrees = contact
3529 .projects
3530 .iter()
3531 .map(|p| {
3532 (
3533 p.worktree_root_names[0].as_str(),
3534 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3535 )
3536 })
3537 .collect();
3538 (contact.user.github_login.as_str(), worktrees)
3539 })
3540 .collect()
3541 }
3542 }
3543
3544 struct TestServer {
3545 peer: Arc<Peer>,
3546 app_state: Arc<AppState>,
3547 server: Arc<Server>,
3548 foreground: Rc<executor::Foreground>,
3549 notifications: mpsc::Receiver<()>,
3550 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3551 forbid_connections: Arc<AtomicBool>,
3552 _test_db: TestDb,
3553 }
3554
3555 impl TestServer {
3556 async fn start(foreground: Rc<executor::Foreground>) -> Self {
3557 let test_db = TestDb::new();
3558 let app_state = Self::build_app_state(&test_db).await;
3559 let peer = Peer::new();
3560 let notifications = mpsc::channel(128);
3561 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3562 Self {
3563 peer,
3564 app_state,
3565 server,
3566 foreground,
3567 notifications: notifications.1,
3568 connection_killers: Default::default(),
3569 forbid_connections: Default::default(),
3570 _test_db: test_db,
3571 }
3572 }
3573
3574 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3575 let http = FakeHttpClient::with_404_response();
3576 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3577 let client_name = name.to_string();
3578 let mut client = Client::new(http.clone());
3579 let server = self.server.clone();
3580 let connection_killers = self.connection_killers.clone();
3581 let forbid_connections = self.forbid_connections.clone();
3582 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3583
3584 Arc::get_mut(&mut client)
3585 .unwrap()
3586 .override_authenticate(move |cx| {
3587 cx.spawn(|_| async move {
3588 let access_token = "the-token".to_string();
3589 Ok(Credentials {
3590 user_id: user_id.0 as u64,
3591 access_token,
3592 })
3593 })
3594 })
3595 .override_establish_connection(move |credentials, cx| {
3596 assert_eq!(credentials.user_id, user_id.0 as u64);
3597 assert_eq!(credentials.access_token, "the-token");
3598
3599 let server = server.clone();
3600 let connection_killers = connection_killers.clone();
3601 let forbid_connections = forbid_connections.clone();
3602 let client_name = client_name.clone();
3603 let connection_id_tx = connection_id_tx.clone();
3604 cx.spawn(move |cx| async move {
3605 if forbid_connections.load(SeqCst) {
3606 Err(EstablishConnectionError::other(anyhow!(
3607 "server is forbidding connections"
3608 )))
3609 } else {
3610 let (client_conn, server_conn, kill_conn) =
3611 Connection::in_memory(cx.background());
3612 connection_killers.lock().insert(user_id, kill_conn);
3613 cx.background()
3614 .spawn(server.handle_connection(
3615 server_conn,
3616 client_name,
3617 user_id,
3618 Some(connection_id_tx),
3619 cx.background(),
3620 ))
3621 .detach();
3622 Ok(client_conn)
3623 }
3624 })
3625 });
3626
3627 client
3628 .authenticate_and_connect(&cx.to_async())
3629 .await
3630 .unwrap();
3631
3632 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3633 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3634 let mut authed_user =
3635 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3636 while authed_user.next().await.unwrap().is_none() {}
3637
3638 TestClient {
3639 client,
3640 peer_id,
3641 user_store,
3642 }
3643 }
3644
3645 fn disconnect_client(&self, user_id: UserId) {
3646 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3647 let _ = kill_conn.try_send(Some(()));
3648 }
3649 }
3650
3651 fn forbid_connections(&self) {
3652 self.forbid_connections.store(true, SeqCst);
3653 }
3654
3655 fn allow_connections(&self) {
3656 self.forbid_connections.store(false, SeqCst);
3657 }
3658
3659 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3660 let mut config = Config::default();
3661 config.session_secret = "a".repeat(32);
3662 config.database_url = test_db.url.clone();
3663 let github_client = github::AppClient::test();
3664 Arc::new(AppState {
3665 db: test_db.db().clone(),
3666 handlebars: Default::default(),
3667 auth_client: auth::build_client("", ""),
3668 repo_client: github::RepoClient::test(&github_client),
3669 github_client,
3670 config,
3671 })
3672 }
3673
3674 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3675 self.server.store.read()
3676 }
3677
3678 async fn condition<F>(&mut self, mut predicate: F)
3679 where
3680 F: FnMut(&Store) -> bool,
3681 {
3682 async_std::future::timeout(Duration::from_millis(500), async {
3683 while !(predicate)(&*self.server.store.read()) {
3684 self.foreground.start_waiting();
3685 self.notifications.next().await;
3686 self.foreground.finish_waiting();
3687 }
3688 })
3689 .await
3690 .expect("condition timed out");
3691 }
3692 }
3693
3694 impl Drop for TestServer {
3695 fn drop(&mut self) {
3696 self.peer.reset();
3697 }
3698 }
3699
3700 struct TestClient {
3701 client: Arc<Client>,
3702 pub peer_id: PeerId,
3703 pub user_store: ModelHandle<UserStore>,
3704 }
3705
3706 impl Deref for TestClient {
3707 type Target = Arc<Client>;
3708
3709 fn deref(&self) -> &Self::Target {
3710 &self.client
3711 }
3712 }
3713
3714 impl TestClient {
3715 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3716 UserId::from_proto(
3717 self.user_store
3718 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3719 )
3720 }
3721 }
3722
3723 impl Executor for Arc<gpui::executor::Background> {
3724 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
3725 self.spawn(future).detach();
3726 }
3727 }
3728
3729 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3730 channel
3731 .messages()
3732 .cursor::<()>()
3733 .map(|m| {
3734 (
3735 m.sender.github_login.clone(),
3736 m.body.clone(),
3737 m.is_pending(),
3738 )
3739 })
3740 .collect()
3741 }
3742
3743 struct EmptyView;
3744
3745 impl gpui::Entity for EmptyView {
3746 type Event = ();
3747 }
3748
3749 impl gpui::View for EmptyView {
3750 fn ui_name() -> &'static str {
3751 "empty view"
3752 }
3753
3754 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3755 gpui::Element::boxed(gpui::elements::Empty)
3756 }
3757 }
3758}