1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{Client, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use db::RELEASE_CHANNEL;
9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
10use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
11use rpc::{
12 proto::{self, ChannelEdge, ChannelPermission},
13 TypedEnvelope,
14};
15use serde_derive::{Deserialize, Serialize};
16use std::{borrow::Cow, hash::Hash, mem, ops::Deref, sync::Arc, time::Duration};
17use util::ResultExt;
18
19pub fn init(client: &Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
20 let channel_store =
21 cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
22 cx.set_global(channel_store);
23}
24
25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
26
27pub type ChannelId = u64;
28
29pub struct ChannelStore {
30 channel_index: ChannelIndex,
31 channel_invitations: Vec<Arc<Channel>>,
32 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
33 channels_with_admin_privileges: HashSet<ChannelId>,
34 outgoing_invites: HashSet<(ChannelId, UserId)>,
35 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
36 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
37 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
38 client: Arc<Client>,
39 user_store: ModelHandle<UserStore>,
40 _rpc_subscription: Subscription,
41 _watch_connection_status: Task<Option<()>>,
42 disconnect_channel_buffers_task: Option<Task<()>>,
43 _update_channels: Task<()>,
44}
45
46pub type ChannelData = (Channel, ChannelPath);
47
48#[derive(Clone, Debug, PartialEq)]
49pub struct Channel {
50 pub id: ChannelId,
51 pub name: String,
52 pub unseen_note_version: Option<(u64, clock::Global)>,
53 pub unseen_message_id: Option<u64>,
54}
55
56impl Channel {
57 pub fn link(&self) -> String {
58 RELEASE_CHANNEL.link_prefix().to_owned()
59 + "channel/"
60 + &self.slug()
61 + "-"
62 + &self.id.to_string()
63 }
64
65 pub fn slug(&self) -> String {
66 let slug: String = self
67 .name
68 .chars()
69 .map(|c| if c.is_alphanumeric() { c } else { '-' })
70 .collect();
71
72 slug.trim_matches(|c| c == '-').to_string()
73 }
74}
75
76#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
77pub struct ChannelPath(Arc<[ChannelId]>);
78
79pub struct ChannelMembership {
80 pub user: Arc<User>,
81 pub kind: proto::channel_member::Kind,
82 pub admin: bool,
83}
84
85pub enum ChannelEvent {
86 ChannelCreated(ChannelId),
87 ChannelRenamed(ChannelId),
88}
89
90impl Entity for ChannelStore {
91 type Event = ChannelEvent;
92}
93
94enum OpenedModelHandle<E: Entity> {
95 Open(WeakModelHandle<E>),
96 Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
97}
98
99impl ChannelStore {
100 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
101 cx.global::<ModelHandle<Self>>().clone()
102 }
103
104 pub fn new(
105 client: Arc<Client>,
106 user_store: ModelHandle<UserStore>,
107 cx: &mut ModelContext<Self>,
108 ) -> Self {
109 let rpc_subscription =
110 client.add_message_handler(cx.handle(), Self::handle_update_channels);
111
112 let mut connection_status = client.status();
113 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
114 let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
115 while let Some(status) = connection_status.next().await {
116 let this = this.upgrade(&cx)?;
117 match status {
118 client::Status::Connected { .. } => {
119 this.update(&mut cx, |this, cx| this.handle_connect(cx))
120 .await
121 .log_err()?;
122 }
123 client::Status::SignedOut | client::Status::UpgradeRequired => {
124 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx));
125 }
126 _ => {
127 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx));
128 }
129 }
130 if status.is_connected() {
131 } else {
132 }
133 }
134 Some(())
135 });
136
137 Self {
138 channel_invitations: Vec::default(),
139 channel_index: ChannelIndex::default(),
140 channel_participants: Default::default(),
141 channels_with_admin_privileges: Default::default(),
142 outgoing_invites: Default::default(),
143 opened_buffers: Default::default(),
144 opened_chats: Default::default(),
145 update_channels_tx,
146 client,
147 user_store,
148 _rpc_subscription: rpc_subscription,
149 _watch_connection_status: watch_connection_status,
150 disconnect_channel_buffers_task: None,
151 _update_channels: cx.spawn_weak(|this, mut cx| async move {
152 while let Some(update_channels) = update_channels_rx.next().await {
153 if let Some(this) = this.upgrade(&cx) {
154 let update_task = this.update(&mut cx, |this, cx| {
155 this.update_channels(update_channels, cx)
156 });
157 if let Some(update_task) = update_task {
158 update_task.await.log_err();
159 }
160 }
161 }
162 }),
163 }
164 }
165
166 pub fn client(&self) -> Arc<Client> {
167 self.client.clone()
168 }
169
170 pub fn has_children(&self, channel_id: ChannelId) -> bool {
171 self.channel_index.iter().any(|path| {
172 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
173 path.len() > ix + 1
174 } else {
175 false
176 }
177 })
178 }
179
180 /// Returns the number of unique channels in the store
181 pub fn channel_count(&self) -> usize {
182 self.channel_index.by_id().len()
183 }
184
185 /// Returns the index of a channel ID in the list of unique channels
186 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
187 self.channel_index
188 .by_id()
189 .keys()
190 .position(|id| *id == channel_id)
191 }
192
193 /// Returns an iterator over all unique channels
194 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
195 self.channel_index.by_id().values()
196 }
197
198 /// Iterate over all entries in the channel DAG
199 pub fn channel_dag_entries(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
200 self.channel_index.iter().map(move |path| {
201 let id = path.last().unwrap();
202 let channel = self.channel_for_id(*id).unwrap();
203 (path.len() - 1, channel)
204 })
205 }
206
207 pub fn channel_dag_entry_at(&self, ix: usize) -> Option<(&Arc<Channel>, &ChannelPath)> {
208 let path = self.channel_index.get(ix)?;
209 let id = path.last().unwrap();
210 let channel = self.channel_for_id(*id).unwrap();
211
212 Some((channel, path))
213 }
214
215 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
216 self.channel_index.by_id().values().nth(ix)
217 }
218
219 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
220 &self.channel_invitations
221 }
222
223 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
224 self.channel_index.by_id().get(&channel_id)
225 }
226
227 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
228 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
229 if let OpenedModelHandle::Open(buffer) = buffer {
230 return buffer.upgrade(cx).is_some();
231 }
232 }
233 false
234 }
235
236 pub fn open_channel_buffer(
237 &mut self,
238 channel_id: ChannelId,
239 cx: &mut ModelContext<Self>,
240 ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
241 let client = self.client.clone();
242 let user_store = self.user_store.clone();
243 self.open_channel_resource(
244 channel_id,
245 |this| &mut this.opened_buffers,
246 |channel, cx| ChannelBuffer::new(channel, client, user_store, cx),
247 cx,
248 )
249 }
250
251 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
252 self.channel_index
253 .by_id()
254 .get(&channel_id)
255 .map(|channel| channel.unseen_note_version.is_some())
256 }
257
258 pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
259 self.channel_index
260 .by_id()
261 .get(&channel_id)
262 .map(|channel| channel.unseen_message_id.is_some())
263 }
264
265 pub fn notes_changed(
266 &mut self,
267 channel_id: ChannelId,
268 epoch: u64,
269 version: &clock::Global,
270 cx: &mut ModelContext<Self>,
271 ) {
272 self.channel_index.note_changed(channel_id, epoch, version);
273 cx.notify();
274 }
275
276 pub fn new_message(
277 &mut self,
278 channel_id: ChannelId,
279 message_id: u64,
280 cx: &mut ModelContext<Self>,
281 ) {
282 self.channel_index.new_message(channel_id, message_id);
283 cx.notify();
284 }
285
286 pub fn acknowledge_message_id(
287 &mut self,
288 channel_id: ChannelId,
289 message_id: u64,
290 cx: &mut ModelContext<Self>,
291 ) {
292 self.channel_index
293 .acknowledge_message_id(channel_id, message_id);
294 cx.notify();
295 }
296
297 pub fn acknowledge_notes_version(
298 &mut self,
299 channel_id: ChannelId,
300 epoch: u64,
301 version: &clock::Global,
302 cx: &mut ModelContext<Self>,
303 ) {
304 self.channel_index
305 .acknowledge_note_version(channel_id, epoch, version);
306 cx.notify();
307 }
308
309 pub fn open_channel_chat(
310 &mut self,
311 channel_id: ChannelId,
312 cx: &mut ModelContext<Self>,
313 ) -> Task<Result<ModelHandle<ChannelChat>>> {
314 let client = self.client.clone();
315 let user_store = self.user_store.clone();
316 let this = cx.handle();
317 self.open_channel_resource(
318 channel_id,
319 |this| &mut this.opened_chats,
320 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
321 cx,
322 )
323 }
324
325 /// Asynchronously open a given resource associated with a channel.
326 ///
327 /// Make sure that the resource is only opened once, even if this method
328 /// is called multiple times with the same channel id while the first task
329 /// is still running.
330 fn open_channel_resource<T: Entity, F, Fut>(
331 &mut self,
332 channel_id: ChannelId,
333 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
334 load: F,
335 cx: &mut ModelContext<Self>,
336 ) -> Task<Result<ModelHandle<T>>>
337 where
338 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
339 Fut: Future<Output = Result<ModelHandle<T>>>,
340 {
341 let task = loop {
342 match get_map(self).entry(channel_id) {
343 hash_map::Entry::Occupied(e) => match e.get() {
344 OpenedModelHandle::Open(model) => {
345 if let Some(model) = model.upgrade(cx) {
346 break Task::ready(Ok(model)).shared();
347 } else {
348 get_map(self).remove(&channel_id);
349 continue;
350 }
351 }
352 OpenedModelHandle::Loading(task) => {
353 break task.clone();
354 }
355 },
356 hash_map::Entry::Vacant(e) => {
357 let task = cx
358 .spawn(|this, cx| async move {
359 let channel = this.read_with(&cx, |this, _| {
360 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
361 Arc::new(anyhow!("no channel for id: {}", channel_id))
362 })
363 })?;
364
365 load(channel, cx).await.map_err(Arc::new)
366 })
367 .shared();
368
369 e.insert(OpenedModelHandle::Loading(task.clone()));
370 cx.spawn({
371 let task = task.clone();
372 |this, mut cx| async move {
373 let result = task.await;
374 this.update(&mut cx, |this, _| match result {
375 Ok(model) => {
376 get_map(this).insert(
377 channel_id,
378 OpenedModelHandle::Open(model.downgrade()),
379 );
380 }
381 Err(_) => {
382 get_map(this).remove(&channel_id);
383 }
384 });
385 }
386 })
387 .detach();
388 break task;
389 }
390 }
391 };
392 cx.foreground()
393 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
394 }
395
396 pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
397 self.channel_index.iter().any(|path| {
398 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
399 path[..=ix]
400 .iter()
401 .any(|id| self.channels_with_admin_privileges.contains(id))
402 } else {
403 false
404 }
405 })
406 }
407
408 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
409 self.channel_participants
410 .get(&channel_id)
411 .map_or(&[], |v| v.as_slice())
412 }
413
414 pub fn create_channel(
415 &self,
416 name: &str,
417 parent_id: Option<ChannelId>,
418 cx: &mut ModelContext<Self>,
419 ) -> Task<Result<ChannelId>> {
420 let client = self.client.clone();
421 let name = name.trim_start_matches("#").to_owned();
422 cx.spawn(|this, mut cx| async move {
423 let response = client
424 .request(proto::CreateChannel { name, parent_id })
425 .await?;
426
427 let channel = response
428 .channel
429 .ok_or_else(|| anyhow!("missing channel in response"))?;
430 let channel_id = channel.id;
431
432 let parent_edge = if let Some(parent_id) = parent_id {
433 vec![ChannelEdge {
434 channel_id: channel.id,
435 parent_id,
436 }]
437 } else {
438 vec![]
439 };
440
441 this.update(&mut cx, |this, cx| {
442 let task = this.update_channels(
443 proto::UpdateChannels {
444 channels: vec![channel],
445 insert_edge: parent_edge,
446 channel_permissions: vec![ChannelPermission {
447 channel_id,
448 is_admin: true,
449 }],
450 ..Default::default()
451 },
452 cx,
453 );
454 assert!(task.is_none());
455
456 // This event is emitted because the collab panel wants to clear the pending edit state
457 // before this frame is rendered. But we can't guarantee that the collab panel's future
458 // will resolve before this flush_effects finishes. Synchronously emitting this event
459 // ensures that the collab panel will observe this creation before the frame completes
460 cx.emit(ChannelEvent::ChannelCreated(channel_id));
461 });
462
463 Ok(channel_id)
464 })
465 }
466
467 pub fn link_channel(
468 &mut self,
469 channel_id: ChannelId,
470 to: ChannelId,
471 cx: &mut ModelContext<Self>,
472 ) -> Task<Result<()>> {
473 let client = self.client.clone();
474 cx.spawn(|_, _| async move {
475 let _ = client
476 .request(proto::LinkChannel { channel_id, to })
477 .await?;
478
479 Ok(())
480 })
481 }
482
483 pub fn unlink_channel(
484 &mut self,
485 channel_id: ChannelId,
486 from: ChannelId,
487 cx: &mut ModelContext<Self>,
488 ) -> Task<Result<()>> {
489 let client = self.client.clone();
490 cx.spawn(|_, _| async move {
491 let _ = client
492 .request(proto::UnlinkChannel { channel_id, from })
493 .await?;
494
495 Ok(())
496 })
497 }
498
499 pub fn move_channel(
500 &mut self,
501 channel_id: ChannelId,
502 from: ChannelId,
503 to: ChannelId,
504 cx: &mut ModelContext<Self>,
505 ) -> Task<Result<()>> {
506 let client = self.client.clone();
507 cx.spawn(|_, _| async move {
508 let _ = client
509 .request(proto::MoveChannel {
510 channel_id,
511 from,
512 to,
513 })
514 .await?;
515
516 Ok(())
517 })
518 }
519
520 pub fn invite_member(
521 &mut self,
522 channel_id: ChannelId,
523 user_id: UserId,
524 admin: bool,
525 cx: &mut ModelContext<Self>,
526 ) -> Task<Result<()>> {
527 if !self.outgoing_invites.insert((channel_id, user_id)) {
528 return Task::ready(Err(anyhow!("invite request already in progress")));
529 }
530
531 cx.notify();
532 let client = self.client.clone();
533 cx.spawn(|this, mut cx| async move {
534 let result = client
535 .request(proto::InviteChannelMember {
536 channel_id,
537 user_id,
538 admin,
539 })
540 .await;
541
542 this.update(&mut cx, |this, cx| {
543 this.outgoing_invites.remove(&(channel_id, user_id));
544 cx.notify();
545 });
546
547 result?;
548
549 Ok(())
550 })
551 }
552
553 pub fn remove_member(
554 &mut self,
555 channel_id: ChannelId,
556 user_id: u64,
557 cx: &mut ModelContext<Self>,
558 ) -> Task<Result<()>> {
559 if !self.outgoing_invites.insert((channel_id, user_id)) {
560 return Task::ready(Err(anyhow!("invite request already in progress")));
561 }
562
563 cx.notify();
564 let client = self.client.clone();
565 cx.spawn(|this, mut cx| async move {
566 let result = client
567 .request(proto::RemoveChannelMember {
568 channel_id,
569 user_id,
570 })
571 .await;
572
573 this.update(&mut cx, |this, cx| {
574 this.outgoing_invites.remove(&(channel_id, user_id));
575 cx.notify();
576 });
577 result?;
578 Ok(())
579 })
580 }
581
582 pub fn set_member_admin(
583 &mut self,
584 channel_id: ChannelId,
585 user_id: UserId,
586 admin: bool,
587 cx: &mut ModelContext<Self>,
588 ) -> Task<Result<()>> {
589 if !self.outgoing_invites.insert((channel_id, user_id)) {
590 return Task::ready(Err(anyhow!("member request already in progress")));
591 }
592
593 cx.notify();
594 let client = self.client.clone();
595 cx.spawn(|this, mut cx| async move {
596 let result = client
597 .request(proto::SetChannelMemberAdmin {
598 channel_id,
599 user_id,
600 admin,
601 })
602 .await;
603
604 this.update(&mut cx, |this, cx| {
605 this.outgoing_invites.remove(&(channel_id, user_id));
606 cx.notify();
607 });
608
609 result?;
610 Ok(())
611 })
612 }
613
614 pub fn rename(
615 &mut self,
616 channel_id: ChannelId,
617 new_name: &str,
618 cx: &mut ModelContext<Self>,
619 ) -> Task<Result<()>> {
620 let client = self.client.clone();
621 let name = new_name.to_string();
622 cx.spawn(|this, mut cx| async move {
623 let channel = client
624 .request(proto::RenameChannel { channel_id, name })
625 .await?
626 .channel
627 .ok_or_else(|| anyhow!("missing channel in response"))?;
628 this.update(&mut cx, |this, cx| {
629 let task = this.update_channels(
630 proto::UpdateChannels {
631 channels: vec![channel],
632 ..Default::default()
633 },
634 cx,
635 );
636 assert!(task.is_none());
637
638 // This event is emitted because the collab panel wants to clear the pending edit state
639 // before this frame is rendered. But we can't guarantee that the collab panel's future
640 // will resolve before this flush_effects finishes. Synchronously emitting this event
641 // ensures that the collab panel will observe this creation before the frame complete
642 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
643 });
644 Ok(())
645 })
646 }
647
648 pub fn respond_to_channel_invite(
649 &mut self,
650 channel_id: ChannelId,
651 accept: bool,
652 ) -> impl Future<Output = Result<()>> {
653 let client = self.client.clone();
654 async move {
655 client
656 .request(proto::RespondToChannelInvite { channel_id, accept })
657 .await?;
658 Ok(())
659 }
660 }
661
662 pub fn get_channel_member_details(
663 &self,
664 channel_id: ChannelId,
665 cx: &mut ModelContext<Self>,
666 ) -> Task<Result<Vec<ChannelMembership>>> {
667 let client = self.client.clone();
668 let user_store = self.user_store.downgrade();
669 cx.spawn(|_, mut cx| async move {
670 let response = client
671 .request(proto::GetChannelMembers { channel_id })
672 .await?;
673
674 let user_ids = response.members.iter().map(|m| m.user_id).collect();
675 let user_store = user_store
676 .upgrade(&cx)
677 .ok_or_else(|| anyhow!("user store dropped"))?;
678 let users = user_store
679 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
680 .await?;
681
682 Ok(users
683 .into_iter()
684 .zip(response.members)
685 .filter_map(|(user, member)| {
686 Some(ChannelMembership {
687 user,
688 admin: member.admin,
689 kind: proto::channel_member::Kind::from_i32(member.kind)?,
690 })
691 })
692 .collect())
693 })
694 }
695
696 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
697 let client = self.client.clone();
698 async move {
699 client.request(proto::DeleteChannel { channel_id }).await?;
700 Ok(())
701 }
702 }
703
704 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
705 false
706 }
707
708 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
709 self.outgoing_invites.contains(&(channel_id, user_id))
710 }
711
712 async fn handle_update_channels(
713 this: ModelHandle<Self>,
714 message: TypedEnvelope<proto::UpdateChannels>,
715 _: Arc<Client>,
716 mut cx: AsyncAppContext,
717 ) -> Result<()> {
718 this.update(&mut cx, |this, _| {
719 this.update_channels_tx
720 .unbounded_send(message.payload)
721 .unwrap();
722 });
723 Ok(())
724 }
725
726 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
727 self.disconnect_channel_buffers_task.take();
728
729 for chat in self.opened_chats.values() {
730 if let OpenedModelHandle::Open(chat) = chat {
731 if let Some(chat) = chat.upgrade(cx) {
732 chat.update(cx, |chat, cx| {
733 chat.rejoin(cx);
734 });
735 }
736 }
737 }
738
739 let mut buffer_versions = Vec::new();
740 for buffer in self.opened_buffers.values() {
741 if let OpenedModelHandle::Open(buffer) = buffer {
742 if let Some(buffer) = buffer.upgrade(cx) {
743 let channel_buffer = buffer.read(cx);
744 let buffer = channel_buffer.buffer().read(cx);
745 buffer_versions.push(proto::ChannelBufferVersion {
746 channel_id: channel_buffer.channel().id,
747 epoch: channel_buffer.epoch(),
748 version: language::proto::serialize_version(&buffer.version()),
749 });
750 }
751 }
752 }
753
754 if buffer_versions.is_empty() {
755 return Task::ready(Ok(()));
756 }
757
758 let response = self.client.request(proto::RejoinChannelBuffers {
759 buffers: buffer_versions,
760 });
761
762 cx.spawn(|this, mut cx| async move {
763 let mut response = response.await?;
764
765 this.update(&mut cx, |this, cx| {
766 this.opened_buffers.retain(|_, buffer| match buffer {
767 OpenedModelHandle::Open(channel_buffer) => {
768 let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
769 return false;
770 };
771
772 channel_buffer.update(cx, |channel_buffer, cx| {
773 let channel_id = channel_buffer.channel().id;
774 if let Some(remote_buffer) = response
775 .buffers
776 .iter_mut()
777 .find(|buffer| buffer.channel_id == channel_id)
778 {
779 let channel_id = channel_buffer.channel().id;
780 let remote_version =
781 language::proto::deserialize_version(&remote_buffer.version);
782
783 channel_buffer.replace_collaborators(
784 mem::take(&mut remote_buffer.collaborators),
785 cx,
786 );
787
788 let operations = channel_buffer
789 .buffer()
790 .update(cx, |buffer, cx| {
791 let outgoing_operations =
792 buffer.serialize_ops(Some(remote_version), cx);
793 let incoming_operations =
794 mem::take(&mut remote_buffer.operations)
795 .into_iter()
796 .map(language::proto::deserialize_operation)
797 .collect::<Result<Vec<_>>>()?;
798 buffer.apply_ops(incoming_operations, cx)?;
799 anyhow::Ok(outgoing_operations)
800 })
801 .log_err();
802
803 if let Some(operations) = operations {
804 let client = this.client.clone();
805 cx.background()
806 .spawn(async move {
807 let operations = operations.await;
808 for chunk in
809 language::proto::split_operations(operations)
810 {
811 client
812 .send(proto::UpdateChannelBuffer {
813 channel_id,
814 operations: chunk,
815 })
816 .ok();
817 }
818 })
819 .detach();
820 return true;
821 }
822 }
823
824 channel_buffer.disconnect(cx);
825 false
826 })
827 }
828 OpenedModelHandle::Loading(_) => true,
829 });
830 });
831 anyhow::Ok(())
832 })
833 }
834
835 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
836 self.channel_index.clear();
837 self.channel_invitations.clear();
838 self.channel_participants.clear();
839 self.channels_with_admin_privileges.clear();
840 self.channel_index.clear();
841 self.outgoing_invites.clear();
842 cx.notify();
843
844 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
845 cx.spawn_weak(|this, mut cx| async move {
846 if wait_for_reconnect {
847 cx.background().timer(RECONNECT_TIMEOUT).await;
848 }
849
850 if let Some(this) = this.upgrade(&cx) {
851 this.update(&mut cx, |this, cx| {
852 for (_, buffer) in this.opened_buffers.drain() {
853 if let OpenedModelHandle::Open(buffer) = buffer {
854 if let Some(buffer) = buffer.upgrade(cx) {
855 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
856 }
857 }
858 }
859 });
860 }
861 })
862 });
863 }
864
865 pub(crate) fn update_channels(
866 &mut self,
867 payload: proto::UpdateChannels,
868 cx: &mut ModelContext<ChannelStore>,
869 ) -> Option<Task<Result<()>>> {
870 if !payload.remove_channel_invitations.is_empty() {
871 self.channel_invitations
872 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
873 }
874 for channel in payload.channel_invitations {
875 match self
876 .channel_invitations
877 .binary_search_by_key(&channel.id, |c| c.id)
878 {
879 Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
880 Err(ix) => self.channel_invitations.insert(
881 ix,
882 Arc::new(Channel {
883 id: channel.id,
884 name: channel.name,
885 unseen_note_version: None,
886 unseen_message_id: None,
887 }),
888 ),
889 }
890 }
891
892 let channels_changed = !payload.channels.is_empty()
893 || !payload.delete_channels.is_empty()
894 || !payload.insert_edge.is_empty()
895 || !payload.delete_edge.is_empty()
896 || !payload.unseen_channel_messages.is_empty()
897 || !payload.unseen_channel_buffer_changes.is_empty();
898
899 if channels_changed {
900 if !payload.delete_channels.is_empty() {
901 self.channel_index.delete_channels(&payload.delete_channels);
902 self.channel_participants
903 .retain(|channel_id, _| !payload.delete_channels.contains(channel_id));
904 self.channels_with_admin_privileges
905 .retain(|channel_id| !payload.delete_channels.contains(channel_id));
906
907 for channel_id in &payload.delete_channels {
908 let channel_id = *channel_id;
909 if let Some(OpenedModelHandle::Open(buffer)) =
910 self.opened_buffers.remove(&channel_id)
911 {
912 if let Some(buffer) = buffer.upgrade(cx) {
913 buffer.update(cx, ChannelBuffer::disconnect);
914 }
915 }
916 }
917 }
918
919 let mut index = self.channel_index.bulk_insert();
920 for channel in payload.channels {
921 index.insert(channel)
922 }
923
924 for unseen_buffer_change in payload.unseen_channel_buffer_changes {
925 let version = language::proto::deserialize_version(&unseen_buffer_change.version);
926 index.note_changed(
927 unseen_buffer_change.channel_id,
928 unseen_buffer_change.epoch,
929 &version,
930 );
931 }
932
933 for unseen_channel_message in payload.unseen_channel_messages {
934 index.new_messages(
935 unseen_channel_message.channel_id,
936 unseen_channel_message.message_id,
937 );
938 }
939
940 for edge in payload.insert_edge {
941 index.insert_edge(edge.channel_id, edge.parent_id);
942 }
943
944 for edge in payload.delete_edge {
945 index.delete_edge(edge.parent_id, edge.channel_id);
946 }
947 }
948
949 for permission in payload.channel_permissions {
950 if permission.is_admin {
951 self.channels_with_admin_privileges
952 .insert(permission.channel_id);
953 } else {
954 self.channels_with_admin_privileges
955 .remove(&permission.channel_id);
956 }
957 }
958
959 cx.notify();
960 if payload.channel_participants.is_empty() {
961 return None;
962 }
963
964 let mut all_user_ids = Vec::new();
965 let channel_participants = payload.channel_participants;
966 for entry in &channel_participants {
967 for user_id in entry.participant_user_ids.iter() {
968 if let Err(ix) = all_user_ids.binary_search(user_id) {
969 all_user_ids.insert(ix, *user_id);
970 }
971 }
972 }
973
974 let users = self
975 .user_store
976 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
977 Some(cx.spawn(|this, mut cx| async move {
978 let users = users.await?;
979
980 this.update(&mut cx, |this, cx| {
981 for entry in &channel_participants {
982 let mut participants: Vec<_> = entry
983 .participant_user_ids
984 .iter()
985 .filter_map(|user_id| {
986 users
987 .binary_search_by_key(&user_id, |user| &user.id)
988 .ok()
989 .map(|ix| users[ix].clone())
990 })
991 .collect();
992
993 participants.sort_by_key(|u| u.id);
994
995 this.channel_participants
996 .insert(entry.channel_id, participants);
997 }
998
999 cx.notify();
1000 });
1001 anyhow::Ok(())
1002 }))
1003 }
1004}
1005
1006impl Deref for ChannelPath {
1007 type Target = [ChannelId];
1008
1009 fn deref(&self) -> &Self::Target {
1010 &self.0
1011 }
1012}
1013
1014impl ChannelPath {
1015 pub fn new(path: Arc<[ChannelId]>) -> Self {
1016 debug_assert!(path.len() >= 1);
1017 Self(path)
1018 }
1019
1020 pub fn parent_id(&self) -> Option<ChannelId> {
1021 self.0.len().checked_sub(2).map(|i| self.0[i])
1022 }
1023
1024 pub fn channel_id(&self) -> ChannelId {
1025 self.0[self.0.len() - 1]
1026 }
1027}
1028
1029impl From<ChannelPath> for Cow<'static, ChannelPath> {
1030 fn from(value: ChannelPath) -> Self {
1031 Cow::Owned(value)
1032 }
1033}
1034
1035impl<'a> From<&'a ChannelPath> for Cow<'a, ChannelPath> {
1036 fn from(value: &'a ChannelPath) -> Self {
1037 Cow::Borrowed(value)
1038 }
1039}
1040
1041impl Default for ChannelPath {
1042 fn default() -> Self {
1043 ChannelPath(Arc::from([]))
1044 }
1045}