1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44pub trait Executor {
45 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
46}
47
48pub struct RealExecutor;
49
50const MESSAGE_COUNT_PER_PAGE: usize = 100;
51const MAX_MESSAGE_LEN: usize = 1024;
52
53impl Server {
54 pub fn new(
55 app_state: Arc<AppState>,
56 peer: Arc<Peer>,
57 notifications: Option<mpsc::Sender<()>>,
58 ) -> Arc<Self> {
59 let mut server = Self {
60 peer,
61 app_state,
62 store: Default::default(),
63 handlers: Default::default(),
64 notifications,
65 };
66
67 server
68 .add_request_handler(Server::ping)
69 .add_request_handler(Server::register_project)
70 .add_message_handler(Server::unregister_project)
71 .add_request_handler(Server::share_project)
72 .add_message_handler(Server::unshare_project)
73 .add_request_handler(Server::join_project)
74 .add_message_handler(Server::leave_project)
75 .add_request_handler(Server::register_worktree)
76 .add_message_handler(Server::unregister_worktree)
77 .add_request_handler(Server::share_worktree)
78 .add_request_handler(Server::update_worktree)
79 .add_message_handler(Server::update_diagnostic_summary)
80 .add_message_handler(Server::disk_based_diagnostics_updating)
81 .add_message_handler(Server::disk_based_diagnostics_updated)
82 .add_request_handler(Server::get_definition)
83 .add_request_handler(Server::open_buffer)
84 .add_message_handler(Server::close_buffer)
85 .add_request_handler(Server::update_buffer)
86 .add_message_handler(Server::update_buffer_file)
87 .add_message_handler(Server::buffer_reloaded)
88 .add_message_handler(Server::buffer_saved)
89 .add_request_handler(Server::save_buffer)
90 .add_request_handler(Server::format_buffers)
91 .add_request_handler(Server::get_completions)
92 .add_request_handler(Server::apply_additional_edits_for_completion)
93 .add_request_handler(Server::get_code_actions)
94 .add_request_handler(Server::apply_code_action)
95 .add_request_handler(Server::get_channels)
96 .add_request_handler(Server::get_users)
97 .add_request_handler(Server::join_channel)
98 .add_message_handler(Server::leave_channel)
99 .add_request_handler(Server::send_channel_message)
100 .add_request_handler(Server::get_channel_messages);
101
102 Arc::new(server)
103 }
104
105 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
106 where
107 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
108 Fut: 'static + Send + Future<Output = tide::Result<()>>,
109 M: EnvelopedMessage,
110 {
111 let prev_handler = self.handlers.insert(
112 TypeId::of::<M>(),
113 Box::new(move |server, envelope| {
114 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
115 (handler)(server, *envelope).boxed()
116 }),
117 );
118 if prev_handler.is_some() {
119 panic!("registered a handler for the same message twice");
120 }
121 self
122 }
123
124 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
125 where
126 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
127 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
128 M: RequestMessage,
129 {
130 self.add_message_handler(move |server, envelope| {
131 let receipt = envelope.receipt();
132 let response = (handler)(server.clone(), envelope);
133 async move {
134 match response.await {
135 Ok(response) => {
136 server.peer.respond(receipt, response)?;
137 Ok(())
138 }
139 Err(error) => {
140 server.peer.respond_with_error(
141 receipt,
142 proto::Error {
143 message: error.to_string(),
144 },
145 )?;
146 Err(error)
147 }
148 }
149 }
150 })
151 }
152
153 pub fn handle_connection<E: Executor>(
154 self: &Arc<Self>,
155 connection: Connection,
156 addr: String,
157 user_id: UserId,
158 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
159 executor: E,
160 ) -> impl Future<Output = ()> {
161 let mut this = self.clone();
162 async move {
163 let (connection_id, handle_io, mut incoming_rx) =
164 this.peer.add_connection(connection).await;
165
166 if let Some(send_connection_id) = send_connection_id.as_mut() {
167 let _ = send_connection_id.send(connection_id).await;
168 }
169
170 this.state_mut().add_connection(connection_id, user_id);
171 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
172 log::error!("error updating contacts for {:?}: {}", user_id, err);
173 }
174
175 let handle_io = handle_io.fuse();
176 futures::pin_mut!(handle_io);
177 loop {
178 let next_message = incoming_rx.next().fuse();
179 futures::pin_mut!(next_message);
180 futures::select_biased! {
181 result = handle_io => {
182 if let Err(err) = result {
183 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
184 }
185 break;
186 }
187 message = next_message => {
188 if let Some(message) = message {
189 let start_time = Instant::now();
190 let type_name = message.payload_type_name();
191 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
192 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
193 let notifications = this.notifications.clone();
194 let is_background = message.is_background();
195 let handle_message = (handler)(this.clone(), message);
196 let handle_message = async move {
197 if let Err(err) = handle_message.await {
198 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
199 } else {
200 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
201 }
202 if let Some(mut notifications) = notifications {
203 let _ = notifications.send(()).await;
204 }
205 };
206 if is_background {
207 executor.spawn_detached(handle_message);
208 } else {
209 handle_message.await;
210 }
211 } else {
212 log::warn!("unhandled message: {}", type_name);
213 }
214 } else {
215 log::info!("rpc connection closed {:?}", addr);
216 break;
217 }
218 }
219 }
220 }
221
222 if let Err(err) = this.sign_out(connection_id).await {
223 log::error!("error signing out connection {:?} - {:?}", addr, err);
224 }
225 }
226 }
227
228 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
229 self.peer.disconnect(connection_id);
230 let removed_connection = self.state_mut().remove_connection(connection_id)?;
231
232 for (project_id, project) in removed_connection.hosted_projects {
233 if let Some(share) = project.share {
234 broadcast(
235 connection_id,
236 share.guests.keys().copied().collect(),
237 |conn_id| {
238 self.peer
239 .send(conn_id, proto::UnshareProject { project_id })
240 },
241 )?;
242 }
243 }
244
245 for (project_id, peer_ids) in removed_connection.guest_project_ids {
246 broadcast(connection_id, peer_ids, |conn_id| {
247 self.peer.send(
248 conn_id,
249 proto::RemoveProjectCollaborator {
250 project_id,
251 peer_id: connection_id.0,
252 },
253 )
254 })?;
255 }
256
257 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
258 Ok(())
259 }
260
261 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
262 Ok(proto::Ack {})
263 }
264
265 async fn register_project(
266 mut self: Arc<Server>,
267 request: TypedEnvelope<proto::RegisterProject>,
268 ) -> tide::Result<proto::RegisterProjectResponse> {
269 let project_id = {
270 let mut state = self.state_mut();
271 let user_id = state.user_id_for_connection(request.sender_id)?;
272 state.register_project(request.sender_id, user_id)
273 };
274 Ok(proto::RegisterProjectResponse { project_id })
275 }
276
277 async fn unregister_project(
278 mut self: Arc<Server>,
279 request: TypedEnvelope<proto::UnregisterProject>,
280 ) -> tide::Result<()> {
281 let project = self
282 .state_mut()
283 .unregister_project(request.payload.project_id, request.sender_id)?;
284 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
285 Ok(())
286 }
287
288 async fn share_project(
289 mut self: Arc<Server>,
290 request: TypedEnvelope<proto::ShareProject>,
291 ) -> tide::Result<proto::Ack> {
292 self.state_mut()
293 .share_project(request.payload.project_id, request.sender_id);
294 Ok(proto::Ack {})
295 }
296
297 async fn unshare_project(
298 mut self: Arc<Server>,
299 request: TypedEnvelope<proto::UnshareProject>,
300 ) -> tide::Result<()> {
301 let project_id = request.payload.project_id;
302 let project = self
303 .state_mut()
304 .unshare_project(project_id, request.sender_id)?;
305
306 broadcast(request.sender_id, project.connection_ids, |conn_id| {
307 self.peer
308 .send(conn_id, proto::UnshareProject { project_id })
309 })?;
310 self.update_contacts_for_users(&project.authorized_user_ids)?;
311 Ok(())
312 }
313
314 async fn join_project(
315 mut self: Arc<Server>,
316 request: TypedEnvelope<proto::JoinProject>,
317 ) -> tide::Result<proto::JoinProjectResponse> {
318 let project_id = request.payload.project_id;
319
320 let user_id = self.state().user_id_for_connection(request.sender_id)?;
321 let (response, connection_ids, contact_user_ids) = self
322 .state_mut()
323 .join_project(request.sender_id, user_id, project_id)
324 .and_then(|joined| {
325 let share = joined.project.share()?;
326 let peer_count = share.guests.len();
327 let mut collaborators = Vec::with_capacity(peer_count);
328 collaborators.push(proto::Collaborator {
329 peer_id: joined.project.host_connection_id.0,
330 replica_id: 0,
331 user_id: joined.project.host_user_id.to_proto(),
332 });
333 let worktrees = joined
334 .project
335 .worktrees
336 .iter()
337 .filter_map(|(id, worktree)| {
338 worktree.share.as_ref().map(|share| proto::Worktree {
339 id: *id,
340 root_name: worktree.root_name.clone(),
341 entries: share.entries.values().cloned().collect(),
342 diagnostic_summaries: share
343 .diagnostic_summaries
344 .values()
345 .cloned()
346 .collect(),
347 weak: worktree.weak,
348 next_update_id: share.next_update_id as u64,
349 })
350 })
351 .collect();
352 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
353 if *peer_conn_id != request.sender_id {
354 collaborators.push(proto::Collaborator {
355 peer_id: peer_conn_id.0,
356 replica_id: *peer_replica_id as u32,
357 user_id: peer_user_id.to_proto(),
358 });
359 }
360 }
361 let response = proto::JoinProjectResponse {
362 worktrees,
363 replica_id: joined.replica_id as u32,
364 collaborators,
365 };
366 let connection_ids = joined.project.connection_ids();
367 let contact_user_ids = joined.project.authorized_user_ids();
368 Ok((response, connection_ids, contact_user_ids))
369 })?;
370
371 broadcast(request.sender_id, connection_ids, |conn_id| {
372 self.peer.send(
373 conn_id,
374 proto::AddProjectCollaborator {
375 project_id,
376 collaborator: Some(proto::Collaborator {
377 peer_id: request.sender_id.0,
378 replica_id: response.replica_id,
379 user_id: user_id.to_proto(),
380 }),
381 },
382 )
383 })?;
384 self.update_contacts_for_users(&contact_user_ids)?;
385 Ok(response)
386 }
387
388 async fn leave_project(
389 mut self: Arc<Server>,
390 request: TypedEnvelope<proto::LeaveProject>,
391 ) -> tide::Result<()> {
392 let sender_id = request.sender_id;
393 let project_id = request.payload.project_id;
394 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
395
396 broadcast(sender_id, worktree.connection_ids, |conn_id| {
397 self.peer.send(
398 conn_id,
399 proto::RemoveProjectCollaborator {
400 project_id,
401 peer_id: sender_id.0,
402 },
403 )
404 })?;
405 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
406
407 Ok(())
408 }
409
410 async fn register_worktree(
411 mut self: Arc<Server>,
412 request: TypedEnvelope<proto::RegisterWorktree>,
413 ) -> tide::Result<proto::Ack> {
414 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
415
416 let mut contact_user_ids = HashSet::default();
417 contact_user_ids.insert(host_user_id);
418 for github_login in request.payload.authorized_logins {
419 let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
420 contact_user_ids.insert(contact_user_id);
421 }
422
423 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
424 self.state_mut().register_worktree(
425 request.payload.project_id,
426 request.payload.worktree_id,
427 request.sender_id,
428 Worktree {
429 authorized_user_ids: contact_user_ids.clone(),
430 root_name: request.payload.root_name,
431 share: None,
432 weak: false,
433 },
434 )?;
435 self.update_contacts_for_users(&contact_user_ids)?;
436 Ok(proto::Ack {})
437 }
438
439 async fn unregister_worktree(
440 mut self: Arc<Server>,
441 request: TypedEnvelope<proto::UnregisterWorktree>,
442 ) -> tide::Result<()> {
443 let project_id = request.payload.project_id;
444 let worktree_id = request.payload.worktree_id;
445 let (worktree, guest_connection_ids) =
446 self.state_mut()
447 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
448 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
449 self.peer.send(
450 conn_id,
451 proto::UnregisterWorktree {
452 project_id,
453 worktree_id,
454 },
455 )
456 })?;
457 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
458 Ok(())
459 }
460
461 async fn share_worktree(
462 mut self: Arc<Server>,
463 mut request: TypedEnvelope<proto::ShareWorktree>,
464 ) -> tide::Result<proto::Ack> {
465 let worktree = request
466 .payload
467 .worktree
468 .as_mut()
469 .ok_or_else(|| anyhow!("missing worktree"))?;
470 let entries = worktree
471 .entries
472 .iter()
473 .map(|entry| (entry.id, entry.clone()))
474 .collect();
475 let diagnostic_summaries = worktree
476 .diagnostic_summaries
477 .iter()
478 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
479 .collect();
480
481 let shared_worktree = self.state_mut().share_worktree(
482 request.payload.project_id,
483 worktree.id,
484 request.sender_id,
485 entries,
486 diagnostic_summaries,
487 worktree.next_update_id,
488 )?;
489
490 broadcast(
491 request.sender_id,
492 shared_worktree.connection_ids,
493 |connection_id| {
494 self.peer
495 .forward_send(request.sender_id, connection_id, request.payload.clone())
496 },
497 )?;
498 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
499
500 Ok(proto::Ack {})
501 }
502
503 async fn update_worktree(
504 mut self: Arc<Server>,
505 request: TypedEnvelope<proto::UpdateWorktree>,
506 ) -> tide::Result<proto::Ack> {
507 let connection_ids = self.state_mut().update_worktree(
508 request.sender_id,
509 request.payload.project_id,
510 request.payload.worktree_id,
511 request.payload.id,
512 &request.payload.removed_entries,
513 &request.payload.updated_entries,
514 )?;
515
516 broadcast(request.sender_id, connection_ids, |connection_id| {
517 self.peer
518 .forward_send(request.sender_id, connection_id, request.payload.clone())
519 })?;
520
521 Ok(proto::Ack {})
522 }
523
524 async fn update_diagnostic_summary(
525 mut self: Arc<Server>,
526 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
527 ) -> tide::Result<()> {
528 let summary = request
529 .payload
530 .summary
531 .clone()
532 .ok_or_else(|| anyhow!("invalid summary"))?;
533 let receiver_ids = self.state_mut().update_diagnostic_summary(
534 request.payload.project_id,
535 request.payload.worktree_id,
536 request.sender_id,
537 summary,
538 )?;
539
540 broadcast(request.sender_id, receiver_ids, |connection_id| {
541 self.peer
542 .forward_send(request.sender_id, connection_id, request.payload.clone())
543 })?;
544 Ok(())
545 }
546
547 async fn disk_based_diagnostics_updating(
548 self: Arc<Server>,
549 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
550 ) -> tide::Result<()> {
551 let receiver_ids = self
552 .state()
553 .project_connection_ids(request.payload.project_id, request.sender_id)?;
554 broadcast(request.sender_id, receiver_ids, |connection_id| {
555 self.peer
556 .forward_send(request.sender_id, connection_id, request.payload.clone())
557 })?;
558 Ok(())
559 }
560
561 async fn disk_based_diagnostics_updated(
562 self: Arc<Server>,
563 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
564 ) -> tide::Result<()> {
565 let receiver_ids = self
566 .state()
567 .project_connection_ids(request.payload.project_id, request.sender_id)?;
568 broadcast(request.sender_id, receiver_ids, |connection_id| {
569 self.peer
570 .forward_send(request.sender_id, connection_id, request.payload.clone())
571 })?;
572 Ok(())
573 }
574
575 async fn get_definition(
576 self: Arc<Server>,
577 request: TypedEnvelope<proto::GetDefinition>,
578 ) -> tide::Result<proto::GetDefinitionResponse> {
579 let host_connection_id = self
580 .state()
581 .read_project(request.payload.project_id, request.sender_id)?
582 .host_connection_id;
583 Ok(self
584 .peer
585 .forward_request(request.sender_id, host_connection_id, request.payload)
586 .await?)
587 }
588
589 async fn open_buffer(
590 self: Arc<Server>,
591 request: TypedEnvelope<proto::OpenBuffer>,
592 ) -> tide::Result<proto::OpenBufferResponse> {
593 let host_connection_id = self
594 .state()
595 .read_project(request.payload.project_id, request.sender_id)?
596 .host_connection_id;
597 Ok(self
598 .peer
599 .forward_request(request.sender_id, host_connection_id, request.payload)
600 .await?)
601 }
602
603 async fn close_buffer(
604 self: Arc<Server>,
605 request: TypedEnvelope<proto::CloseBuffer>,
606 ) -> tide::Result<()> {
607 let host_connection_id = self
608 .state()
609 .read_project(request.payload.project_id, request.sender_id)?
610 .host_connection_id;
611 self.peer
612 .forward_send(request.sender_id, host_connection_id, request.payload)?;
613 Ok(())
614 }
615
616 async fn save_buffer(
617 self: Arc<Server>,
618 request: TypedEnvelope<proto::SaveBuffer>,
619 ) -> tide::Result<proto::BufferSaved> {
620 let host;
621 let mut guests;
622 {
623 let state = self.state();
624 let project = state.read_project(request.payload.project_id, request.sender_id)?;
625 host = project.host_connection_id;
626 guests = project.guest_connection_ids()
627 }
628
629 let response = self
630 .peer
631 .forward_request(request.sender_id, host, request.payload.clone())
632 .await?;
633
634 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
635 broadcast(host, guests, |conn_id| {
636 self.peer.forward_send(host, conn_id, response.clone())
637 })?;
638
639 Ok(response)
640 }
641
642 async fn format_buffers(
643 self: Arc<Server>,
644 request: TypedEnvelope<proto::FormatBuffers>,
645 ) -> tide::Result<proto::FormatBuffersResponse> {
646 let host = self
647 .state()
648 .read_project(request.payload.project_id, request.sender_id)?
649 .host_connection_id;
650 Ok(self
651 .peer
652 .forward_request(request.sender_id, host, request.payload.clone())
653 .await?)
654 }
655
656 async fn get_completions(
657 self: Arc<Server>,
658 request: TypedEnvelope<proto::GetCompletions>,
659 ) -> tide::Result<proto::GetCompletionsResponse> {
660 let host = self
661 .state()
662 .read_project(request.payload.project_id, request.sender_id)?
663 .host_connection_id;
664 Ok(self
665 .peer
666 .forward_request(request.sender_id, host, request.payload.clone())
667 .await?)
668 }
669
670 async fn apply_additional_edits_for_completion(
671 self: Arc<Server>,
672 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
673 ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
674 let host = self
675 .state()
676 .read_project(request.payload.project_id, request.sender_id)?
677 .host_connection_id;
678 Ok(self
679 .peer
680 .forward_request(request.sender_id, host, request.payload.clone())
681 .await?)
682 }
683
684 async fn get_code_actions(
685 self: Arc<Server>,
686 request: TypedEnvelope<proto::GetCodeActions>,
687 ) -> tide::Result<proto::GetCodeActionsResponse> {
688 let host = self
689 .state()
690 .read_project(request.payload.project_id, request.sender_id)?
691 .host_connection_id;
692 Ok(self
693 .peer
694 .forward_request(request.sender_id, host, request.payload.clone())
695 .await?)
696 }
697
698 async fn apply_code_action(
699 self: Arc<Server>,
700 request: TypedEnvelope<proto::ApplyCodeAction>,
701 ) -> tide::Result<proto::ApplyCodeActionResponse> {
702 let host = self
703 .state()
704 .read_project(request.payload.project_id, request.sender_id)?
705 .host_connection_id;
706 Ok(self
707 .peer
708 .forward_request(request.sender_id, host, request.payload.clone())
709 .await?)
710 }
711
712 async fn update_buffer(
713 self: Arc<Server>,
714 request: TypedEnvelope<proto::UpdateBuffer>,
715 ) -> tide::Result<proto::Ack> {
716 let receiver_ids = self
717 .state()
718 .project_connection_ids(request.payload.project_id, request.sender_id)?;
719 broadcast(request.sender_id, receiver_ids, |connection_id| {
720 self.peer
721 .forward_send(request.sender_id, connection_id, request.payload.clone())
722 })?;
723 Ok(proto::Ack {})
724 }
725
726 async fn update_buffer_file(
727 self: Arc<Server>,
728 request: TypedEnvelope<proto::UpdateBufferFile>,
729 ) -> tide::Result<()> {
730 let receiver_ids = self
731 .state()
732 .project_connection_ids(request.payload.project_id, request.sender_id)?;
733 broadcast(request.sender_id, receiver_ids, |connection_id| {
734 self.peer
735 .forward_send(request.sender_id, connection_id, request.payload.clone())
736 })?;
737 Ok(())
738 }
739
740 async fn buffer_reloaded(
741 self: Arc<Server>,
742 request: TypedEnvelope<proto::BufferReloaded>,
743 ) -> tide::Result<()> {
744 let receiver_ids = self
745 .state()
746 .project_connection_ids(request.payload.project_id, request.sender_id)?;
747 broadcast(request.sender_id, receiver_ids, |connection_id| {
748 self.peer
749 .forward_send(request.sender_id, connection_id, request.payload.clone())
750 })?;
751 Ok(())
752 }
753
754 async fn buffer_saved(
755 self: Arc<Server>,
756 request: TypedEnvelope<proto::BufferSaved>,
757 ) -> tide::Result<()> {
758 let receiver_ids = self
759 .state()
760 .project_connection_ids(request.payload.project_id, request.sender_id)?;
761 broadcast(request.sender_id, receiver_ids, |connection_id| {
762 self.peer
763 .forward_send(request.sender_id, connection_id, request.payload.clone())
764 })?;
765 Ok(())
766 }
767
768 async fn get_channels(
769 self: Arc<Server>,
770 request: TypedEnvelope<proto::GetChannels>,
771 ) -> tide::Result<proto::GetChannelsResponse> {
772 let user_id = self.state().user_id_for_connection(request.sender_id)?;
773 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
774 Ok(proto::GetChannelsResponse {
775 channels: channels
776 .into_iter()
777 .map(|chan| proto::Channel {
778 id: chan.id.to_proto(),
779 name: chan.name,
780 })
781 .collect(),
782 })
783 }
784
785 async fn get_users(
786 self: Arc<Server>,
787 request: TypedEnvelope<proto::GetUsers>,
788 ) -> tide::Result<proto::GetUsersResponse> {
789 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
790 let users = self
791 .app_state
792 .db
793 .get_users_by_ids(user_ids)
794 .await?
795 .into_iter()
796 .map(|user| proto::User {
797 id: user.id.to_proto(),
798 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
799 github_login: user.github_login,
800 })
801 .collect();
802 Ok(proto::GetUsersResponse { users })
803 }
804
805 fn update_contacts_for_users<'a>(
806 self: &Arc<Server>,
807 user_ids: impl IntoIterator<Item = &'a UserId>,
808 ) -> anyhow::Result<()> {
809 let mut result = Ok(());
810 let state = self.state();
811 for user_id in user_ids {
812 let contacts = state.contacts_for_user(*user_id);
813 for connection_id in state.connection_ids_for_user(*user_id) {
814 if let Err(error) = self.peer.send(
815 connection_id,
816 proto::UpdateContacts {
817 contacts: contacts.clone(),
818 },
819 ) {
820 result = Err(error);
821 }
822 }
823 }
824 result
825 }
826
827 async fn join_channel(
828 mut self: Arc<Self>,
829 request: TypedEnvelope<proto::JoinChannel>,
830 ) -> tide::Result<proto::JoinChannelResponse> {
831 let user_id = self.state().user_id_for_connection(request.sender_id)?;
832 let channel_id = ChannelId::from_proto(request.payload.channel_id);
833 if !self
834 .app_state
835 .db
836 .can_user_access_channel(user_id, channel_id)
837 .await?
838 {
839 Err(anyhow!("access denied"))?;
840 }
841
842 self.state_mut().join_channel(request.sender_id, channel_id);
843 let messages = self
844 .app_state
845 .db
846 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
847 .await?
848 .into_iter()
849 .map(|msg| proto::ChannelMessage {
850 id: msg.id.to_proto(),
851 body: msg.body,
852 timestamp: msg.sent_at.unix_timestamp() as u64,
853 sender_id: msg.sender_id.to_proto(),
854 nonce: Some(msg.nonce.as_u128().into()),
855 })
856 .collect::<Vec<_>>();
857 Ok(proto::JoinChannelResponse {
858 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
859 messages,
860 })
861 }
862
863 async fn leave_channel(
864 mut self: Arc<Self>,
865 request: TypedEnvelope<proto::LeaveChannel>,
866 ) -> tide::Result<()> {
867 let user_id = self.state().user_id_for_connection(request.sender_id)?;
868 let channel_id = ChannelId::from_proto(request.payload.channel_id);
869 if !self
870 .app_state
871 .db
872 .can_user_access_channel(user_id, channel_id)
873 .await?
874 {
875 Err(anyhow!("access denied"))?;
876 }
877
878 self.state_mut()
879 .leave_channel(request.sender_id, channel_id);
880
881 Ok(())
882 }
883
884 async fn send_channel_message(
885 self: Arc<Self>,
886 request: TypedEnvelope<proto::SendChannelMessage>,
887 ) -> tide::Result<proto::SendChannelMessageResponse> {
888 let channel_id = ChannelId::from_proto(request.payload.channel_id);
889 let user_id;
890 let connection_ids;
891 {
892 let state = self.state();
893 user_id = state.user_id_for_connection(request.sender_id)?;
894 connection_ids = state.channel_connection_ids(channel_id)?;
895 }
896
897 // Validate the message body.
898 let body = request.payload.body.trim().to_string();
899 if body.len() > MAX_MESSAGE_LEN {
900 return Err(anyhow!("message is too long"))?;
901 }
902 if body.is_empty() {
903 return Err(anyhow!("message can't be blank"))?;
904 }
905
906 let timestamp = OffsetDateTime::now_utc();
907 let nonce = request
908 .payload
909 .nonce
910 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
911
912 let message_id = self
913 .app_state
914 .db
915 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
916 .await?
917 .to_proto();
918 let message = proto::ChannelMessage {
919 sender_id: user_id.to_proto(),
920 id: message_id,
921 body,
922 timestamp: timestamp.unix_timestamp() as u64,
923 nonce: Some(nonce),
924 };
925 broadcast(request.sender_id, connection_ids, |conn_id| {
926 self.peer.send(
927 conn_id,
928 proto::ChannelMessageSent {
929 channel_id: channel_id.to_proto(),
930 message: Some(message.clone()),
931 },
932 )
933 })?;
934 Ok(proto::SendChannelMessageResponse {
935 message: Some(message),
936 })
937 }
938
939 async fn get_channel_messages(
940 self: Arc<Self>,
941 request: TypedEnvelope<proto::GetChannelMessages>,
942 ) -> tide::Result<proto::GetChannelMessagesResponse> {
943 let user_id = self.state().user_id_for_connection(request.sender_id)?;
944 let channel_id = ChannelId::from_proto(request.payload.channel_id);
945 if !self
946 .app_state
947 .db
948 .can_user_access_channel(user_id, channel_id)
949 .await?
950 {
951 Err(anyhow!("access denied"))?;
952 }
953
954 let messages = self
955 .app_state
956 .db
957 .get_channel_messages(
958 channel_id,
959 MESSAGE_COUNT_PER_PAGE,
960 Some(MessageId::from_proto(request.payload.before_message_id)),
961 )
962 .await?
963 .into_iter()
964 .map(|msg| proto::ChannelMessage {
965 id: msg.id.to_proto(),
966 body: msg.body,
967 timestamp: msg.sent_at.unix_timestamp() as u64,
968 sender_id: msg.sender_id.to_proto(),
969 nonce: Some(msg.nonce.as_u128().into()),
970 })
971 .collect::<Vec<_>>();
972
973 Ok(proto::GetChannelMessagesResponse {
974 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
975 messages,
976 })
977 }
978
979 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
980 self.store.read()
981 }
982
983 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
984 self.store.write()
985 }
986}
987
988impl Executor for RealExecutor {
989 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
990 task::spawn(future);
991 }
992}
993
994fn broadcast<F>(
995 sender_id: ConnectionId,
996 receiver_ids: Vec<ConnectionId>,
997 mut f: F,
998) -> anyhow::Result<()>
999where
1000 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1001{
1002 let mut result = Ok(());
1003 for receiver_id in receiver_ids {
1004 if receiver_id != sender_id {
1005 if let Err(error) = f(receiver_id) {
1006 if result.is_ok() {
1007 result = Err(error);
1008 }
1009 }
1010 }
1011 }
1012 result
1013}
1014
1015pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1016 let server = Server::new(app.state().clone(), rpc.clone(), None);
1017 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1018 let server = server.clone();
1019 async move {
1020 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1021
1022 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1023 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1024 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1025 let client_protocol_version: Option<u32> = request
1026 .header("X-Zed-Protocol-Version")
1027 .and_then(|v| v.as_str().parse().ok());
1028
1029 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1030 return Ok(Response::new(StatusCode::UpgradeRequired));
1031 }
1032
1033 let header = match request.header("Sec-Websocket-Key") {
1034 Some(h) => h.as_str(),
1035 None => return Err(anyhow!("expected sec-websocket-key"))?,
1036 };
1037
1038 let user_id = process_auth_header(&request).await?;
1039
1040 let mut response = Response::new(StatusCode::SwitchingProtocols);
1041 response.insert_header(UPGRADE, "websocket");
1042 response.insert_header(CONNECTION, "Upgrade");
1043 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1044 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1045 response.insert_header("Sec-Websocket-Version", "13");
1046
1047 let http_res: &mut tide::http::Response = response.as_mut();
1048 let upgrade_receiver = http_res.recv_upgrade().await;
1049 let addr = request.remote().unwrap_or("unknown").to_string();
1050 task::spawn(async move {
1051 if let Some(stream) = upgrade_receiver.await {
1052 server
1053 .handle_connection(
1054 Connection::new(
1055 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1056 ),
1057 addr,
1058 user_id,
1059 None,
1060 RealExecutor,
1061 )
1062 .await;
1063 }
1064 });
1065
1066 Ok(response)
1067 }
1068 });
1069}
1070
1071fn header_contains_ignore_case<T>(
1072 request: &tide::Request<T>,
1073 header_name: HeaderName,
1074 value: &str,
1075) -> bool {
1076 request
1077 .header(header_name)
1078 .map(|h| {
1079 h.as_str()
1080 .split(',')
1081 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1082 })
1083 .unwrap_or(false)
1084}
1085
1086#[cfg(test)]
1087mod tests {
1088 use super::*;
1089 use crate::{
1090 auth,
1091 db::{tests::TestDb, UserId},
1092 github, AppState, Config,
1093 };
1094 use ::rpc::Peer;
1095 use collections::BTreeMap;
1096 use futures::channel::mpsc::UnboundedReceiver;
1097 use gpui::{executor, ModelHandle, TestAppContext};
1098 use parking_lot::Mutex;
1099 use postage::{mpsc, watch};
1100 use rand::prelude::*;
1101 use rpc::PeerId;
1102 use serde_json::json;
1103 use sqlx::types::time::OffsetDateTime;
1104 use std::{
1105 cell::{Cell, RefCell},
1106 env,
1107 ops::Deref,
1108 path::Path,
1109 rc::Rc,
1110 sync::{
1111 atomic::{AtomicBool, Ordering::SeqCst},
1112 Arc,
1113 },
1114 time::Duration,
1115 };
1116 use zed::{
1117 client::{
1118 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1119 EstablishConnectionError, UserStore,
1120 },
1121 editor::{
1122 self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1123 Redo, ToggleCodeActions, Undo,
1124 },
1125 fs::{FakeFs, Fs as _},
1126 language::{
1127 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1128 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1129 },
1130 lsp::{self, FakeLanguageServer},
1131 project::{DiagnosticSummary, Project, ProjectPath},
1132 workspace::{Workspace, WorkspaceParams},
1133 };
1134
1135 #[cfg(test)]
1136 #[ctor::ctor]
1137 fn init_logger() {
1138 if std::env::var("RUST_LOG").is_ok() {
1139 env_logger::init();
1140 }
1141 }
1142
1143 #[gpui::test(iterations = 10)]
1144 async fn test_share_project(
1145 mut cx_a: TestAppContext,
1146 mut cx_b: TestAppContext,
1147 last_iteration: bool,
1148 ) {
1149 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1150 let lang_registry = Arc::new(LanguageRegistry::new());
1151 let fs = Arc::new(FakeFs::new(cx_a.background()));
1152 cx_a.foreground().forbid_parking();
1153
1154 // Connect to a server as 2 clients.
1155 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1156 let client_a = server.create_client(&mut cx_a, "user_a").await;
1157 let client_b = server.create_client(&mut cx_b, "user_b").await;
1158
1159 // Share a project as client A
1160 fs.insert_tree(
1161 "/a",
1162 json!({
1163 ".zed.toml": r#"collaborators = ["user_b"]"#,
1164 "a.txt": "a-contents",
1165 "b.txt": "b-contents",
1166 }),
1167 )
1168 .await;
1169 let project_a = cx_a.update(|cx| {
1170 Project::local(
1171 client_a.clone(),
1172 client_a.user_store.clone(),
1173 lang_registry.clone(),
1174 fs.clone(),
1175 cx,
1176 )
1177 });
1178 let (worktree_a, _) = project_a
1179 .update(&mut cx_a, |p, cx| {
1180 p.find_or_create_local_worktree("/a", false, cx)
1181 })
1182 .await
1183 .unwrap();
1184 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1185 worktree_a
1186 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1187 .await;
1188 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1189 project_a
1190 .update(&mut cx_a, |p, cx| p.share(cx))
1191 .await
1192 .unwrap();
1193
1194 // Join that project as client B
1195 let project_b = Project::remote(
1196 project_id,
1197 client_b.clone(),
1198 client_b.user_store.clone(),
1199 lang_registry.clone(),
1200 fs.clone(),
1201 &mut cx_b.to_async(),
1202 )
1203 .await
1204 .unwrap();
1205
1206 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1207 assert_eq!(
1208 project
1209 .collaborators()
1210 .get(&client_a.peer_id)
1211 .unwrap()
1212 .user
1213 .github_login,
1214 "user_a"
1215 );
1216 project.replica_id()
1217 });
1218 project_a
1219 .condition(&cx_a, |tree, _| {
1220 tree.collaborators()
1221 .get(&client_b.peer_id)
1222 .map_or(false, |collaborator| {
1223 collaborator.replica_id == replica_id_b
1224 && collaborator.user.github_login == "user_b"
1225 })
1226 })
1227 .await;
1228
1229 // Open the same file as client B and client A.
1230 let buffer_b = project_b
1231 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1232 .await
1233 .unwrap();
1234 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1235 buffer_b.read_with(&cx_b, |buf, cx| {
1236 assert_eq!(buf.read(cx).text(), "b-contents")
1237 });
1238 project_a.read_with(&cx_a, |project, cx| {
1239 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1240 });
1241 let buffer_a = project_a
1242 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1243 .await
1244 .unwrap();
1245
1246 let editor_b = cx_b.add_view(window_b, |cx| {
1247 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1248 });
1249
1250 // TODO
1251 // // Create a selection set as client B and see that selection set as client A.
1252 // buffer_a
1253 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1254 // .await;
1255
1256 // Edit the buffer as client B and see that edit as client A.
1257 editor_b.update(&mut cx_b, |editor, cx| {
1258 editor.handle_input(&Input("ok, ".into()), cx)
1259 });
1260 buffer_a
1261 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1262 .await;
1263
1264 // TODO
1265 // // Remove the selection set as client B, see those selections disappear as client A.
1266 cx_b.update(move |_| drop(editor_b));
1267 // buffer_a
1268 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1269 // .await;
1270
1271 // Close the buffer as client A, see that the buffer is closed.
1272 cx_a.update(move |_| drop(buffer_a));
1273 project_a
1274 .condition(&cx_a, |project, cx| {
1275 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1276 })
1277 .await;
1278
1279 // Dropping the client B's project removes client B from client A's collaborators.
1280 cx_b.update(move |_| drop(project_b));
1281 project_a
1282 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1283 .await;
1284 }
1285
1286 #[gpui::test(iterations = 10)]
1287 async fn test_unshare_project(
1288 mut cx_a: TestAppContext,
1289 mut cx_b: TestAppContext,
1290 last_iteration: bool,
1291 ) {
1292 let lang_registry = Arc::new(LanguageRegistry::new());
1293 let fs = Arc::new(FakeFs::new(cx_a.background()));
1294 cx_a.foreground().forbid_parking();
1295
1296 // Connect to a server as 2 clients.
1297 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1298 let client_a = server.create_client(&mut cx_a, "user_a").await;
1299 let client_b = server.create_client(&mut cx_b, "user_b").await;
1300
1301 // Share a project as client A
1302 fs.insert_tree(
1303 "/a",
1304 json!({
1305 ".zed.toml": r#"collaborators = ["user_b"]"#,
1306 "a.txt": "a-contents",
1307 "b.txt": "b-contents",
1308 }),
1309 )
1310 .await;
1311 let project_a = cx_a.update(|cx| {
1312 Project::local(
1313 client_a.clone(),
1314 client_a.user_store.clone(),
1315 lang_registry.clone(),
1316 fs.clone(),
1317 cx,
1318 )
1319 });
1320 let (worktree_a, _) = project_a
1321 .update(&mut cx_a, |p, cx| {
1322 p.find_or_create_local_worktree("/a", false, cx)
1323 })
1324 .await
1325 .unwrap();
1326 worktree_a
1327 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1328 .await;
1329 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1330 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1331 project_a
1332 .update(&mut cx_a, |p, cx| p.share(cx))
1333 .await
1334 .unwrap();
1335 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1336
1337 // Join that project as client B
1338 let project_b = Project::remote(
1339 project_id,
1340 client_b.clone(),
1341 client_b.user_store.clone(),
1342 lang_registry.clone(),
1343 fs.clone(),
1344 &mut cx_b.to_async(),
1345 )
1346 .await
1347 .unwrap();
1348 project_b
1349 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1350 .await
1351 .unwrap();
1352
1353 // Unshare the project as client A
1354 project_a
1355 .update(&mut cx_a, |project, cx| project.unshare(cx))
1356 .await
1357 .unwrap();
1358 project_b
1359 .condition(&mut cx_b, |project, _| project.is_read_only())
1360 .await;
1361 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1362 drop(project_b);
1363
1364 // Share the project again and ensure guests can still join.
1365 project_a
1366 .update(&mut cx_a, |project, cx| project.share(cx))
1367 .await
1368 .unwrap();
1369 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1370
1371 let project_c = Project::remote(
1372 project_id,
1373 client_b.clone(),
1374 client_b.user_store.clone(),
1375 lang_registry.clone(),
1376 fs.clone(),
1377 &mut cx_b.to_async(),
1378 )
1379 .await
1380 .unwrap();
1381 project_c
1382 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1383 .await
1384 .unwrap();
1385 }
1386
1387 #[gpui::test(iterations = 10)]
1388 async fn test_propagate_saves_and_fs_changes(
1389 mut cx_a: TestAppContext,
1390 mut cx_b: TestAppContext,
1391 mut cx_c: TestAppContext,
1392 last_iteration: bool,
1393 ) {
1394 let lang_registry = Arc::new(LanguageRegistry::new());
1395 let fs = Arc::new(FakeFs::new(cx_a.background()));
1396 cx_a.foreground().forbid_parking();
1397
1398 // Connect to a server as 3 clients.
1399 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1400 let client_a = server.create_client(&mut cx_a, "user_a").await;
1401 let client_b = server.create_client(&mut cx_b, "user_b").await;
1402 let client_c = server.create_client(&mut cx_c, "user_c").await;
1403
1404 // Share a worktree as client A.
1405 fs.insert_tree(
1406 "/a",
1407 json!({
1408 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1409 "file1": "",
1410 "file2": ""
1411 }),
1412 )
1413 .await;
1414 let project_a = cx_a.update(|cx| {
1415 Project::local(
1416 client_a.clone(),
1417 client_a.user_store.clone(),
1418 lang_registry.clone(),
1419 fs.clone(),
1420 cx,
1421 )
1422 });
1423 let (worktree_a, _) = project_a
1424 .update(&mut cx_a, |p, cx| {
1425 p.find_or_create_local_worktree("/a", false, cx)
1426 })
1427 .await
1428 .unwrap();
1429 worktree_a
1430 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1431 .await;
1432 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1433 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1434 project_a
1435 .update(&mut cx_a, |p, cx| p.share(cx))
1436 .await
1437 .unwrap();
1438
1439 // Join that worktree as clients B and C.
1440 let project_b = Project::remote(
1441 project_id,
1442 client_b.clone(),
1443 client_b.user_store.clone(),
1444 lang_registry.clone(),
1445 fs.clone(),
1446 &mut cx_b.to_async(),
1447 )
1448 .await
1449 .unwrap();
1450 let project_c = Project::remote(
1451 project_id,
1452 client_c.clone(),
1453 client_c.user_store.clone(),
1454 lang_registry.clone(),
1455 fs.clone(),
1456 &mut cx_c.to_async(),
1457 )
1458 .await
1459 .unwrap();
1460 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1461 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1462
1463 // Open and edit a buffer as both guests B and C.
1464 let buffer_b = project_b
1465 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1466 .await
1467 .unwrap();
1468 let buffer_c = project_c
1469 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1470 .await
1471 .unwrap();
1472 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1473 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1474
1475 // Open and edit that buffer as the host.
1476 let buffer_a = project_a
1477 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1478 .await
1479 .unwrap();
1480
1481 buffer_a
1482 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1483 .await;
1484 buffer_a.update(&mut cx_a, |buf, cx| {
1485 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1486 });
1487
1488 // Wait for edits to propagate
1489 buffer_a
1490 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1491 .await;
1492 buffer_b
1493 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1494 .await;
1495 buffer_c
1496 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1497 .await;
1498
1499 // Edit the buffer as the host and concurrently save as guest B.
1500 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1501 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1502 save_b.await.unwrap();
1503 assert_eq!(
1504 fs.load("/a/file1".as_ref()).await.unwrap(),
1505 "hi-a, i-am-c, i-am-b, i-am-a"
1506 );
1507 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1508 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1509 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1510
1511 // Make changes on host's file system, see those changes on guest worktrees.
1512 fs.rename(
1513 "/a/file1".as_ref(),
1514 "/a/file1-renamed".as_ref(),
1515 Default::default(),
1516 )
1517 .await
1518 .unwrap();
1519
1520 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1521 .await
1522 .unwrap();
1523 fs.insert_file(Path::new("/a/file4"), "4".into())
1524 .await
1525 .unwrap();
1526
1527 worktree_a
1528 .condition(&cx_a, |tree, _| {
1529 tree.paths()
1530 .map(|p| p.to_string_lossy())
1531 .collect::<Vec<_>>()
1532 == [".zed.toml", "file1-renamed", "file3", "file4"]
1533 })
1534 .await;
1535 worktree_b
1536 .condition(&cx_b, |tree, _| {
1537 tree.paths()
1538 .map(|p| p.to_string_lossy())
1539 .collect::<Vec<_>>()
1540 == [".zed.toml", "file1-renamed", "file3", "file4"]
1541 })
1542 .await;
1543 worktree_c
1544 .condition(&cx_c, |tree, _| {
1545 tree.paths()
1546 .map(|p| p.to_string_lossy())
1547 .collect::<Vec<_>>()
1548 == [".zed.toml", "file1-renamed", "file3", "file4"]
1549 })
1550 .await;
1551
1552 // Ensure buffer files are updated as well.
1553 buffer_a
1554 .condition(&cx_a, |buf, _| {
1555 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1556 })
1557 .await;
1558 buffer_b
1559 .condition(&cx_b, |buf, _| {
1560 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1561 })
1562 .await;
1563 buffer_c
1564 .condition(&cx_c, |buf, _| {
1565 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1566 })
1567 .await;
1568 }
1569
1570 #[gpui::test(iterations = 10)]
1571 async fn test_buffer_conflict_after_save(
1572 mut cx_a: TestAppContext,
1573 mut cx_b: TestAppContext,
1574 last_iteration: bool,
1575 ) {
1576 cx_a.foreground().forbid_parking();
1577 let lang_registry = Arc::new(LanguageRegistry::new());
1578 let fs = Arc::new(FakeFs::new(cx_a.background()));
1579
1580 // Connect to a server as 2 clients.
1581 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1582 let client_a = server.create_client(&mut cx_a, "user_a").await;
1583 let client_b = server.create_client(&mut cx_b, "user_b").await;
1584
1585 // Share a project as client A
1586 fs.insert_tree(
1587 "/dir",
1588 json!({
1589 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1590 "a.txt": "a-contents",
1591 }),
1592 )
1593 .await;
1594
1595 let project_a = cx_a.update(|cx| {
1596 Project::local(
1597 client_a.clone(),
1598 client_a.user_store.clone(),
1599 lang_registry.clone(),
1600 fs.clone(),
1601 cx,
1602 )
1603 });
1604 let (worktree_a, _) = project_a
1605 .update(&mut cx_a, |p, cx| {
1606 p.find_or_create_local_worktree("/dir", false, cx)
1607 })
1608 .await
1609 .unwrap();
1610 worktree_a
1611 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1612 .await;
1613 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1614 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1615 project_a
1616 .update(&mut cx_a, |p, cx| p.share(cx))
1617 .await
1618 .unwrap();
1619
1620 // Join that project as client B
1621 let project_b = Project::remote(
1622 project_id,
1623 client_b.clone(),
1624 client_b.user_store.clone(),
1625 lang_registry.clone(),
1626 fs.clone(),
1627 &mut cx_b.to_async(),
1628 )
1629 .await
1630 .unwrap();
1631
1632 // Open a buffer as client B
1633 let buffer_b = project_b
1634 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1635 .await
1636 .unwrap();
1637
1638 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1639 buffer_b.read_with(&cx_b, |buf, _| {
1640 assert!(buf.is_dirty());
1641 assert!(!buf.has_conflict());
1642 });
1643
1644 buffer_b
1645 .update(&mut cx_b, |buf, cx| buf.save(cx))
1646 .await
1647 .unwrap();
1648 buffer_b
1649 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1650 .await;
1651 buffer_b.read_with(&cx_b, |buf, _| {
1652 assert!(!buf.has_conflict());
1653 });
1654
1655 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1656 buffer_b.read_with(&cx_b, |buf, _| {
1657 assert!(buf.is_dirty());
1658 assert!(!buf.has_conflict());
1659 });
1660 }
1661
1662 #[gpui::test(iterations = 10)]
1663 async fn test_buffer_reloading(
1664 mut cx_a: TestAppContext,
1665 mut cx_b: TestAppContext,
1666 last_iteration: bool,
1667 ) {
1668 cx_a.foreground().forbid_parking();
1669 let lang_registry = Arc::new(LanguageRegistry::new());
1670 let fs = Arc::new(FakeFs::new(cx_a.background()));
1671
1672 // Connect to a server as 2 clients.
1673 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1674 let client_a = server.create_client(&mut cx_a, "user_a").await;
1675 let client_b = server.create_client(&mut cx_b, "user_b").await;
1676
1677 // Share a project as client A
1678 fs.insert_tree(
1679 "/dir",
1680 json!({
1681 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1682 "a.txt": "a-contents",
1683 }),
1684 )
1685 .await;
1686
1687 let project_a = cx_a.update(|cx| {
1688 Project::local(
1689 client_a.clone(),
1690 client_a.user_store.clone(),
1691 lang_registry.clone(),
1692 fs.clone(),
1693 cx,
1694 )
1695 });
1696 let (worktree_a, _) = project_a
1697 .update(&mut cx_a, |p, cx| {
1698 p.find_or_create_local_worktree("/dir", false, cx)
1699 })
1700 .await
1701 .unwrap();
1702 worktree_a
1703 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1704 .await;
1705 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1706 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1707 project_a
1708 .update(&mut cx_a, |p, cx| p.share(cx))
1709 .await
1710 .unwrap();
1711
1712 // Join that project as client B
1713 let project_b = Project::remote(
1714 project_id,
1715 client_b.clone(),
1716 client_b.user_store.clone(),
1717 lang_registry.clone(),
1718 fs.clone(),
1719 &mut cx_b.to_async(),
1720 )
1721 .await
1722 .unwrap();
1723 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1724
1725 // Open a buffer as client B
1726 let buffer_b = project_b
1727 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1728 .await
1729 .unwrap();
1730 buffer_b.read_with(&cx_b, |buf, _| {
1731 assert!(!buf.is_dirty());
1732 assert!(!buf.has_conflict());
1733 });
1734
1735 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1736 .await
1737 .unwrap();
1738 buffer_b
1739 .condition(&cx_b, |buf, _| {
1740 buf.text() == "new contents" && !buf.is_dirty()
1741 })
1742 .await;
1743 buffer_b.read_with(&cx_b, |buf, _| {
1744 assert!(!buf.has_conflict());
1745 });
1746 }
1747
1748 #[gpui::test(iterations = 10)]
1749 async fn test_editing_while_guest_opens_buffer(
1750 mut cx_a: TestAppContext,
1751 mut cx_b: TestAppContext,
1752 last_iteration: bool,
1753 ) {
1754 cx_a.foreground().forbid_parking();
1755 let lang_registry = Arc::new(LanguageRegistry::new());
1756 let fs = Arc::new(FakeFs::new(cx_a.background()));
1757
1758 // Connect to a server as 2 clients.
1759 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1760 let client_a = server.create_client(&mut cx_a, "user_a").await;
1761 let client_b = server.create_client(&mut cx_b, "user_b").await;
1762
1763 // Share a project as client A
1764 fs.insert_tree(
1765 "/dir",
1766 json!({
1767 ".zed.toml": r#"collaborators = ["user_b"]"#,
1768 "a.txt": "a-contents",
1769 }),
1770 )
1771 .await;
1772 let project_a = cx_a.update(|cx| {
1773 Project::local(
1774 client_a.clone(),
1775 client_a.user_store.clone(),
1776 lang_registry.clone(),
1777 fs.clone(),
1778 cx,
1779 )
1780 });
1781 let (worktree_a, _) = project_a
1782 .update(&mut cx_a, |p, cx| {
1783 p.find_or_create_local_worktree("/dir", false, cx)
1784 })
1785 .await
1786 .unwrap();
1787 worktree_a
1788 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1789 .await;
1790 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1791 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1792 project_a
1793 .update(&mut cx_a, |p, cx| p.share(cx))
1794 .await
1795 .unwrap();
1796
1797 // Join that project as client B
1798 let project_b = Project::remote(
1799 project_id,
1800 client_b.clone(),
1801 client_b.user_store.clone(),
1802 lang_registry.clone(),
1803 fs.clone(),
1804 &mut cx_b.to_async(),
1805 )
1806 .await
1807 .unwrap();
1808
1809 // Open a buffer as client A
1810 let buffer_a = project_a
1811 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1812 .await
1813 .unwrap();
1814
1815 // Start opening the same buffer as client B
1816 let buffer_b = cx_b
1817 .background()
1818 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1819
1820 // Edit the buffer as client A while client B is still opening it.
1821 cx_b.background().simulate_random_delay().await;
1822 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1823 cx_b.background().simulate_random_delay().await;
1824 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1825
1826 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1827 let buffer_b = buffer_b.await.unwrap();
1828 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1829 }
1830
1831 #[gpui::test(iterations = 10)]
1832 async fn test_leaving_worktree_while_opening_buffer(
1833 mut cx_a: TestAppContext,
1834 mut cx_b: TestAppContext,
1835 last_iteration: bool,
1836 ) {
1837 cx_a.foreground().forbid_parking();
1838 let lang_registry = Arc::new(LanguageRegistry::new());
1839 let fs = Arc::new(FakeFs::new(cx_a.background()));
1840
1841 // Connect to a server as 2 clients.
1842 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1843 let client_a = server.create_client(&mut cx_a, "user_a").await;
1844 let client_b = server.create_client(&mut cx_b, "user_b").await;
1845
1846 // Share a project as client A
1847 fs.insert_tree(
1848 "/dir",
1849 json!({
1850 ".zed.toml": r#"collaborators = ["user_b"]"#,
1851 "a.txt": "a-contents",
1852 }),
1853 )
1854 .await;
1855 let project_a = cx_a.update(|cx| {
1856 Project::local(
1857 client_a.clone(),
1858 client_a.user_store.clone(),
1859 lang_registry.clone(),
1860 fs.clone(),
1861 cx,
1862 )
1863 });
1864 let (worktree_a, _) = project_a
1865 .update(&mut cx_a, |p, cx| {
1866 p.find_or_create_local_worktree("/dir", false, cx)
1867 })
1868 .await
1869 .unwrap();
1870 worktree_a
1871 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1872 .await;
1873 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1874 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1875 project_a
1876 .update(&mut cx_a, |p, cx| p.share(cx))
1877 .await
1878 .unwrap();
1879
1880 // Join that project as client B
1881 let project_b = Project::remote(
1882 project_id,
1883 client_b.clone(),
1884 client_b.user_store.clone(),
1885 lang_registry.clone(),
1886 fs.clone(),
1887 &mut cx_b.to_async(),
1888 )
1889 .await
1890 .unwrap();
1891
1892 // See that a guest has joined as client A.
1893 project_a
1894 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1895 .await;
1896
1897 // Begin opening a buffer as client B, but leave the project before the open completes.
1898 let buffer_b = cx_b
1899 .background()
1900 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1901 cx_b.update(|_| drop(project_b));
1902 drop(buffer_b);
1903
1904 // See that the guest has left.
1905 project_a
1906 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1907 .await;
1908 }
1909
1910 #[gpui::test(iterations = 10)]
1911 async fn test_peer_disconnection(
1912 mut cx_a: TestAppContext,
1913 mut cx_b: TestAppContext,
1914 last_iteration: bool,
1915 ) {
1916 cx_a.foreground().forbid_parking();
1917 let lang_registry = Arc::new(LanguageRegistry::new());
1918 let fs = Arc::new(FakeFs::new(cx_a.background()));
1919
1920 // Connect to a server as 2 clients.
1921 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1922 let client_a = server.create_client(&mut cx_a, "user_a").await;
1923 let client_b = server.create_client(&mut cx_b, "user_b").await;
1924
1925 // Share a project as client A
1926 fs.insert_tree(
1927 "/a",
1928 json!({
1929 ".zed.toml": r#"collaborators = ["user_b"]"#,
1930 "a.txt": "a-contents",
1931 "b.txt": "b-contents",
1932 }),
1933 )
1934 .await;
1935 let project_a = cx_a.update(|cx| {
1936 Project::local(
1937 client_a.clone(),
1938 client_a.user_store.clone(),
1939 lang_registry.clone(),
1940 fs.clone(),
1941 cx,
1942 )
1943 });
1944 let (worktree_a, _) = project_a
1945 .update(&mut cx_a, |p, cx| {
1946 p.find_or_create_local_worktree("/a", false, cx)
1947 })
1948 .await
1949 .unwrap();
1950 worktree_a
1951 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1952 .await;
1953 let project_id = project_a
1954 .update(&mut cx_a, |project, _| project.next_remote_id())
1955 .await;
1956 project_a
1957 .update(&mut cx_a, |project, cx| project.share(cx))
1958 .await
1959 .unwrap();
1960
1961 // Join that project as client B
1962 let _project_b = Project::remote(
1963 project_id,
1964 client_b.clone(),
1965 client_b.user_store.clone(),
1966 lang_registry.clone(),
1967 fs.clone(),
1968 &mut cx_b.to_async(),
1969 )
1970 .await
1971 .unwrap();
1972
1973 // See that a guest has joined as client A.
1974 project_a
1975 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1976 .await;
1977
1978 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1979 client_b.disconnect(&cx_b.to_async()).unwrap();
1980 project_a
1981 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1982 .await;
1983 }
1984
1985 #[gpui::test(iterations = 10)]
1986 async fn test_collaborating_with_diagnostics(
1987 mut cx_a: TestAppContext,
1988 mut cx_b: TestAppContext,
1989 last_iteration: bool,
1990 ) {
1991 cx_a.foreground().forbid_parking();
1992 let mut lang_registry = Arc::new(LanguageRegistry::new());
1993 let fs = Arc::new(FakeFs::new(cx_a.background()));
1994
1995 // Set up a fake language server.
1996 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1997 Arc::get_mut(&mut lang_registry)
1998 .unwrap()
1999 .add(Arc::new(Language::new(
2000 LanguageConfig {
2001 name: "Rust".to_string(),
2002 path_suffixes: vec!["rs".to_string()],
2003 language_server: Some(language_server_config),
2004 ..Default::default()
2005 },
2006 Some(tree_sitter_rust::language()),
2007 )));
2008
2009 // Connect to a server as 2 clients.
2010 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2011 let client_a = server.create_client(&mut cx_a, "user_a").await;
2012 let client_b = server.create_client(&mut cx_b, "user_b").await;
2013
2014 // Share a project as client A
2015 fs.insert_tree(
2016 "/a",
2017 json!({
2018 ".zed.toml": r#"collaborators = ["user_b"]"#,
2019 "a.rs": "let one = two",
2020 "other.rs": "",
2021 }),
2022 )
2023 .await;
2024 let project_a = cx_a.update(|cx| {
2025 Project::local(
2026 client_a.clone(),
2027 client_a.user_store.clone(),
2028 lang_registry.clone(),
2029 fs.clone(),
2030 cx,
2031 )
2032 });
2033 let (worktree_a, _) = project_a
2034 .update(&mut cx_a, |p, cx| {
2035 p.find_or_create_local_worktree("/a", false, cx)
2036 })
2037 .await
2038 .unwrap();
2039 worktree_a
2040 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2041 .await;
2042 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2043 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2044 project_a
2045 .update(&mut cx_a, |p, cx| p.share(cx))
2046 .await
2047 .unwrap();
2048
2049 // Cause the language server to start.
2050 let _ = cx_a
2051 .background()
2052 .spawn(project_a.update(&mut cx_a, |project, cx| {
2053 project.open_buffer(
2054 ProjectPath {
2055 worktree_id,
2056 path: Path::new("other.rs").into(),
2057 },
2058 cx,
2059 )
2060 }))
2061 .await
2062 .unwrap();
2063
2064 // Simulate a language server reporting errors for a file.
2065 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2066 fake_language_server
2067 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2068 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2069 version: None,
2070 diagnostics: vec![lsp::Diagnostic {
2071 severity: Some(lsp::DiagnosticSeverity::ERROR),
2072 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2073 message: "message 1".to_string(),
2074 ..Default::default()
2075 }],
2076 })
2077 .await;
2078
2079 // Wait for server to see the diagnostics update.
2080 server
2081 .condition(|store| {
2082 let worktree = store
2083 .project(project_id)
2084 .unwrap()
2085 .worktrees
2086 .get(&worktree_id.to_proto())
2087 .unwrap();
2088
2089 !worktree
2090 .share
2091 .as_ref()
2092 .unwrap()
2093 .diagnostic_summaries
2094 .is_empty()
2095 })
2096 .await;
2097
2098 // Join the worktree as client B.
2099 let project_b = Project::remote(
2100 project_id,
2101 client_b.clone(),
2102 client_b.user_store.clone(),
2103 lang_registry.clone(),
2104 fs.clone(),
2105 &mut cx_b.to_async(),
2106 )
2107 .await
2108 .unwrap();
2109
2110 project_b.read_with(&cx_b, |project, cx| {
2111 assert_eq!(
2112 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2113 &[(
2114 ProjectPath {
2115 worktree_id,
2116 path: Arc::from(Path::new("a.rs")),
2117 },
2118 DiagnosticSummary {
2119 error_count: 1,
2120 warning_count: 0,
2121 ..Default::default()
2122 },
2123 )]
2124 )
2125 });
2126
2127 // Simulate a language server reporting more errors for a file.
2128 fake_language_server
2129 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2130 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2131 version: None,
2132 diagnostics: vec![
2133 lsp::Diagnostic {
2134 severity: Some(lsp::DiagnosticSeverity::ERROR),
2135 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2136 message: "message 1".to_string(),
2137 ..Default::default()
2138 },
2139 lsp::Diagnostic {
2140 severity: Some(lsp::DiagnosticSeverity::WARNING),
2141 range: lsp::Range::new(
2142 lsp::Position::new(0, 10),
2143 lsp::Position::new(0, 13),
2144 ),
2145 message: "message 2".to_string(),
2146 ..Default::default()
2147 },
2148 ],
2149 })
2150 .await;
2151
2152 // Client b gets the updated summaries
2153 project_b
2154 .condition(&cx_b, |project, cx| {
2155 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2156 == &[(
2157 ProjectPath {
2158 worktree_id,
2159 path: Arc::from(Path::new("a.rs")),
2160 },
2161 DiagnosticSummary {
2162 error_count: 1,
2163 warning_count: 1,
2164 ..Default::default()
2165 },
2166 )]
2167 })
2168 .await;
2169
2170 // Open the file with the errors on client B. They should be present.
2171 let buffer_b = cx_b
2172 .background()
2173 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2174 .await
2175 .unwrap();
2176
2177 buffer_b.read_with(&cx_b, |buffer, _| {
2178 assert_eq!(
2179 buffer
2180 .snapshot()
2181 .diagnostics_in_range::<_, Point>(0..buffer.len())
2182 .map(|entry| entry)
2183 .collect::<Vec<_>>(),
2184 &[
2185 DiagnosticEntry {
2186 range: Point::new(0, 4)..Point::new(0, 7),
2187 diagnostic: Diagnostic {
2188 group_id: 0,
2189 message: "message 1".to_string(),
2190 severity: lsp::DiagnosticSeverity::ERROR,
2191 is_primary: true,
2192 ..Default::default()
2193 }
2194 },
2195 DiagnosticEntry {
2196 range: Point::new(0, 10)..Point::new(0, 13),
2197 diagnostic: Diagnostic {
2198 group_id: 1,
2199 severity: lsp::DiagnosticSeverity::WARNING,
2200 message: "message 2".to_string(),
2201 is_primary: true,
2202 ..Default::default()
2203 }
2204 }
2205 ]
2206 );
2207 });
2208 }
2209
2210 #[gpui::test(iterations = 10)]
2211 async fn test_collaborating_with_completion(
2212 mut cx_a: TestAppContext,
2213 mut cx_b: TestAppContext,
2214 last_iteration: bool,
2215 ) {
2216 cx_a.foreground().forbid_parking();
2217 let mut lang_registry = Arc::new(LanguageRegistry::new());
2218 let fs = Arc::new(FakeFs::new(cx_a.background()));
2219
2220 // Set up a fake language server.
2221 let (language_server_config, mut fake_language_servers) =
2222 LanguageServerConfig::fake_with_capabilities(lsp::ServerCapabilities {
2223 completion_provider: Some(lsp::CompletionOptions {
2224 trigger_characters: Some(vec![".".to_string()]),
2225 ..Default::default()
2226 }),
2227 ..Default::default()
2228 });
2229 Arc::get_mut(&mut lang_registry)
2230 .unwrap()
2231 .add(Arc::new(Language::new(
2232 LanguageConfig {
2233 name: "Rust".to_string(),
2234 path_suffixes: vec!["rs".to_string()],
2235 language_server: Some(language_server_config),
2236 ..Default::default()
2237 },
2238 Some(tree_sitter_rust::language()),
2239 )));
2240
2241 // Connect to a server as 2 clients.
2242 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2243 let client_a = server.create_client(&mut cx_a, "user_a").await;
2244 let client_b = server.create_client(&mut cx_b, "user_b").await;
2245
2246 // Share a project as client A
2247 fs.insert_tree(
2248 "/a",
2249 json!({
2250 ".zed.toml": r#"collaborators = ["user_b"]"#,
2251 "main.rs": "fn main() { a }",
2252 "other.rs": "",
2253 }),
2254 )
2255 .await;
2256 let project_a = cx_a.update(|cx| {
2257 Project::local(
2258 client_a.clone(),
2259 client_a.user_store.clone(),
2260 lang_registry.clone(),
2261 fs.clone(),
2262 cx,
2263 )
2264 });
2265 let (worktree_a, _) = project_a
2266 .update(&mut cx_a, |p, cx| {
2267 p.find_or_create_local_worktree("/a", false, cx)
2268 })
2269 .await
2270 .unwrap();
2271 worktree_a
2272 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2273 .await;
2274 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2275 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2276 project_a
2277 .update(&mut cx_a, |p, cx| p.share(cx))
2278 .await
2279 .unwrap();
2280
2281 // Join the worktree as client B.
2282 let project_b = Project::remote(
2283 project_id,
2284 client_b.clone(),
2285 client_b.user_store.clone(),
2286 lang_registry.clone(),
2287 fs.clone(),
2288 &mut cx_b.to_async(),
2289 )
2290 .await
2291 .unwrap();
2292
2293 // Open a file in an editor as the guest.
2294 let buffer_b = project_b
2295 .update(&mut cx_b, |p, cx| {
2296 p.open_buffer((worktree_id, "main.rs"), cx)
2297 })
2298 .await
2299 .unwrap();
2300 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2301 let editor_b = cx_b.add_view(window_b, |cx| {
2302 Editor::for_buffer(
2303 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2304 Arc::new(|cx| EditorSettings::test(cx)),
2305 Some(project_b.clone()),
2306 cx,
2307 )
2308 });
2309
2310 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2311 buffer_b
2312 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2313 .await;
2314
2315 // Type a completion trigger character as the guest.
2316 editor_b.update(&mut cx_b, |editor, cx| {
2317 editor.select_ranges([13..13], None, cx);
2318 editor.handle_input(&Input(".".into()), cx);
2319 cx.focus(&editor_b);
2320 });
2321
2322 // Receive a completion request as the host's language server.
2323 // Return some completions from the host's language server.
2324 cx_a.foreground().start_waiting();
2325 fake_language_server
2326 .handle_request::<lsp::request::Completion, _>(|params| {
2327 assert_eq!(
2328 params.text_document_position.text_document.uri,
2329 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2330 );
2331 assert_eq!(
2332 params.text_document_position.position,
2333 lsp::Position::new(0, 14),
2334 );
2335
2336 Some(lsp::CompletionResponse::Array(vec![
2337 lsp::CompletionItem {
2338 label: "first_method(…)".into(),
2339 detail: Some("fn(&mut self, B) -> C".into()),
2340 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2341 new_text: "first_method($1)".to_string(),
2342 range: lsp::Range::new(
2343 lsp::Position::new(0, 14),
2344 lsp::Position::new(0, 14),
2345 ),
2346 })),
2347 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2348 ..Default::default()
2349 },
2350 lsp::CompletionItem {
2351 label: "second_method(…)".into(),
2352 detail: Some("fn(&mut self, C) -> D<E>".into()),
2353 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2354 new_text: "second_method()".to_string(),
2355 range: lsp::Range::new(
2356 lsp::Position::new(0, 14),
2357 lsp::Position::new(0, 14),
2358 ),
2359 })),
2360 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2361 ..Default::default()
2362 },
2363 ]))
2364 })
2365 .next()
2366 .await
2367 .unwrap();
2368 cx_a.foreground().finish_waiting();
2369
2370 // Open the buffer on the host.
2371 let buffer_a = project_a
2372 .update(&mut cx_a, |p, cx| {
2373 p.open_buffer((worktree_id, "main.rs"), cx)
2374 })
2375 .await
2376 .unwrap();
2377 buffer_a
2378 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2379 .await;
2380
2381 // Confirm a completion on the guest.
2382 editor_b
2383 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2384 .await;
2385 editor_b.update(&mut cx_b, |editor, cx| {
2386 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2387 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2388 });
2389
2390 // Return a resolved completion from the host's language server.
2391 // The resolved completion has an additional text edit.
2392 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2393 assert_eq!(params.label, "first_method(…)");
2394 lsp::CompletionItem {
2395 label: "first_method(…)".into(),
2396 detail: Some("fn(&mut self, B) -> C".into()),
2397 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2398 new_text: "first_method($1)".to_string(),
2399 range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2400 })),
2401 additional_text_edits: Some(vec![lsp::TextEdit {
2402 new_text: "use d::SomeTrait;\n".to_string(),
2403 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2404 }]),
2405 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2406 ..Default::default()
2407 }
2408 });
2409
2410 // The additional edit is applied.
2411 buffer_a
2412 .condition(&cx_a, |buffer, _| {
2413 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2414 })
2415 .await;
2416 buffer_b
2417 .condition(&cx_b, |buffer, _| {
2418 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2419 })
2420 .await;
2421 }
2422
2423 #[gpui::test(iterations = 10)]
2424 async fn test_formatting_buffer(
2425 mut cx_a: TestAppContext,
2426 mut cx_b: TestAppContext,
2427 last_iteration: bool,
2428 ) {
2429 cx_a.foreground().forbid_parking();
2430 let mut lang_registry = Arc::new(LanguageRegistry::new());
2431 let fs = Arc::new(FakeFs::new(cx_a.background()));
2432
2433 // Set up a fake language server.
2434 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2435 Arc::get_mut(&mut lang_registry)
2436 .unwrap()
2437 .add(Arc::new(Language::new(
2438 LanguageConfig {
2439 name: "Rust".to_string(),
2440 path_suffixes: vec!["rs".to_string()],
2441 language_server: Some(language_server_config),
2442 ..Default::default()
2443 },
2444 Some(tree_sitter_rust::language()),
2445 )));
2446
2447 // Connect to a server as 2 clients.
2448 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2449 let client_a = server.create_client(&mut cx_a, "user_a").await;
2450 let client_b = server.create_client(&mut cx_b, "user_b").await;
2451
2452 // Share a project as client A
2453 fs.insert_tree(
2454 "/a",
2455 json!({
2456 ".zed.toml": r#"collaborators = ["user_b"]"#,
2457 "a.rs": "let one = two",
2458 }),
2459 )
2460 .await;
2461 let project_a = cx_a.update(|cx| {
2462 Project::local(
2463 client_a.clone(),
2464 client_a.user_store.clone(),
2465 lang_registry.clone(),
2466 fs.clone(),
2467 cx,
2468 )
2469 });
2470 let (worktree_a, _) = project_a
2471 .update(&mut cx_a, |p, cx| {
2472 p.find_or_create_local_worktree("/a", false, cx)
2473 })
2474 .await
2475 .unwrap();
2476 worktree_a
2477 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2478 .await;
2479 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2480 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2481 project_a
2482 .update(&mut cx_a, |p, cx| p.share(cx))
2483 .await
2484 .unwrap();
2485
2486 // Join the worktree as client B.
2487 let project_b = Project::remote(
2488 project_id,
2489 client_b.clone(),
2490 client_b.user_store.clone(),
2491 lang_registry.clone(),
2492 fs.clone(),
2493 &mut cx_b.to_async(),
2494 )
2495 .await
2496 .unwrap();
2497
2498 let buffer_b = cx_b
2499 .background()
2500 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2501 .await
2502 .unwrap();
2503
2504 let format = project_b.update(&mut cx_b, |project, cx| {
2505 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2506 });
2507
2508 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2509 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2510 Some(vec![
2511 lsp::TextEdit {
2512 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2513 new_text: "h".to_string(),
2514 },
2515 lsp::TextEdit {
2516 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2517 new_text: "y".to_string(),
2518 },
2519 ])
2520 });
2521
2522 format.await.unwrap();
2523 assert_eq!(
2524 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2525 "let honey = two"
2526 );
2527 }
2528
2529 #[gpui::test(iterations = 10)]
2530 async fn test_definition(
2531 mut cx_a: TestAppContext,
2532 mut cx_b: TestAppContext,
2533 last_iteration: bool,
2534 ) {
2535 cx_a.foreground().forbid_parking();
2536 let mut lang_registry = Arc::new(LanguageRegistry::new());
2537 let fs = Arc::new(FakeFs::new(cx_a.background()));
2538 fs.insert_tree(
2539 "/root-1",
2540 json!({
2541 ".zed.toml": r#"collaborators = ["user_b"]"#,
2542 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2543 }),
2544 )
2545 .await;
2546 fs.insert_tree(
2547 "/root-2",
2548 json!({
2549 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2550 }),
2551 )
2552 .await;
2553
2554 // Set up a fake language server.
2555 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2556 Arc::get_mut(&mut lang_registry)
2557 .unwrap()
2558 .add(Arc::new(Language::new(
2559 LanguageConfig {
2560 name: "Rust".to_string(),
2561 path_suffixes: vec!["rs".to_string()],
2562 language_server: Some(language_server_config),
2563 ..Default::default()
2564 },
2565 Some(tree_sitter_rust::language()),
2566 )));
2567
2568 // Connect to a server as 2 clients.
2569 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2570 let client_a = server.create_client(&mut cx_a, "user_a").await;
2571 let client_b = server.create_client(&mut cx_b, "user_b").await;
2572
2573 // Share a project as client A
2574 let project_a = cx_a.update(|cx| {
2575 Project::local(
2576 client_a.clone(),
2577 client_a.user_store.clone(),
2578 lang_registry.clone(),
2579 fs.clone(),
2580 cx,
2581 )
2582 });
2583 let (worktree_a, _) = project_a
2584 .update(&mut cx_a, |p, cx| {
2585 p.find_or_create_local_worktree("/root-1", false, cx)
2586 })
2587 .await
2588 .unwrap();
2589 worktree_a
2590 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2591 .await;
2592 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2593 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2594 project_a
2595 .update(&mut cx_a, |p, cx| p.share(cx))
2596 .await
2597 .unwrap();
2598
2599 // Join the worktree as client B.
2600 let project_b = Project::remote(
2601 project_id,
2602 client_b.clone(),
2603 client_b.user_store.clone(),
2604 lang_registry.clone(),
2605 fs.clone(),
2606 &mut cx_b.to_async(),
2607 )
2608 .await
2609 .unwrap();
2610
2611 // Open the file on client B.
2612 let buffer_b = cx_b
2613 .background()
2614 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2615 .await
2616 .unwrap();
2617
2618 // Request the definition of a symbol as the guest.
2619 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2620
2621 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2622 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2623 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2624 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2625 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2626 )))
2627 });
2628
2629 let definitions_1 = definitions_1.await.unwrap();
2630 cx_b.read(|cx| {
2631 assert_eq!(definitions_1.len(), 1);
2632 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2633 let target_buffer = definitions_1[0].target_buffer.read(cx);
2634 assert_eq!(
2635 target_buffer.text(),
2636 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2637 );
2638 assert_eq!(
2639 definitions_1[0].target_range.to_point(target_buffer),
2640 Point::new(0, 6)..Point::new(0, 9)
2641 );
2642 });
2643
2644 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2645 // the previous call to `definition`.
2646 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2647 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2648 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2649 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2650 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2651 )))
2652 });
2653
2654 let definitions_2 = definitions_2.await.unwrap();
2655 cx_b.read(|cx| {
2656 assert_eq!(definitions_2.len(), 1);
2657 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2658 let target_buffer = definitions_2[0].target_buffer.read(cx);
2659 assert_eq!(
2660 target_buffer.text(),
2661 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2662 );
2663 assert_eq!(
2664 definitions_2[0].target_range.to_point(target_buffer),
2665 Point::new(1, 6)..Point::new(1, 11)
2666 );
2667 });
2668 assert_eq!(
2669 definitions_1[0].target_buffer,
2670 definitions_2[0].target_buffer
2671 );
2672
2673 cx_b.update(|_| {
2674 drop(definitions_1);
2675 drop(definitions_2);
2676 });
2677 project_b
2678 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2679 .await;
2680 }
2681
2682 #[gpui::test(iterations = 10)]
2683 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2684 mut cx_a: TestAppContext,
2685 mut cx_b: TestAppContext,
2686 mut rng: StdRng,
2687 last_iteration: bool,
2688 ) {
2689 cx_a.foreground().forbid_parking();
2690 let mut lang_registry = Arc::new(LanguageRegistry::new());
2691 let fs = Arc::new(FakeFs::new(cx_a.background()));
2692 fs.insert_tree(
2693 "/root",
2694 json!({
2695 ".zed.toml": r#"collaborators = ["user_b"]"#,
2696 "a.rs": "const ONE: usize = b::TWO;",
2697 "b.rs": "const TWO: usize = 2",
2698 }),
2699 )
2700 .await;
2701
2702 // Set up a fake language server.
2703 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2704
2705 Arc::get_mut(&mut lang_registry)
2706 .unwrap()
2707 .add(Arc::new(Language::new(
2708 LanguageConfig {
2709 name: "Rust".to_string(),
2710 path_suffixes: vec!["rs".to_string()],
2711 language_server: Some(language_server_config),
2712 ..Default::default()
2713 },
2714 Some(tree_sitter_rust::language()),
2715 )));
2716
2717 // Connect to a server as 2 clients.
2718 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2719 let client_a = server.create_client(&mut cx_a, "user_a").await;
2720 let client_b = server.create_client(&mut cx_b, "user_b").await;
2721
2722 // Share a project as client A
2723 let project_a = cx_a.update(|cx| {
2724 Project::local(
2725 client_a.clone(),
2726 client_a.user_store.clone(),
2727 lang_registry.clone(),
2728 fs.clone(),
2729 cx,
2730 )
2731 });
2732
2733 let (worktree_a, _) = project_a
2734 .update(&mut cx_a, |p, cx| {
2735 p.find_or_create_local_worktree("/root", false, cx)
2736 })
2737 .await
2738 .unwrap();
2739 worktree_a
2740 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2741 .await;
2742 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2743 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2744 project_a
2745 .update(&mut cx_a, |p, cx| p.share(cx))
2746 .await
2747 .unwrap();
2748
2749 // Join the worktree as client B.
2750 let project_b = Project::remote(
2751 project_id,
2752 client_b.clone(),
2753 client_b.user_store.clone(),
2754 lang_registry.clone(),
2755 fs.clone(),
2756 &mut cx_b.to_async(),
2757 )
2758 .await
2759 .unwrap();
2760
2761 let buffer_b1 = cx_b
2762 .background()
2763 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2764 .await
2765 .unwrap();
2766
2767 let definitions;
2768 let buffer_b2;
2769 if rng.gen() {
2770 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2771 buffer_b2 =
2772 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2773 } else {
2774 buffer_b2 =
2775 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2776 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2777 }
2778
2779 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2780 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2781 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2782 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2783 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2784 )))
2785 });
2786
2787 let buffer_b2 = buffer_b2.await.unwrap();
2788 let definitions = definitions.await.unwrap();
2789 assert_eq!(definitions.len(), 1);
2790 assert_eq!(definitions[0].target_buffer, buffer_b2);
2791 }
2792
2793 #[gpui::test(iterations = 10)]
2794 async fn test_collaborating_with_code_actions(
2795 mut cx_a: TestAppContext,
2796 mut cx_b: TestAppContext,
2797 last_iteration: bool,
2798 ) {
2799 cx_a.foreground().forbid_parking();
2800 let mut lang_registry = Arc::new(LanguageRegistry::new());
2801 let fs = Arc::new(FakeFs::new(cx_a.background()));
2802 let mut path_openers_b = Vec::new();
2803 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2804
2805 // Set up a fake language server.
2806 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2807 Arc::get_mut(&mut lang_registry)
2808 .unwrap()
2809 .add(Arc::new(Language::new(
2810 LanguageConfig {
2811 name: "Rust".to_string(),
2812 path_suffixes: vec!["rs".to_string()],
2813 language_server: Some(language_server_config),
2814 ..Default::default()
2815 },
2816 Some(tree_sitter_rust::language()),
2817 )));
2818
2819 // Connect to a server as 2 clients.
2820 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2821 let client_a = server.create_client(&mut cx_a, "user_a").await;
2822 let client_b = server.create_client(&mut cx_b, "user_b").await;
2823
2824 // Share a project as client A
2825 fs.insert_tree(
2826 "/a",
2827 json!({
2828 ".zed.toml": r#"collaborators = ["user_b"]"#,
2829 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2830 "other.rs": "pub fn foo() -> usize { 4 }",
2831 }),
2832 )
2833 .await;
2834 let project_a = cx_a.update(|cx| {
2835 Project::local(
2836 client_a.clone(),
2837 client_a.user_store.clone(),
2838 lang_registry.clone(),
2839 fs.clone(),
2840 cx,
2841 )
2842 });
2843 let (worktree_a, _) = project_a
2844 .update(&mut cx_a, |p, cx| {
2845 p.find_or_create_local_worktree("/a", false, cx)
2846 })
2847 .await
2848 .unwrap();
2849 worktree_a
2850 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2851 .await;
2852 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2853 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2854 project_a
2855 .update(&mut cx_a, |p, cx| p.share(cx))
2856 .await
2857 .unwrap();
2858
2859 // Join the worktree as client B.
2860 let project_b = Project::remote(
2861 project_id,
2862 client_b.clone(),
2863 client_b.user_store.clone(),
2864 lang_registry.clone(),
2865 fs.clone(),
2866 &mut cx_b.to_async(),
2867 )
2868 .await
2869 .unwrap();
2870 let mut params = cx_b.update(WorkspaceParams::test);
2871 params.languages = lang_registry.clone();
2872 params.client = client_b.client.clone();
2873 params.user_store = client_b.user_store.clone();
2874 params.project = project_b;
2875 params.path_openers = path_openers_b.into();
2876
2877 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
2878 let editor_b = workspace_b
2879 .update(&mut cx_b, |workspace, cx| {
2880 workspace.open_path((worktree_id, "main.rs").into(), cx)
2881 })
2882 .await
2883 .unwrap()
2884 .downcast::<Editor>()
2885 .unwrap();
2886
2887 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2888 fake_language_server
2889 .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2890 assert_eq!(
2891 params.text_document.uri,
2892 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2893 );
2894 assert_eq!(params.range.start, lsp::Position::new(0, 0));
2895 assert_eq!(params.range.end, lsp::Position::new(0, 0));
2896 None
2897 })
2898 .next()
2899 .await;
2900
2901 // Move cursor to a location that contains code actions.
2902 editor_b.update(&mut cx_b, |editor, cx| {
2903 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2904 cx.focus(&editor_b);
2905 });
2906
2907 fake_language_server
2908 .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2909 assert_eq!(
2910 params.text_document.uri,
2911 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2912 );
2913 assert_eq!(params.range.start, lsp::Position::new(1, 31));
2914 assert_eq!(params.range.end, lsp::Position::new(1, 31));
2915
2916 Some(vec![lsp::CodeActionOrCommand::CodeAction(
2917 lsp::CodeAction {
2918 title: "Inline into all callers".to_string(),
2919 edit: Some(lsp::WorkspaceEdit {
2920 changes: Some(
2921 [
2922 (
2923 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2924 vec![lsp::TextEdit::new(
2925 lsp::Range::new(
2926 lsp::Position::new(1, 22),
2927 lsp::Position::new(1, 34),
2928 ),
2929 "4".to_string(),
2930 )],
2931 ),
2932 (
2933 lsp::Url::from_file_path("/a/other.rs").unwrap(),
2934 vec![lsp::TextEdit::new(
2935 lsp::Range::new(
2936 lsp::Position::new(0, 0),
2937 lsp::Position::new(0, 27),
2938 ),
2939 "".to_string(),
2940 )],
2941 ),
2942 ]
2943 .into_iter()
2944 .collect(),
2945 ),
2946 ..Default::default()
2947 }),
2948 data: Some(json!({
2949 "codeActionParams": {
2950 "range": {
2951 "start": {"line": 1, "column": 31},
2952 "end": {"line": 1, "column": 31},
2953 }
2954 }
2955 })),
2956 ..Default::default()
2957 },
2958 )])
2959 })
2960 .next()
2961 .await;
2962
2963 // Toggle code actions and wait for them to display.
2964 editor_b.update(&mut cx_b, |editor, cx| {
2965 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2966 });
2967 editor_b
2968 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2969 .await;
2970
2971 fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
2972
2973 // Confirming the code action will trigger a resolve request.
2974 let confirm_action = workspace_b
2975 .update(&mut cx_b, |workspace, cx| {
2976 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2977 })
2978 .unwrap();
2979 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2980 lsp::CodeAction {
2981 title: "Inline into all callers".to_string(),
2982 edit: Some(lsp::WorkspaceEdit {
2983 changes: Some(
2984 [
2985 (
2986 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2987 vec![lsp::TextEdit::new(
2988 lsp::Range::new(
2989 lsp::Position::new(1, 22),
2990 lsp::Position::new(1, 34),
2991 ),
2992 "4".to_string(),
2993 )],
2994 ),
2995 (
2996 lsp::Url::from_file_path("/a/other.rs").unwrap(),
2997 vec![lsp::TextEdit::new(
2998 lsp::Range::new(
2999 lsp::Position::new(0, 0),
3000 lsp::Position::new(0, 27),
3001 ),
3002 "".to_string(),
3003 )],
3004 ),
3005 ]
3006 .into_iter()
3007 .collect(),
3008 ),
3009 ..Default::default()
3010 }),
3011 ..Default::default()
3012 }
3013 });
3014
3015 // After the action is confirmed, an editor containing both modified files is opened.
3016 confirm_action.await.unwrap();
3017 let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3018 workspace
3019 .active_item(cx)
3020 .unwrap()
3021 .downcast::<Editor>()
3022 .unwrap()
3023 });
3024 code_action_editor.update(&mut cx_b, |editor, cx| {
3025 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3026 editor.undo(&Undo, cx);
3027 assert_eq!(
3028 editor.text(cx),
3029 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3030 );
3031 editor.redo(&Redo, cx);
3032 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3033 });
3034 }
3035
3036 #[gpui::test(iterations = 10)]
3037 async fn test_basic_chat(
3038 mut cx_a: TestAppContext,
3039 mut cx_b: TestAppContext,
3040 last_iteration: bool,
3041 ) {
3042 cx_a.foreground().forbid_parking();
3043
3044 // Connect to a server as 2 clients.
3045 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3046 let client_a = server.create_client(&mut cx_a, "user_a").await;
3047 let client_b = server.create_client(&mut cx_b, "user_b").await;
3048
3049 // Create an org that includes these 2 users.
3050 let db = &server.app_state.db;
3051 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3052 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3053 .await
3054 .unwrap();
3055 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3056 .await
3057 .unwrap();
3058
3059 // Create a channel that includes all the users.
3060 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3061 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3062 .await
3063 .unwrap();
3064 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3065 .await
3066 .unwrap();
3067 db.create_channel_message(
3068 channel_id,
3069 client_b.current_user_id(&cx_b),
3070 "hello A, it's B.",
3071 OffsetDateTime::now_utc(),
3072 1,
3073 )
3074 .await
3075 .unwrap();
3076
3077 let channels_a = cx_a
3078 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3079 channels_a
3080 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3081 .await;
3082 channels_a.read_with(&cx_a, |list, _| {
3083 assert_eq!(
3084 list.available_channels().unwrap(),
3085 &[ChannelDetails {
3086 id: channel_id.to_proto(),
3087 name: "test-channel".to_string()
3088 }]
3089 )
3090 });
3091 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3092 this.get_channel(channel_id.to_proto(), cx).unwrap()
3093 });
3094 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3095 channel_a
3096 .condition(&cx_a, |channel, _| {
3097 channel_messages(channel)
3098 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3099 })
3100 .await;
3101
3102 let channels_b = cx_b
3103 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3104 channels_b
3105 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3106 .await;
3107 channels_b.read_with(&cx_b, |list, _| {
3108 assert_eq!(
3109 list.available_channels().unwrap(),
3110 &[ChannelDetails {
3111 id: channel_id.to_proto(),
3112 name: "test-channel".to_string()
3113 }]
3114 )
3115 });
3116
3117 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3118 this.get_channel(channel_id.to_proto(), cx).unwrap()
3119 });
3120 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3121 channel_b
3122 .condition(&cx_b, |channel, _| {
3123 channel_messages(channel)
3124 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3125 })
3126 .await;
3127
3128 channel_a
3129 .update(&mut cx_a, |channel, cx| {
3130 channel
3131 .send_message("oh, hi B.".to_string(), cx)
3132 .unwrap()
3133 .detach();
3134 let task = channel.send_message("sup".to_string(), cx).unwrap();
3135 assert_eq!(
3136 channel_messages(channel),
3137 &[
3138 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3139 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3140 ("user_a".to_string(), "sup".to_string(), true)
3141 ]
3142 );
3143 task
3144 })
3145 .await
3146 .unwrap();
3147
3148 channel_b
3149 .condition(&cx_b, |channel, _| {
3150 channel_messages(channel)
3151 == [
3152 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3153 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3154 ("user_a".to_string(), "sup".to_string(), false),
3155 ]
3156 })
3157 .await;
3158
3159 assert_eq!(
3160 server
3161 .state()
3162 .await
3163 .channel(channel_id)
3164 .unwrap()
3165 .connection_ids
3166 .len(),
3167 2
3168 );
3169 cx_b.update(|_| drop(channel_b));
3170 server
3171 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3172 .await;
3173
3174 cx_a.update(|_| drop(channel_a));
3175 server
3176 .condition(|state| state.channel(channel_id).is_none())
3177 .await;
3178 }
3179
3180 #[gpui::test(iterations = 10)]
3181 async fn test_chat_message_validation(mut cx_a: TestAppContext, last_iteration: bool) {
3182 cx_a.foreground().forbid_parking();
3183
3184 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3185 let client_a = server.create_client(&mut cx_a, "user_a").await;
3186
3187 let db = &server.app_state.db;
3188 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3189 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3190 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3191 .await
3192 .unwrap();
3193 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3194 .await
3195 .unwrap();
3196
3197 let channels_a = cx_a
3198 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3199 channels_a
3200 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3201 .await;
3202 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3203 this.get_channel(channel_id.to_proto(), cx).unwrap()
3204 });
3205
3206 // Messages aren't allowed to be too long.
3207 channel_a
3208 .update(&mut cx_a, |channel, cx| {
3209 let long_body = "this is long.\n".repeat(1024);
3210 channel.send_message(long_body, cx).unwrap()
3211 })
3212 .await
3213 .unwrap_err();
3214
3215 // Messages aren't allowed to be blank.
3216 channel_a.update(&mut cx_a, |channel, cx| {
3217 channel.send_message(String::new(), cx).unwrap_err()
3218 });
3219
3220 // Leading and trailing whitespace are trimmed.
3221 channel_a
3222 .update(&mut cx_a, |channel, cx| {
3223 channel
3224 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3225 .unwrap()
3226 })
3227 .await
3228 .unwrap();
3229 assert_eq!(
3230 db.get_channel_messages(channel_id, 10, None)
3231 .await
3232 .unwrap()
3233 .iter()
3234 .map(|m| &m.body)
3235 .collect::<Vec<_>>(),
3236 &["surrounded by whitespace"]
3237 );
3238 }
3239
3240 #[gpui::test(iterations = 10)]
3241 async fn test_chat_reconnection(
3242 mut cx_a: TestAppContext,
3243 mut cx_b: TestAppContext,
3244 last_iteration: bool,
3245 ) {
3246 cx_a.foreground().forbid_parking();
3247
3248 // Connect to a server as 2 clients.
3249 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3250 let client_a = server.create_client(&mut cx_a, "user_a").await;
3251 let client_b = server.create_client(&mut cx_b, "user_b").await;
3252 let mut status_b = client_b.status();
3253
3254 // Create an org that includes these 2 users.
3255 let db = &server.app_state.db;
3256 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3257 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3258 .await
3259 .unwrap();
3260 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3261 .await
3262 .unwrap();
3263
3264 // Create a channel that includes all the users.
3265 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3266 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3267 .await
3268 .unwrap();
3269 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3270 .await
3271 .unwrap();
3272 db.create_channel_message(
3273 channel_id,
3274 client_b.current_user_id(&cx_b),
3275 "hello A, it's B.",
3276 OffsetDateTime::now_utc(),
3277 2,
3278 )
3279 .await
3280 .unwrap();
3281
3282 let channels_a = cx_a
3283 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3284 channels_a
3285 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3286 .await;
3287
3288 channels_a.read_with(&cx_a, |list, _| {
3289 assert_eq!(
3290 list.available_channels().unwrap(),
3291 &[ChannelDetails {
3292 id: channel_id.to_proto(),
3293 name: "test-channel".to_string()
3294 }]
3295 )
3296 });
3297 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3298 this.get_channel(channel_id.to_proto(), cx).unwrap()
3299 });
3300 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3301 channel_a
3302 .condition(&cx_a, |channel, _| {
3303 channel_messages(channel)
3304 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3305 })
3306 .await;
3307
3308 let channels_b = cx_b
3309 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3310 channels_b
3311 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3312 .await;
3313 channels_b.read_with(&cx_b, |list, _| {
3314 assert_eq!(
3315 list.available_channels().unwrap(),
3316 &[ChannelDetails {
3317 id: channel_id.to_proto(),
3318 name: "test-channel".to_string()
3319 }]
3320 )
3321 });
3322
3323 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3324 this.get_channel(channel_id.to_proto(), cx).unwrap()
3325 });
3326 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3327 channel_b
3328 .condition(&cx_b, |channel, _| {
3329 channel_messages(channel)
3330 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3331 })
3332 .await;
3333
3334 // Disconnect client B, ensuring we can still access its cached channel data.
3335 server.forbid_connections();
3336 server.disconnect_client(client_b.current_user_id(&cx_b));
3337 while !matches!(
3338 status_b.next().await,
3339 Some(client::Status::ReconnectionError { .. })
3340 ) {}
3341
3342 channels_b.read_with(&cx_b, |channels, _| {
3343 assert_eq!(
3344 channels.available_channels().unwrap(),
3345 [ChannelDetails {
3346 id: channel_id.to_proto(),
3347 name: "test-channel".to_string()
3348 }]
3349 )
3350 });
3351 channel_b.read_with(&cx_b, |channel, _| {
3352 assert_eq!(
3353 channel_messages(channel),
3354 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3355 )
3356 });
3357
3358 // Send a message from client B while it is disconnected.
3359 channel_b
3360 .update(&mut cx_b, |channel, cx| {
3361 let task = channel
3362 .send_message("can you see this?".to_string(), cx)
3363 .unwrap();
3364 assert_eq!(
3365 channel_messages(channel),
3366 &[
3367 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3368 ("user_b".to_string(), "can you see this?".to_string(), true)
3369 ]
3370 );
3371 task
3372 })
3373 .await
3374 .unwrap_err();
3375
3376 // Send a message from client A while B is disconnected.
3377 channel_a
3378 .update(&mut cx_a, |channel, cx| {
3379 channel
3380 .send_message("oh, hi B.".to_string(), cx)
3381 .unwrap()
3382 .detach();
3383 let task = channel.send_message("sup".to_string(), cx).unwrap();
3384 assert_eq!(
3385 channel_messages(channel),
3386 &[
3387 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3388 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3389 ("user_a".to_string(), "sup".to_string(), true)
3390 ]
3391 );
3392 task
3393 })
3394 .await
3395 .unwrap();
3396
3397 // Give client B a chance to reconnect.
3398 server.allow_connections();
3399 cx_b.foreground().advance_clock(Duration::from_secs(10));
3400
3401 // Verify that B sees the new messages upon reconnection, as well as the message client B
3402 // sent while offline.
3403 channel_b
3404 .condition(&cx_b, |channel, _| {
3405 channel_messages(channel)
3406 == [
3407 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3408 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3409 ("user_a".to_string(), "sup".to_string(), false),
3410 ("user_b".to_string(), "can you see this?".to_string(), false),
3411 ]
3412 })
3413 .await;
3414
3415 // Ensure client A and B can communicate normally after reconnection.
3416 channel_a
3417 .update(&mut cx_a, |channel, cx| {
3418 channel.send_message("you online?".to_string(), cx).unwrap()
3419 })
3420 .await
3421 .unwrap();
3422 channel_b
3423 .condition(&cx_b, |channel, _| {
3424 channel_messages(channel)
3425 == [
3426 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3427 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3428 ("user_a".to_string(), "sup".to_string(), false),
3429 ("user_b".to_string(), "can you see this?".to_string(), false),
3430 ("user_a".to_string(), "you online?".to_string(), false),
3431 ]
3432 })
3433 .await;
3434
3435 channel_b
3436 .update(&mut cx_b, |channel, cx| {
3437 channel.send_message("yep".to_string(), cx).unwrap()
3438 })
3439 .await
3440 .unwrap();
3441 channel_a
3442 .condition(&cx_a, |channel, _| {
3443 channel_messages(channel)
3444 == [
3445 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3446 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3447 ("user_a".to_string(), "sup".to_string(), false),
3448 ("user_b".to_string(), "can you see this?".to_string(), false),
3449 ("user_a".to_string(), "you online?".to_string(), false),
3450 ("user_b".to_string(), "yep".to_string(), false),
3451 ]
3452 })
3453 .await;
3454 }
3455
3456 #[gpui::test(iterations = 10)]
3457 async fn test_contacts(
3458 mut cx_a: TestAppContext,
3459 mut cx_b: TestAppContext,
3460 mut cx_c: TestAppContext,
3461 last_iteration: bool,
3462 ) {
3463 cx_a.foreground().forbid_parking();
3464 let lang_registry = Arc::new(LanguageRegistry::new());
3465 let fs = Arc::new(FakeFs::new(cx_a.background()));
3466
3467 // Connect to a server as 3 clients.
3468 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3469 let client_a = server.create_client(&mut cx_a, "user_a").await;
3470 let client_b = server.create_client(&mut cx_b, "user_b").await;
3471 let client_c = server.create_client(&mut cx_c, "user_c").await;
3472
3473 // Share a worktree as client A.
3474 fs.insert_tree(
3475 "/a",
3476 json!({
3477 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3478 }),
3479 )
3480 .await;
3481
3482 let project_a = cx_a.update(|cx| {
3483 Project::local(
3484 client_a.clone(),
3485 client_a.user_store.clone(),
3486 lang_registry.clone(),
3487 fs.clone(),
3488 cx,
3489 )
3490 });
3491 let (worktree_a, _) = project_a
3492 .update(&mut cx_a, |p, cx| {
3493 p.find_or_create_local_worktree("/a", false, cx)
3494 })
3495 .await
3496 .unwrap();
3497 worktree_a
3498 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3499 .await;
3500
3501 client_a
3502 .user_store
3503 .condition(&cx_a, |user_store, _| {
3504 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3505 })
3506 .await;
3507 client_b
3508 .user_store
3509 .condition(&cx_b, |user_store, _| {
3510 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3511 })
3512 .await;
3513 client_c
3514 .user_store
3515 .condition(&cx_c, |user_store, _| {
3516 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3517 })
3518 .await;
3519
3520 let project_id = project_a
3521 .update(&mut cx_a, |project, _| project.next_remote_id())
3522 .await;
3523 project_a
3524 .update(&mut cx_a, |project, cx| project.share(cx))
3525 .await
3526 .unwrap();
3527
3528 let _project_b = Project::remote(
3529 project_id,
3530 client_b.clone(),
3531 client_b.user_store.clone(),
3532 lang_registry.clone(),
3533 fs.clone(),
3534 &mut cx_b.to_async(),
3535 )
3536 .await
3537 .unwrap();
3538
3539 client_a
3540 .user_store
3541 .condition(&cx_a, |user_store, _| {
3542 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3543 })
3544 .await;
3545 client_b
3546 .user_store
3547 .condition(&cx_b, |user_store, _| {
3548 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3549 })
3550 .await;
3551 client_c
3552 .user_store
3553 .condition(&cx_c, |user_store, _| {
3554 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3555 })
3556 .await;
3557
3558 project_a
3559 .condition(&cx_a, |project, _| {
3560 project.collaborators().contains_key(&client_b.peer_id)
3561 })
3562 .await;
3563
3564 cx_a.update(move |_| drop(project_a));
3565 client_a
3566 .user_store
3567 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3568 .await;
3569 client_b
3570 .user_store
3571 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3572 .await;
3573 client_c
3574 .user_store
3575 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3576 .await;
3577
3578 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3579 user_store
3580 .contacts()
3581 .iter()
3582 .map(|contact| {
3583 let worktrees = contact
3584 .projects
3585 .iter()
3586 .map(|p| {
3587 (
3588 p.worktree_root_names[0].as_str(),
3589 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3590 )
3591 })
3592 .collect();
3593 (contact.user.github_login.as_str(), worktrees)
3594 })
3595 .collect()
3596 }
3597 }
3598
3599 #[gpui::test(iterations = 100)]
3600 async fn test_random_collaboration(cx: TestAppContext, rng: StdRng, last_iteration: bool) {
3601 cx.foreground().forbid_parking();
3602 let max_peers = env::var("MAX_PEERS")
3603 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3604 .unwrap_or(5);
3605 let max_operations = env::var("OPERATIONS")
3606 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3607 .unwrap_or(10);
3608
3609 let rng = Rc::new(RefCell::new(rng));
3610
3611 let mut host_lang_registry = Arc::new(LanguageRegistry::new());
3612 let guest_lang_registry = Arc::new(LanguageRegistry::new());
3613
3614 // Set up a fake language server.
3615 let (language_server_config, fake_language_servers) = LanguageServerConfig::fake();
3616 Arc::get_mut(&mut host_lang_registry)
3617 .unwrap()
3618 .add(Arc::new(Language::new(
3619 LanguageConfig {
3620 name: "Rust".to_string(),
3621 path_suffixes: vec!["rs".to_string()],
3622 language_server: Some(language_server_config),
3623 ..Default::default()
3624 },
3625 Some(tree_sitter_rust::language()),
3626 )));
3627
3628 let fs = Arc::new(FakeFs::new(cx.background()));
3629 fs.insert_tree(
3630 "/_collab",
3631 json!({
3632 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3633 }),
3634 )
3635 .await;
3636
3637 let operations = Rc::new(Cell::new(0));
3638 let mut server = TestServer::start(cx.foreground(), last_iteration).await;
3639 let mut clients = Vec::new();
3640
3641 let mut next_entity_id = 100000;
3642 let mut host_cx = TestAppContext::new(
3643 cx.foreground_platform(),
3644 cx.platform(),
3645 cx.foreground(),
3646 cx.background(),
3647 cx.font_cache(),
3648 next_entity_id,
3649 );
3650 let host = server.create_client(&mut host_cx, "host").await;
3651 let host_project = host_cx.update(|cx| {
3652 Project::local(
3653 host.client.clone(),
3654 host.user_store.clone(),
3655 host_lang_registry.clone(),
3656 fs.clone(),
3657 cx,
3658 )
3659 });
3660 let host_project_id = host_project
3661 .update(&mut host_cx, |p, _| p.next_remote_id())
3662 .await;
3663
3664 let (collab_worktree, _) = host_project
3665 .update(&mut host_cx, |project, cx| {
3666 project.find_or_create_local_worktree("/_collab", false, cx)
3667 })
3668 .await
3669 .unwrap();
3670 collab_worktree
3671 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3672 .await;
3673 host_project
3674 .update(&mut host_cx, |project, cx| project.share(cx))
3675 .await
3676 .unwrap();
3677
3678 clients.push(cx.foreground().spawn(host.simulate_host(
3679 host_project.clone(),
3680 fake_language_servers,
3681 operations.clone(),
3682 max_operations,
3683 rng.clone(),
3684 host_cx.clone(),
3685 )));
3686
3687 while operations.get() < max_operations {
3688 cx.background().simulate_random_delay().await;
3689 if clients.len() < max_peers && rng.borrow_mut().gen_bool(0.05) {
3690 operations.set(operations.get() + 1);
3691
3692 let guest_id = clients.len();
3693 log::info!("Adding guest {}", guest_id);
3694 next_entity_id += 100000;
3695 let mut guest_cx = TestAppContext::new(
3696 cx.foreground_platform(),
3697 cx.platform(),
3698 cx.foreground(),
3699 cx.background(),
3700 cx.font_cache(),
3701 next_entity_id,
3702 );
3703 let guest = server
3704 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3705 .await;
3706 let guest_project = Project::remote(
3707 host_project_id,
3708 guest.client.clone(),
3709 guest.user_store.clone(),
3710 guest_lang_registry.clone(),
3711 fs.clone(),
3712 &mut guest_cx.to_async(),
3713 )
3714 .await
3715 .unwrap();
3716 clients.push(cx.foreground().spawn(guest.simulate_guest(
3717 guest_id,
3718 guest_project,
3719 operations.clone(),
3720 max_operations,
3721 rng.clone(),
3722 guest_cx,
3723 )));
3724
3725 log::info!("Guest {} added", guest_id);
3726 }
3727 }
3728
3729 let clients = futures::future::join_all(clients).await;
3730 cx.foreground().run_until_parked();
3731
3732 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
3733 project
3734 .worktrees(cx)
3735 .map(|worktree| {
3736 let snapshot = worktree.read(cx).snapshot();
3737 (snapshot.id(), snapshot)
3738 })
3739 .collect::<BTreeMap<_, _>>()
3740 });
3741
3742 for (guest_client, guest_cx) in clients.iter().skip(1) {
3743 let guest_id = guest_client.client.id();
3744 let worktree_snapshots =
3745 guest_client
3746 .project
3747 .as_ref()
3748 .unwrap()
3749 .read_with(guest_cx, |project, cx| {
3750 project
3751 .worktrees(cx)
3752 .map(|worktree| {
3753 let snapshot = worktree.read(cx).snapshot();
3754 (snapshot.id(), snapshot)
3755 })
3756 .collect::<BTreeMap<_, _>>()
3757 });
3758
3759 assert_eq!(
3760 worktree_snapshots.keys().collect::<Vec<_>>(),
3761 host_worktree_snapshots.keys().collect::<Vec<_>>(),
3762 "guest {} has different worktrees than the host",
3763 guest_id
3764 );
3765 for (id, host_snapshot) in &host_worktree_snapshots {
3766 let guest_snapshot = &worktree_snapshots[id];
3767 assert_eq!(
3768 guest_snapshot.root_name(),
3769 host_snapshot.root_name(),
3770 "guest {} has different root name than the host for worktree {}",
3771 guest_id,
3772 id
3773 );
3774 assert_eq!(
3775 guest_snapshot.entries(false).collect::<Vec<_>>(),
3776 host_snapshot.entries(false).collect::<Vec<_>>(),
3777 "guest {} has different snapshot than the host for worktree {}",
3778 guest_id,
3779 id
3780 );
3781 }
3782
3783 guest_client
3784 .project
3785 .as_ref()
3786 .unwrap()
3787 .read_with(guest_cx, |project, _| {
3788 assert!(
3789 !project.has_buffered_operations(),
3790 "guest {} has buffered operations ",
3791 guest_id,
3792 );
3793 });
3794
3795 for guest_buffer in &guest_client.buffers {
3796 let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
3797 let host_buffer = host_project.read_with(&host_cx, |project, _| {
3798 project
3799 .shared_buffer(guest_client.peer_id, buffer_id)
3800 .expect(&format!(
3801 "host doest not have buffer for guest:{}, peer:{}, id:{}",
3802 guest_id, guest_client.peer_id, buffer_id
3803 ))
3804 });
3805 assert_eq!(
3806 guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
3807 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
3808 "guest {} buffer {} differs from the host's buffer",
3809 guest_id,
3810 buffer_id,
3811 );
3812 }
3813 }
3814 }
3815
3816 struct TestServer {
3817 peer: Arc<Peer>,
3818 app_state: Arc<AppState>,
3819 server: Arc<Server>,
3820 foreground: Rc<executor::Foreground>,
3821 notifications: mpsc::Receiver<()>,
3822 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3823 forbid_connections: Arc<AtomicBool>,
3824 _test_db: TestDb,
3825 }
3826
3827 impl TestServer {
3828 async fn start(foreground: Rc<executor::Foreground>, clean_db_pool_on_drop: bool) -> Self {
3829 let mut test_db = TestDb::new();
3830 test_db.set_clean_pool_on_drop(clean_db_pool_on_drop);
3831 let app_state = Self::build_app_state(&test_db).await;
3832 let peer = Peer::new();
3833 let notifications = mpsc::channel(128);
3834 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3835 Self {
3836 peer,
3837 app_state,
3838 server,
3839 foreground,
3840 notifications: notifications.1,
3841 connection_killers: Default::default(),
3842 forbid_connections: Default::default(),
3843 _test_db: test_db,
3844 }
3845 }
3846
3847 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3848 let http = FakeHttpClient::with_404_response();
3849 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3850 let client_name = name.to_string();
3851 let mut client = Client::new(http.clone());
3852 let server = self.server.clone();
3853 let connection_killers = self.connection_killers.clone();
3854 let forbid_connections = self.forbid_connections.clone();
3855 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3856
3857 Arc::get_mut(&mut client)
3858 .unwrap()
3859 .override_authenticate(move |cx| {
3860 cx.spawn(|_| async move {
3861 let access_token = "the-token".to_string();
3862 Ok(Credentials {
3863 user_id: user_id.0 as u64,
3864 access_token,
3865 })
3866 })
3867 })
3868 .override_establish_connection(move |credentials, cx| {
3869 assert_eq!(credentials.user_id, user_id.0 as u64);
3870 assert_eq!(credentials.access_token, "the-token");
3871
3872 let server = server.clone();
3873 let connection_killers = connection_killers.clone();
3874 let forbid_connections = forbid_connections.clone();
3875 let client_name = client_name.clone();
3876 let connection_id_tx = connection_id_tx.clone();
3877 cx.spawn(move |cx| async move {
3878 if forbid_connections.load(SeqCst) {
3879 Err(EstablishConnectionError::other(anyhow!(
3880 "server is forbidding connections"
3881 )))
3882 } else {
3883 let (client_conn, server_conn, kill_conn) =
3884 Connection::in_memory(cx.background());
3885 connection_killers.lock().insert(user_id, kill_conn);
3886 cx.background()
3887 .spawn(server.handle_connection(
3888 server_conn,
3889 client_name,
3890 user_id,
3891 Some(connection_id_tx),
3892 cx.background(),
3893 ))
3894 .detach();
3895 Ok(client_conn)
3896 }
3897 })
3898 });
3899
3900 client
3901 .authenticate_and_connect(&cx.to_async())
3902 .await
3903 .unwrap();
3904
3905 Channel::init(&client);
3906 Project::init(&client);
3907
3908 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3909 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3910 let mut authed_user =
3911 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3912 while authed_user.next().await.unwrap().is_none() {}
3913
3914 TestClient {
3915 client,
3916 peer_id,
3917 user_store,
3918 project: Default::default(),
3919 buffers: Default::default(),
3920 }
3921 }
3922
3923 fn disconnect_client(&self, user_id: UserId) {
3924 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3925 let _ = kill_conn.try_send(Some(()));
3926 }
3927 }
3928
3929 fn forbid_connections(&self) {
3930 self.forbid_connections.store(true, SeqCst);
3931 }
3932
3933 fn allow_connections(&self) {
3934 self.forbid_connections.store(false, SeqCst);
3935 }
3936
3937 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3938 let mut config = Config::default();
3939 config.session_secret = "a".repeat(32);
3940 config.database_url = test_db.url.clone();
3941 let github_client = github::AppClient::test();
3942 Arc::new(AppState {
3943 db: test_db.db().clone(),
3944 handlebars: Default::default(),
3945 auth_client: auth::build_client("", ""),
3946 repo_client: github::RepoClient::test(&github_client),
3947 github_client,
3948 config,
3949 })
3950 }
3951
3952 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3953 self.server.store.read()
3954 }
3955
3956 async fn condition<F>(&mut self, mut predicate: F)
3957 where
3958 F: FnMut(&Store) -> bool,
3959 {
3960 async_std::future::timeout(Duration::from_millis(500), async {
3961 while !(predicate)(&*self.server.store.read()) {
3962 self.foreground.start_waiting();
3963 self.notifications.next().await;
3964 self.foreground.finish_waiting();
3965 }
3966 })
3967 .await
3968 .expect("condition timed out");
3969 }
3970 }
3971
3972 impl Drop for TestServer {
3973 fn drop(&mut self) {
3974 self.peer.reset();
3975 }
3976 }
3977
3978 struct TestClient {
3979 client: Arc<Client>,
3980 pub peer_id: PeerId,
3981 pub user_store: ModelHandle<UserStore>,
3982 project: Option<ModelHandle<Project>>,
3983 buffers: HashSet<ModelHandle<zed::language::Buffer>>,
3984 }
3985
3986 impl Deref for TestClient {
3987 type Target = Arc<Client>;
3988
3989 fn deref(&self) -> &Self::Target {
3990 &self.client
3991 }
3992 }
3993
3994 impl TestClient {
3995 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3996 UserId::from_proto(
3997 self.user_store
3998 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3999 )
4000 }
4001
4002 async fn simulate_host(
4003 mut self,
4004 project: ModelHandle<Project>,
4005 fake_language_servers: UnboundedReceiver<FakeLanguageServer>,
4006 operations: Rc<Cell<usize>>,
4007 max_operations: usize,
4008 rng: Rc<RefCell<StdRng>>,
4009 mut cx: TestAppContext,
4010 ) -> (Self, TestAppContext) {
4011 let fs = project.read_with(&cx, |project, _| project.fs().clone());
4012 let mut files: Vec<PathBuf> = Default::default();
4013 while operations.get() < max_operations {
4014 operations.set(operations.get() + 1);
4015
4016 let distribution = rng.borrow_mut().gen_range(0..100);
4017 match distribution {
4018 0..=20 if !files.is_empty() => {
4019 let mut path = files.choose(&mut *rng.borrow_mut()).unwrap().as_path();
4020 while let Some(parent_path) = path.parent() {
4021 path = parent_path;
4022 if rng.borrow_mut().gen() {
4023 break;
4024 }
4025 }
4026
4027 log::info!("Host: find/create local worktree {:?}", path);
4028 project
4029 .update(&mut cx, |project, cx| {
4030 project.find_or_create_local_worktree(path, false, cx)
4031 })
4032 .await
4033 .unwrap();
4034 }
4035 10..=80 if !files.is_empty() => {
4036 let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4037 let file = files.choose(&mut *rng.borrow_mut()).unwrap();
4038 let (worktree, path) = project
4039 .update(&mut cx, |project, cx| {
4040 project.find_or_create_local_worktree(file, false, cx)
4041 })
4042 .await
4043 .unwrap();
4044 let project_path =
4045 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4046 log::info!("Host: opening path {:?}", project_path);
4047 let buffer = project
4048 .update(&mut cx, |project, cx| {
4049 project.open_buffer(project_path, cx)
4050 })
4051 .await
4052 .unwrap();
4053 self.buffers.insert(buffer.clone());
4054 buffer
4055 } else {
4056 self.buffers
4057 .iter()
4058 .choose(&mut *rng.borrow_mut())
4059 .unwrap()
4060 .clone()
4061 };
4062
4063 if rng.borrow_mut().gen_bool(0.1) {
4064 cx.update(|cx| {
4065 log::info!(
4066 "Host: dropping buffer {:?}",
4067 buffer.read(cx).file().unwrap().full_path(cx)
4068 );
4069 self.buffers.remove(&buffer);
4070 drop(buffer);
4071 });
4072 } else {
4073 buffer.update(&mut cx, |buffer, cx| {
4074 log::info!(
4075 "Host: updating buffer {:?}",
4076 buffer.file().unwrap().full_path(cx)
4077 );
4078 buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4079 });
4080 }
4081 }
4082 _ => loop {
4083 let path_component_count = rng.borrow_mut().gen_range(1..=5);
4084 let mut path = PathBuf::new();
4085 path.push("/");
4086 for _ in 0..path_component_count {
4087 let letter = rng.borrow_mut().gen_range(b'a'..=b'z');
4088 path.push(std::str::from_utf8(&[letter]).unwrap());
4089 }
4090 path.set_extension("rs");
4091 let parent_path = path.parent().unwrap();
4092
4093 log::info!("Host: creating file {:?}", path);
4094 if fs.create_dir(&parent_path).await.is_ok()
4095 && fs.create_file(&path, Default::default()).await.is_ok()
4096 {
4097 files.push(path);
4098 break;
4099 } else {
4100 log::info!("Host: cannot create file");
4101 }
4102 },
4103 }
4104
4105 cx.background().simulate_random_delay().await;
4106 }
4107
4108 self.project = Some(project);
4109 (self, cx)
4110 }
4111
4112 pub async fn simulate_guest(
4113 mut self,
4114 guest_id: usize,
4115 project: ModelHandle<Project>,
4116 operations: Rc<Cell<usize>>,
4117 max_operations: usize,
4118 rng: Rc<RefCell<StdRng>>,
4119 mut cx: TestAppContext,
4120 ) -> (Self, TestAppContext) {
4121 while operations.get() < max_operations {
4122 let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4123 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4124 project
4125 .worktrees(&cx)
4126 .filter(|worktree| {
4127 worktree.read(cx).entries(false).any(|e| e.is_file())
4128 })
4129 .choose(&mut *rng.borrow_mut())
4130 }) {
4131 worktree
4132 } else {
4133 cx.background().simulate_random_delay().await;
4134 continue;
4135 };
4136
4137 operations.set(operations.get() + 1);
4138 let project_path = worktree.read_with(&cx, |worktree, _| {
4139 let entry = worktree
4140 .entries(false)
4141 .filter(|e| e.is_file())
4142 .choose(&mut *rng.borrow_mut())
4143 .unwrap();
4144 (worktree.id(), entry.path.clone())
4145 });
4146 log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4147 let buffer = project
4148 .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4149 .await
4150 .unwrap();
4151 self.buffers.insert(buffer.clone());
4152 buffer
4153 } else {
4154 self.buffers
4155 .iter()
4156 .choose(&mut *rng.borrow_mut())
4157 .unwrap()
4158 .clone()
4159 };
4160
4161 if rng.borrow_mut().gen_bool(0.1) {
4162 cx.update(|cx| {
4163 log::info!(
4164 "Guest {}: dropping buffer {:?}",
4165 guest_id,
4166 buffer.read(cx).file().unwrap().full_path(cx)
4167 );
4168 self.buffers.remove(&buffer);
4169 drop(buffer);
4170 });
4171 } else {
4172 buffer.update(&mut cx, |buffer, cx| {
4173 log::info!(
4174 "Guest {}: updating buffer {:?}",
4175 guest_id,
4176 buffer.file().unwrap().full_path(cx)
4177 );
4178 buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4179 });
4180 }
4181
4182 cx.background().simulate_random_delay().await;
4183 }
4184
4185 self.project = Some(project);
4186 (self, cx)
4187 }
4188 }
4189
4190 impl Executor for Arc<gpui::executor::Background> {
4191 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4192 self.spawn(future).detach();
4193 }
4194 }
4195
4196 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4197 channel
4198 .messages()
4199 .cursor::<()>()
4200 .map(|m| {
4201 (
4202 m.sender.github_login.clone(),
4203 m.body.clone(),
4204 m.is_pending(),
4205 )
4206 })
4207 .collect()
4208 }
4209
4210 struct EmptyView;
4211
4212 impl gpui::Entity for EmptyView {
4213 type Event = ();
4214 }
4215
4216 impl gpui::View for EmptyView {
4217 fn ui_name() -> &'static str {
4218 "empty view"
4219 }
4220
4221 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4222 gpui::Element::boxed(gpui::elements::Empty)
4223 }
4224 }
4225}