1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use collections::{hash_map::Entry, HashMap, HashSet};
4use feature_flags::FeatureFlagAppExt;
5use futures::{channel::mpsc, Future, StreamExt};
6use gpui::{
7 AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
8 WeakModel,
9};
10use postage::{sink::Sink, watch};
11use rpc::proto::{RequestMessage, UsersResponse};
12use std::sync::{Arc, Weak};
13use text::ReplicaId;
14use util::TryFutureExt as _;
15
16pub type UserId = u64;
17
18#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
19pub struct ChannelId(pub u64);
20
21impl std::fmt::Display for ChannelId {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 self.0.fmt(f)
24 }
25}
26
27#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
28pub struct ProjectId(pub u64);
29
30#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
31pub struct DevServerId(pub u64);
32
33#[derive(
34 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
35)]
36pub struct DevServerProjectId(pub u64);
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub struct ParticipantIndex(pub u32);
40
41#[derive(Default, Debug)]
42pub struct User {
43 pub id: UserId,
44 pub github_login: String,
45 pub avatar_uri: SharedUri,
46}
47
48#[derive(Clone, Debug, PartialEq, Eq)]
49pub struct Collaborator {
50 pub peer_id: proto::PeerId,
51 pub replica_id: ReplicaId,
52 pub user_id: UserId,
53}
54
55impl PartialOrd for User {
56 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
57 Some(self.cmp(other))
58 }
59}
60
61impl Ord for User {
62 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
63 self.github_login.cmp(&other.github_login)
64 }
65}
66
67impl PartialEq for User {
68 fn eq(&self, other: &Self) -> bool {
69 self.id == other.id && self.github_login == other.github_login
70 }
71}
72
73impl Eq for User {}
74
75#[derive(Debug, PartialEq)]
76pub struct Contact {
77 pub user: Arc<User>,
78 pub online: bool,
79 pub busy: bool,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq)]
83pub enum ContactRequestStatus {
84 None,
85 RequestSent,
86 RequestReceived,
87 RequestAccepted,
88}
89
90pub struct UserStore {
91 users: HashMap<u64, Arc<User>>,
92 by_github_login: HashMap<String, u64>,
93 participant_indices: HashMap<u64, ParticipantIndex>,
94 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
95 current_plan: Option<proto::Plan>,
96 current_user: watch::Receiver<Option<Arc<User>>>,
97 contacts: Vec<Arc<Contact>>,
98 incoming_contact_requests: Vec<Arc<User>>,
99 outgoing_contact_requests: Vec<Arc<User>>,
100 pending_contact_requests: HashMap<u64, usize>,
101 invite_info: Option<InviteInfo>,
102 client: Weak<Client>,
103 _maintain_contacts: Task<()>,
104 _maintain_current_user: Task<Result<()>>,
105 weak_self: WeakModel<Self>,
106}
107
108#[derive(Clone)]
109pub struct InviteInfo {
110 pub count: u32,
111 pub url: Arc<str>,
112}
113
114pub enum Event {
115 Contact {
116 user: Arc<User>,
117 kind: ContactEventKind,
118 },
119 ShowContacts,
120 ParticipantIndicesChanged,
121}
122
123#[derive(Clone, Copy)]
124pub enum ContactEventKind {
125 Requested,
126 Accepted,
127 Cancelled,
128}
129
130impl EventEmitter<Event> for UserStore {}
131
132enum UpdateContacts {
133 Update(proto::UpdateContacts),
134 Wait(postage::barrier::Sender),
135 Clear(postage::barrier::Sender),
136}
137
138impl UserStore {
139 pub fn new(client: Arc<Client>, cx: &mut ModelContext<Self>) -> Self {
140 let (mut current_user_tx, current_user_rx) = watch::channel();
141 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
142 let rpc_subscriptions = vec![
143 client.add_message_handler(cx.weak_model(), Self::handle_update_plan),
144 client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
145 client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
146 client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
147 ];
148 Self {
149 users: Default::default(),
150 by_github_login: Default::default(),
151 current_user: current_user_rx,
152 current_plan: None,
153 contacts: Default::default(),
154 incoming_contact_requests: Default::default(),
155 participant_indices: Default::default(),
156 outgoing_contact_requests: Default::default(),
157 invite_info: None,
158 client: Arc::downgrade(&client),
159 update_contacts_tx,
160 _maintain_contacts: cx.spawn(|this, mut cx| async move {
161 let _subscriptions = rpc_subscriptions;
162 while let Some(message) = update_contacts_rx.next().await {
163 if let Ok(task) =
164 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
165 {
166 task.log_err().await;
167 } else {
168 break;
169 }
170 }
171 }),
172 _maintain_current_user: cx.spawn(|this, mut cx| async move {
173 let mut status = client.status();
174 let weak = Arc::downgrade(&client);
175 drop(client);
176 while let Some(status) = status.next().await {
177 // if the client is dropped, the app is shutting down.
178 let Some(client) = weak.upgrade() else {
179 return Ok(());
180 };
181 match status {
182 Status::Connected { .. } => {
183 if let Some(user_id) = client.user_id() {
184 let fetch_user = if let Ok(fetch_user) = this
185 .update(&mut cx, |this, cx| {
186 this.get_user(user_id, cx).log_err()
187 }) {
188 fetch_user
189 } else {
190 break;
191 };
192 let fetch_metrics_id =
193 client.request(proto::GetPrivateUserInfo {}).log_err();
194 let (user, info) = futures::join!(fetch_user, fetch_metrics_id);
195
196 cx.update(|cx| {
197 if let Some(info) = info {
198 let disable_staff = std::env::var("ZED_DISABLE_STAFF")
199 .map_or(false, |v| v != "" && v != "0");
200 let staff = info.staff && !disable_staff;
201 cx.update_flags(staff, info.flags);
202 client.telemetry.set_authenticated_user_info(
203 Some(info.metrics_id.clone()),
204 staff,
205 )
206 }
207 })?;
208
209 current_user_tx.send(user).await.ok();
210
211 this.update(&mut cx, |_, cx| cx.notify())?;
212 }
213 }
214 Status::SignedOut => {
215 current_user_tx.send(None).await.ok();
216 this.update(&mut cx, |this, cx| {
217 cx.notify();
218 this.clear_contacts()
219 })?
220 .await;
221 }
222 Status::ConnectionLost => {
223 this.update(&mut cx, |this, cx| {
224 cx.notify();
225 this.clear_contacts()
226 })?
227 .await;
228 }
229 _ => {}
230 }
231 }
232 Ok(())
233 }),
234 pending_contact_requests: Default::default(),
235 weak_self: cx.weak_model(),
236 }
237 }
238
239 #[cfg(feature = "test-support")]
240 pub fn clear_cache(&mut self) {
241 self.users.clear();
242 self.by_github_login.clear();
243 }
244
245 async fn handle_update_invite_info(
246 this: Model<Self>,
247 message: TypedEnvelope<proto::UpdateInviteInfo>,
248 mut cx: AsyncAppContext,
249 ) -> Result<()> {
250 this.update(&mut cx, |this, cx| {
251 this.invite_info = Some(InviteInfo {
252 url: Arc::from(message.payload.url),
253 count: message.payload.count,
254 });
255 cx.notify();
256 })?;
257 Ok(())
258 }
259
260 async fn handle_show_contacts(
261 this: Model<Self>,
262 _: TypedEnvelope<proto::ShowContacts>,
263 mut cx: AsyncAppContext,
264 ) -> Result<()> {
265 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
266 Ok(())
267 }
268
269 pub fn invite_info(&self) -> Option<&InviteInfo> {
270 self.invite_info.as_ref()
271 }
272
273 async fn handle_update_contacts(
274 this: Model<Self>,
275 message: TypedEnvelope<proto::UpdateContacts>,
276 mut cx: AsyncAppContext,
277 ) -> Result<()> {
278 this.update(&mut cx, |this, _| {
279 this.update_contacts_tx
280 .unbounded_send(UpdateContacts::Update(message.payload))
281 .unwrap();
282 })?;
283 Ok(())
284 }
285
286 async fn handle_update_plan(
287 this: Model<Self>,
288 message: TypedEnvelope<proto::UpdateUserPlan>,
289 mut cx: AsyncAppContext,
290 ) -> Result<()> {
291 this.update(&mut cx, |this, cx| {
292 this.current_plan = Some(message.payload.plan());
293 cx.notify();
294 })?;
295 Ok(())
296 }
297
298 fn update_contacts(
299 &mut self,
300 message: UpdateContacts,
301 cx: &mut ModelContext<Self>,
302 ) -> Task<Result<()>> {
303 match message {
304 UpdateContacts::Wait(barrier) => {
305 drop(barrier);
306 Task::ready(Ok(()))
307 }
308 UpdateContacts::Clear(barrier) => {
309 self.contacts.clear();
310 self.incoming_contact_requests.clear();
311 self.outgoing_contact_requests.clear();
312 drop(barrier);
313 Task::ready(Ok(()))
314 }
315 UpdateContacts::Update(message) => {
316 let mut user_ids = HashSet::default();
317 for contact in &message.contacts {
318 user_ids.insert(contact.user_id);
319 }
320 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
321 user_ids.extend(message.outgoing_requests.iter());
322
323 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
324 cx.spawn(|this, mut cx| async move {
325 load_users.await?;
326
327 // Users are fetched in parallel above and cached in call to get_users
328 // No need to parallelize here
329 let mut updated_contacts = Vec::new();
330 let this = this
331 .upgrade()
332 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
333 for contact in message.contacts {
334 updated_contacts.push(Arc::new(
335 Contact::from_proto(contact, &this, &mut cx).await?,
336 ));
337 }
338
339 let mut incoming_requests = Vec::new();
340 for request in message.incoming_requests {
341 incoming_requests.push({
342 this.update(&mut cx, |this, cx| {
343 this.get_user(request.requester_id, cx)
344 })?
345 .await?
346 });
347 }
348
349 let mut outgoing_requests = Vec::new();
350 for requested_user_id in message.outgoing_requests {
351 outgoing_requests.push(
352 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
353 .await?,
354 );
355 }
356
357 let removed_contacts =
358 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
359 let removed_incoming_requests =
360 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
361 let removed_outgoing_requests =
362 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
363
364 this.update(&mut cx, |this, cx| {
365 // Remove contacts
366 this.contacts
367 .retain(|contact| !removed_contacts.contains(&contact.user.id));
368 // Update existing contacts and insert new ones
369 for updated_contact in updated_contacts {
370 match this.contacts.binary_search_by_key(
371 &&updated_contact.user.github_login,
372 |contact| &contact.user.github_login,
373 ) {
374 Ok(ix) => this.contacts[ix] = updated_contact,
375 Err(ix) => this.contacts.insert(ix, updated_contact),
376 }
377 }
378
379 // Remove incoming contact requests
380 this.incoming_contact_requests.retain(|user| {
381 if removed_incoming_requests.contains(&user.id) {
382 cx.emit(Event::Contact {
383 user: user.clone(),
384 kind: ContactEventKind::Cancelled,
385 });
386 false
387 } else {
388 true
389 }
390 });
391 // Update existing incoming requests and insert new ones
392 for user in incoming_requests {
393 match this
394 .incoming_contact_requests
395 .binary_search_by_key(&&user.github_login, |contact| {
396 &contact.github_login
397 }) {
398 Ok(ix) => this.incoming_contact_requests[ix] = user,
399 Err(ix) => this.incoming_contact_requests.insert(ix, user),
400 }
401 }
402
403 // Remove outgoing contact requests
404 this.outgoing_contact_requests
405 .retain(|user| !removed_outgoing_requests.contains(&user.id));
406 // Update existing incoming requests and insert new ones
407 for request in outgoing_requests {
408 match this
409 .outgoing_contact_requests
410 .binary_search_by_key(&&request.github_login, |contact| {
411 &contact.github_login
412 }) {
413 Ok(ix) => this.outgoing_contact_requests[ix] = request,
414 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
415 }
416 }
417
418 cx.notify();
419 })?;
420
421 Ok(())
422 })
423 }
424 }
425 }
426
427 pub fn contacts(&self) -> &[Arc<Contact>] {
428 &self.contacts
429 }
430
431 pub fn has_contact(&self, user: &Arc<User>) -> bool {
432 self.contacts
433 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
434 .is_ok()
435 }
436
437 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
438 &self.incoming_contact_requests
439 }
440
441 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
442 &self.outgoing_contact_requests
443 }
444
445 pub fn is_contact_request_pending(&self, user: &User) -> bool {
446 self.pending_contact_requests.contains_key(&user.id)
447 }
448
449 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
450 if self
451 .contacts
452 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
453 .is_ok()
454 {
455 ContactRequestStatus::RequestAccepted
456 } else if self
457 .outgoing_contact_requests
458 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
459 .is_ok()
460 {
461 ContactRequestStatus::RequestSent
462 } else if self
463 .incoming_contact_requests
464 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
465 .is_ok()
466 {
467 ContactRequestStatus::RequestReceived
468 } else {
469 ContactRequestStatus::None
470 }
471 }
472
473 pub fn request_contact(
474 &mut self,
475 responder_id: u64,
476 cx: &mut ModelContext<Self>,
477 ) -> Task<Result<()>> {
478 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
479 }
480
481 pub fn remove_contact(
482 &mut self,
483 user_id: u64,
484 cx: &mut ModelContext<Self>,
485 ) -> Task<Result<()>> {
486 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
487 }
488
489 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
490 self.incoming_contact_requests
491 .iter()
492 .any(|user| user.id == user_id)
493 }
494
495 pub fn respond_to_contact_request(
496 &mut self,
497 requester_id: u64,
498 accept: bool,
499 cx: &mut ModelContext<Self>,
500 ) -> Task<Result<()>> {
501 self.perform_contact_request(
502 requester_id,
503 proto::RespondToContactRequest {
504 requester_id,
505 response: if accept {
506 proto::ContactRequestResponse::Accept
507 } else {
508 proto::ContactRequestResponse::Decline
509 } as i32,
510 },
511 cx,
512 )
513 }
514
515 pub fn dismiss_contact_request(
516 &mut self,
517 requester_id: u64,
518 cx: &mut ModelContext<Self>,
519 ) -> Task<Result<()>> {
520 let client = self.client.upgrade();
521 cx.spawn(move |_, _| async move {
522 client
523 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
524 .request(proto::RespondToContactRequest {
525 requester_id,
526 response: proto::ContactRequestResponse::Dismiss as i32,
527 })
528 .await?;
529 Ok(())
530 })
531 }
532
533 fn perform_contact_request<T: RequestMessage>(
534 &mut self,
535 user_id: u64,
536 request: T,
537 cx: &mut ModelContext<Self>,
538 ) -> Task<Result<()>> {
539 let client = self.client.upgrade();
540 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
541 cx.notify();
542
543 cx.spawn(move |this, mut cx| async move {
544 let response = client
545 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
546 .request(request)
547 .await;
548 this.update(&mut cx, |this, cx| {
549 if let Entry::Occupied(mut request_count) =
550 this.pending_contact_requests.entry(user_id)
551 {
552 *request_count.get_mut() -= 1;
553 if *request_count.get() == 0 {
554 request_count.remove();
555 }
556 }
557 cx.notify();
558 })?;
559 response?;
560 Ok(())
561 })
562 }
563
564 pub fn clear_contacts(&mut self) -> impl Future<Output = ()> {
565 let (tx, mut rx) = postage::barrier::channel();
566 self.update_contacts_tx
567 .unbounded_send(UpdateContacts::Clear(tx))
568 .unwrap();
569 async move {
570 rx.next().await;
571 }
572 }
573
574 pub fn contact_updates_done(&mut self) -> impl Future<Output = ()> {
575 let (tx, mut rx) = postage::barrier::channel();
576 self.update_contacts_tx
577 .unbounded_send(UpdateContacts::Wait(tx))
578 .unwrap();
579 async move {
580 rx.next().await;
581 }
582 }
583
584 pub fn get_users(
585 &mut self,
586 user_ids: Vec<u64>,
587 cx: &mut ModelContext<Self>,
588 ) -> Task<Result<Vec<Arc<User>>>> {
589 let mut user_ids_to_fetch = user_ids.clone();
590 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
591
592 cx.spawn(|this, mut cx| async move {
593 if !user_ids_to_fetch.is_empty() {
594 this.update(&mut cx, |this, cx| {
595 this.load_users(
596 proto::GetUsers {
597 user_ids: user_ids_to_fetch,
598 },
599 cx,
600 )
601 })?
602 .await?;
603 }
604
605 this.update(&mut cx, |this, _| {
606 user_ids
607 .iter()
608 .map(|user_id| {
609 this.users
610 .get(user_id)
611 .cloned()
612 .ok_or_else(|| anyhow!("user {} not found", user_id))
613 })
614 .collect()
615 })?
616 })
617 }
618
619 pub fn fuzzy_search_users(
620 &mut self,
621 query: String,
622 cx: &mut ModelContext<Self>,
623 ) -> Task<Result<Vec<Arc<User>>>> {
624 self.load_users(proto::FuzzySearchUsers { query }, cx)
625 }
626
627 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
628 self.users.get(&user_id).cloned()
629 }
630
631 pub fn get_user_optimistic(
632 &mut self,
633 user_id: u64,
634 cx: &mut ModelContext<Self>,
635 ) -> Option<Arc<User>> {
636 if let Some(user) = self.users.get(&user_id).cloned() {
637 return Some(user);
638 }
639
640 self.get_user(user_id, cx).detach_and_log_err(cx);
641 None
642 }
643
644 pub fn get_user(
645 &mut self,
646 user_id: u64,
647 cx: &mut ModelContext<Self>,
648 ) -> Task<Result<Arc<User>>> {
649 if let Some(user) = self.users.get(&user_id).cloned() {
650 return Task::ready(Ok(user));
651 }
652
653 let load_users = self.get_users(vec![user_id], cx);
654 cx.spawn(move |this, mut cx| async move {
655 load_users.await?;
656 this.update(&mut cx, |this, _| {
657 this.users
658 .get(&user_id)
659 .cloned()
660 .ok_or_else(|| anyhow!("server responded with no users"))
661 })?
662 })
663 }
664
665 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
666 self.by_github_login
667 .get(github_login)
668 .and_then(|id| self.users.get(id).cloned())
669 }
670
671 pub fn current_user(&self) -> Option<Arc<User>> {
672 self.current_user.borrow().clone()
673 }
674
675 pub fn current_plan(&self) -> Option<proto::Plan> {
676 self.current_plan
677 }
678
679 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
680 self.current_user.clone()
681 }
682
683 fn load_users(
684 &mut self,
685 request: impl RequestMessage<Response = UsersResponse>,
686 cx: &mut ModelContext<Self>,
687 ) -> Task<Result<Vec<Arc<User>>>> {
688 let client = self.client.clone();
689 cx.spawn(|this, mut cx| async move {
690 if let Some(rpc) = client.upgrade() {
691 let response = rpc.request(request).await.context("error loading users")?;
692 let users = response.users;
693
694 this.update(&mut cx, |this, _| this.insert(users))
695 } else {
696 Ok(Vec::new())
697 }
698 })
699 }
700
701 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
702 let mut ret = Vec::with_capacity(users.len());
703 for user in users {
704 let user = User::new(user);
705 if let Some(old) = self.users.insert(user.id, user.clone()) {
706 if old.github_login != user.github_login {
707 self.by_github_login.remove(&old.github_login);
708 }
709 }
710 self.by_github_login
711 .insert(user.github_login.clone(), user.id);
712 ret.push(user)
713 }
714 ret
715 }
716
717 pub fn set_participant_indices(
718 &mut self,
719 participant_indices: HashMap<u64, ParticipantIndex>,
720 cx: &mut ModelContext<Self>,
721 ) {
722 if participant_indices != self.participant_indices {
723 self.participant_indices = participant_indices;
724 cx.emit(Event::ParticipantIndicesChanged);
725 }
726 }
727
728 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
729 &self.participant_indices
730 }
731
732 pub fn participant_names(
733 &self,
734 user_ids: impl Iterator<Item = u64>,
735 cx: &AppContext,
736 ) -> HashMap<u64, SharedString> {
737 let mut ret = HashMap::default();
738 let mut missing_user_ids = Vec::new();
739 for id in user_ids {
740 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
741 ret.insert(id, github_login.into());
742 } else {
743 missing_user_ids.push(id)
744 }
745 }
746 if !missing_user_ids.is_empty() {
747 let this = self.weak_self.clone();
748 cx.spawn(|mut cx| async move {
749 this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
750 .await
751 })
752 .detach_and_log_err(cx);
753 }
754 ret
755 }
756}
757
758impl User {
759 fn new(message: proto::User) -> Arc<Self> {
760 Arc::new(User {
761 id: message.id,
762 github_login: message.github_login,
763 avatar_uri: message.avatar_url.into(),
764 })
765 }
766}
767
768impl Contact {
769 async fn from_proto(
770 contact: proto::Contact,
771 user_store: &Model<UserStore>,
772 cx: &mut AsyncAppContext,
773 ) -> Result<Self> {
774 let user = user_store
775 .update(cx, |user_store, cx| {
776 user_store.get_user(contact.user_id, cx)
777 })?
778 .await?;
779 Ok(Self {
780 user,
781 online: contact.online,
782 busy: contact.busy,
783 })
784 }
785}
786
787impl Collaborator {
788 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
789 Ok(Self {
790 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
791 replica_id: message.replica_id as ReplicaId,
792 user_id: message.user_id as UserId,
793 })
794 }
795}