1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{Client, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use db::RELEASE_CHANNEL;
9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
10use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
11use rpc::{
12 proto::{self, ChannelEdge, ChannelPermission, ChannelRole, ChannelVisibility},
13 TypedEnvelope,
14};
15use serde_derive::{Deserialize, Serialize};
16use std::{borrow::Cow, hash::Hash, mem, ops::Deref, sync::Arc, time::Duration};
17use util::ResultExt;
18
19pub fn init(client: &Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
20 let channel_store =
21 cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
22 cx.set_global(channel_store);
23}
24
25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
26
27pub type ChannelId = u64;
28
29pub struct ChannelStore {
30 channel_index: ChannelIndex,
31 channel_invitations: Vec<Arc<Channel>>,
32 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
33 channels_with_admin_privileges: HashSet<ChannelId>,
34 outgoing_invites: HashSet<(ChannelId, UserId)>,
35 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
36 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
37 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
38 client: Arc<Client>,
39 user_store: ModelHandle<UserStore>,
40 _rpc_subscription: Subscription,
41 _watch_connection_status: Task<Option<()>>,
42 disconnect_channel_buffers_task: Option<Task<()>>,
43 _update_channels: Task<()>,
44}
45
46pub type ChannelData = (Channel, ChannelPath);
47
48#[derive(Clone, Debug, PartialEq)]
49pub struct Channel {
50 pub id: ChannelId,
51 pub name: String,
52 pub visibility: proto::ChannelVisibility,
53 pub unseen_note_version: Option<(u64, clock::Global)>,
54 pub unseen_message_id: Option<u64>,
55}
56
57impl Channel {
58 pub fn link(&self) -> String {
59 RELEASE_CHANNEL.link_prefix().to_owned()
60 + "channel/"
61 + &self.slug()
62 + "-"
63 + &self.id.to_string()
64 }
65
66 pub fn slug(&self) -> String {
67 let slug: String = self
68 .name
69 .chars()
70 .map(|c| if c.is_alphanumeric() { c } else { '-' })
71 .collect();
72
73 slug.trim_matches(|c| c == '-').to_string()
74 }
75}
76
77#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
78pub struct ChannelPath(Arc<[ChannelId]>);
79
80pub struct ChannelMembership {
81 pub user: Arc<User>,
82 pub kind: proto::channel_member::Kind,
83 pub role: proto::ChannelRole,
84}
85impl ChannelMembership {
86 pub fn sort_key(&self) -> MembershipSortKey {
87 MembershipSortKey {
88 role_order: match self.role {
89 proto::ChannelRole::Admin => 0,
90 proto::ChannelRole::Member => 1,
91 proto::ChannelRole::Banned => 2,
92 proto::ChannelRole::Guest => 3,
93 },
94 kind_order: match self.kind {
95 proto::channel_member::Kind::Member => 0,
96 proto::channel_member::Kind::AncestorMember => 1,
97 proto::channel_member::Kind::Invitee => 2,
98 },
99 username_order: self.user.github_login.as_str(),
100 }
101 }
102}
103
104#[derive(PartialOrd, Ord, PartialEq, Eq)]
105pub struct MembershipSortKey<'a> {
106 role_order: u8,
107 kind_order: u8,
108 username_order: &'a str,
109}
110
111pub enum ChannelEvent {
112 ChannelCreated(ChannelId),
113 ChannelRenamed(ChannelId),
114}
115
116impl Entity for ChannelStore {
117 type Event = ChannelEvent;
118}
119
120enum OpenedModelHandle<E: Entity> {
121 Open(WeakModelHandle<E>),
122 Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
123}
124
125impl ChannelStore {
126 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
127 cx.global::<ModelHandle<Self>>().clone()
128 }
129
130 pub fn new(
131 client: Arc<Client>,
132 user_store: ModelHandle<UserStore>,
133 cx: &mut ModelContext<Self>,
134 ) -> Self {
135 let rpc_subscription =
136 client.add_message_handler(cx.handle(), Self::handle_update_channels);
137
138 let mut connection_status = client.status();
139 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
140 let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
141 while let Some(status) = connection_status.next().await {
142 let this = this.upgrade(&cx)?;
143 match status {
144 client::Status::Connected { .. } => {
145 this.update(&mut cx, |this, cx| this.handle_connect(cx))
146 .await
147 .log_err()?;
148 }
149 client::Status::SignedOut | client::Status::UpgradeRequired => {
150 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx));
151 }
152 _ => {
153 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx));
154 }
155 }
156 if status.is_connected() {
157 } else {
158 }
159 }
160 Some(())
161 });
162
163 Self {
164 channel_invitations: Vec::default(),
165 channel_index: ChannelIndex::default(),
166 channel_participants: Default::default(),
167 channels_with_admin_privileges: Default::default(),
168 outgoing_invites: Default::default(),
169 opened_buffers: Default::default(),
170 opened_chats: Default::default(),
171 update_channels_tx,
172 client,
173 user_store,
174 _rpc_subscription: rpc_subscription,
175 _watch_connection_status: watch_connection_status,
176 disconnect_channel_buffers_task: None,
177 _update_channels: cx.spawn_weak(|this, mut cx| async move {
178 while let Some(update_channels) = update_channels_rx.next().await {
179 if let Some(this) = this.upgrade(&cx) {
180 let update_task = this.update(&mut cx, |this, cx| {
181 this.update_channels(update_channels, cx)
182 });
183 if let Some(update_task) = update_task {
184 update_task.await.log_err();
185 }
186 }
187 }
188 }),
189 }
190 }
191
192 pub fn client(&self) -> Arc<Client> {
193 self.client.clone()
194 }
195
196 pub fn has_children(&self, channel_id: ChannelId) -> bool {
197 self.channel_index.iter().any(|path| {
198 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
199 path.len() > ix + 1
200 } else {
201 false
202 }
203 })
204 }
205
206 /// Returns the number of unique channels in the store
207 pub fn channel_count(&self) -> usize {
208 self.channel_index.by_id().len()
209 }
210
211 /// Returns the index of a channel ID in the list of unique channels
212 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
213 self.channel_index
214 .by_id()
215 .keys()
216 .position(|id| *id == channel_id)
217 }
218
219 /// Returns an iterator over all unique channels
220 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
221 self.channel_index.by_id().values()
222 }
223
224 /// Iterate over all entries in the channel DAG
225 pub fn channel_dag_entries(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
226 self.channel_index.iter().map(move |path| {
227 let id = path.last().unwrap();
228 let channel = self.channel_for_id(*id).unwrap();
229 (path.len() - 1, channel)
230 })
231 }
232
233 pub fn channel_dag_entry_at(&self, ix: usize) -> Option<(&Arc<Channel>, &ChannelPath)> {
234 let path = self.channel_index.get(ix)?;
235 let id = path.last().unwrap();
236 let channel = self.channel_for_id(*id).unwrap();
237
238 Some((channel, path))
239 }
240
241 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
242 self.channel_index.by_id().values().nth(ix)
243 }
244
245 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
246 &self.channel_invitations
247 }
248
249 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
250 self.channel_index.by_id().get(&channel_id)
251 }
252
253 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
254 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
255 if let OpenedModelHandle::Open(buffer) = buffer {
256 return buffer.upgrade(cx).is_some();
257 }
258 }
259 false
260 }
261
262 pub fn open_channel_buffer(
263 &mut self,
264 channel_id: ChannelId,
265 cx: &mut ModelContext<Self>,
266 ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
267 let client = self.client.clone();
268 let user_store = self.user_store.clone();
269 self.open_channel_resource(
270 channel_id,
271 |this| &mut this.opened_buffers,
272 |channel, cx| ChannelBuffer::new(channel, client, user_store, cx),
273 cx,
274 )
275 }
276
277 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
278 self.channel_index
279 .by_id()
280 .get(&channel_id)
281 .map(|channel| channel.unseen_note_version.is_some())
282 }
283
284 pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
285 self.channel_index
286 .by_id()
287 .get(&channel_id)
288 .map(|channel| channel.unseen_message_id.is_some())
289 }
290
291 pub fn notes_changed(
292 &mut self,
293 channel_id: ChannelId,
294 epoch: u64,
295 version: &clock::Global,
296 cx: &mut ModelContext<Self>,
297 ) {
298 self.channel_index.note_changed(channel_id, epoch, version);
299 cx.notify();
300 }
301
302 pub fn new_message(
303 &mut self,
304 channel_id: ChannelId,
305 message_id: u64,
306 cx: &mut ModelContext<Self>,
307 ) {
308 self.channel_index.new_message(channel_id, message_id);
309 cx.notify();
310 }
311
312 pub fn acknowledge_message_id(
313 &mut self,
314 channel_id: ChannelId,
315 message_id: u64,
316 cx: &mut ModelContext<Self>,
317 ) {
318 self.channel_index
319 .acknowledge_message_id(channel_id, message_id);
320 cx.notify();
321 }
322
323 pub fn acknowledge_notes_version(
324 &mut self,
325 channel_id: ChannelId,
326 epoch: u64,
327 version: &clock::Global,
328 cx: &mut ModelContext<Self>,
329 ) {
330 self.channel_index
331 .acknowledge_note_version(channel_id, epoch, version);
332 cx.notify();
333 }
334
335 pub fn open_channel_chat(
336 &mut self,
337 channel_id: ChannelId,
338 cx: &mut ModelContext<Self>,
339 ) -> Task<Result<ModelHandle<ChannelChat>>> {
340 let client = self.client.clone();
341 let user_store = self.user_store.clone();
342 let this = cx.handle();
343 self.open_channel_resource(
344 channel_id,
345 |this| &mut this.opened_chats,
346 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
347 cx,
348 )
349 }
350
351 /// Asynchronously open a given resource associated with a channel.
352 ///
353 /// Make sure that the resource is only opened once, even if this method
354 /// is called multiple times with the same channel id while the first task
355 /// is still running.
356 fn open_channel_resource<T: Entity, F, Fut>(
357 &mut self,
358 channel_id: ChannelId,
359 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
360 load: F,
361 cx: &mut ModelContext<Self>,
362 ) -> Task<Result<ModelHandle<T>>>
363 where
364 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
365 Fut: Future<Output = Result<ModelHandle<T>>>,
366 {
367 let task = loop {
368 match get_map(self).entry(channel_id) {
369 hash_map::Entry::Occupied(e) => match e.get() {
370 OpenedModelHandle::Open(model) => {
371 if let Some(model) = model.upgrade(cx) {
372 break Task::ready(Ok(model)).shared();
373 } else {
374 get_map(self).remove(&channel_id);
375 continue;
376 }
377 }
378 OpenedModelHandle::Loading(task) => {
379 break task.clone();
380 }
381 },
382 hash_map::Entry::Vacant(e) => {
383 let task = cx
384 .spawn(|this, cx| async move {
385 let channel = this.read_with(&cx, |this, _| {
386 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
387 Arc::new(anyhow!("no channel for id: {}", channel_id))
388 })
389 })?;
390
391 load(channel, cx).await.map_err(Arc::new)
392 })
393 .shared();
394
395 e.insert(OpenedModelHandle::Loading(task.clone()));
396 cx.spawn({
397 let task = task.clone();
398 |this, mut cx| async move {
399 let result = task.await;
400 this.update(&mut cx, |this, _| match result {
401 Ok(model) => {
402 get_map(this).insert(
403 channel_id,
404 OpenedModelHandle::Open(model.downgrade()),
405 );
406 }
407 Err(_) => {
408 get_map(this).remove(&channel_id);
409 }
410 });
411 }
412 })
413 .detach();
414 break task;
415 }
416 }
417 };
418 cx.foreground()
419 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
420 }
421
422 pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
423 self.channel_index.iter().any(|path| {
424 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
425 path[..=ix]
426 .iter()
427 .any(|id| self.channels_with_admin_privileges.contains(id))
428 } else {
429 false
430 }
431 })
432 }
433
434 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
435 self.channel_participants
436 .get(&channel_id)
437 .map_or(&[], |v| v.as_slice())
438 }
439
440 pub fn create_channel(
441 &self,
442 name: &str,
443 parent_id: Option<ChannelId>,
444 cx: &mut ModelContext<Self>,
445 ) -> Task<Result<ChannelId>> {
446 let client = self.client.clone();
447 let name = name.trim_start_matches("#").to_owned();
448 cx.spawn(|this, mut cx| async move {
449 let response = client
450 .request(proto::CreateChannel { name, parent_id })
451 .await?;
452
453 let channel = response
454 .channel
455 .ok_or_else(|| anyhow!("missing channel in response"))?;
456 let channel_id = channel.id;
457
458 let parent_edge = if let Some(parent_id) = parent_id {
459 vec![ChannelEdge {
460 channel_id: channel.id,
461 parent_id,
462 }]
463 } else {
464 vec![]
465 };
466
467 this.update(&mut cx, |this, cx| {
468 let task = this.update_channels(
469 proto::UpdateChannels {
470 channels: vec![channel],
471 insert_edge: parent_edge,
472 channel_permissions: vec![ChannelPermission {
473 channel_id,
474 role: ChannelRole::Admin.into(),
475 }],
476 ..Default::default()
477 },
478 cx,
479 );
480 assert!(task.is_none());
481
482 // This event is emitted because the collab panel wants to clear the pending edit state
483 // before this frame is rendered. But we can't guarantee that the collab panel's future
484 // will resolve before this flush_effects finishes. Synchronously emitting this event
485 // ensures that the collab panel will observe this creation before the frame completes
486 cx.emit(ChannelEvent::ChannelCreated(channel_id));
487 });
488
489 Ok(channel_id)
490 })
491 }
492
493 pub fn link_channel(
494 &mut self,
495 channel_id: ChannelId,
496 to: ChannelId,
497 cx: &mut ModelContext<Self>,
498 ) -> Task<Result<()>> {
499 let client = self.client.clone();
500 cx.spawn(|_, _| async move {
501 let _ = client
502 .request(proto::LinkChannel { channel_id, to })
503 .await?;
504
505 Ok(())
506 })
507 }
508
509 pub fn unlink_channel(
510 &mut self,
511 channel_id: ChannelId,
512 from: ChannelId,
513 cx: &mut ModelContext<Self>,
514 ) -> Task<Result<()>> {
515 let client = self.client.clone();
516 cx.spawn(|_, _| async move {
517 let _ = client
518 .request(proto::UnlinkChannel { channel_id, from })
519 .await?;
520
521 Ok(())
522 })
523 }
524
525 pub fn move_channel(
526 &mut self,
527 channel_id: ChannelId,
528 from: ChannelId,
529 to: ChannelId,
530 cx: &mut ModelContext<Self>,
531 ) -> Task<Result<()>> {
532 let client = self.client.clone();
533 cx.spawn(|_, _| async move {
534 let _ = client
535 .request(proto::MoveChannel {
536 channel_id,
537 from,
538 to,
539 })
540 .await?;
541
542 Ok(())
543 })
544 }
545
546 pub fn set_channel_visibility(
547 &mut self,
548 channel_id: ChannelId,
549 visibility: ChannelVisibility,
550 cx: &mut ModelContext<Self>,
551 ) -> Task<Result<()>> {
552 let client = self.client.clone();
553 cx.spawn(|_, _| async move {
554 let _ = client
555 .request(proto::SetChannelVisibility {
556 channel_id,
557 visibility: visibility.into(),
558 })
559 .await?;
560
561 Ok(())
562 })
563 }
564
565 pub fn invite_member(
566 &mut self,
567 channel_id: ChannelId,
568 user_id: UserId,
569 role: proto::ChannelRole,
570 cx: &mut ModelContext<Self>,
571 ) -> Task<Result<()>> {
572 if !self.outgoing_invites.insert((channel_id, user_id)) {
573 return Task::ready(Err(anyhow!("invite request already in progress")));
574 }
575
576 cx.notify();
577 let client = self.client.clone();
578 cx.spawn(|this, mut cx| async move {
579 let result = client
580 .request(proto::InviteChannelMember {
581 channel_id,
582 user_id,
583 role: role.into(),
584 })
585 .await;
586
587 this.update(&mut cx, |this, cx| {
588 this.outgoing_invites.remove(&(channel_id, user_id));
589 cx.notify();
590 });
591
592 result?;
593
594 Ok(())
595 })
596 }
597
598 pub fn remove_member(
599 &mut self,
600 channel_id: ChannelId,
601 user_id: u64,
602 cx: &mut ModelContext<Self>,
603 ) -> Task<Result<()>> {
604 if !self.outgoing_invites.insert((channel_id, user_id)) {
605 return Task::ready(Err(anyhow!("invite request already in progress")));
606 }
607
608 cx.notify();
609 let client = self.client.clone();
610 cx.spawn(|this, mut cx| async move {
611 let result = client
612 .request(proto::RemoveChannelMember {
613 channel_id,
614 user_id,
615 })
616 .await;
617
618 this.update(&mut cx, |this, cx| {
619 this.outgoing_invites.remove(&(channel_id, user_id));
620 cx.notify();
621 });
622 result?;
623 Ok(())
624 })
625 }
626
627 pub fn set_member_role(
628 &mut self,
629 channel_id: ChannelId,
630 user_id: UserId,
631 role: proto::ChannelRole,
632 cx: &mut ModelContext<Self>,
633 ) -> Task<Result<()>> {
634 if !self.outgoing_invites.insert((channel_id, user_id)) {
635 return Task::ready(Err(anyhow!("member request already in progress")));
636 }
637
638 cx.notify();
639 let client = self.client.clone();
640 cx.spawn(|this, mut cx| async move {
641 let result = client
642 .request(proto::SetChannelMemberRole {
643 channel_id,
644 user_id,
645 role: role.into(),
646 })
647 .await;
648
649 this.update(&mut cx, |this, cx| {
650 this.outgoing_invites.remove(&(channel_id, user_id));
651 cx.notify();
652 });
653
654 result?;
655 Ok(())
656 })
657 }
658
659 pub fn rename(
660 &mut self,
661 channel_id: ChannelId,
662 new_name: &str,
663 cx: &mut ModelContext<Self>,
664 ) -> Task<Result<()>> {
665 let client = self.client.clone();
666 let name = new_name.to_string();
667 cx.spawn(|this, mut cx| async move {
668 let channel = client
669 .request(proto::RenameChannel { channel_id, name })
670 .await?
671 .channel
672 .ok_or_else(|| anyhow!("missing channel in response"))?;
673 this.update(&mut cx, |this, cx| {
674 let task = this.update_channels(
675 proto::UpdateChannels {
676 channels: vec![channel],
677 ..Default::default()
678 },
679 cx,
680 );
681 assert!(task.is_none());
682
683 // This event is emitted because the collab panel wants to clear the pending edit state
684 // before this frame is rendered. But we can't guarantee that the collab panel's future
685 // will resolve before this flush_effects finishes. Synchronously emitting this event
686 // ensures that the collab panel will observe this creation before the frame complete
687 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
688 });
689 Ok(())
690 })
691 }
692
693 pub fn respond_to_channel_invite(
694 &mut self,
695 channel_id: ChannelId,
696 accept: bool,
697 ) -> impl Future<Output = Result<()>> {
698 let client = self.client.clone();
699 async move {
700 client
701 .request(proto::RespondToChannelInvite { channel_id, accept })
702 .await?;
703 Ok(())
704 }
705 }
706
707 pub fn get_channel_member_details(
708 &self,
709 channel_id: ChannelId,
710 cx: &mut ModelContext<Self>,
711 ) -> Task<Result<Vec<ChannelMembership>>> {
712 let client = self.client.clone();
713 let user_store = self.user_store.downgrade();
714 cx.spawn(|_, mut cx| async move {
715 let response = client
716 .request(proto::GetChannelMembers { channel_id })
717 .await?;
718
719 let user_ids = response.members.iter().map(|m| m.user_id).collect();
720 let user_store = user_store
721 .upgrade(&cx)
722 .ok_or_else(|| anyhow!("user store dropped"))?;
723 let users = user_store
724 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
725 .await?;
726
727 Ok(users
728 .into_iter()
729 .zip(response.members)
730 .filter_map(|(user, member)| {
731 Some(ChannelMembership {
732 user,
733 role: member.role(),
734 kind: member.kind(),
735 })
736 })
737 .collect())
738 })
739 }
740
741 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
742 let client = self.client.clone();
743 async move {
744 client.request(proto::DeleteChannel { channel_id }).await?;
745 Ok(())
746 }
747 }
748
749 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
750 false
751 }
752
753 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
754 self.outgoing_invites.contains(&(channel_id, user_id))
755 }
756
757 async fn handle_update_channels(
758 this: ModelHandle<Self>,
759 message: TypedEnvelope<proto::UpdateChannels>,
760 _: Arc<Client>,
761 mut cx: AsyncAppContext,
762 ) -> Result<()> {
763 this.update(&mut cx, |this, _| {
764 this.update_channels_tx
765 .unbounded_send(message.payload)
766 .unwrap();
767 });
768 Ok(())
769 }
770
771 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
772 self.disconnect_channel_buffers_task.take();
773
774 for chat in self.opened_chats.values() {
775 if let OpenedModelHandle::Open(chat) = chat {
776 if let Some(chat) = chat.upgrade(cx) {
777 chat.update(cx, |chat, cx| {
778 chat.rejoin(cx);
779 });
780 }
781 }
782 }
783
784 let mut buffer_versions = Vec::new();
785 for buffer in self.opened_buffers.values() {
786 if let OpenedModelHandle::Open(buffer) = buffer {
787 if let Some(buffer) = buffer.upgrade(cx) {
788 let channel_buffer = buffer.read(cx);
789 let buffer = channel_buffer.buffer().read(cx);
790 buffer_versions.push(proto::ChannelBufferVersion {
791 channel_id: channel_buffer.channel().id,
792 epoch: channel_buffer.epoch(),
793 version: language::proto::serialize_version(&buffer.version()),
794 });
795 }
796 }
797 }
798
799 if buffer_versions.is_empty() {
800 return Task::ready(Ok(()));
801 }
802
803 let response = self.client.request(proto::RejoinChannelBuffers {
804 buffers: buffer_versions,
805 });
806
807 cx.spawn(|this, mut cx| async move {
808 let mut response = response.await?;
809
810 this.update(&mut cx, |this, cx| {
811 this.opened_buffers.retain(|_, buffer| match buffer {
812 OpenedModelHandle::Open(channel_buffer) => {
813 let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
814 return false;
815 };
816
817 channel_buffer.update(cx, |channel_buffer, cx| {
818 let channel_id = channel_buffer.channel().id;
819 if let Some(remote_buffer) = response
820 .buffers
821 .iter_mut()
822 .find(|buffer| buffer.channel_id == channel_id)
823 {
824 let channel_id = channel_buffer.channel().id;
825 let remote_version =
826 language::proto::deserialize_version(&remote_buffer.version);
827
828 channel_buffer.replace_collaborators(
829 mem::take(&mut remote_buffer.collaborators),
830 cx,
831 );
832
833 let operations = channel_buffer
834 .buffer()
835 .update(cx, |buffer, cx| {
836 let outgoing_operations =
837 buffer.serialize_ops(Some(remote_version), cx);
838 let incoming_operations =
839 mem::take(&mut remote_buffer.operations)
840 .into_iter()
841 .map(language::proto::deserialize_operation)
842 .collect::<Result<Vec<_>>>()?;
843 buffer.apply_ops(incoming_operations, cx)?;
844 anyhow::Ok(outgoing_operations)
845 })
846 .log_err();
847
848 if let Some(operations) = operations {
849 let client = this.client.clone();
850 cx.background()
851 .spawn(async move {
852 let operations = operations.await;
853 for chunk in
854 language::proto::split_operations(operations)
855 {
856 client
857 .send(proto::UpdateChannelBuffer {
858 channel_id,
859 operations: chunk,
860 })
861 .ok();
862 }
863 })
864 .detach();
865 return true;
866 }
867 }
868
869 channel_buffer.disconnect(cx);
870 false
871 })
872 }
873 OpenedModelHandle::Loading(_) => true,
874 });
875 });
876 anyhow::Ok(())
877 })
878 }
879
880 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
881 self.channel_index.clear();
882 self.channel_invitations.clear();
883 self.channel_participants.clear();
884 self.channels_with_admin_privileges.clear();
885 self.channel_index.clear();
886 self.outgoing_invites.clear();
887 cx.notify();
888
889 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
890 cx.spawn_weak(|this, mut cx| async move {
891 if wait_for_reconnect {
892 cx.background().timer(RECONNECT_TIMEOUT).await;
893 }
894
895 if let Some(this) = this.upgrade(&cx) {
896 this.update(&mut cx, |this, cx| {
897 for (_, buffer) in this.opened_buffers.drain() {
898 if let OpenedModelHandle::Open(buffer) = buffer {
899 if let Some(buffer) = buffer.upgrade(cx) {
900 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
901 }
902 }
903 }
904 });
905 }
906 })
907 });
908 }
909
910 pub(crate) fn update_channels(
911 &mut self,
912 payload: proto::UpdateChannels,
913 cx: &mut ModelContext<ChannelStore>,
914 ) -> Option<Task<Result<()>>> {
915 if !payload.remove_channel_invitations.is_empty() {
916 self.channel_invitations
917 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
918 }
919 for channel in payload.channel_invitations {
920 match self
921 .channel_invitations
922 .binary_search_by_key(&channel.id, |c| c.id)
923 {
924 Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
925 Err(ix) => self.channel_invitations.insert(
926 ix,
927 Arc::new(Channel {
928 id: channel.id,
929 visibility: channel.visibility(),
930 name: channel.name,
931 unseen_note_version: None,
932 unseen_message_id: None,
933 }),
934 ),
935 }
936 }
937
938 let channels_changed = !payload.channels.is_empty()
939 || !payload.delete_channels.is_empty()
940 || !payload.insert_edge.is_empty()
941 || !payload.delete_edge.is_empty()
942 || !payload.unseen_channel_messages.is_empty()
943 || !payload.unseen_channel_buffer_changes.is_empty();
944
945 if channels_changed {
946 if !payload.delete_channels.is_empty() {
947 self.channel_index.delete_channels(&payload.delete_channels);
948 self.channel_participants
949 .retain(|channel_id, _| !payload.delete_channels.contains(channel_id));
950 self.channels_with_admin_privileges
951 .retain(|channel_id| !payload.delete_channels.contains(channel_id));
952
953 for channel_id in &payload.delete_channels {
954 let channel_id = *channel_id;
955 if let Some(OpenedModelHandle::Open(buffer)) =
956 self.opened_buffers.remove(&channel_id)
957 {
958 if let Some(buffer) = buffer.upgrade(cx) {
959 buffer.update(cx, ChannelBuffer::disconnect);
960 }
961 }
962 }
963 }
964
965 let mut index = self.channel_index.bulk_insert();
966 for channel in payload.channels {
967 index.insert(channel)
968 }
969
970 for unseen_buffer_change in payload.unseen_channel_buffer_changes {
971 let version = language::proto::deserialize_version(&unseen_buffer_change.version);
972 index.note_changed(
973 unseen_buffer_change.channel_id,
974 unseen_buffer_change.epoch,
975 &version,
976 );
977 }
978
979 for unseen_channel_message in payload.unseen_channel_messages {
980 index.new_messages(
981 unseen_channel_message.channel_id,
982 unseen_channel_message.message_id,
983 );
984 }
985
986 for edge in payload.insert_edge {
987 index.insert_edge(edge.channel_id, edge.parent_id);
988 }
989
990 for edge in payload.delete_edge {
991 index.delete_edge(edge.parent_id, edge.channel_id);
992 }
993 }
994
995 for permission in payload.channel_permissions {
996 if permission.role() == proto::ChannelRole::Admin {
997 self.channels_with_admin_privileges
998 .insert(permission.channel_id);
999 } else {
1000 self.channels_with_admin_privileges
1001 .remove(&permission.channel_id);
1002 }
1003 }
1004
1005 cx.notify();
1006 if payload.channel_participants.is_empty() {
1007 return None;
1008 }
1009
1010 let mut all_user_ids = Vec::new();
1011 let channel_participants = payload.channel_participants;
1012 for entry in &channel_participants {
1013 for user_id in entry.participant_user_ids.iter() {
1014 if let Err(ix) = all_user_ids.binary_search(user_id) {
1015 all_user_ids.insert(ix, *user_id);
1016 }
1017 }
1018 }
1019
1020 let users = self
1021 .user_store
1022 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1023 Some(cx.spawn(|this, mut cx| async move {
1024 let users = users.await?;
1025
1026 this.update(&mut cx, |this, cx| {
1027 for entry in &channel_participants {
1028 let mut participants: Vec<_> = entry
1029 .participant_user_ids
1030 .iter()
1031 .filter_map(|user_id| {
1032 users
1033 .binary_search_by_key(&user_id, |user| &user.id)
1034 .ok()
1035 .map(|ix| users[ix].clone())
1036 })
1037 .collect();
1038
1039 participants.sort_by_key(|u| u.id);
1040
1041 this.channel_participants
1042 .insert(entry.channel_id, participants);
1043 }
1044
1045 cx.notify();
1046 });
1047 anyhow::Ok(())
1048 }))
1049 }
1050}
1051
1052impl Deref for ChannelPath {
1053 type Target = [ChannelId];
1054
1055 fn deref(&self) -> &Self::Target {
1056 &self.0
1057 }
1058}
1059
1060impl ChannelPath {
1061 pub fn new(path: Arc<[ChannelId]>) -> Self {
1062 debug_assert!(path.len() >= 1);
1063 Self(path)
1064 }
1065
1066 pub fn parent_id(&self) -> Option<ChannelId> {
1067 self.0.len().checked_sub(2).map(|i| self.0[i])
1068 }
1069
1070 pub fn channel_id(&self) -> ChannelId {
1071 self.0[self.0.len() - 1]
1072 }
1073}
1074
1075impl From<ChannelPath> for Cow<'static, ChannelPath> {
1076 fn from(value: ChannelPath) -> Self {
1077 Cow::Owned(value)
1078 }
1079}
1080
1081impl<'a> From<&'a ChannelPath> for Cow<'a, ChannelPath> {
1082 fn from(value: &'a ChannelPath) -> Self {
1083 Cow::Borrowed(value)
1084 }
1085}
1086
1087impl Default for ChannelPath {
1088 fn default() -> Self {
1089 ChannelPath(Arc::from([]))
1090 }
1091}