1mod store;
2
3use super::{
4 auth::process_auth_header,
5 db::{ChannelId, MessageId, UserId},
6 AppState,
7};
8use anyhow::anyhow;
9use async_std::task;
10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
11use collections::{HashMap, HashSet};
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
14use postage::{mpsc, prelude::Sink as _};
15use rpc::{
16 proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
17 Connection, ConnectionId, Peer, TypedEnvelope,
18};
19use sha1::{Digest as _, Sha1};
20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
21use store::{Store, Worktree};
22use surf::StatusCode;
23use tide::log;
24use tide::{
25 http::headers::{HeaderName, CONNECTION, UPGRADE},
26 Request, Response,
27};
28use time::OffsetDateTime;
29
30type MessageHandler = Box<
31 dyn Send
32 + Sync
33 + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
34>;
35
36pub struct Server {
37 peer: Arc<Peer>,
38 store: RwLock<Store>,
39 app_state: Arc<AppState>,
40 handlers: HashMap<TypeId, MessageHandler>,
41 notifications: Option<mpsc::Sender<()>>,
42}
43
44pub trait Executor {
45 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
46}
47
48pub struct RealExecutor;
49
50const MESSAGE_COUNT_PER_PAGE: usize = 100;
51const MAX_MESSAGE_LEN: usize = 1024;
52
53impl Server {
54 pub fn new(
55 app_state: Arc<AppState>,
56 peer: Arc<Peer>,
57 notifications: Option<mpsc::Sender<()>>,
58 ) -> Arc<Self> {
59 let mut server = Self {
60 peer,
61 app_state,
62 store: Default::default(),
63 handlers: Default::default(),
64 notifications,
65 };
66
67 server
68 .add_request_handler(Server::ping)
69 .add_request_handler(Server::register_project)
70 .add_message_handler(Server::unregister_project)
71 .add_request_handler(Server::share_project)
72 .add_message_handler(Server::unshare_project)
73 .add_request_handler(Server::join_project)
74 .add_message_handler(Server::leave_project)
75 .add_request_handler(Server::register_worktree)
76 .add_message_handler(Server::unregister_worktree)
77 .add_request_handler(Server::share_worktree)
78 .add_request_handler(Server::update_worktree)
79 .add_message_handler(Server::update_diagnostic_summary)
80 .add_message_handler(Server::disk_based_diagnostics_updating)
81 .add_message_handler(Server::disk_based_diagnostics_updated)
82 .add_request_handler(Server::get_definition)
83 .add_request_handler(Server::open_buffer)
84 .add_message_handler(Server::close_buffer)
85 .add_request_handler(Server::update_buffer)
86 .add_message_handler(Server::update_buffer_file)
87 .add_message_handler(Server::buffer_reloaded)
88 .add_message_handler(Server::buffer_saved)
89 .add_request_handler(Server::save_buffer)
90 .add_request_handler(Server::format_buffers)
91 .add_request_handler(Server::get_completions)
92 .add_request_handler(Server::apply_additional_edits_for_completion)
93 .add_request_handler(Server::get_code_actions)
94 .add_request_handler(Server::apply_code_action)
95 .add_request_handler(Server::get_channels)
96 .add_request_handler(Server::get_users)
97 .add_request_handler(Server::join_channel)
98 .add_message_handler(Server::leave_channel)
99 .add_request_handler(Server::send_channel_message)
100 .add_request_handler(Server::get_channel_messages);
101
102 Arc::new(server)
103 }
104
105 fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
106 where
107 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
108 Fut: 'static + Send + Future<Output = tide::Result<()>>,
109 M: EnvelopedMessage,
110 {
111 let prev_handler = self.handlers.insert(
112 TypeId::of::<M>(),
113 Box::new(move |server, envelope| {
114 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
115 (handler)(server, *envelope).boxed()
116 }),
117 );
118 if prev_handler.is_some() {
119 panic!("registered a handler for the same message twice");
120 }
121 self
122 }
123
124 fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
125 where
126 F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
127 Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
128 M: RequestMessage,
129 {
130 self.add_message_handler(move |server, envelope| {
131 let receipt = envelope.receipt();
132 let response = (handler)(server.clone(), envelope);
133 async move {
134 match response.await {
135 Ok(response) => {
136 server.peer.respond(receipt, response)?;
137 Ok(())
138 }
139 Err(error) => {
140 server.peer.respond_with_error(
141 receipt,
142 proto::Error {
143 message: error.to_string(),
144 },
145 )?;
146 Err(error)
147 }
148 }
149 }
150 })
151 }
152
153 pub fn handle_connection<E: Executor>(
154 self: &Arc<Self>,
155 connection: Connection,
156 addr: String,
157 user_id: UserId,
158 mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
159 executor: E,
160 ) -> impl Future<Output = ()> {
161 let mut this = self.clone();
162 async move {
163 let (connection_id, handle_io, mut incoming_rx) =
164 this.peer.add_connection(connection).await;
165
166 if let Some(send_connection_id) = send_connection_id.as_mut() {
167 let _ = send_connection_id.send(connection_id).await;
168 }
169
170 this.state_mut().add_connection(connection_id, user_id);
171 if let Err(err) = this.update_contacts_for_users(&[user_id]) {
172 log::error!("error updating contacts for {:?}: {}", user_id, err);
173 }
174
175 let handle_io = handle_io.fuse();
176 futures::pin_mut!(handle_io);
177 loop {
178 let next_message = incoming_rx.next().fuse();
179 futures::pin_mut!(next_message);
180 futures::select_biased! {
181 result = handle_io => {
182 if let Err(err) = result {
183 log::error!("error handling rpc connection {:?} - {:?}", addr, err);
184 }
185 break;
186 }
187 message = next_message => {
188 if let Some(message) = message {
189 let start_time = Instant::now();
190 let type_name = message.payload_type_name();
191 log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
192 if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
193 let notifications = this.notifications.clone();
194 let is_background = message.is_background();
195 let handle_message = (handler)(this.clone(), message);
196 let handle_message = async move {
197 if let Err(err) = handle_message.await {
198 log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
199 } else {
200 log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
201 }
202 if let Some(mut notifications) = notifications {
203 let _ = notifications.send(()).await;
204 }
205 };
206 if is_background {
207 executor.spawn_detached(handle_message);
208 } else {
209 handle_message.await;
210 }
211 } else {
212 log::warn!("unhandled message: {}", type_name);
213 }
214 } else {
215 log::info!("rpc connection closed {:?}", addr);
216 break;
217 }
218 }
219 }
220 }
221
222 if let Err(err) = this.sign_out(connection_id).await {
223 log::error!("error signing out connection {:?} - {:?}", addr, err);
224 }
225 }
226 }
227
228 async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
229 self.peer.disconnect(connection_id);
230 let removed_connection = self.state_mut().remove_connection(connection_id)?;
231
232 for (project_id, project) in removed_connection.hosted_projects {
233 if let Some(share) = project.share {
234 broadcast(
235 connection_id,
236 share.guests.keys().copied().collect(),
237 |conn_id| {
238 self.peer
239 .send(conn_id, proto::UnshareProject { project_id })
240 },
241 )?;
242 }
243 }
244
245 for (project_id, peer_ids) in removed_connection.guest_project_ids {
246 broadcast(connection_id, peer_ids, |conn_id| {
247 self.peer.send(
248 conn_id,
249 proto::RemoveProjectCollaborator {
250 project_id,
251 peer_id: connection_id.0,
252 },
253 )
254 })?;
255 }
256
257 self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
258 Ok(())
259 }
260
261 async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
262 Ok(proto::Ack {})
263 }
264
265 async fn register_project(
266 mut self: Arc<Server>,
267 request: TypedEnvelope<proto::RegisterProject>,
268 ) -> tide::Result<proto::RegisterProjectResponse> {
269 let project_id = {
270 let mut state = self.state_mut();
271 let user_id = state.user_id_for_connection(request.sender_id)?;
272 state.register_project(request.sender_id, user_id)
273 };
274 Ok(proto::RegisterProjectResponse { project_id })
275 }
276
277 async fn unregister_project(
278 mut self: Arc<Server>,
279 request: TypedEnvelope<proto::UnregisterProject>,
280 ) -> tide::Result<()> {
281 let project = self
282 .state_mut()
283 .unregister_project(request.payload.project_id, request.sender_id)?;
284 self.update_contacts_for_users(project.authorized_user_ids().iter())?;
285 Ok(())
286 }
287
288 async fn share_project(
289 mut self: Arc<Server>,
290 request: TypedEnvelope<proto::ShareProject>,
291 ) -> tide::Result<proto::Ack> {
292 self.state_mut()
293 .share_project(request.payload.project_id, request.sender_id);
294 Ok(proto::Ack {})
295 }
296
297 async fn unshare_project(
298 mut self: Arc<Server>,
299 request: TypedEnvelope<proto::UnshareProject>,
300 ) -> tide::Result<()> {
301 let project_id = request.payload.project_id;
302 let project = self
303 .state_mut()
304 .unshare_project(project_id, request.sender_id)?;
305
306 broadcast(request.sender_id, project.connection_ids, |conn_id| {
307 self.peer
308 .send(conn_id, proto::UnshareProject { project_id })
309 })?;
310 self.update_contacts_for_users(&project.authorized_user_ids)?;
311 Ok(())
312 }
313
314 async fn join_project(
315 mut self: Arc<Server>,
316 request: TypedEnvelope<proto::JoinProject>,
317 ) -> tide::Result<proto::JoinProjectResponse> {
318 let project_id = request.payload.project_id;
319
320 let user_id = self.state().user_id_for_connection(request.sender_id)?;
321 let (response, connection_ids, contact_user_ids) = self
322 .state_mut()
323 .join_project(request.sender_id, user_id, project_id)
324 .and_then(|joined| {
325 let share = joined.project.share()?;
326 let peer_count = share.guests.len();
327 let mut collaborators = Vec::with_capacity(peer_count);
328 collaborators.push(proto::Collaborator {
329 peer_id: joined.project.host_connection_id.0,
330 replica_id: 0,
331 user_id: joined.project.host_user_id.to_proto(),
332 });
333 let worktrees = joined
334 .project
335 .worktrees
336 .iter()
337 .filter_map(|(id, worktree)| {
338 worktree.share.as_ref().map(|share| proto::Worktree {
339 id: *id,
340 root_name: worktree.root_name.clone(),
341 entries: share.entries.values().cloned().collect(),
342 diagnostic_summaries: share
343 .diagnostic_summaries
344 .values()
345 .cloned()
346 .collect(),
347 weak: worktree.weak,
348 next_update_id: share.next_update_id as u64,
349 })
350 })
351 .collect();
352 for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
353 if *peer_conn_id != request.sender_id {
354 collaborators.push(proto::Collaborator {
355 peer_id: peer_conn_id.0,
356 replica_id: *peer_replica_id as u32,
357 user_id: peer_user_id.to_proto(),
358 });
359 }
360 }
361 let response = proto::JoinProjectResponse {
362 worktrees,
363 replica_id: joined.replica_id as u32,
364 collaborators,
365 };
366 let connection_ids = joined.project.connection_ids();
367 let contact_user_ids = joined.project.authorized_user_ids();
368 Ok((response, connection_ids, contact_user_ids))
369 })?;
370
371 broadcast(request.sender_id, connection_ids, |conn_id| {
372 self.peer.send(
373 conn_id,
374 proto::AddProjectCollaborator {
375 project_id,
376 collaborator: Some(proto::Collaborator {
377 peer_id: request.sender_id.0,
378 replica_id: response.replica_id,
379 user_id: user_id.to_proto(),
380 }),
381 },
382 )
383 })?;
384 self.update_contacts_for_users(&contact_user_ids)?;
385 Ok(response)
386 }
387
388 async fn leave_project(
389 mut self: Arc<Server>,
390 request: TypedEnvelope<proto::LeaveProject>,
391 ) -> tide::Result<()> {
392 let sender_id = request.sender_id;
393 let project_id = request.payload.project_id;
394 let worktree = self.state_mut().leave_project(sender_id, project_id)?;
395
396 broadcast(sender_id, worktree.connection_ids, |conn_id| {
397 self.peer.send(
398 conn_id,
399 proto::RemoveProjectCollaborator {
400 project_id,
401 peer_id: sender_id.0,
402 },
403 )
404 })?;
405 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
406
407 Ok(())
408 }
409
410 async fn register_worktree(
411 mut self: Arc<Server>,
412 request: TypedEnvelope<proto::RegisterWorktree>,
413 ) -> tide::Result<proto::Ack> {
414 let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
415
416 let mut contact_user_ids = HashSet::default();
417 contact_user_ids.insert(host_user_id);
418 for github_login in request.payload.authorized_logins {
419 let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
420 contact_user_ids.insert(contact_user_id);
421 }
422
423 let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
424 self.state_mut().register_worktree(
425 request.payload.project_id,
426 request.payload.worktree_id,
427 request.sender_id,
428 Worktree {
429 authorized_user_ids: contact_user_ids.clone(),
430 root_name: request.payload.root_name,
431 share: None,
432 weak: false,
433 },
434 )?;
435 self.update_contacts_for_users(&contact_user_ids)?;
436 Ok(proto::Ack {})
437 }
438
439 async fn unregister_worktree(
440 mut self: Arc<Server>,
441 request: TypedEnvelope<proto::UnregisterWorktree>,
442 ) -> tide::Result<()> {
443 let project_id = request.payload.project_id;
444 let worktree_id = request.payload.worktree_id;
445 let (worktree, guest_connection_ids) =
446 self.state_mut()
447 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
448 broadcast(request.sender_id, guest_connection_ids, |conn_id| {
449 self.peer.send(
450 conn_id,
451 proto::UnregisterWorktree {
452 project_id,
453 worktree_id,
454 },
455 )
456 })?;
457 self.update_contacts_for_users(&worktree.authorized_user_ids)?;
458 Ok(())
459 }
460
461 async fn share_worktree(
462 mut self: Arc<Server>,
463 mut request: TypedEnvelope<proto::ShareWorktree>,
464 ) -> tide::Result<proto::Ack> {
465 let worktree = request
466 .payload
467 .worktree
468 .as_mut()
469 .ok_or_else(|| anyhow!("missing worktree"))?;
470 let entries = worktree
471 .entries
472 .iter()
473 .map(|entry| (entry.id, entry.clone()))
474 .collect();
475 let diagnostic_summaries = worktree
476 .diagnostic_summaries
477 .iter()
478 .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
479 .collect();
480
481 let shared_worktree = self.state_mut().share_worktree(
482 request.payload.project_id,
483 worktree.id,
484 request.sender_id,
485 entries,
486 diagnostic_summaries,
487 worktree.next_update_id,
488 )?;
489
490 broadcast(
491 request.sender_id,
492 shared_worktree.connection_ids,
493 |connection_id| {
494 self.peer
495 .forward_send(request.sender_id, connection_id, request.payload.clone())
496 },
497 )?;
498 self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
499
500 Ok(proto::Ack {})
501 }
502
503 async fn update_worktree(
504 mut self: Arc<Server>,
505 request: TypedEnvelope<proto::UpdateWorktree>,
506 ) -> tide::Result<proto::Ack> {
507 let connection_ids = self.state_mut().update_worktree(
508 request.sender_id,
509 request.payload.project_id,
510 request.payload.worktree_id,
511 request.payload.id,
512 &request.payload.removed_entries,
513 &request.payload.updated_entries,
514 )?;
515
516 broadcast(request.sender_id, connection_ids, |connection_id| {
517 self.peer
518 .forward_send(request.sender_id, connection_id, request.payload.clone())
519 })?;
520
521 Ok(proto::Ack {})
522 }
523
524 async fn update_diagnostic_summary(
525 mut self: Arc<Server>,
526 request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
527 ) -> tide::Result<()> {
528 let summary = request
529 .payload
530 .summary
531 .clone()
532 .ok_or_else(|| anyhow!("invalid summary"))?;
533 let receiver_ids = self.state_mut().update_diagnostic_summary(
534 request.payload.project_id,
535 request.payload.worktree_id,
536 request.sender_id,
537 summary,
538 )?;
539
540 broadcast(request.sender_id, receiver_ids, |connection_id| {
541 self.peer
542 .forward_send(request.sender_id, connection_id, request.payload.clone())
543 })?;
544 Ok(())
545 }
546
547 async fn disk_based_diagnostics_updating(
548 self: Arc<Server>,
549 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
550 ) -> tide::Result<()> {
551 let receiver_ids = self
552 .state()
553 .project_connection_ids(request.payload.project_id, request.sender_id)?;
554 broadcast(request.sender_id, receiver_ids, |connection_id| {
555 self.peer
556 .forward_send(request.sender_id, connection_id, request.payload.clone())
557 })?;
558 Ok(())
559 }
560
561 async fn disk_based_diagnostics_updated(
562 self: Arc<Server>,
563 request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
564 ) -> tide::Result<()> {
565 let receiver_ids = self
566 .state()
567 .project_connection_ids(request.payload.project_id, request.sender_id)?;
568 broadcast(request.sender_id, receiver_ids, |connection_id| {
569 self.peer
570 .forward_send(request.sender_id, connection_id, request.payload.clone())
571 })?;
572 Ok(())
573 }
574
575 async fn get_definition(
576 self: Arc<Server>,
577 request: TypedEnvelope<proto::GetDefinition>,
578 ) -> tide::Result<proto::GetDefinitionResponse> {
579 let host_connection_id = self
580 .state()
581 .read_project(request.payload.project_id, request.sender_id)?
582 .host_connection_id;
583 Ok(self
584 .peer
585 .forward_request(request.sender_id, host_connection_id, request.payload)
586 .await?)
587 }
588
589 async fn open_buffer(
590 self: Arc<Server>,
591 request: TypedEnvelope<proto::OpenBuffer>,
592 ) -> tide::Result<proto::OpenBufferResponse> {
593 let host_connection_id = self
594 .state()
595 .read_project(request.payload.project_id, request.sender_id)?
596 .host_connection_id;
597 Ok(self
598 .peer
599 .forward_request(request.sender_id, host_connection_id, request.payload)
600 .await?)
601 }
602
603 async fn close_buffer(
604 self: Arc<Server>,
605 request: TypedEnvelope<proto::CloseBuffer>,
606 ) -> tide::Result<()> {
607 let host_connection_id = self
608 .state()
609 .read_project(request.payload.project_id, request.sender_id)?
610 .host_connection_id;
611 self.peer
612 .forward_send(request.sender_id, host_connection_id, request.payload)?;
613 Ok(())
614 }
615
616 async fn save_buffer(
617 self: Arc<Server>,
618 request: TypedEnvelope<proto::SaveBuffer>,
619 ) -> tide::Result<proto::BufferSaved> {
620 let host;
621 let mut guests;
622 {
623 let state = self.state();
624 let project = state.read_project(request.payload.project_id, request.sender_id)?;
625 host = project.host_connection_id;
626 guests = project.guest_connection_ids()
627 }
628
629 let response = self
630 .peer
631 .forward_request(request.sender_id, host, request.payload.clone())
632 .await?;
633
634 guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
635 broadcast(host, guests, |conn_id| {
636 self.peer.forward_send(host, conn_id, response.clone())
637 })?;
638
639 Ok(response)
640 }
641
642 async fn format_buffers(
643 self: Arc<Server>,
644 request: TypedEnvelope<proto::FormatBuffers>,
645 ) -> tide::Result<proto::FormatBuffersResponse> {
646 let host = self
647 .state()
648 .read_project(request.payload.project_id, request.sender_id)?
649 .host_connection_id;
650 Ok(self
651 .peer
652 .forward_request(request.sender_id, host, request.payload.clone())
653 .await?)
654 }
655
656 async fn get_completions(
657 self: Arc<Server>,
658 request: TypedEnvelope<proto::GetCompletions>,
659 ) -> tide::Result<proto::GetCompletionsResponse> {
660 let host = self
661 .state()
662 .read_project(request.payload.project_id, request.sender_id)?
663 .host_connection_id;
664 Ok(self
665 .peer
666 .forward_request(request.sender_id, host, request.payload.clone())
667 .await?)
668 }
669
670 async fn apply_additional_edits_for_completion(
671 self: Arc<Server>,
672 request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
673 ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
674 let host = self
675 .state()
676 .read_project(request.payload.project_id, request.sender_id)?
677 .host_connection_id;
678 Ok(self
679 .peer
680 .forward_request(request.sender_id, host, request.payload.clone())
681 .await?)
682 }
683
684 async fn get_code_actions(
685 self: Arc<Server>,
686 request: TypedEnvelope<proto::GetCodeActions>,
687 ) -> tide::Result<proto::GetCodeActionsResponse> {
688 let host = self
689 .state()
690 .read_project(request.payload.project_id, request.sender_id)?
691 .host_connection_id;
692 Ok(self
693 .peer
694 .forward_request(request.sender_id, host, request.payload.clone())
695 .await?)
696 }
697
698 async fn apply_code_action(
699 self: Arc<Server>,
700 request: TypedEnvelope<proto::ApplyCodeAction>,
701 ) -> tide::Result<proto::ApplyCodeActionResponse> {
702 let host = self
703 .state()
704 .read_project(request.payload.project_id, request.sender_id)?
705 .host_connection_id;
706 Ok(self
707 .peer
708 .forward_request(request.sender_id, host, request.payload.clone())
709 .await?)
710 }
711
712 async fn update_buffer(
713 self: Arc<Server>,
714 request: TypedEnvelope<proto::UpdateBuffer>,
715 ) -> tide::Result<proto::Ack> {
716 let receiver_ids = self
717 .state()
718 .project_connection_ids(request.payload.project_id, request.sender_id)?;
719 broadcast(request.sender_id, receiver_ids, |connection_id| {
720 self.peer
721 .forward_send(request.sender_id, connection_id, request.payload.clone())
722 })?;
723 Ok(proto::Ack {})
724 }
725
726 async fn update_buffer_file(
727 self: Arc<Server>,
728 request: TypedEnvelope<proto::UpdateBufferFile>,
729 ) -> tide::Result<()> {
730 let receiver_ids = self
731 .state()
732 .project_connection_ids(request.payload.project_id, request.sender_id)?;
733 broadcast(request.sender_id, receiver_ids, |connection_id| {
734 self.peer
735 .forward_send(request.sender_id, connection_id, request.payload.clone())
736 })?;
737 Ok(())
738 }
739
740 async fn buffer_reloaded(
741 self: Arc<Server>,
742 request: TypedEnvelope<proto::BufferReloaded>,
743 ) -> tide::Result<()> {
744 let receiver_ids = self
745 .state()
746 .project_connection_ids(request.payload.project_id, request.sender_id)?;
747 broadcast(request.sender_id, receiver_ids, |connection_id| {
748 self.peer
749 .forward_send(request.sender_id, connection_id, request.payload.clone())
750 })?;
751 Ok(())
752 }
753
754 async fn buffer_saved(
755 self: Arc<Server>,
756 request: TypedEnvelope<proto::BufferSaved>,
757 ) -> tide::Result<()> {
758 let receiver_ids = self
759 .state()
760 .project_connection_ids(request.payload.project_id, request.sender_id)?;
761 broadcast(request.sender_id, receiver_ids, |connection_id| {
762 self.peer
763 .forward_send(request.sender_id, connection_id, request.payload.clone())
764 })?;
765 Ok(())
766 }
767
768 async fn get_channels(
769 self: Arc<Server>,
770 request: TypedEnvelope<proto::GetChannels>,
771 ) -> tide::Result<proto::GetChannelsResponse> {
772 let user_id = self.state().user_id_for_connection(request.sender_id)?;
773 let channels = self.app_state.db.get_accessible_channels(user_id).await?;
774 Ok(proto::GetChannelsResponse {
775 channels: channels
776 .into_iter()
777 .map(|chan| proto::Channel {
778 id: chan.id.to_proto(),
779 name: chan.name,
780 })
781 .collect(),
782 })
783 }
784
785 async fn get_users(
786 self: Arc<Server>,
787 request: TypedEnvelope<proto::GetUsers>,
788 ) -> tide::Result<proto::GetUsersResponse> {
789 let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
790 let users = self
791 .app_state
792 .db
793 .get_users_by_ids(user_ids)
794 .await?
795 .into_iter()
796 .map(|user| proto::User {
797 id: user.id.to_proto(),
798 avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
799 github_login: user.github_login,
800 })
801 .collect();
802 Ok(proto::GetUsersResponse { users })
803 }
804
805 fn update_contacts_for_users<'a>(
806 self: &Arc<Server>,
807 user_ids: impl IntoIterator<Item = &'a UserId>,
808 ) -> anyhow::Result<()> {
809 let mut result = Ok(());
810 let state = self.state();
811 for user_id in user_ids {
812 let contacts = state.contacts_for_user(*user_id);
813 for connection_id in state.connection_ids_for_user(*user_id) {
814 if let Err(error) = self.peer.send(
815 connection_id,
816 proto::UpdateContacts {
817 contacts: contacts.clone(),
818 },
819 ) {
820 result = Err(error);
821 }
822 }
823 }
824 result
825 }
826
827 async fn join_channel(
828 mut self: Arc<Self>,
829 request: TypedEnvelope<proto::JoinChannel>,
830 ) -> tide::Result<proto::JoinChannelResponse> {
831 let user_id = self.state().user_id_for_connection(request.sender_id)?;
832 let channel_id = ChannelId::from_proto(request.payload.channel_id);
833 if !self
834 .app_state
835 .db
836 .can_user_access_channel(user_id, channel_id)
837 .await?
838 {
839 Err(anyhow!("access denied"))?;
840 }
841
842 self.state_mut().join_channel(request.sender_id, channel_id);
843 let messages = self
844 .app_state
845 .db
846 .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
847 .await?
848 .into_iter()
849 .map(|msg| proto::ChannelMessage {
850 id: msg.id.to_proto(),
851 body: msg.body,
852 timestamp: msg.sent_at.unix_timestamp() as u64,
853 sender_id: msg.sender_id.to_proto(),
854 nonce: Some(msg.nonce.as_u128().into()),
855 })
856 .collect::<Vec<_>>();
857 Ok(proto::JoinChannelResponse {
858 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
859 messages,
860 })
861 }
862
863 async fn leave_channel(
864 mut self: Arc<Self>,
865 request: TypedEnvelope<proto::LeaveChannel>,
866 ) -> tide::Result<()> {
867 let user_id = self.state().user_id_for_connection(request.sender_id)?;
868 let channel_id = ChannelId::from_proto(request.payload.channel_id);
869 if !self
870 .app_state
871 .db
872 .can_user_access_channel(user_id, channel_id)
873 .await?
874 {
875 Err(anyhow!("access denied"))?;
876 }
877
878 self.state_mut()
879 .leave_channel(request.sender_id, channel_id);
880
881 Ok(())
882 }
883
884 async fn send_channel_message(
885 self: Arc<Self>,
886 request: TypedEnvelope<proto::SendChannelMessage>,
887 ) -> tide::Result<proto::SendChannelMessageResponse> {
888 let channel_id = ChannelId::from_proto(request.payload.channel_id);
889 let user_id;
890 let connection_ids;
891 {
892 let state = self.state();
893 user_id = state.user_id_for_connection(request.sender_id)?;
894 connection_ids = state.channel_connection_ids(channel_id)?;
895 }
896
897 // Validate the message body.
898 let body = request.payload.body.trim().to_string();
899 if body.len() > MAX_MESSAGE_LEN {
900 return Err(anyhow!("message is too long"))?;
901 }
902 if body.is_empty() {
903 return Err(anyhow!("message can't be blank"))?;
904 }
905
906 let timestamp = OffsetDateTime::now_utc();
907 let nonce = request
908 .payload
909 .nonce
910 .ok_or_else(|| anyhow!("nonce can't be blank"))?;
911
912 let message_id = self
913 .app_state
914 .db
915 .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
916 .await?
917 .to_proto();
918 let message = proto::ChannelMessage {
919 sender_id: user_id.to_proto(),
920 id: message_id,
921 body,
922 timestamp: timestamp.unix_timestamp() as u64,
923 nonce: Some(nonce),
924 };
925 broadcast(request.sender_id, connection_ids, |conn_id| {
926 self.peer.send(
927 conn_id,
928 proto::ChannelMessageSent {
929 channel_id: channel_id.to_proto(),
930 message: Some(message.clone()),
931 },
932 )
933 })?;
934 Ok(proto::SendChannelMessageResponse {
935 message: Some(message),
936 })
937 }
938
939 async fn get_channel_messages(
940 self: Arc<Self>,
941 request: TypedEnvelope<proto::GetChannelMessages>,
942 ) -> tide::Result<proto::GetChannelMessagesResponse> {
943 let user_id = self.state().user_id_for_connection(request.sender_id)?;
944 let channel_id = ChannelId::from_proto(request.payload.channel_id);
945 if !self
946 .app_state
947 .db
948 .can_user_access_channel(user_id, channel_id)
949 .await?
950 {
951 Err(anyhow!("access denied"))?;
952 }
953
954 let messages = self
955 .app_state
956 .db
957 .get_channel_messages(
958 channel_id,
959 MESSAGE_COUNT_PER_PAGE,
960 Some(MessageId::from_proto(request.payload.before_message_id)),
961 )
962 .await?
963 .into_iter()
964 .map(|msg| proto::ChannelMessage {
965 id: msg.id.to_proto(),
966 body: msg.body,
967 timestamp: msg.sent_at.unix_timestamp() as u64,
968 sender_id: msg.sender_id.to_proto(),
969 nonce: Some(msg.nonce.as_u128().into()),
970 })
971 .collect::<Vec<_>>();
972
973 Ok(proto::GetChannelMessagesResponse {
974 done: messages.len() < MESSAGE_COUNT_PER_PAGE,
975 messages,
976 })
977 }
978
979 fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
980 self.store.read()
981 }
982
983 fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
984 self.store.write()
985 }
986}
987
988impl Executor for RealExecutor {
989 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
990 task::spawn(future);
991 }
992}
993
994fn broadcast<F>(
995 sender_id: ConnectionId,
996 receiver_ids: Vec<ConnectionId>,
997 mut f: F,
998) -> anyhow::Result<()>
999where
1000 F: FnMut(ConnectionId) -> anyhow::Result<()>,
1001{
1002 let mut result = Ok(());
1003 for receiver_id in receiver_ids {
1004 if receiver_id != sender_id {
1005 if let Err(error) = f(receiver_id) {
1006 if result.is_ok() {
1007 result = Err(error);
1008 }
1009 }
1010 }
1011 }
1012 result
1013}
1014
1015pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1016 let server = Server::new(app.state().clone(), rpc.clone(), None);
1017 app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1018 let server = server.clone();
1019 async move {
1020 const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1021
1022 let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1023 let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1024 let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1025 let client_protocol_version: Option<u32> = request
1026 .header("X-Zed-Protocol-Version")
1027 .and_then(|v| v.as_str().parse().ok());
1028
1029 if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1030 return Ok(Response::new(StatusCode::UpgradeRequired));
1031 }
1032
1033 let header = match request.header("Sec-Websocket-Key") {
1034 Some(h) => h.as_str(),
1035 None => return Err(anyhow!("expected sec-websocket-key"))?,
1036 };
1037
1038 let user_id = process_auth_header(&request).await?;
1039
1040 let mut response = Response::new(StatusCode::SwitchingProtocols);
1041 response.insert_header(UPGRADE, "websocket");
1042 response.insert_header(CONNECTION, "Upgrade");
1043 let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1044 response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1045 response.insert_header("Sec-Websocket-Version", "13");
1046
1047 let http_res: &mut tide::http::Response = response.as_mut();
1048 let upgrade_receiver = http_res.recv_upgrade().await;
1049 let addr = request.remote().unwrap_or("unknown").to_string();
1050 task::spawn(async move {
1051 if let Some(stream) = upgrade_receiver.await {
1052 server
1053 .handle_connection(
1054 Connection::new(
1055 WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1056 ),
1057 addr,
1058 user_id,
1059 None,
1060 RealExecutor,
1061 )
1062 .await;
1063 }
1064 });
1065
1066 Ok(response)
1067 }
1068 });
1069}
1070
1071fn header_contains_ignore_case<T>(
1072 request: &tide::Request<T>,
1073 header_name: HeaderName,
1074 value: &str,
1075) -> bool {
1076 request
1077 .header(header_name)
1078 .map(|h| {
1079 h.as_str()
1080 .split(',')
1081 .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1082 })
1083 .unwrap_or(false)
1084}
1085
1086#[cfg(test)]
1087mod tests {
1088 use super::*;
1089 use crate::{
1090 auth,
1091 db::{tests::TestDb, UserId},
1092 github, AppState, Config,
1093 };
1094 use ::rpc::Peer;
1095 use collections::BTreeMap;
1096 use gpui::{executor, ModelHandle, TestAppContext};
1097 use parking_lot::Mutex;
1098 use postage::{mpsc, watch};
1099 use rand::prelude::*;
1100 use rpc::PeerId;
1101 use serde_json::json;
1102 use sqlx::types::time::OffsetDateTime;
1103 use std::{
1104 cell::{Cell, RefCell},
1105 env,
1106 ops::Deref,
1107 path::Path,
1108 rc::Rc,
1109 sync::{
1110 atomic::{AtomicBool, Ordering::SeqCst},
1111 Arc,
1112 },
1113 time::Duration,
1114 };
1115 use zed::{
1116 client::{
1117 self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1118 EstablishConnectionError, UserStore,
1119 },
1120 editor::{
1121 self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1122 Redo, ToggleCodeActions, Undo,
1123 },
1124 fs::{FakeFs, Fs as _},
1125 language::{
1126 tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1127 LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1128 },
1129 lsp,
1130 project::{DiagnosticSummary, Project, ProjectPath},
1131 workspace::{Workspace, WorkspaceParams},
1132 };
1133
1134 #[cfg(test)]
1135 #[ctor::ctor]
1136 fn init_logger() {
1137 if std::env::var("RUST_LOG").is_ok() {
1138 env_logger::init();
1139 }
1140 }
1141
1142 #[gpui::test(iterations = 10)]
1143 async fn test_share_project(
1144 mut cx_a: TestAppContext,
1145 mut cx_b: TestAppContext,
1146 last_iteration: bool,
1147 ) {
1148 let (window_b, _) = cx_b.add_window(|_| EmptyView);
1149 let lang_registry = Arc::new(LanguageRegistry::new());
1150 let fs = Arc::new(FakeFs::new(cx_a.background()));
1151 cx_a.foreground().forbid_parking();
1152
1153 // Connect to a server as 2 clients.
1154 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1155 let client_a = server.create_client(&mut cx_a, "user_a").await;
1156 let client_b = server.create_client(&mut cx_b, "user_b").await;
1157
1158 // Share a project as client A
1159 fs.insert_tree(
1160 "/a",
1161 json!({
1162 ".zed.toml": r#"collaborators = ["user_b"]"#,
1163 "a.txt": "a-contents",
1164 "b.txt": "b-contents",
1165 }),
1166 )
1167 .await;
1168 let project_a = cx_a.update(|cx| {
1169 Project::local(
1170 client_a.clone(),
1171 client_a.user_store.clone(),
1172 lang_registry.clone(),
1173 fs.clone(),
1174 cx,
1175 )
1176 });
1177 let (worktree_a, _) = project_a
1178 .update(&mut cx_a, |p, cx| {
1179 p.find_or_create_local_worktree("/a", false, cx)
1180 })
1181 .await
1182 .unwrap();
1183 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1184 worktree_a
1185 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1186 .await;
1187 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1188 project_a
1189 .update(&mut cx_a, |p, cx| p.share(cx))
1190 .await
1191 .unwrap();
1192
1193 // Join that project as client B
1194 let project_b = Project::remote(
1195 project_id,
1196 client_b.clone(),
1197 client_b.user_store.clone(),
1198 lang_registry.clone(),
1199 fs.clone(),
1200 &mut cx_b.to_async(),
1201 )
1202 .await
1203 .unwrap();
1204
1205 let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1206 assert_eq!(
1207 project
1208 .collaborators()
1209 .get(&client_a.peer_id)
1210 .unwrap()
1211 .user
1212 .github_login,
1213 "user_a"
1214 );
1215 project.replica_id()
1216 });
1217 project_a
1218 .condition(&cx_a, |tree, _| {
1219 tree.collaborators()
1220 .get(&client_b.peer_id)
1221 .map_or(false, |collaborator| {
1222 collaborator.replica_id == replica_id_b
1223 && collaborator.user.github_login == "user_b"
1224 })
1225 })
1226 .await;
1227
1228 // Open the same file as client B and client A.
1229 let buffer_b = project_b
1230 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1231 .await
1232 .unwrap();
1233 let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1234 buffer_b.read_with(&cx_b, |buf, cx| {
1235 assert_eq!(buf.read(cx).text(), "b-contents")
1236 });
1237 project_a.read_with(&cx_a, |project, cx| {
1238 assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1239 });
1240 let buffer_a = project_a
1241 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1242 .await
1243 .unwrap();
1244
1245 let editor_b = cx_b.add_view(window_b, |cx| {
1246 Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1247 });
1248
1249 // TODO
1250 // // Create a selection set as client B and see that selection set as client A.
1251 // buffer_a
1252 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1253 // .await;
1254
1255 // Edit the buffer as client B and see that edit as client A.
1256 editor_b.update(&mut cx_b, |editor, cx| {
1257 editor.handle_input(&Input("ok, ".into()), cx)
1258 });
1259 buffer_a
1260 .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1261 .await;
1262
1263 // TODO
1264 // // Remove the selection set as client B, see those selections disappear as client A.
1265 cx_b.update(move |_| drop(editor_b));
1266 // buffer_a
1267 // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1268 // .await;
1269
1270 // Close the buffer as client A, see that the buffer is closed.
1271 cx_a.update(move |_| drop(buffer_a));
1272 project_a
1273 .condition(&cx_a, |project, cx| {
1274 !project.has_open_buffer((worktree_id, "b.txt"), cx)
1275 })
1276 .await;
1277
1278 // Dropping the client B's project removes client B from client A's collaborators.
1279 cx_b.update(move |_| drop(project_b));
1280 project_a
1281 .condition(&cx_a, |project, _| project.collaborators().is_empty())
1282 .await;
1283 }
1284
1285 #[gpui::test(iterations = 10)]
1286 async fn test_unshare_project(
1287 mut cx_a: TestAppContext,
1288 mut cx_b: TestAppContext,
1289 last_iteration: bool,
1290 ) {
1291 let lang_registry = Arc::new(LanguageRegistry::new());
1292 let fs = Arc::new(FakeFs::new(cx_a.background()));
1293 cx_a.foreground().forbid_parking();
1294
1295 // Connect to a server as 2 clients.
1296 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1297 let client_a = server.create_client(&mut cx_a, "user_a").await;
1298 let client_b = server.create_client(&mut cx_b, "user_b").await;
1299
1300 // Share a project as client A
1301 fs.insert_tree(
1302 "/a",
1303 json!({
1304 ".zed.toml": r#"collaborators = ["user_b"]"#,
1305 "a.txt": "a-contents",
1306 "b.txt": "b-contents",
1307 }),
1308 )
1309 .await;
1310 let project_a = cx_a.update(|cx| {
1311 Project::local(
1312 client_a.clone(),
1313 client_a.user_store.clone(),
1314 lang_registry.clone(),
1315 fs.clone(),
1316 cx,
1317 )
1318 });
1319 let (worktree_a, _) = project_a
1320 .update(&mut cx_a, |p, cx| {
1321 p.find_or_create_local_worktree("/a", false, cx)
1322 })
1323 .await
1324 .unwrap();
1325 worktree_a
1326 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1327 .await;
1328 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1329 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1330 project_a
1331 .update(&mut cx_a, |p, cx| p.share(cx))
1332 .await
1333 .unwrap();
1334 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1335
1336 // Join that project as client B
1337 let project_b = Project::remote(
1338 project_id,
1339 client_b.clone(),
1340 client_b.user_store.clone(),
1341 lang_registry.clone(),
1342 fs.clone(),
1343 &mut cx_b.to_async(),
1344 )
1345 .await
1346 .unwrap();
1347 project_b
1348 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1349 .await
1350 .unwrap();
1351
1352 // Unshare the project as client A
1353 project_a
1354 .update(&mut cx_a, |project, cx| project.unshare(cx))
1355 .await
1356 .unwrap();
1357 project_b
1358 .condition(&mut cx_b, |project, _| project.is_read_only())
1359 .await;
1360 assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1361 drop(project_b);
1362
1363 // Share the project again and ensure guests can still join.
1364 project_a
1365 .update(&mut cx_a, |project, cx| project.share(cx))
1366 .await
1367 .unwrap();
1368 assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1369
1370 let project_c = Project::remote(
1371 project_id,
1372 client_b.clone(),
1373 client_b.user_store.clone(),
1374 lang_registry.clone(),
1375 fs.clone(),
1376 &mut cx_b.to_async(),
1377 )
1378 .await
1379 .unwrap();
1380 project_c
1381 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1382 .await
1383 .unwrap();
1384 }
1385
1386 #[gpui::test(iterations = 10)]
1387 async fn test_propagate_saves_and_fs_changes(
1388 mut cx_a: TestAppContext,
1389 mut cx_b: TestAppContext,
1390 mut cx_c: TestAppContext,
1391 last_iteration: bool,
1392 ) {
1393 let lang_registry = Arc::new(LanguageRegistry::new());
1394 let fs = Arc::new(FakeFs::new(cx_a.background()));
1395 cx_a.foreground().forbid_parking();
1396
1397 // Connect to a server as 3 clients.
1398 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1399 let client_a = server.create_client(&mut cx_a, "user_a").await;
1400 let client_b = server.create_client(&mut cx_b, "user_b").await;
1401 let client_c = server.create_client(&mut cx_c, "user_c").await;
1402
1403 // Share a worktree as client A.
1404 fs.insert_tree(
1405 "/a",
1406 json!({
1407 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1408 "file1": "",
1409 "file2": ""
1410 }),
1411 )
1412 .await;
1413 let project_a = cx_a.update(|cx| {
1414 Project::local(
1415 client_a.clone(),
1416 client_a.user_store.clone(),
1417 lang_registry.clone(),
1418 fs.clone(),
1419 cx,
1420 )
1421 });
1422 let (worktree_a, _) = project_a
1423 .update(&mut cx_a, |p, cx| {
1424 p.find_or_create_local_worktree("/a", false, cx)
1425 })
1426 .await
1427 .unwrap();
1428 worktree_a
1429 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1430 .await;
1431 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1432 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1433 project_a
1434 .update(&mut cx_a, |p, cx| p.share(cx))
1435 .await
1436 .unwrap();
1437
1438 // Join that worktree as clients B and C.
1439 let project_b = Project::remote(
1440 project_id,
1441 client_b.clone(),
1442 client_b.user_store.clone(),
1443 lang_registry.clone(),
1444 fs.clone(),
1445 &mut cx_b.to_async(),
1446 )
1447 .await
1448 .unwrap();
1449 let project_c = Project::remote(
1450 project_id,
1451 client_c.clone(),
1452 client_c.user_store.clone(),
1453 lang_registry.clone(),
1454 fs.clone(),
1455 &mut cx_c.to_async(),
1456 )
1457 .await
1458 .unwrap();
1459 let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1460 let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1461
1462 // Open and edit a buffer as both guests B and C.
1463 let buffer_b = project_b
1464 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1465 .await
1466 .unwrap();
1467 let buffer_c = project_c
1468 .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1469 .await
1470 .unwrap();
1471 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1472 buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1473
1474 // Open and edit that buffer as the host.
1475 let buffer_a = project_a
1476 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1477 .await
1478 .unwrap();
1479
1480 buffer_a
1481 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1482 .await;
1483 buffer_a.update(&mut cx_a, |buf, cx| {
1484 buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1485 });
1486
1487 // Wait for edits to propagate
1488 buffer_a
1489 .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1490 .await;
1491 buffer_b
1492 .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1493 .await;
1494 buffer_c
1495 .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1496 .await;
1497
1498 // Edit the buffer as the host and concurrently save as guest B.
1499 let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1500 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1501 save_b.await.unwrap();
1502 assert_eq!(
1503 fs.load("/a/file1".as_ref()).await.unwrap(),
1504 "hi-a, i-am-c, i-am-b, i-am-a"
1505 );
1506 buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1507 buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1508 buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1509
1510 // Make changes on host's file system, see those changes on guest worktrees.
1511 fs.rename(
1512 "/a/file1".as_ref(),
1513 "/a/file1-renamed".as_ref(),
1514 Default::default(),
1515 )
1516 .await
1517 .unwrap();
1518
1519 fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1520 .await
1521 .unwrap();
1522 fs.insert_file(Path::new("/a/file4"), "4".into())
1523 .await
1524 .unwrap();
1525
1526 worktree_a
1527 .condition(&cx_a, |tree, _| {
1528 tree.paths()
1529 .map(|p| p.to_string_lossy())
1530 .collect::<Vec<_>>()
1531 == [".zed.toml", "file1-renamed", "file3", "file4"]
1532 })
1533 .await;
1534 worktree_b
1535 .condition(&cx_b, |tree, _| {
1536 tree.paths()
1537 .map(|p| p.to_string_lossy())
1538 .collect::<Vec<_>>()
1539 == [".zed.toml", "file1-renamed", "file3", "file4"]
1540 })
1541 .await;
1542 worktree_c
1543 .condition(&cx_c, |tree, _| {
1544 tree.paths()
1545 .map(|p| p.to_string_lossy())
1546 .collect::<Vec<_>>()
1547 == [".zed.toml", "file1-renamed", "file3", "file4"]
1548 })
1549 .await;
1550
1551 // Ensure buffer files are updated as well.
1552 buffer_a
1553 .condition(&cx_a, |buf, _| {
1554 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1555 })
1556 .await;
1557 buffer_b
1558 .condition(&cx_b, |buf, _| {
1559 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1560 })
1561 .await;
1562 buffer_c
1563 .condition(&cx_c, |buf, _| {
1564 buf.file().unwrap().path().to_str() == Some("file1-renamed")
1565 })
1566 .await;
1567 }
1568
1569 #[gpui::test(iterations = 10)]
1570 async fn test_buffer_conflict_after_save(
1571 mut cx_a: TestAppContext,
1572 mut cx_b: TestAppContext,
1573 last_iteration: bool,
1574 ) {
1575 cx_a.foreground().forbid_parking();
1576 let lang_registry = Arc::new(LanguageRegistry::new());
1577 let fs = Arc::new(FakeFs::new(cx_a.background()));
1578
1579 // Connect to a server as 2 clients.
1580 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1581 let client_a = server.create_client(&mut cx_a, "user_a").await;
1582 let client_b = server.create_client(&mut cx_b, "user_b").await;
1583
1584 // Share a project as client A
1585 fs.insert_tree(
1586 "/dir",
1587 json!({
1588 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1589 "a.txt": "a-contents",
1590 }),
1591 )
1592 .await;
1593
1594 let project_a = cx_a.update(|cx| {
1595 Project::local(
1596 client_a.clone(),
1597 client_a.user_store.clone(),
1598 lang_registry.clone(),
1599 fs.clone(),
1600 cx,
1601 )
1602 });
1603 let (worktree_a, _) = project_a
1604 .update(&mut cx_a, |p, cx| {
1605 p.find_or_create_local_worktree("/dir", false, cx)
1606 })
1607 .await
1608 .unwrap();
1609 worktree_a
1610 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1611 .await;
1612 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1613 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1614 project_a
1615 .update(&mut cx_a, |p, cx| p.share(cx))
1616 .await
1617 .unwrap();
1618
1619 // Join that project as client B
1620 let project_b = Project::remote(
1621 project_id,
1622 client_b.clone(),
1623 client_b.user_store.clone(),
1624 lang_registry.clone(),
1625 fs.clone(),
1626 &mut cx_b.to_async(),
1627 )
1628 .await
1629 .unwrap();
1630
1631 // Open a buffer as client B
1632 let buffer_b = project_b
1633 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1634 .await
1635 .unwrap();
1636
1637 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1638 buffer_b.read_with(&cx_b, |buf, _| {
1639 assert!(buf.is_dirty());
1640 assert!(!buf.has_conflict());
1641 });
1642
1643 buffer_b
1644 .update(&mut cx_b, |buf, cx| buf.save(cx))
1645 .await
1646 .unwrap();
1647 buffer_b
1648 .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1649 .await;
1650 buffer_b.read_with(&cx_b, |buf, _| {
1651 assert!(!buf.has_conflict());
1652 });
1653
1654 buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1655 buffer_b.read_with(&cx_b, |buf, _| {
1656 assert!(buf.is_dirty());
1657 assert!(!buf.has_conflict());
1658 });
1659 }
1660
1661 #[gpui::test(iterations = 10)]
1662 async fn test_buffer_reloading(
1663 mut cx_a: TestAppContext,
1664 mut cx_b: TestAppContext,
1665 last_iteration: bool,
1666 ) {
1667 cx_a.foreground().forbid_parking();
1668 let lang_registry = Arc::new(LanguageRegistry::new());
1669 let fs = Arc::new(FakeFs::new(cx_a.background()));
1670
1671 // Connect to a server as 2 clients.
1672 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1673 let client_a = server.create_client(&mut cx_a, "user_a").await;
1674 let client_b = server.create_client(&mut cx_b, "user_b").await;
1675
1676 // Share a project as client A
1677 fs.insert_tree(
1678 "/dir",
1679 json!({
1680 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1681 "a.txt": "a-contents",
1682 }),
1683 )
1684 .await;
1685
1686 let project_a = cx_a.update(|cx| {
1687 Project::local(
1688 client_a.clone(),
1689 client_a.user_store.clone(),
1690 lang_registry.clone(),
1691 fs.clone(),
1692 cx,
1693 )
1694 });
1695 let (worktree_a, _) = project_a
1696 .update(&mut cx_a, |p, cx| {
1697 p.find_or_create_local_worktree("/dir", false, cx)
1698 })
1699 .await
1700 .unwrap();
1701 worktree_a
1702 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1703 .await;
1704 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1705 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1706 project_a
1707 .update(&mut cx_a, |p, cx| p.share(cx))
1708 .await
1709 .unwrap();
1710
1711 // Join that project as client B
1712 let project_b = Project::remote(
1713 project_id,
1714 client_b.clone(),
1715 client_b.user_store.clone(),
1716 lang_registry.clone(),
1717 fs.clone(),
1718 &mut cx_b.to_async(),
1719 )
1720 .await
1721 .unwrap();
1722 let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1723
1724 // Open a buffer as client B
1725 let buffer_b = project_b
1726 .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1727 .await
1728 .unwrap();
1729 buffer_b.read_with(&cx_b, |buf, _| {
1730 assert!(!buf.is_dirty());
1731 assert!(!buf.has_conflict());
1732 });
1733
1734 fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1735 .await
1736 .unwrap();
1737 buffer_b
1738 .condition(&cx_b, |buf, _| {
1739 buf.text() == "new contents" && !buf.is_dirty()
1740 })
1741 .await;
1742 buffer_b.read_with(&cx_b, |buf, _| {
1743 assert!(!buf.has_conflict());
1744 });
1745 }
1746
1747 #[gpui::test(iterations = 10)]
1748 async fn test_editing_while_guest_opens_buffer(
1749 mut cx_a: TestAppContext,
1750 mut cx_b: TestAppContext,
1751 last_iteration: bool,
1752 ) {
1753 cx_a.foreground().forbid_parking();
1754 let lang_registry = Arc::new(LanguageRegistry::new());
1755 let fs = Arc::new(FakeFs::new(cx_a.background()));
1756
1757 // Connect to a server as 2 clients.
1758 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1759 let client_a = server.create_client(&mut cx_a, "user_a").await;
1760 let client_b = server.create_client(&mut cx_b, "user_b").await;
1761
1762 // Share a project as client A
1763 fs.insert_tree(
1764 "/dir",
1765 json!({
1766 ".zed.toml": r#"collaborators = ["user_b"]"#,
1767 "a.txt": "a-contents",
1768 }),
1769 )
1770 .await;
1771 let project_a = cx_a.update(|cx| {
1772 Project::local(
1773 client_a.clone(),
1774 client_a.user_store.clone(),
1775 lang_registry.clone(),
1776 fs.clone(),
1777 cx,
1778 )
1779 });
1780 let (worktree_a, _) = project_a
1781 .update(&mut cx_a, |p, cx| {
1782 p.find_or_create_local_worktree("/dir", false, cx)
1783 })
1784 .await
1785 .unwrap();
1786 worktree_a
1787 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1788 .await;
1789 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1790 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1791 project_a
1792 .update(&mut cx_a, |p, cx| p.share(cx))
1793 .await
1794 .unwrap();
1795
1796 // Join that project as client B
1797 let project_b = Project::remote(
1798 project_id,
1799 client_b.clone(),
1800 client_b.user_store.clone(),
1801 lang_registry.clone(),
1802 fs.clone(),
1803 &mut cx_b.to_async(),
1804 )
1805 .await
1806 .unwrap();
1807
1808 // Open a buffer as client A
1809 let buffer_a = project_a
1810 .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1811 .await
1812 .unwrap();
1813
1814 // Start opening the same buffer as client B
1815 let buffer_b = cx_b
1816 .background()
1817 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1818
1819 // Edit the buffer as client A while client B is still opening it.
1820 cx_b.background().simulate_random_delay().await;
1821 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1822 cx_b.background().simulate_random_delay().await;
1823 buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1824
1825 let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1826 let buffer_b = buffer_b.await.unwrap();
1827 buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1828 }
1829
1830 #[gpui::test(iterations = 10)]
1831 async fn test_leaving_worktree_while_opening_buffer(
1832 mut cx_a: TestAppContext,
1833 mut cx_b: TestAppContext,
1834 last_iteration: bool,
1835 ) {
1836 cx_a.foreground().forbid_parking();
1837 let lang_registry = Arc::new(LanguageRegistry::new());
1838 let fs = Arc::new(FakeFs::new(cx_a.background()));
1839
1840 // Connect to a server as 2 clients.
1841 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1842 let client_a = server.create_client(&mut cx_a, "user_a").await;
1843 let client_b = server.create_client(&mut cx_b, "user_b").await;
1844
1845 // Share a project as client A
1846 fs.insert_tree(
1847 "/dir",
1848 json!({
1849 ".zed.toml": r#"collaborators = ["user_b"]"#,
1850 "a.txt": "a-contents",
1851 }),
1852 )
1853 .await;
1854 let project_a = cx_a.update(|cx| {
1855 Project::local(
1856 client_a.clone(),
1857 client_a.user_store.clone(),
1858 lang_registry.clone(),
1859 fs.clone(),
1860 cx,
1861 )
1862 });
1863 let (worktree_a, _) = project_a
1864 .update(&mut cx_a, |p, cx| {
1865 p.find_or_create_local_worktree("/dir", false, cx)
1866 })
1867 .await
1868 .unwrap();
1869 worktree_a
1870 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1871 .await;
1872 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1873 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1874 project_a
1875 .update(&mut cx_a, |p, cx| p.share(cx))
1876 .await
1877 .unwrap();
1878
1879 // Join that project as client B
1880 let project_b = Project::remote(
1881 project_id,
1882 client_b.clone(),
1883 client_b.user_store.clone(),
1884 lang_registry.clone(),
1885 fs.clone(),
1886 &mut cx_b.to_async(),
1887 )
1888 .await
1889 .unwrap();
1890
1891 // See that a guest has joined as client A.
1892 project_a
1893 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1894 .await;
1895
1896 // Begin opening a buffer as client B, but leave the project before the open completes.
1897 let buffer_b = cx_b
1898 .background()
1899 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1900 cx_b.update(|_| drop(project_b));
1901 drop(buffer_b);
1902
1903 // See that the guest has left.
1904 project_a
1905 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1906 .await;
1907 }
1908
1909 #[gpui::test(iterations = 10)]
1910 async fn test_peer_disconnection(
1911 mut cx_a: TestAppContext,
1912 mut cx_b: TestAppContext,
1913 last_iteration: bool,
1914 ) {
1915 cx_a.foreground().forbid_parking();
1916 let lang_registry = Arc::new(LanguageRegistry::new());
1917 let fs = Arc::new(FakeFs::new(cx_a.background()));
1918
1919 // Connect to a server as 2 clients.
1920 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1921 let client_a = server.create_client(&mut cx_a, "user_a").await;
1922 let client_b = server.create_client(&mut cx_b, "user_b").await;
1923
1924 // Share a project as client A
1925 fs.insert_tree(
1926 "/a",
1927 json!({
1928 ".zed.toml": r#"collaborators = ["user_b"]"#,
1929 "a.txt": "a-contents",
1930 "b.txt": "b-contents",
1931 }),
1932 )
1933 .await;
1934 let project_a = cx_a.update(|cx| {
1935 Project::local(
1936 client_a.clone(),
1937 client_a.user_store.clone(),
1938 lang_registry.clone(),
1939 fs.clone(),
1940 cx,
1941 )
1942 });
1943 let (worktree_a, _) = project_a
1944 .update(&mut cx_a, |p, cx| {
1945 p.find_or_create_local_worktree("/a", false, cx)
1946 })
1947 .await
1948 .unwrap();
1949 worktree_a
1950 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1951 .await;
1952 let project_id = project_a
1953 .update(&mut cx_a, |project, _| project.next_remote_id())
1954 .await;
1955 project_a
1956 .update(&mut cx_a, |project, cx| project.share(cx))
1957 .await
1958 .unwrap();
1959
1960 // Join that project as client B
1961 let _project_b = Project::remote(
1962 project_id,
1963 client_b.clone(),
1964 client_b.user_store.clone(),
1965 lang_registry.clone(),
1966 fs.clone(),
1967 &mut cx_b.to_async(),
1968 )
1969 .await
1970 .unwrap();
1971
1972 // See that a guest has joined as client A.
1973 project_a
1974 .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1975 .await;
1976
1977 // Drop client B's connection and ensure client A observes client B leaving the worktree.
1978 client_b.disconnect(&cx_b.to_async()).unwrap();
1979 project_a
1980 .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1981 .await;
1982 }
1983
1984 #[gpui::test(iterations = 10)]
1985 async fn test_collaborating_with_diagnostics(
1986 mut cx_a: TestAppContext,
1987 mut cx_b: TestAppContext,
1988 last_iteration: bool,
1989 ) {
1990 cx_a.foreground().forbid_parking();
1991 let mut lang_registry = Arc::new(LanguageRegistry::new());
1992 let fs = Arc::new(FakeFs::new(cx_a.background()));
1993
1994 // Set up a fake language server.
1995 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1996 Arc::get_mut(&mut lang_registry)
1997 .unwrap()
1998 .add(Arc::new(Language::new(
1999 LanguageConfig {
2000 name: "Rust".to_string(),
2001 path_suffixes: vec!["rs".to_string()],
2002 language_server: Some(language_server_config),
2003 ..Default::default()
2004 },
2005 Some(tree_sitter_rust::language()),
2006 )));
2007
2008 // Connect to a server as 2 clients.
2009 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2010 let client_a = server.create_client(&mut cx_a, "user_a").await;
2011 let client_b = server.create_client(&mut cx_b, "user_b").await;
2012
2013 // Share a project as client A
2014 fs.insert_tree(
2015 "/a",
2016 json!({
2017 ".zed.toml": r#"collaborators = ["user_b"]"#,
2018 "a.rs": "let one = two",
2019 "other.rs": "",
2020 }),
2021 )
2022 .await;
2023 let project_a = cx_a.update(|cx| {
2024 Project::local(
2025 client_a.clone(),
2026 client_a.user_store.clone(),
2027 lang_registry.clone(),
2028 fs.clone(),
2029 cx,
2030 )
2031 });
2032 let (worktree_a, _) = project_a
2033 .update(&mut cx_a, |p, cx| {
2034 p.find_or_create_local_worktree("/a", false, cx)
2035 })
2036 .await
2037 .unwrap();
2038 worktree_a
2039 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2040 .await;
2041 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2042 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2043 project_a
2044 .update(&mut cx_a, |p, cx| p.share(cx))
2045 .await
2046 .unwrap();
2047
2048 // Cause the language server to start.
2049 let _ = cx_a
2050 .background()
2051 .spawn(project_a.update(&mut cx_a, |project, cx| {
2052 project.open_buffer(
2053 ProjectPath {
2054 worktree_id,
2055 path: Path::new("other.rs").into(),
2056 },
2057 cx,
2058 )
2059 }))
2060 .await
2061 .unwrap();
2062
2063 // Simulate a language server reporting errors for a file.
2064 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2065 fake_language_server
2066 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2067 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2068 version: None,
2069 diagnostics: vec![lsp::Diagnostic {
2070 severity: Some(lsp::DiagnosticSeverity::ERROR),
2071 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2072 message: "message 1".to_string(),
2073 ..Default::default()
2074 }],
2075 })
2076 .await;
2077
2078 // Wait for server to see the diagnostics update.
2079 server
2080 .condition(|store| {
2081 let worktree = store
2082 .project(project_id)
2083 .unwrap()
2084 .worktrees
2085 .get(&worktree_id.to_proto())
2086 .unwrap();
2087
2088 !worktree
2089 .share
2090 .as_ref()
2091 .unwrap()
2092 .diagnostic_summaries
2093 .is_empty()
2094 })
2095 .await;
2096
2097 // Join the worktree as client B.
2098 let project_b = Project::remote(
2099 project_id,
2100 client_b.clone(),
2101 client_b.user_store.clone(),
2102 lang_registry.clone(),
2103 fs.clone(),
2104 &mut cx_b.to_async(),
2105 )
2106 .await
2107 .unwrap();
2108
2109 project_b.read_with(&cx_b, |project, cx| {
2110 assert_eq!(
2111 project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2112 &[(
2113 ProjectPath {
2114 worktree_id,
2115 path: Arc::from(Path::new("a.rs")),
2116 },
2117 DiagnosticSummary {
2118 error_count: 1,
2119 warning_count: 0,
2120 ..Default::default()
2121 },
2122 )]
2123 )
2124 });
2125
2126 // Simulate a language server reporting more errors for a file.
2127 fake_language_server
2128 .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2129 uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2130 version: None,
2131 diagnostics: vec![
2132 lsp::Diagnostic {
2133 severity: Some(lsp::DiagnosticSeverity::ERROR),
2134 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2135 message: "message 1".to_string(),
2136 ..Default::default()
2137 },
2138 lsp::Diagnostic {
2139 severity: Some(lsp::DiagnosticSeverity::WARNING),
2140 range: lsp::Range::new(
2141 lsp::Position::new(0, 10),
2142 lsp::Position::new(0, 13),
2143 ),
2144 message: "message 2".to_string(),
2145 ..Default::default()
2146 },
2147 ],
2148 })
2149 .await;
2150
2151 // Client b gets the updated summaries
2152 project_b
2153 .condition(&cx_b, |project, cx| {
2154 project.diagnostic_summaries(cx).collect::<Vec<_>>()
2155 == &[(
2156 ProjectPath {
2157 worktree_id,
2158 path: Arc::from(Path::new("a.rs")),
2159 },
2160 DiagnosticSummary {
2161 error_count: 1,
2162 warning_count: 1,
2163 ..Default::default()
2164 },
2165 )]
2166 })
2167 .await;
2168
2169 // Open the file with the errors on client B. They should be present.
2170 let buffer_b = cx_b
2171 .background()
2172 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2173 .await
2174 .unwrap();
2175
2176 buffer_b.read_with(&cx_b, |buffer, _| {
2177 assert_eq!(
2178 buffer
2179 .snapshot()
2180 .diagnostics_in_range::<_, Point>(0..buffer.len())
2181 .map(|entry| entry)
2182 .collect::<Vec<_>>(),
2183 &[
2184 DiagnosticEntry {
2185 range: Point::new(0, 4)..Point::new(0, 7),
2186 diagnostic: Diagnostic {
2187 group_id: 0,
2188 message: "message 1".to_string(),
2189 severity: lsp::DiagnosticSeverity::ERROR,
2190 is_primary: true,
2191 ..Default::default()
2192 }
2193 },
2194 DiagnosticEntry {
2195 range: Point::new(0, 10)..Point::new(0, 13),
2196 diagnostic: Diagnostic {
2197 group_id: 1,
2198 severity: lsp::DiagnosticSeverity::WARNING,
2199 message: "message 2".to_string(),
2200 is_primary: true,
2201 ..Default::default()
2202 }
2203 }
2204 ]
2205 );
2206 });
2207 }
2208
2209 #[gpui::test(iterations = 10)]
2210 async fn test_collaborating_with_completion(
2211 mut cx_a: TestAppContext,
2212 mut cx_b: TestAppContext,
2213 last_iteration: bool,
2214 ) {
2215 cx_a.foreground().forbid_parking();
2216 let mut lang_registry = Arc::new(LanguageRegistry::new());
2217 let fs = Arc::new(FakeFs::new(cx_a.background()));
2218
2219 // Set up a fake language server.
2220 let (language_server_config, mut fake_language_servers) =
2221 LanguageServerConfig::fake_with_capabilities(lsp::ServerCapabilities {
2222 completion_provider: Some(lsp::CompletionOptions {
2223 trigger_characters: Some(vec![".".to_string()]),
2224 ..Default::default()
2225 }),
2226 ..Default::default()
2227 });
2228 Arc::get_mut(&mut lang_registry)
2229 .unwrap()
2230 .add(Arc::new(Language::new(
2231 LanguageConfig {
2232 name: "Rust".to_string(),
2233 path_suffixes: vec!["rs".to_string()],
2234 language_server: Some(language_server_config),
2235 ..Default::default()
2236 },
2237 Some(tree_sitter_rust::language()),
2238 )));
2239
2240 // Connect to a server as 2 clients.
2241 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2242 let client_a = server.create_client(&mut cx_a, "user_a").await;
2243 let client_b = server.create_client(&mut cx_b, "user_b").await;
2244
2245 // Share a project as client A
2246 fs.insert_tree(
2247 "/a",
2248 json!({
2249 ".zed.toml": r#"collaborators = ["user_b"]"#,
2250 "main.rs": "fn main() { a }",
2251 "other.rs": "",
2252 }),
2253 )
2254 .await;
2255 let project_a = cx_a.update(|cx| {
2256 Project::local(
2257 client_a.clone(),
2258 client_a.user_store.clone(),
2259 lang_registry.clone(),
2260 fs.clone(),
2261 cx,
2262 )
2263 });
2264 let (worktree_a, _) = project_a
2265 .update(&mut cx_a, |p, cx| {
2266 p.find_or_create_local_worktree("/a", false, cx)
2267 })
2268 .await
2269 .unwrap();
2270 worktree_a
2271 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2272 .await;
2273 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2274 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2275 project_a
2276 .update(&mut cx_a, |p, cx| p.share(cx))
2277 .await
2278 .unwrap();
2279
2280 // Join the worktree as client B.
2281 let project_b = Project::remote(
2282 project_id,
2283 client_b.clone(),
2284 client_b.user_store.clone(),
2285 lang_registry.clone(),
2286 fs.clone(),
2287 &mut cx_b.to_async(),
2288 )
2289 .await
2290 .unwrap();
2291
2292 // Open a file in an editor as the guest.
2293 let buffer_b = project_b
2294 .update(&mut cx_b, |p, cx| {
2295 p.open_buffer((worktree_id, "main.rs"), cx)
2296 })
2297 .await
2298 .unwrap();
2299 let (window_b, _) = cx_b.add_window(|_| EmptyView);
2300 let editor_b = cx_b.add_view(window_b, |cx| {
2301 Editor::for_buffer(
2302 cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2303 Arc::new(|cx| EditorSettings::test(cx)),
2304 Some(project_b.clone()),
2305 cx,
2306 )
2307 });
2308
2309 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2310 buffer_b
2311 .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2312 .await;
2313
2314 // Type a completion trigger character as the guest.
2315 editor_b.update(&mut cx_b, |editor, cx| {
2316 editor.select_ranges([13..13], None, cx);
2317 editor.handle_input(&Input(".".into()), cx);
2318 cx.focus(&editor_b);
2319 });
2320
2321 // Receive a completion request as the host's language server.
2322 // Return some completions from the host's language server.
2323 fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2324 assert_eq!(
2325 params.text_document_position.text_document.uri,
2326 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2327 );
2328 assert_eq!(
2329 params.text_document_position.position,
2330 lsp::Position::new(0, 14),
2331 );
2332
2333 Some(lsp::CompletionResponse::Array(vec![
2334 lsp::CompletionItem {
2335 label: "first_method(…)".into(),
2336 detail: Some("fn(&mut self, B) -> C".into()),
2337 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2338 new_text: "first_method($1)".to_string(),
2339 range: lsp::Range::new(
2340 lsp::Position::new(0, 14),
2341 lsp::Position::new(0, 14),
2342 ),
2343 })),
2344 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2345 ..Default::default()
2346 },
2347 lsp::CompletionItem {
2348 label: "second_method(…)".into(),
2349 detail: Some("fn(&mut self, C) -> D<E>".into()),
2350 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2351 new_text: "second_method()".to_string(),
2352 range: lsp::Range::new(
2353 lsp::Position::new(0, 14),
2354 lsp::Position::new(0, 14),
2355 ),
2356 })),
2357 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2358 ..Default::default()
2359 },
2360 ]))
2361 });
2362
2363 // Open the buffer on the host.
2364 let buffer_a = project_a
2365 .update(&mut cx_a, |p, cx| {
2366 p.open_buffer((worktree_id, "main.rs"), cx)
2367 })
2368 .await
2369 .unwrap();
2370 buffer_a
2371 .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2372 .await;
2373
2374 // Confirm a completion on the guest.
2375 editor_b
2376 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2377 .await;
2378 editor_b.update(&mut cx_b, |editor, cx| {
2379 editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2380 assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2381 });
2382
2383 // Return a resolved completion from the host's language server.
2384 // The resolved completion has an additional text edit.
2385 fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2386 assert_eq!(params.label, "first_method(…)");
2387 lsp::CompletionItem {
2388 label: "first_method(…)".into(),
2389 detail: Some("fn(&mut self, B) -> C".into()),
2390 text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2391 new_text: "first_method($1)".to_string(),
2392 range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2393 })),
2394 additional_text_edits: Some(vec![lsp::TextEdit {
2395 new_text: "use d::SomeTrait;\n".to_string(),
2396 range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2397 }]),
2398 insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2399 ..Default::default()
2400 }
2401 });
2402
2403 // The additional edit is applied.
2404 buffer_a
2405 .condition(&cx_a, |buffer, _| {
2406 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2407 })
2408 .await;
2409 buffer_b
2410 .condition(&cx_b, |buffer, _| {
2411 buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2412 })
2413 .await;
2414 }
2415
2416 #[gpui::test(iterations = 10)]
2417 async fn test_formatting_buffer(
2418 mut cx_a: TestAppContext,
2419 mut cx_b: TestAppContext,
2420 last_iteration: bool,
2421 ) {
2422 cx_a.foreground().forbid_parking();
2423 let mut lang_registry = Arc::new(LanguageRegistry::new());
2424 let fs = Arc::new(FakeFs::new(cx_a.background()));
2425
2426 // Set up a fake language server.
2427 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2428 Arc::get_mut(&mut lang_registry)
2429 .unwrap()
2430 .add(Arc::new(Language::new(
2431 LanguageConfig {
2432 name: "Rust".to_string(),
2433 path_suffixes: vec!["rs".to_string()],
2434 language_server: Some(language_server_config),
2435 ..Default::default()
2436 },
2437 Some(tree_sitter_rust::language()),
2438 )));
2439
2440 // Connect to a server as 2 clients.
2441 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2442 let client_a = server.create_client(&mut cx_a, "user_a").await;
2443 let client_b = server.create_client(&mut cx_b, "user_b").await;
2444
2445 // Share a project as client A
2446 fs.insert_tree(
2447 "/a",
2448 json!({
2449 ".zed.toml": r#"collaborators = ["user_b"]"#,
2450 "a.rs": "let one = two",
2451 }),
2452 )
2453 .await;
2454 let project_a = cx_a.update(|cx| {
2455 Project::local(
2456 client_a.clone(),
2457 client_a.user_store.clone(),
2458 lang_registry.clone(),
2459 fs.clone(),
2460 cx,
2461 )
2462 });
2463 let (worktree_a, _) = project_a
2464 .update(&mut cx_a, |p, cx| {
2465 p.find_or_create_local_worktree("/a", false, cx)
2466 })
2467 .await
2468 .unwrap();
2469 worktree_a
2470 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2471 .await;
2472 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2473 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2474 project_a
2475 .update(&mut cx_a, |p, cx| p.share(cx))
2476 .await
2477 .unwrap();
2478
2479 // Join the worktree as client B.
2480 let project_b = Project::remote(
2481 project_id,
2482 client_b.clone(),
2483 client_b.user_store.clone(),
2484 lang_registry.clone(),
2485 fs.clone(),
2486 &mut cx_b.to_async(),
2487 )
2488 .await
2489 .unwrap();
2490
2491 let buffer_b = cx_b
2492 .background()
2493 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2494 .await
2495 .unwrap();
2496
2497 let format = project_b.update(&mut cx_b, |project, cx| {
2498 project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2499 });
2500
2501 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2502 fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2503 Some(vec![
2504 lsp::TextEdit {
2505 range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2506 new_text: "h".to_string(),
2507 },
2508 lsp::TextEdit {
2509 range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2510 new_text: "y".to_string(),
2511 },
2512 ])
2513 });
2514
2515 format.await.unwrap();
2516 assert_eq!(
2517 buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2518 "let honey = two"
2519 );
2520 }
2521
2522 #[gpui::test(iterations = 10)]
2523 async fn test_definition(
2524 mut cx_a: TestAppContext,
2525 mut cx_b: TestAppContext,
2526 last_iteration: bool,
2527 ) {
2528 cx_a.foreground().forbid_parking();
2529 let mut lang_registry = Arc::new(LanguageRegistry::new());
2530 let fs = Arc::new(FakeFs::new(cx_a.background()));
2531 fs.insert_tree(
2532 "/root-1",
2533 json!({
2534 ".zed.toml": r#"collaborators = ["user_b"]"#,
2535 "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2536 }),
2537 )
2538 .await;
2539 fs.insert_tree(
2540 "/root-2",
2541 json!({
2542 "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2543 }),
2544 )
2545 .await;
2546
2547 // Set up a fake language server.
2548 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2549 Arc::get_mut(&mut lang_registry)
2550 .unwrap()
2551 .add(Arc::new(Language::new(
2552 LanguageConfig {
2553 name: "Rust".to_string(),
2554 path_suffixes: vec!["rs".to_string()],
2555 language_server: Some(language_server_config),
2556 ..Default::default()
2557 },
2558 Some(tree_sitter_rust::language()),
2559 )));
2560
2561 // Connect to a server as 2 clients.
2562 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2563 let client_a = server.create_client(&mut cx_a, "user_a").await;
2564 let client_b = server.create_client(&mut cx_b, "user_b").await;
2565
2566 // Share a project as client A
2567 let project_a = cx_a.update(|cx| {
2568 Project::local(
2569 client_a.clone(),
2570 client_a.user_store.clone(),
2571 lang_registry.clone(),
2572 fs.clone(),
2573 cx,
2574 )
2575 });
2576 let (worktree_a, _) = project_a
2577 .update(&mut cx_a, |p, cx| {
2578 p.find_or_create_local_worktree("/root-1", false, cx)
2579 })
2580 .await
2581 .unwrap();
2582 worktree_a
2583 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2584 .await;
2585 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2586 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2587 project_a
2588 .update(&mut cx_a, |p, cx| p.share(cx))
2589 .await
2590 .unwrap();
2591
2592 // Join the worktree as client B.
2593 let project_b = Project::remote(
2594 project_id,
2595 client_b.clone(),
2596 client_b.user_store.clone(),
2597 lang_registry.clone(),
2598 fs.clone(),
2599 &mut cx_b.to_async(),
2600 )
2601 .await
2602 .unwrap();
2603
2604 // Open the file on client B.
2605 let buffer_b = cx_b
2606 .background()
2607 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2608 .await
2609 .unwrap();
2610
2611 // Request the definition of a symbol as the guest.
2612 let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2613
2614 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2615 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2616 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2617 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2618 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2619 )))
2620 });
2621
2622 let definitions_1 = definitions_1.await.unwrap();
2623 cx_b.read(|cx| {
2624 assert_eq!(definitions_1.len(), 1);
2625 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2626 let target_buffer = definitions_1[0].target_buffer.read(cx);
2627 assert_eq!(
2628 target_buffer.text(),
2629 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2630 );
2631 assert_eq!(
2632 definitions_1[0].target_range.to_point(target_buffer),
2633 Point::new(0, 6)..Point::new(0, 9)
2634 );
2635 });
2636
2637 // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2638 // the previous call to `definition`.
2639 let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2640 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2641 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2642 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2643 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2644 )))
2645 });
2646
2647 let definitions_2 = definitions_2.await.unwrap();
2648 cx_b.read(|cx| {
2649 assert_eq!(definitions_2.len(), 1);
2650 assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2651 let target_buffer = definitions_2[0].target_buffer.read(cx);
2652 assert_eq!(
2653 target_buffer.text(),
2654 "const TWO: usize = 2;\nconst THREE: usize = 3;"
2655 );
2656 assert_eq!(
2657 definitions_2[0].target_range.to_point(target_buffer),
2658 Point::new(1, 6)..Point::new(1, 11)
2659 );
2660 });
2661 assert_eq!(
2662 definitions_1[0].target_buffer,
2663 definitions_2[0].target_buffer
2664 );
2665
2666 cx_b.update(|_| {
2667 drop(definitions_1);
2668 drop(definitions_2);
2669 });
2670 project_b
2671 .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2672 .await;
2673 }
2674
2675 #[gpui::test(iterations = 10)]
2676 async fn test_open_buffer_while_getting_definition_pointing_to_it(
2677 mut cx_a: TestAppContext,
2678 mut cx_b: TestAppContext,
2679 mut rng: StdRng,
2680 last_iteration: bool,
2681 ) {
2682 cx_a.foreground().forbid_parking();
2683 let mut lang_registry = Arc::new(LanguageRegistry::new());
2684 let fs = Arc::new(FakeFs::new(cx_a.background()));
2685 fs.insert_tree(
2686 "/root",
2687 json!({
2688 ".zed.toml": r#"collaborators = ["user_b"]"#,
2689 "a.rs": "const ONE: usize = b::TWO;",
2690 "b.rs": "const TWO: usize = 2",
2691 }),
2692 )
2693 .await;
2694
2695 // Set up a fake language server.
2696 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2697
2698 Arc::get_mut(&mut lang_registry)
2699 .unwrap()
2700 .add(Arc::new(Language::new(
2701 LanguageConfig {
2702 name: "Rust".to_string(),
2703 path_suffixes: vec!["rs".to_string()],
2704 language_server: Some(language_server_config),
2705 ..Default::default()
2706 },
2707 Some(tree_sitter_rust::language()),
2708 )));
2709
2710 // Connect to a server as 2 clients.
2711 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2712 let client_a = server.create_client(&mut cx_a, "user_a").await;
2713 let client_b = server.create_client(&mut cx_b, "user_b").await;
2714
2715 // Share a project as client A
2716 let project_a = cx_a.update(|cx| {
2717 Project::local(
2718 client_a.clone(),
2719 client_a.user_store.clone(),
2720 lang_registry.clone(),
2721 fs.clone(),
2722 cx,
2723 )
2724 });
2725
2726 let (worktree_a, _) = project_a
2727 .update(&mut cx_a, |p, cx| {
2728 p.find_or_create_local_worktree("/root", false, cx)
2729 })
2730 .await
2731 .unwrap();
2732 worktree_a
2733 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2734 .await;
2735 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2736 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2737 project_a
2738 .update(&mut cx_a, |p, cx| p.share(cx))
2739 .await
2740 .unwrap();
2741
2742 // Join the worktree as client B.
2743 let project_b = Project::remote(
2744 project_id,
2745 client_b.clone(),
2746 client_b.user_store.clone(),
2747 lang_registry.clone(),
2748 fs.clone(),
2749 &mut cx_b.to_async(),
2750 )
2751 .await
2752 .unwrap();
2753
2754 let buffer_b1 = cx_b
2755 .background()
2756 .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2757 .await
2758 .unwrap();
2759
2760 let definitions;
2761 let buffer_b2;
2762 if rng.gen() {
2763 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2764 buffer_b2 =
2765 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2766 } else {
2767 buffer_b2 =
2768 project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2769 definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2770 }
2771
2772 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2773 fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2774 Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2775 lsp::Url::from_file_path("/root/b.rs").unwrap(),
2776 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2777 )))
2778 });
2779
2780 let buffer_b2 = buffer_b2.await.unwrap();
2781 let definitions = definitions.await.unwrap();
2782 assert_eq!(definitions.len(), 1);
2783 assert_eq!(definitions[0].target_buffer, buffer_b2);
2784 }
2785
2786 #[gpui::test(iterations = 10)]
2787 async fn test_collaborating_with_code_actions(
2788 mut cx_a: TestAppContext,
2789 mut cx_b: TestAppContext,
2790 last_iteration: bool,
2791 ) {
2792 cx_a.foreground().forbid_parking();
2793 let mut lang_registry = Arc::new(LanguageRegistry::new());
2794 let fs = Arc::new(FakeFs::new(cx_a.background()));
2795 let mut path_openers_b = Vec::new();
2796 cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2797
2798 // Set up a fake language server.
2799 let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2800 Arc::get_mut(&mut lang_registry)
2801 .unwrap()
2802 .add(Arc::new(Language::new(
2803 LanguageConfig {
2804 name: "Rust".to_string(),
2805 path_suffixes: vec!["rs".to_string()],
2806 language_server: Some(language_server_config),
2807 ..Default::default()
2808 },
2809 Some(tree_sitter_rust::language()),
2810 )));
2811
2812 // Connect to a server as 2 clients.
2813 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2814 let client_a = server.create_client(&mut cx_a, "user_a").await;
2815 let client_b = server.create_client(&mut cx_b, "user_b").await;
2816
2817 // Share a project as client A
2818 fs.insert_tree(
2819 "/a",
2820 json!({
2821 ".zed.toml": r#"collaborators = ["user_b"]"#,
2822 "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2823 "other.rs": "pub fn foo() -> usize { 4 }",
2824 }),
2825 )
2826 .await;
2827 let project_a = cx_a.update(|cx| {
2828 Project::local(
2829 client_a.clone(),
2830 client_a.user_store.clone(),
2831 lang_registry.clone(),
2832 fs.clone(),
2833 cx,
2834 )
2835 });
2836 let (worktree_a, _) = project_a
2837 .update(&mut cx_a, |p, cx| {
2838 p.find_or_create_local_worktree("/a", false, cx)
2839 })
2840 .await
2841 .unwrap();
2842 worktree_a
2843 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2844 .await;
2845 let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2846 let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2847 project_a
2848 .update(&mut cx_a, |p, cx| p.share(cx))
2849 .await
2850 .unwrap();
2851
2852 // Join the worktree as client B.
2853 let project_b = Project::remote(
2854 project_id,
2855 client_b.clone(),
2856 client_b.user_store.clone(),
2857 lang_registry.clone(),
2858 fs.clone(),
2859 &mut cx_b.to_async(),
2860 )
2861 .await
2862 .unwrap();
2863 let mut params = cx_b.update(WorkspaceParams::test);
2864 params.languages = lang_registry.clone();
2865 params.client = client_b.client.clone();
2866 params.user_store = client_b.user_store.clone();
2867 params.project = project_b;
2868 params.path_openers = path_openers_b.into();
2869
2870 let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(¶ms, cx));
2871 let editor_b = workspace_b
2872 .update(&mut cx_b, |workspace, cx| {
2873 workspace.open_path((worktree_id, "main.rs").into(), cx)
2874 })
2875 .await
2876 .unwrap()
2877 .downcast::<Editor>()
2878 .unwrap();
2879
2880 let mut fake_language_server = fake_language_servers.next().await.unwrap();
2881 fake_language_server
2882 .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2883 assert_eq!(
2884 params.text_document.uri,
2885 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2886 );
2887 assert_eq!(params.range.start, lsp::Position::new(0, 0));
2888 assert_eq!(params.range.end, lsp::Position::new(0, 0));
2889 None
2890 })
2891 .next()
2892 .await;
2893
2894 // Move cursor to a location that contains code actions.
2895 editor_b.update(&mut cx_b, |editor, cx| {
2896 editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2897 cx.focus(&editor_b);
2898 });
2899 fake_language_server.handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2900 assert_eq!(
2901 params.text_document.uri,
2902 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2903 );
2904 assert_eq!(params.range.start, lsp::Position::new(1, 31));
2905 assert_eq!(params.range.end, lsp::Position::new(1, 31));
2906
2907 Some(vec![lsp::CodeActionOrCommand::CodeAction(
2908 lsp::CodeAction {
2909 title: "Inline into all callers".to_string(),
2910 edit: Some(lsp::WorkspaceEdit {
2911 changes: Some(
2912 [
2913 (
2914 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2915 vec![lsp::TextEdit::new(
2916 lsp::Range::new(
2917 lsp::Position::new(1, 22),
2918 lsp::Position::new(1, 34),
2919 ),
2920 "4".to_string(),
2921 )],
2922 ),
2923 (
2924 lsp::Url::from_file_path("/a/other.rs").unwrap(),
2925 vec![lsp::TextEdit::new(
2926 lsp::Range::new(
2927 lsp::Position::new(0, 0),
2928 lsp::Position::new(0, 27),
2929 ),
2930 "".to_string(),
2931 )],
2932 ),
2933 ]
2934 .into_iter()
2935 .collect(),
2936 ),
2937 ..Default::default()
2938 }),
2939 data: Some(json!({
2940 "codeActionParams": {
2941 "range": {
2942 "start": {"line": 1, "column": 31},
2943 "end": {"line": 1, "column": 31},
2944 }
2945 }
2946 })),
2947 ..Default::default()
2948 },
2949 )])
2950 });
2951
2952 // Toggle code actions and wait for them to display.
2953 editor_b.update(&mut cx_b, |editor, cx| {
2954 editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2955 });
2956 editor_b
2957 .condition(&cx_b, |editor, _| editor.context_menu_visible())
2958 .await;
2959
2960 // Confirming the code action will trigger a resolve request.
2961 let confirm_action = workspace_b
2962 .update(&mut cx_b, |workspace, cx| {
2963 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2964 })
2965 .unwrap();
2966 fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2967 lsp::CodeAction {
2968 title: "Inline into all callers".to_string(),
2969 edit: Some(lsp::WorkspaceEdit {
2970 changes: Some(
2971 [
2972 (
2973 lsp::Url::from_file_path("/a/main.rs").unwrap(),
2974 vec![lsp::TextEdit::new(
2975 lsp::Range::new(
2976 lsp::Position::new(1, 22),
2977 lsp::Position::new(1, 34),
2978 ),
2979 "4".to_string(),
2980 )],
2981 ),
2982 (
2983 lsp::Url::from_file_path("/a/other.rs").unwrap(),
2984 vec![lsp::TextEdit::new(
2985 lsp::Range::new(
2986 lsp::Position::new(0, 0),
2987 lsp::Position::new(0, 27),
2988 ),
2989 "".to_string(),
2990 )],
2991 ),
2992 ]
2993 .into_iter()
2994 .collect(),
2995 ),
2996 ..Default::default()
2997 }),
2998 ..Default::default()
2999 }
3000 });
3001
3002 // After the action is confirmed, an editor containing both modified files is opened.
3003 confirm_action.await.unwrap();
3004 let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3005 workspace
3006 .active_item(cx)
3007 .unwrap()
3008 .downcast::<Editor>()
3009 .unwrap()
3010 });
3011 code_action_editor.update(&mut cx_b, |editor, cx| {
3012 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3013 editor.undo(&Undo, cx);
3014 assert_eq!(
3015 editor.text(cx),
3016 "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3017 );
3018 editor.redo(&Redo, cx);
3019 assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3020 });
3021 }
3022
3023 #[gpui::test(iterations = 10)]
3024 async fn test_basic_chat(
3025 mut cx_a: TestAppContext,
3026 mut cx_b: TestAppContext,
3027 last_iteration: bool,
3028 ) {
3029 cx_a.foreground().forbid_parking();
3030
3031 // Connect to a server as 2 clients.
3032 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3033 let client_a = server.create_client(&mut cx_a, "user_a").await;
3034 let client_b = server.create_client(&mut cx_b, "user_b").await;
3035
3036 // Create an org that includes these 2 users.
3037 let db = &server.app_state.db;
3038 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3039 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3040 .await
3041 .unwrap();
3042 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3043 .await
3044 .unwrap();
3045
3046 // Create a channel that includes all the users.
3047 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3048 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3049 .await
3050 .unwrap();
3051 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3052 .await
3053 .unwrap();
3054 db.create_channel_message(
3055 channel_id,
3056 client_b.current_user_id(&cx_b),
3057 "hello A, it's B.",
3058 OffsetDateTime::now_utc(),
3059 1,
3060 )
3061 .await
3062 .unwrap();
3063
3064 let channels_a = cx_a
3065 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3066 channels_a
3067 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3068 .await;
3069 channels_a.read_with(&cx_a, |list, _| {
3070 assert_eq!(
3071 list.available_channels().unwrap(),
3072 &[ChannelDetails {
3073 id: channel_id.to_proto(),
3074 name: "test-channel".to_string()
3075 }]
3076 )
3077 });
3078 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3079 this.get_channel(channel_id.to_proto(), cx).unwrap()
3080 });
3081 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3082 channel_a
3083 .condition(&cx_a, |channel, _| {
3084 channel_messages(channel)
3085 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3086 })
3087 .await;
3088
3089 let channels_b = cx_b
3090 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3091 channels_b
3092 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3093 .await;
3094 channels_b.read_with(&cx_b, |list, _| {
3095 assert_eq!(
3096 list.available_channels().unwrap(),
3097 &[ChannelDetails {
3098 id: channel_id.to_proto(),
3099 name: "test-channel".to_string()
3100 }]
3101 )
3102 });
3103
3104 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3105 this.get_channel(channel_id.to_proto(), cx).unwrap()
3106 });
3107 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3108 channel_b
3109 .condition(&cx_b, |channel, _| {
3110 channel_messages(channel)
3111 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3112 })
3113 .await;
3114
3115 channel_a
3116 .update(&mut cx_a, |channel, cx| {
3117 channel
3118 .send_message("oh, hi B.".to_string(), cx)
3119 .unwrap()
3120 .detach();
3121 let task = channel.send_message("sup".to_string(), cx).unwrap();
3122 assert_eq!(
3123 channel_messages(channel),
3124 &[
3125 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3126 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3127 ("user_a".to_string(), "sup".to_string(), true)
3128 ]
3129 );
3130 task
3131 })
3132 .await
3133 .unwrap();
3134
3135 channel_b
3136 .condition(&cx_b, |channel, _| {
3137 channel_messages(channel)
3138 == [
3139 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3140 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3141 ("user_a".to_string(), "sup".to_string(), false),
3142 ]
3143 })
3144 .await;
3145
3146 assert_eq!(
3147 server
3148 .state()
3149 .await
3150 .channel(channel_id)
3151 .unwrap()
3152 .connection_ids
3153 .len(),
3154 2
3155 );
3156 cx_b.update(|_| drop(channel_b));
3157 server
3158 .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3159 .await;
3160
3161 cx_a.update(|_| drop(channel_a));
3162 server
3163 .condition(|state| state.channel(channel_id).is_none())
3164 .await;
3165 }
3166
3167 #[gpui::test(iterations = 10)]
3168 async fn test_chat_message_validation(mut cx_a: TestAppContext, last_iteration: bool) {
3169 cx_a.foreground().forbid_parking();
3170
3171 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3172 let client_a = server.create_client(&mut cx_a, "user_a").await;
3173
3174 let db = &server.app_state.db;
3175 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3176 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3177 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3178 .await
3179 .unwrap();
3180 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3181 .await
3182 .unwrap();
3183
3184 let channels_a = cx_a
3185 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3186 channels_a
3187 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3188 .await;
3189 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3190 this.get_channel(channel_id.to_proto(), cx).unwrap()
3191 });
3192
3193 // Messages aren't allowed to be too long.
3194 channel_a
3195 .update(&mut cx_a, |channel, cx| {
3196 let long_body = "this is long.\n".repeat(1024);
3197 channel.send_message(long_body, cx).unwrap()
3198 })
3199 .await
3200 .unwrap_err();
3201
3202 // Messages aren't allowed to be blank.
3203 channel_a.update(&mut cx_a, |channel, cx| {
3204 channel.send_message(String::new(), cx).unwrap_err()
3205 });
3206
3207 // Leading and trailing whitespace are trimmed.
3208 channel_a
3209 .update(&mut cx_a, |channel, cx| {
3210 channel
3211 .send_message("\n surrounded by whitespace \n".to_string(), cx)
3212 .unwrap()
3213 })
3214 .await
3215 .unwrap();
3216 assert_eq!(
3217 db.get_channel_messages(channel_id, 10, None)
3218 .await
3219 .unwrap()
3220 .iter()
3221 .map(|m| &m.body)
3222 .collect::<Vec<_>>(),
3223 &["surrounded by whitespace"]
3224 );
3225 }
3226
3227 #[gpui::test(iterations = 10)]
3228 async fn test_chat_reconnection(
3229 mut cx_a: TestAppContext,
3230 mut cx_b: TestAppContext,
3231 last_iteration: bool,
3232 ) {
3233 cx_a.foreground().forbid_parking();
3234
3235 // Connect to a server as 2 clients.
3236 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3237 let client_a = server.create_client(&mut cx_a, "user_a").await;
3238 let client_b = server.create_client(&mut cx_b, "user_b").await;
3239 let mut status_b = client_b.status();
3240
3241 // Create an org that includes these 2 users.
3242 let db = &server.app_state.db;
3243 let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3244 db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3245 .await
3246 .unwrap();
3247 db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3248 .await
3249 .unwrap();
3250
3251 // Create a channel that includes all the users.
3252 let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3253 db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3254 .await
3255 .unwrap();
3256 db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3257 .await
3258 .unwrap();
3259 db.create_channel_message(
3260 channel_id,
3261 client_b.current_user_id(&cx_b),
3262 "hello A, it's B.",
3263 OffsetDateTime::now_utc(),
3264 2,
3265 )
3266 .await
3267 .unwrap();
3268
3269 let channels_a = cx_a
3270 .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3271 channels_a
3272 .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3273 .await;
3274
3275 channels_a.read_with(&cx_a, |list, _| {
3276 assert_eq!(
3277 list.available_channels().unwrap(),
3278 &[ChannelDetails {
3279 id: channel_id.to_proto(),
3280 name: "test-channel".to_string()
3281 }]
3282 )
3283 });
3284 let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3285 this.get_channel(channel_id.to_proto(), cx).unwrap()
3286 });
3287 channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3288 channel_a
3289 .condition(&cx_a, |channel, _| {
3290 channel_messages(channel)
3291 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3292 })
3293 .await;
3294
3295 let channels_b = cx_b
3296 .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3297 channels_b
3298 .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3299 .await;
3300 channels_b.read_with(&cx_b, |list, _| {
3301 assert_eq!(
3302 list.available_channels().unwrap(),
3303 &[ChannelDetails {
3304 id: channel_id.to_proto(),
3305 name: "test-channel".to_string()
3306 }]
3307 )
3308 });
3309
3310 let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3311 this.get_channel(channel_id.to_proto(), cx).unwrap()
3312 });
3313 channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3314 channel_b
3315 .condition(&cx_b, |channel, _| {
3316 channel_messages(channel)
3317 == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3318 })
3319 .await;
3320
3321 // Disconnect client B, ensuring we can still access its cached channel data.
3322 server.forbid_connections();
3323 server.disconnect_client(client_b.current_user_id(&cx_b));
3324 while !matches!(
3325 status_b.next().await,
3326 Some(client::Status::ReconnectionError { .. })
3327 ) {}
3328
3329 channels_b.read_with(&cx_b, |channels, _| {
3330 assert_eq!(
3331 channels.available_channels().unwrap(),
3332 [ChannelDetails {
3333 id: channel_id.to_proto(),
3334 name: "test-channel".to_string()
3335 }]
3336 )
3337 });
3338 channel_b.read_with(&cx_b, |channel, _| {
3339 assert_eq!(
3340 channel_messages(channel),
3341 [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3342 )
3343 });
3344
3345 // Send a message from client B while it is disconnected.
3346 channel_b
3347 .update(&mut cx_b, |channel, cx| {
3348 let task = channel
3349 .send_message("can you see this?".to_string(), cx)
3350 .unwrap();
3351 assert_eq!(
3352 channel_messages(channel),
3353 &[
3354 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3355 ("user_b".to_string(), "can you see this?".to_string(), true)
3356 ]
3357 );
3358 task
3359 })
3360 .await
3361 .unwrap_err();
3362
3363 // Send a message from client A while B is disconnected.
3364 channel_a
3365 .update(&mut cx_a, |channel, cx| {
3366 channel
3367 .send_message("oh, hi B.".to_string(), cx)
3368 .unwrap()
3369 .detach();
3370 let task = channel.send_message("sup".to_string(), cx).unwrap();
3371 assert_eq!(
3372 channel_messages(channel),
3373 &[
3374 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3375 ("user_a".to_string(), "oh, hi B.".to_string(), true),
3376 ("user_a".to_string(), "sup".to_string(), true)
3377 ]
3378 );
3379 task
3380 })
3381 .await
3382 .unwrap();
3383
3384 // Give client B a chance to reconnect.
3385 server.allow_connections();
3386 cx_b.foreground().advance_clock(Duration::from_secs(10));
3387
3388 // Verify that B sees the new messages upon reconnection, as well as the message client B
3389 // sent while offline.
3390 channel_b
3391 .condition(&cx_b, |channel, _| {
3392 channel_messages(channel)
3393 == [
3394 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3395 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3396 ("user_a".to_string(), "sup".to_string(), false),
3397 ("user_b".to_string(), "can you see this?".to_string(), false),
3398 ]
3399 })
3400 .await;
3401
3402 // Ensure client A and B can communicate normally after reconnection.
3403 channel_a
3404 .update(&mut cx_a, |channel, cx| {
3405 channel.send_message("you online?".to_string(), cx).unwrap()
3406 })
3407 .await
3408 .unwrap();
3409 channel_b
3410 .condition(&cx_b, |channel, _| {
3411 channel_messages(channel)
3412 == [
3413 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3414 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3415 ("user_a".to_string(), "sup".to_string(), false),
3416 ("user_b".to_string(), "can you see this?".to_string(), false),
3417 ("user_a".to_string(), "you online?".to_string(), false),
3418 ]
3419 })
3420 .await;
3421
3422 channel_b
3423 .update(&mut cx_b, |channel, cx| {
3424 channel.send_message("yep".to_string(), cx).unwrap()
3425 })
3426 .await
3427 .unwrap();
3428 channel_a
3429 .condition(&cx_a, |channel, _| {
3430 channel_messages(channel)
3431 == [
3432 ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3433 ("user_a".to_string(), "oh, hi B.".to_string(), false),
3434 ("user_a".to_string(), "sup".to_string(), false),
3435 ("user_b".to_string(), "can you see this?".to_string(), false),
3436 ("user_a".to_string(), "you online?".to_string(), false),
3437 ("user_b".to_string(), "yep".to_string(), false),
3438 ]
3439 })
3440 .await;
3441 }
3442
3443 #[gpui::test(iterations = 10)]
3444 async fn test_contacts(
3445 mut cx_a: TestAppContext,
3446 mut cx_b: TestAppContext,
3447 mut cx_c: TestAppContext,
3448 last_iteration: bool,
3449 ) {
3450 cx_a.foreground().forbid_parking();
3451 let lang_registry = Arc::new(LanguageRegistry::new());
3452 let fs = Arc::new(FakeFs::new(cx_a.background()));
3453
3454 // Connect to a server as 3 clients.
3455 let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3456 let client_a = server.create_client(&mut cx_a, "user_a").await;
3457 let client_b = server.create_client(&mut cx_b, "user_b").await;
3458 let client_c = server.create_client(&mut cx_c, "user_c").await;
3459
3460 // Share a worktree as client A.
3461 fs.insert_tree(
3462 "/a",
3463 json!({
3464 ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3465 }),
3466 )
3467 .await;
3468
3469 let project_a = cx_a.update(|cx| {
3470 Project::local(
3471 client_a.clone(),
3472 client_a.user_store.clone(),
3473 lang_registry.clone(),
3474 fs.clone(),
3475 cx,
3476 )
3477 });
3478 let (worktree_a, _) = project_a
3479 .update(&mut cx_a, |p, cx| {
3480 p.find_or_create_local_worktree("/a", false, cx)
3481 })
3482 .await
3483 .unwrap();
3484 worktree_a
3485 .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3486 .await;
3487
3488 client_a
3489 .user_store
3490 .condition(&cx_a, |user_store, _| {
3491 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3492 })
3493 .await;
3494 client_b
3495 .user_store
3496 .condition(&cx_b, |user_store, _| {
3497 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3498 })
3499 .await;
3500 client_c
3501 .user_store
3502 .condition(&cx_c, |user_store, _| {
3503 contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3504 })
3505 .await;
3506
3507 let project_id = project_a
3508 .update(&mut cx_a, |project, _| project.next_remote_id())
3509 .await;
3510 project_a
3511 .update(&mut cx_a, |project, cx| project.share(cx))
3512 .await
3513 .unwrap();
3514
3515 let _project_b = Project::remote(
3516 project_id,
3517 client_b.clone(),
3518 client_b.user_store.clone(),
3519 lang_registry.clone(),
3520 fs.clone(),
3521 &mut cx_b.to_async(),
3522 )
3523 .await
3524 .unwrap();
3525
3526 client_a
3527 .user_store
3528 .condition(&cx_a, |user_store, _| {
3529 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3530 })
3531 .await;
3532 client_b
3533 .user_store
3534 .condition(&cx_b, |user_store, _| {
3535 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3536 })
3537 .await;
3538 client_c
3539 .user_store
3540 .condition(&cx_c, |user_store, _| {
3541 contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3542 })
3543 .await;
3544
3545 project_a
3546 .condition(&cx_a, |project, _| {
3547 project.collaborators().contains_key(&client_b.peer_id)
3548 })
3549 .await;
3550
3551 cx_a.update(move |_| drop(project_a));
3552 client_a
3553 .user_store
3554 .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3555 .await;
3556 client_b
3557 .user_store
3558 .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3559 .await;
3560 client_c
3561 .user_store
3562 .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3563 .await;
3564
3565 fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3566 user_store
3567 .contacts()
3568 .iter()
3569 .map(|contact| {
3570 let worktrees = contact
3571 .projects
3572 .iter()
3573 .map(|p| {
3574 (
3575 p.worktree_root_names[0].as_str(),
3576 p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3577 )
3578 })
3579 .collect();
3580 (contact.user.github_login.as_str(), worktrees)
3581 })
3582 .collect()
3583 }
3584 }
3585
3586 #[gpui::test(iterations = 100)]
3587 async fn test_random_collaboration(cx: TestAppContext, rng: StdRng, last_iteration: bool) {
3588 cx.foreground().forbid_parking();
3589 let max_peers = env::var("MAX_PEERS")
3590 .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3591 .unwrap_or(5);
3592 let max_operations = env::var("OPERATIONS")
3593 .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3594 .unwrap_or(10);
3595
3596 let rng = Rc::new(RefCell::new(rng));
3597 let lang_registry = Arc::new(LanguageRegistry::new());
3598 let fs = Arc::new(FakeFs::new(cx.background()));
3599 fs.insert_tree(
3600 "/_collab",
3601 json!({
3602 ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3603 }),
3604 )
3605 .await;
3606
3607 let operations = Rc::new(Cell::new(0));
3608 let mut server = TestServer::start(cx.foreground(), last_iteration).await;
3609 let mut clients = Vec::new();
3610
3611 let mut next_entity_id = 100000;
3612 let mut host_cx = TestAppContext::new(
3613 cx.foreground_platform(),
3614 cx.platform(),
3615 cx.foreground(),
3616 cx.background(),
3617 cx.font_cache(),
3618 next_entity_id,
3619 );
3620 let host = server.create_client(&mut host_cx, "host").await;
3621 let host_project = host_cx.update(|cx| {
3622 Project::local(
3623 host.client.clone(),
3624 host.user_store.clone(),
3625 lang_registry.clone(),
3626 fs.clone(),
3627 cx,
3628 )
3629 });
3630 let host_project_id = host_project
3631 .update(&mut host_cx, |p, _| p.next_remote_id())
3632 .await;
3633
3634 let (collab_worktree, _) = host_project
3635 .update(&mut host_cx, |project, cx| {
3636 project.find_or_create_local_worktree("/_collab", false, cx)
3637 })
3638 .await
3639 .unwrap();
3640 collab_worktree
3641 .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3642 .await;
3643 host_project
3644 .update(&mut host_cx, |project, cx| project.share(cx))
3645 .await
3646 .unwrap();
3647
3648 clients.push(cx.foreground().spawn(host.simulate_host(
3649 host_project.clone(),
3650 operations.clone(),
3651 max_operations,
3652 rng.clone(),
3653 host_cx.clone(),
3654 )));
3655
3656 while operations.get() < max_operations {
3657 cx.background().simulate_random_delay().await;
3658 if clients.len() < max_peers && rng.borrow_mut().gen_bool(0.05) {
3659 operations.set(operations.get() + 1);
3660
3661 let guest_id = clients.len();
3662 log::info!("Adding guest {}", guest_id);
3663 next_entity_id += 100000;
3664 let mut guest_cx = TestAppContext::new(
3665 cx.foreground_platform(),
3666 cx.platform(),
3667 cx.foreground(),
3668 cx.background(),
3669 cx.font_cache(),
3670 next_entity_id,
3671 );
3672 let guest = server
3673 .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3674 .await;
3675 let guest_project = Project::remote(
3676 host_project_id,
3677 guest.client.clone(),
3678 guest.user_store.clone(),
3679 lang_registry.clone(),
3680 fs.clone(),
3681 &mut guest_cx.to_async(),
3682 )
3683 .await
3684 .unwrap();
3685 clients.push(cx.foreground().spawn(guest.simulate_guest(
3686 guest_id,
3687 guest_project,
3688 operations.clone(),
3689 max_operations,
3690 rng.clone(),
3691 guest_cx,
3692 )));
3693
3694 log::info!("Guest {} added", guest_id);
3695 }
3696 }
3697
3698 let clients = futures::future::join_all(clients).await;
3699 cx.foreground().run_until_parked();
3700
3701 let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
3702 project
3703 .worktrees(cx)
3704 .map(|worktree| {
3705 let snapshot = worktree.read(cx).snapshot();
3706 (snapshot.id(), snapshot)
3707 })
3708 .collect::<BTreeMap<_, _>>()
3709 });
3710
3711 for (guest_client, guest_cx) in clients.iter().skip(1) {
3712 let guest_id = guest_client.client.id();
3713 let worktree_snapshots =
3714 guest_client
3715 .project
3716 .as_ref()
3717 .unwrap()
3718 .read_with(guest_cx, |project, cx| {
3719 project
3720 .worktrees(cx)
3721 .map(|worktree| {
3722 let snapshot = worktree.read(cx).snapshot();
3723 (snapshot.id(), snapshot)
3724 })
3725 .collect::<BTreeMap<_, _>>()
3726 });
3727
3728 assert_eq!(
3729 worktree_snapshots.keys().collect::<Vec<_>>(),
3730 host_worktree_snapshots.keys().collect::<Vec<_>>(),
3731 "guest {} has different worktrees than the host",
3732 guest_id
3733 );
3734 for (id, host_snapshot) in &host_worktree_snapshots {
3735 let guest_snapshot = &worktree_snapshots[id];
3736 assert_eq!(
3737 guest_snapshot.root_name(),
3738 host_snapshot.root_name(),
3739 "guest {} has different root name than the host for worktree {}",
3740 guest_id,
3741 id
3742 );
3743 assert_eq!(
3744 guest_snapshot.entries(false).collect::<Vec<_>>(),
3745 host_snapshot.entries(false).collect::<Vec<_>>(),
3746 "guest {} has different snapshot than the host for worktree {}",
3747 guest_id,
3748 id
3749 );
3750 }
3751
3752 guest_client
3753 .project
3754 .as_ref()
3755 .unwrap()
3756 .read_with(guest_cx, |project, _| {
3757 assert!(
3758 !project.has_buffered_operations(),
3759 "guest {} has buffered operations ",
3760 guest_id,
3761 );
3762 });
3763
3764 for guest_buffer in &guest_client.buffers {
3765 let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
3766 let host_buffer = host_project.read_with(&host_cx, |project, _| {
3767 project
3768 .shared_buffer(guest_client.peer_id, buffer_id)
3769 .expect(&format!(
3770 "host doest not have buffer for guest:{}, peer:{}, id:{}",
3771 guest_id, guest_client.peer_id, buffer_id
3772 ))
3773 });
3774 assert_eq!(
3775 guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
3776 host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
3777 "guest {} buffer {} differs from the host's buffer",
3778 guest_id,
3779 buffer_id,
3780 );
3781 }
3782 }
3783 }
3784
3785 struct TestServer {
3786 peer: Arc<Peer>,
3787 app_state: Arc<AppState>,
3788 server: Arc<Server>,
3789 foreground: Rc<executor::Foreground>,
3790 notifications: mpsc::Receiver<()>,
3791 connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3792 forbid_connections: Arc<AtomicBool>,
3793 _test_db: TestDb,
3794 }
3795
3796 impl TestServer {
3797 async fn start(foreground: Rc<executor::Foreground>, clean_db_pool_on_drop: bool) -> Self {
3798 let mut test_db = TestDb::new();
3799 test_db.set_clean_pool_on_drop(clean_db_pool_on_drop);
3800 let app_state = Self::build_app_state(&test_db).await;
3801 let peer = Peer::new();
3802 let notifications = mpsc::channel(128);
3803 let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3804 Self {
3805 peer,
3806 app_state,
3807 server,
3808 foreground,
3809 notifications: notifications.1,
3810 connection_killers: Default::default(),
3811 forbid_connections: Default::default(),
3812 _test_db: test_db,
3813 }
3814 }
3815
3816 async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3817 let http = FakeHttpClient::with_404_response();
3818 let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3819 let client_name = name.to_string();
3820 let mut client = Client::new(http.clone());
3821 let server = self.server.clone();
3822 let connection_killers = self.connection_killers.clone();
3823 let forbid_connections = self.forbid_connections.clone();
3824 let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3825
3826 Arc::get_mut(&mut client)
3827 .unwrap()
3828 .override_authenticate(move |cx| {
3829 cx.spawn(|_| async move {
3830 let access_token = "the-token".to_string();
3831 Ok(Credentials {
3832 user_id: user_id.0 as u64,
3833 access_token,
3834 })
3835 })
3836 })
3837 .override_establish_connection(move |credentials, cx| {
3838 assert_eq!(credentials.user_id, user_id.0 as u64);
3839 assert_eq!(credentials.access_token, "the-token");
3840
3841 let server = server.clone();
3842 let connection_killers = connection_killers.clone();
3843 let forbid_connections = forbid_connections.clone();
3844 let client_name = client_name.clone();
3845 let connection_id_tx = connection_id_tx.clone();
3846 cx.spawn(move |cx| async move {
3847 if forbid_connections.load(SeqCst) {
3848 Err(EstablishConnectionError::other(anyhow!(
3849 "server is forbidding connections"
3850 )))
3851 } else {
3852 let (client_conn, server_conn, kill_conn) =
3853 Connection::in_memory(cx.background());
3854 connection_killers.lock().insert(user_id, kill_conn);
3855 cx.background()
3856 .spawn(server.handle_connection(
3857 server_conn,
3858 client_name,
3859 user_id,
3860 Some(connection_id_tx),
3861 cx.background(),
3862 ))
3863 .detach();
3864 Ok(client_conn)
3865 }
3866 })
3867 });
3868
3869 client
3870 .authenticate_and_connect(&cx.to_async())
3871 .await
3872 .unwrap();
3873
3874 Channel::init(&client);
3875 Project::init(&client);
3876
3877 let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3878 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3879 let mut authed_user =
3880 user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3881 while authed_user.next().await.unwrap().is_none() {}
3882
3883 TestClient {
3884 client,
3885 peer_id,
3886 user_store,
3887 project: Default::default(),
3888 buffers: Default::default(),
3889 }
3890 }
3891
3892 fn disconnect_client(&self, user_id: UserId) {
3893 if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3894 let _ = kill_conn.try_send(Some(()));
3895 }
3896 }
3897
3898 fn forbid_connections(&self) {
3899 self.forbid_connections.store(true, SeqCst);
3900 }
3901
3902 fn allow_connections(&self) {
3903 self.forbid_connections.store(false, SeqCst);
3904 }
3905
3906 async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3907 let mut config = Config::default();
3908 config.session_secret = "a".repeat(32);
3909 config.database_url = test_db.url.clone();
3910 let github_client = github::AppClient::test();
3911 Arc::new(AppState {
3912 db: test_db.db().clone(),
3913 handlebars: Default::default(),
3914 auth_client: auth::build_client("", ""),
3915 repo_client: github::RepoClient::test(&github_client),
3916 github_client,
3917 config,
3918 })
3919 }
3920
3921 async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3922 self.server.store.read()
3923 }
3924
3925 async fn condition<F>(&mut self, mut predicate: F)
3926 where
3927 F: FnMut(&Store) -> bool,
3928 {
3929 async_std::future::timeout(Duration::from_millis(500), async {
3930 while !(predicate)(&*self.server.store.read()) {
3931 self.foreground.start_waiting();
3932 self.notifications.next().await;
3933 self.foreground.finish_waiting();
3934 }
3935 })
3936 .await
3937 .expect("condition timed out");
3938 }
3939 }
3940
3941 impl Drop for TestServer {
3942 fn drop(&mut self) {
3943 self.peer.reset();
3944 }
3945 }
3946
3947 struct TestClient {
3948 client: Arc<Client>,
3949 pub peer_id: PeerId,
3950 pub user_store: ModelHandle<UserStore>,
3951 project: Option<ModelHandle<Project>>,
3952 buffers: HashSet<ModelHandle<zed::language::Buffer>>,
3953 }
3954
3955 impl Deref for TestClient {
3956 type Target = Arc<Client>;
3957
3958 fn deref(&self) -> &Self::Target {
3959 &self.client
3960 }
3961 }
3962
3963 impl TestClient {
3964 pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3965 UserId::from_proto(
3966 self.user_store
3967 .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3968 )
3969 }
3970
3971 async fn simulate_host(
3972 mut self,
3973 project: ModelHandle<Project>,
3974 operations: Rc<Cell<usize>>,
3975 max_operations: usize,
3976 rng: Rc<RefCell<StdRng>>,
3977 mut cx: TestAppContext,
3978 ) -> (Self, TestAppContext) {
3979 let fs = project.read_with(&cx, |project, _| project.fs().clone());
3980 let mut files: Vec<PathBuf> = Default::default();
3981 while operations.get() < max_operations {
3982 operations.set(operations.get() + 1);
3983
3984 let distribution = rng.borrow_mut().gen_range(0..100);
3985 match distribution {
3986 0..=20 if !files.is_empty() => {
3987 let mut path = files.choose(&mut *rng.borrow_mut()).unwrap().as_path();
3988 while let Some(parent_path) = path.parent() {
3989 path = parent_path;
3990 if rng.borrow_mut().gen() {
3991 break;
3992 }
3993 }
3994
3995 log::info!("Host: find/create local worktree {:?}", path);
3996 project
3997 .update(&mut cx, |project, cx| {
3998 project.find_or_create_local_worktree(path, false, cx)
3999 })
4000 .await
4001 .unwrap();
4002 }
4003 10..=80 if !files.is_empty() => {
4004 let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4005 let file = files.choose(&mut *rng.borrow_mut()).unwrap();
4006 let (worktree, path) = project
4007 .update(&mut cx, |project, cx| {
4008 project.find_or_create_local_worktree(file, false, cx)
4009 })
4010 .await
4011 .unwrap();
4012 let project_path =
4013 worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4014 log::info!("Host: opening path {:?}", project_path);
4015 let buffer = project
4016 .update(&mut cx, |project, cx| {
4017 project.open_buffer(project_path, cx)
4018 })
4019 .await
4020 .unwrap();
4021 self.buffers.insert(buffer.clone());
4022 buffer
4023 } else {
4024 self.buffers
4025 .iter()
4026 .choose(&mut *rng.borrow_mut())
4027 .unwrap()
4028 .clone()
4029 };
4030
4031 if rng.borrow_mut().gen_bool(0.1) {
4032 cx.update(|cx| {
4033 log::info!(
4034 "Host: dropping buffer {:?}",
4035 buffer.read(cx).file().unwrap().full_path(cx)
4036 );
4037 self.buffers.remove(&buffer);
4038 drop(buffer);
4039 });
4040 } else {
4041 buffer.update(&mut cx, |buffer, cx| {
4042 log::info!(
4043 "Host: updating buffer {:?}",
4044 buffer.file().unwrap().full_path(cx)
4045 );
4046 buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4047 });
4048 }
4049 }
4050 _ => loop {
4051 let path_component_count = rng.borrow_mut().gen_range(1..=5);
4052 let mut path = PathBuf::new();
4053 path.push("/");
4054 for _ in 0..path_component_count {
4055 let letter = rng.borrow_mut().gen_range(b'a'..=b'z');
4056 path.push(std::str::from_utf8(&[letter]).unwrap());
4057 }
4058 let parent_path = path.parent().unwrap();
4059
4060 log::info!("Host: creating file {:?}", path);
4061 if fs.create_dir(&parent_path).await.is_ok()
4062 && fs.create_file(&path, Default::default()).await.is_ok()
4063 {
4064 files.push(path);
4065 break;
4066 } else {
4067 log::info!("Host: cannot create file");
4068 }
4069 },
4070 }
4071
4072 cx.background().simulate_random_delay().await;
4073 }
4074
4075 self.project = Some(project);
4076 (self, cx)
4077 }
4078
4079 pub async fn simulate_guest(
4080 mut self,
4081 guest_id: usize,
4082 project: ModelHandle<Project>,
4083 operations: Rc<Cell<usize>>,
4084 max_operations: usize,
4085 rng: Rc<RefCell<StdRng>>,
4086 mut cx: TestAppContext,
4087 ) -> (Self, TestAppContext) {
4088 while operations.get() < max_operations {
4089 let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4090 let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4091 project
4092 .worktrees(&cx)
4093 .filter(|worktree| {
4094 worktree.read(cx).entries(false).any(|e| e.is_file())
4095 })
4096 .choose(&mut *rng.borrow_mut())
4097 }) {
4098 worktree
4099 } else {
4100 cx.background().simulate_random_delay().await;
4101 continue;
4102 };
4103
4104 operations.set(operations.get() + 1);
4105 let project_path = worktree.read_with(&cx, |worktree, _| {
4106 let entry = worktree
4107 .entries(false)
4108 .filter(|e| e.is_file())
4109 .choose(&mut *rng.borrow_mut())
4110 .unwrap();
4111 (worktree.id(), entry.path.clone())
4112 });
4113 log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4114 let buffer = project
4115 .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4116 .await
4117 .unwrap();
4118 self.buffers.insert(buffer.clone());
4119 buffer
4120 } else {
4121 self.buffers
4122 .iter()
4123 .choose(&mut *rng.borrow_mut())
4124 .unwrap()
4125 .clone()
4126 };
4127
4128 if rng.borrow_mut().gen_bool(0.1) {
4129 cx.update(|cx| {
4130 log::info!(
4131 "Guest {}: dropping buffer {:?}",
4132 guest_id,
4133 buffer.read(cx).file().unwrap().full_path(cx)
4134 );
4135 self.buffers.remove(&buffer);
4136 drop(buffer);
4137 });
4138 } else {
4139 buffer.update(&mut cx, |buffer, cx| {
4140 log::info!(
4141 "Guest {}: updating buffer {:?}",
4142 guest_id,
4143 buffer.file().unwrap().full_path(cx)
4144 );
4145 buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4146 });
4147 }
4148
4149 cx.background().simulate_random_delay().await;
4150 }
4151
4152 self.project = Some(project);
4153 (self, cx)
4154 }
4155 }
4156
4157 impl Executor for Arc<gpui::executor::Background> {
4158 fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4159 self.spawn(future).detach();
4160 }
4161 }
4162
4163 fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4164 channel
4165 .messages()
4166 .cursor::<()>()
4167 .map(|m| {
4168 (
4169 m.sender.github_login.clone(),
4170 m.body.clone(),
4171 m.is_pending(),
4172 )
4173 })
4174 .collect()
4175 }
4176
4177 struct EmptyView;
4178
4179 impl gpui::Entity for EmptyView {
4180 type Event = ();
4181 }
4182
4183 impl gpui::View for EmptyView {
4184 fn ui_name() -> &'static str {
4185 "empty view"
4186 }
4187
4188 fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4189 gpui::Element::boxed(gpui::elements::Empty)
4190 }
4191 }
4192}