1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{Client, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use db::RELEASE_CHANNEL;
9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
10use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
11use rpc::{
12 proto::{self, ChannelEdge, ChannelPermission},
13 TypedEnvelope,
14};
15use serde_derive::{Deserialize, Serialize};
16use std::{borrow::Cow, hash::Hash, mem, ops::Deref, sync::Arc, time::Duration};
17use util::ResultExt;
18
19pub fn init(client: &Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
20 let channel_store =
21 cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
22 cx.set_global(channel_store);
23}
24
25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
26
27pub type ChannelId = u64;
28
29pub struct ChannelStore {
30 channel_index: ChannelIndex,
31 channel_invitations: Vec<Arc<Channel>>,
32 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
33 channels_with_admin_privileges: HashSet<ChannelId>,
34 outgoing_invites: HashSet<(ChannelId, UserId)>,
35 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
36 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
37 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
38 client: Arc<Client>,
39 user_store: ModelHandle<UserStore>,
40 _rpc_subscription: Subscription,
41 _watch_connection_status: Task<Option<()>>,
42 disconnect_channel_buffers_task: Option<Task<()>>,
43 _update_channels: Task<()>,
44}
45
46pub type ChannelData = (Channel, ChannelPath);
47
48#[derive(Clone, Debug, PartialEq)]
49pub struct Channel {
50 pub id: ChannelId,
51 pub name: String,
52 pub unseen_note_version: Option<(u64, clock::Global)>,
53 pub unseen_message_id: Option<u64>,
54}
55
56impl Channel {
57 pub fn link(&self) -> String {
58 RELEASE_CHANNEL.link_prefix().to_owned()
59 + "channel/"
60 + &self.slug()
61 + "-"
62 + &self.id.to_string()
63 }
64
65 pub fn slug(&self) -> String {
66 let slug: String = self
67 .name
68 .chars()
69 .map(|c| if c.is_alphanumeric() { c } else { '-' })
70 .collect();
71
72 slug.trim_matches(|c| c == '-').to_string()
73 }
74}
75
76#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
77pub struct ChannelPath(Arc<[ChannelId]>);
78
79pub struct ChannelMembership {
80 pub user: Arc<User>,
81 pub kind: proto::channel_member::Kind,
82 pub admin: bool,
83}
84
85pub enum ChannelEvent {
86 ChannelCreated(ChannelId),
87 ChannelRenamed(ChannelId),
88}
89
90impl Entity for ChannelStore {
91 type Event = ChannelEvent;
92}
93
94enum OpenedModelHandle<E: Entity> {
95 Open(WeakModelHandle<E>),
96 Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
97}
98
99impl ChannelStore {
100 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
101 cx.global::<ModelHandle<Self>>().clone()
102 }
103
104 pub fn new(
105 client: Arc<Client>,
106 user_store: ModelHandle<UserStore>,
107 cx: &mut ModelContext<Self>,
108 ) -> Self {
109 let rpc_subscription =
110 client.add_message_handler(cx.handle(), Self::handle_update_channels);
111
112 let mut connection_status = client.status();
113 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
114 let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
115 while let Some(status) = connection_status.next().await {
116 let this = this.upgrade(&cx)?;
117 if status.is_connected() {
118 this.update(&mut cx, |this, cx| this.handle_connect(cx))
119 .await
120 .log_err()?;
121 } else {
122 this.update(&mut cx, |this, cx| this.handle_disconnect(cx));
123 }
124 }
125 Some(())
126 });
127
128 Self {
129 channel_invitations: Vec::default(),
130 channel_index: ChannelIndex::default(),
131 channel_participants: Default::default(),
132 channels_with_admin_privileges: Default::default(),
133 outgoing_invites: Default::default(),
134 opened_buffers: Default::default(),
135 opened_chats: Default::default(),
136 update_channels_tx,
137 client,
138 user_store,
139 _rpc_subscription: rpc_subscription,
140 _watch_connection_status: watch_connection_status,
141 disconnect_channel_buffers_task: None,
142 _update_channels: cx.spawn_weak(|this, mut cx| async move {
143 while let Some(update_channels) = update_channels_rx.next().await {
144 if let Some(this) = this.upgrade(&cx) {
145 let update_task = this.update(&mut cx, |this, cx| {
146 this.update_channels(update_channels, cx)
147 });
148 if let Some(update_task) = update_task {
149 update_task.await.log_err();
150 }
151 }
152 }
153 }),
154 }
155 }
156
157 pub fn client(&self) -> Arc<Client> {
158 self.client.clone()
159 }
160
161 pub fn has_children(&self, channel_id: ChannelId) -> bool {
162 self.channel_index.iter().any(|path| {
163 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
164 path.len() > ix + 1
165 } else {
166 false
167 }
168 })
169 }
170
171 /// Returns the number of unique channels in the store
172 pub fn channel_count(&self) -> usize {
173 self.channel_index.by_id().len()
174 }
175
176 /// Returns the index of a channel ID in the list of unique channels
177 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
178 self.channel_index
179 .by_id()
180 .keys()
181 .position(|id| *id == channel_id)
182 }
183
184 /// Returns an iterator over all unique channels
185 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
186 self.channel_index.by_id().values()
187 }
188
189 /// Iterate over all entries in the channel DAG
190 pub fn channel_dag_entries(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
191 self.channel_index.iter().map(move |path| {
192 let id = path.last().unwrap();
193 let channel = self.channel_for_id(*id).unwrap();
194 (path.len() - 1, channel)
195 })
196 }
197
198 pub fn channel_dag_entry_at(&self, ix: usize) -> Option<(&Arc<Channel>, &ChannelPath)> {
199 let path = self.channel_index.get(ix)?;
200 let id = path.last().unwrap();
201 let channel = self.channel_for_id(*id).unwrap();
202
203 Some((channel, path))
204 }
205
206 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
207 self.channel_index.by_id().values().nth(ix)
208 }
209
210 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
211 &self.channel_invitations
212 }
213
214 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
215 self.channel_index.by_id().get(&channel_id)
216 }
217
218 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
219 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
220 if let OpenedModelHandle::Open(buffer) = buffer {
221 return buffer.upgrade(cx).is_some();
222 }
223 }
224 false
225 }
226
227 pub fn open_channel_buffer(
228 &mut self,
229 channel_id: ChannelId,
230 cx: &mut ModelContext<Self>,
231 ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
232 let client = self.client.clone();
233 let user_store = self.user_store.clone();
234 self.open_channel_resource(
235 channel_id,
236 |this| &mut this.opened_buffers,
237 |channel, cx| ChannelBuffer::new(channel, client, user_store, cx),
238 cx,
239 )
240 }
241
242 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
243 self.channel_index
244 .by_id()
245 .get(&channel_id)
246 .map(|channel| channel.unseen_note_version.is_some())
247 }
248
249 pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
250 self.channel_index
251 .by_id()
252 .get(&channel_id)
253 .map(|channel| channel.unseen_message_id.is_some())
254 }
255
256 pub fn notes_changed(
257 &mut self,
258 channel_id: ChannelId,
259 epoch: u64,
260 version: &clock::Global,
261 cx: &mut ModelContext<Self>,
262 ) {
263 self.channel_index.note_changed(channel_id, epoch, version);
264 cx.notify();
265 }
266
267 pub fn new_message(
268 &mut self,
269 channel_id: ChannelId,
270 message_id: u64,
271 cx: &mut ModelContext<Self>,
272 ) {
273 self.channel_index.new_message(channel_id, message_id);
274 cx.notify();
275 }
276
277 pub fn acknowledge_message_id(
278 &mut self,
279 channel_id: ChannelId,
280 message_id: u64,
281 cx: &mut ModelContext<Self>,
282 ) {
283 self.channel_index
284 .acknowledge_message_id(channel_id, message_id);
285 cx.notify();
286 }
287
288 pub fn acknowledge_notes_version(
289 &mut self,
290 channel_id: ChannelId,
291 epoch: u64,
292 version: &clock::Global,
293 cx: &mut ModelContext<Self>,
294 ) {
295 self.channel_index
296 .acknowledge_note_version(channel_id, epoch, version);
297 cx.notify();
298 }
299
300 pub fn open_channel_chat(
301 &mut self,
302 channel_id: ChannelId,
303 cx: &mut ModelContext<Self>,
304 ) -> Task<Result<ModelHandle<ChannelChat>>> {
305 let client = self.client.clone();
306 let user_store = self.user_store.clone();
307 let this = cx.handle();
308 self.open_channel_resource(
309 channel_id,
310 |this| &mut this.opened_chats,
311 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
312 cx,
313 )
314 }
315
316 /// Asynchronously open a given resource associated with a channel.
317 ///
318 /// Make sure that the resource is only opened once, even if this method
319 /// is called multiple times with the same channel id while the first task
320 /// is still running.
321 fn open_channel_resource<T: Entity, F, Fut>(
322 &mut self,
323 channel_id: ChannelId,
324 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
325 load: F,
326 cx: &mut ModelContext<Self>,
327 ) -> Task<Result<ModelHandle<T>>>
328 where
329 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
330 Fut: Future<Output = Result<ModelHandle<T>>>,
331 {
332 let task = loop {
333 match get_map(self).entry(channel_id) {
334 hash_map::Entry::Occupied(e) => match e.get() {
335 OpenedModelHandle::Open(model) => {
336 if let Some(model) = model.upgrade(cx) {
337 break Task::ready(Ok(model)).shared();
338 } else {
339 get_map(self).remove(&channel_id);
340 continue;
341 }
342 }
343 OpenedModelHandle::Loading(task) => {
344 break task.clone();
345 }
346 },
347 hash_map::Entry::Vacant(e) => {
348 let task = cx
349 .spawn(|this, cx| async move {
350 let channel = this.read_with(&cx, |this, _| {
351 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
352 Arc::new(anyhow!("no channel for id: {}", channel_id))
353 })
354 })?;
355
356 load(channel, cx).await.map_err(Arc::new)
357 })
358 .shared();
359
360 e.insert(OpenedModelHandle::Loading(task.clone()));
361 cx.spawn({
362 let task = task.clone();
363 |this, mut cx| async move {
364 let result = task.await;
365 this.update(&mut cx, |this, _| match result {
366 Ok(model) => {
367 get_map(this).insert(
368 channel_id,
369 OpenedModelHandle::Open(model.downgrade()),
370 );
371 }
372 Err(_) => {
373 get_map(this).remove(&channel_id);
374 }
375 });
376 }
377 })
378 .detach();
379 break task;
380 }
381 }
382 };
383 cx.foreground()
384 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
385 }
386
387 pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
388 self.channel_index.iter().any(|path| {
389 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
390 path[..=ix]
391 .iter()
392 .any(|id| self.channels_with_admin_privileges.contains(id))
393 } else {
394 false
395 }
396 })
397 }
398
399 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
400 self.channel_participants
401 .get(&channel_id)
402 .map_or(&[], |v| v.as_slice())
403 }
404
405 pub fn create_channel(
406 &self,
407 name: &str,
408 parent_id: Option<ChannelId>,
409 cx: &mut ModelContext<Self>,
410 ) -> Task<Result<ChannelId>> {
411 let client = self.client.clone();
412 let name = name.trim_start_matches("#").to_owned();
413 cx.spawn(|this, mut cx| async move {
414 let response = client
415 .request(proto::CreateChannel { name, parent_id })
416 .await?;
417
418 let channel = response
419 .channel
420 .ok_or_else(|| anyhow!("missing channel in response"))?;
421 let channel_id = channel.id;
422
423 let parent_edge = if let Some(parent_id) = parent_id {
424 vec![ChannelEdge {
425 channel_id: channel.id,
426 parent_id,
427 }]
428 } else {
429 vec![]
430 };
431
432 this.update(&mut cx, |this, cx| {
433 let task = this.update_channels(
434 proto::UpdateChannels {
435 channels: vec![channel],
436 insert_edge: parent_edge,
437 channel_permissions: vec![ChannelPermission {
438 channel_id,
439 is_admin: true,
440 }],
441 ..Default::default()
442 },
443 cx,
444 );
445 assert!(task.is_none());
446
447 // This event is emitted because the collab panel wants to clear the pending edit state
448 // before this frame is rendered. But we can't guarantee that the collab panel's future
449 // will resolve before this flush_effects finishes. Synchronously emitting this event
450 // ensures that the collab panel will observe this creation before the frame completes
451 cx.emit(ChannelEvent::ChannelCreated(channel_id));
452 });
453
454 Ok(channel_id)
455 })
456 }
457
458 pub fn link_channel(
459 &mut self,
460 channel_id: ChannelId,
461 to: ChannelId,
462 cx: &mut ModelContext<Self>,
463 ) -> Task<Result<()>> {
464 let client = self.client.clone();
465 cx.spawn(|_, _| async move {
466 let _ = client
467 .request(proto::LinkChannel { channel_id, to })
468 .await?;
469
470 Ok(())
471 })
472 }
473
474 pub fn unlink_channel(
475 &mut self,
476 channel_id: ChannelId,
477 from: ChannelId,
478 cx: &mut ModelContext<Self>,
479 ) -> Task<Result<()>> {
480 let client = self.client.clone();
481 cx.spawn(|_, _| async move {
482 let _ = client
483 .request(proto::UnlinkChannel { channel_id, from })
484 .await?;
485
486 Ok(())
487 })
488 }
489
490 pub fn move_channel(
491 &mut self,
492 channel_id: ChannelId,
493 from: ChannelId,
494 to: ChannelId,
495 cx: &mut ModelContext<Self>,
496 ) -> Task<Result<()>> {
497 let client = self.client.clone();
498 cx.spawn(|_, _| async move {
499 let _ = client
500 .request(proto::MoveChannel {
501 channel_id,
502 from,
503 to,
504 })
505 .await?;
506
507 Ok(())
508 })
509 }
510
511 pub fn invite_member(
512 &mut self,
513 channel_id: ChannelId,
514 user_id: UserId,
515 admin: bool,
516 cx: &mut ModelContext<Self>,
517 ) -> Task<Result<()>> {
518 if !self.outgoing_invites.insert((channel_id, user_id)) {
519 return Task::ready(Err(anyhow!("invite request already in progress")));
520 }
521
522 cx.notify();
523 let client = self.client.clone();
524 cx.spawn(|this, mut cx| async move {
525 let result = client
526 .request(proto::InviteChannelMember {
527 channel_id,
528 user_id,
529 admin,
530 })
531 .await;
532
533 this.update(&mut cx, |this, cx| {
534 this.outgoing_invites.remove(&(channel_id, user_id));
535 cx.notify();
536 });
537
538 result?;
539
540 Ok(())
541 })
542 }
543
544 pub fn remove_member(
545 &mut self,
546 channel_id: ChannelId,
547 user_id: u64,
548 cx: &mut ModelContext<Self>,
549 ) -> Task<Result<()>> {
550 if !self.outgoing_invites.insert((channel_id, user_id)) {
551 return Task::ready(Err(anyhow!("invite request already in progress")));
552 }
553
554 cx.notify();
555 let client = self.client.clone();
556 cx.spawn(|this, mut cx| async move {
557 let result = client
558 .request(proto::RemoveChannelMember {
559 channel_id,
560 user_id,
561 })
562 .await;
563
564 this.update(&mut cx, |this, cx| {
565 this.outgoing_invites.remove(&(channel_id, user_id));
566 cx.notify();
567 });
568 result?;
569 Ok(())
570 })
571 }
572
573 pub fn set_member_admin(
574 &mut self,
575 channel_id: ChannelId,
576 user_id: UserId,
577 admin: bool,
578 cx: &mut ModelContext<Self>,
579 ) -> Task<Result<()>> {
580 if !self.outgoing_invites.insert((channel_id, user_id)) {
581 return Task::ready(Err(anyhow!("member request already in progress")));
582 }
583
584 cx.notify();
585 let client = self.client.clone();
586 cx.spawn(|this, mut cx| async move {
587 let result = client
588 .request(proto::SetChannelMemberAdmin {
589 channel_id,
590 user_id,
591 admin,
592 })
593 .await;
594
595 this.update(&mut cx, |this, cx| {
596 this.outgoing_invites.remove(&(channel_id, user_id));
597 cx.notify();
598 });
599
600 result?;
601 Ok(())
602 })
603 }
604
605 pub fn rename(
606 &mut self,
607 channel_id: ChannelId,
608 new_name: &str,
609 cx: &mut ModelContext<Self>,
610 ) -> Task<Result<()>> {
611 let client = self.client.clone();
612 let name = new_name.to_string();
613 cx.spawn(|this, mut cx| async move {
614 let channel = client
615 .request(proto::RenameChannel { channel_id, name })
616 .await?
617 .channel
618 .ok_or_else(|| anyhow!("missing channel in response"))?;
619 this.update(&mut cx, |this, cx| {
620 let task = this.update_channels(
621 proto::UpdateChannels {
622 channels: vec![channel],
623 ..Default::default()
624 },
625 cx,
626 );
627 assert!(task.is_none());
628
629 // This event is emitted because the collab panel wants to clear the pending edit state
630 // before this frame is rendered. But we can't guarantee that the collab panel's future
631 // will resolve before this flush_effects finishes. Synchronously emitting this event
632 // ensures that the collab panel will observe this creation before the frame complete
633 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
634 });
635 Ok(())
636 })
637 }
638
639 pub fn respond_to_channel_invite(
640 &mut self,
641 channel_id: ChannelId,
642 accept: bool,
643 ) -> impl Future<Output = Result<()>> {
644 let client = self.client.clone();
645 async move {
646 client
647 .request(proto::RespondToChannelInvite { channel_id, accept })
648 .await?;
649 Ok(())
650 }
651 }
652
653 pub fn get_channel_member_details(
654 &self,
655 channel_id: ChannelId,
656 cx: &mut ModelContext<Self>,
657 ) -> Task<Result<Vec<ChannelMembership>>> {
658 let client = self.client.clone();
659 let user_store = self.user_store.downgrade();
660 cx.spawn(|_, mut cx| async move {
661 let response = client
662 .request(proto::GetChannelMembers { channel_id })
663 .await?;
664
665 let user_ids = response.members.iter().map(|m| m.user_id).collect();
666 let user_store = user_store
667 .upgrade(&cx)
668 .ok_or_else(|| anyhow!("user store dropped"))?;
669 let users = user_store
670 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
671 .await?;
672
673 Ok(users
674 .into_iter()
675 .zip(response.members)
676 .filter_map(|(user, member)| {
677 Some(ChannelMembership {
678 user,
679 admin: member.admin,
680 kind: proto::channel_member::Kind::from_i32(member.kind)?,
681 })
682 })
683 .collect())
684 })
685 }
686
687 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
688 let client = self.client.clone();
689 async move {
690 client.request(proto::DeleteChannel { channel_id }).await?;
691 Ok(())
692 }
693 }
694
695 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
696 false
697 }
698
699 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
700 self.outgoing_invites.contains(&(channel_id, user_id))
701 }
702
703 async fn handle_update_channels(
704 this: ModelHandle<Self>,
705 message: TypedEnvelope<proto::UpdateChannels>,
706 _: Arc<Client>,
707 mut cx: AsyncAppContext,
708 ) -> Result<()> {
709 this.update(&mut cx, |this, _| {
710 this.update_channels_tx
711 .unbounded_send(message.payload)
712 .unwrap();
713 });
714 Ok(())
715 }
716
717 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
718 self.disconnect_channel_buffers_task.take();
719
720 for chat in self.opened_chats.values() {
721 if let OpenedModelHandle::Open(chat) = chat {
722 if let Some(chat) = chat.upgrade(cx) {
723 chat.update(cx, |chat, cx| {
724 chat.rejoin(cx);
725 });
726 }
727 }
728 }
729
730 let mut buffer_versions = Vec::new();
731 for buffer in self.opened_buffers.values() {
732 if let OpenedModelHandle::Open(buffer) = buffer {
733 if let Some(buffer) = buffer.upgrade(cx) {
734 let channel_buffer = buffer.read(cx);
735 let buffer = channel_buffer.buffer().read(cx);
736 buffer_versions.push(proto::ChannelBufferVersion {
737 channel_id: channel_buffer.channel().id,
738 epoch: channel_buffer.epoch(),
739 version: language::proto::serialize_version(&buffer.version()),
740 });
741 }
742 }
743 }
744
745 if buffer_versions.is_empty() {
746 return Task::ready(Ok(()));
747 }
748
749 let response = self.client.request(proto::RejoinChannelBuffers {
750 buffers: buffer_versions,
751 });
752
753 cx.spawn(|this, mut cx| async move {
754 let mut response = response.await?;
755
756 this.update(&mut cx, |this, cx| {
757 this.opened_buffers.retain(|_, buffer| match buffer {
758 OpenedModelHandle::Open(channel_buffer) => {
759 let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
760 return false;
761 };
762
763 channel_buffer.update(cx, |channel_buffer, cx| {
764 let channel_id = channel_buffer.channel().id;
765 if let Some(remote_buffer) = response
766 .buffers
767 .iter_mut()
768 .find(|buffer| buffer.channel_id == channel_id)
769 {
770 let channel_id = channel_buffer.channel().id;
771 let remote_version =
772 language::proto::deserialize_version(&remote_buffer.version);
773
774 channel_buffer.replace_collaborators(
775 mem::take(&mut remote_buffer.collaborators),
776 cx,
777 );
778
779 let operations = channel_buffer
780 .buffer()
781 .update(cx, |buffer, cx| {
782 let outgoing_operations =
783 buffer.serialize_ops(Some(remote_version), cx);
784 let incoming_operations =
785 mem::take(&mut remote_buffer.operations)
786 .into_iter()
787 .map(language::proto::deserialize_operation)
788 .collect::<Result<Vec<_>>>()?;
789 buffer.apply_ops(incoming_operations, cx)?;
790 anyhow::Ok(outgoing_operations)
791 })
792 .log_err();
793
794 if let Some(operations) = operations {
795 let client = this.client.clone();
796 cx.background()
797 .spawn(async move {
798 let operations = operations.await;
799 for chunk in
800 language::proto::split_operations(operations)
801 {
802 client
803 .send(proto::UpdateChannelBuffer {
804 channel_id,
805 operations: chunk,
806 })
807 .ok();
808 }
809 })
810 .detach();
811 return true;
812 }
813 }
814
815 channel_buffer.disconnect(cx);
816 false
817 })
818 }
819 OpenedModelHandle::Loading(_) => true,
820 });
821 });
822 anyhow::Ok(())
823 })
824 }
825
826 fn handle_disconnect(&mut self, cx: &mut ModelContext<Self>) {
827 self.channel_index.clear();
828 self.channel_invitations.clear();
829 self.channel_participants.clear();
830 self.channels_with_admin_privileges.clear();
831 self.channel_index.clear();
832 self.outgoing_invites.clear();
833 cx.notify();
834
835 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
836 cx.spawn_weak(|this, mut cx| async move {
837 cx.background().timer(RECONNECT_TIMEOUT).await;
838 if let Some(this) = this.upgrade(&cx) {
839 this.update(&mut cx, |this, cx| {
840 for (_, buffer) in this.opened_buffers.drain() {
841 if let OpenedModelHandle::Open(buffer) = buffer {
842 if let Some(buffer) = buffer.upgrade(cx) {
843 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
844 }
845 }
846 }
847 });
848 }
849 })
850 });
851 }
852
853 pub(crate) fn update_channels(
854 &mut self,
855 payload: proto::UpdateChannels,
856 cx: &mut ModelContext<ChannelStore>,
857 ) -> Option<Task<Result<()>>> {
858 if !payload.remove_channel_invitations.is_empty() {
859 self.channel_invitations
860 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
861 }
862 for channel in payload.channel_invitations {
863 match self
864 .channel_invitations
865 .binary_search_by_key(&channel.id, |c| c.id)
866 {
867 Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
868 Err(ix) => self.channel_invitations.insert(
869 ix,
870 Arc::new(Channel {
871 id: channel.id,
872 name: channel.name,
873 unseen_note_version: None,
874 unseen_message_id: None,
875 }),
876 ),
877 }
878 }
879
880 let channels_changed = !payload.channels.is_empty()
881 || !payload.delete_channels.is_empty()
882 || !payload.insert_edge.is_empty()
883 || !payload.delete_edge.is_empty()
884 || !payload.unseen_channel_messages.is_empty()
885 || !payload.unseen_channel_buffer_changes.is_empty();
886
887 if channels_changed {
888 if !payload.delete_channels.is_empty() {
889 self.channel_index.delete_channels(&payload.delete_channels);
890 self.channel_participants
891 .retain(|channel_id, _| !payload.delete_channels.contains(channel_id));
892 self.channels_with_admin_privileges
893 .retain(|channel_id| !payload.delete_channels.contains(channel_id));
894
895 for channel_id in &payload.delete_channels {
896 let channel_id = *channel_id;
897 if let Some(OpenedModelHandle::Open(buffer)) =
898 self.opened_buffers.remove(&channel_id)
899 {
900 if let Some(buffer) = buffer.upgrade(cx) {
901 buffer.update(cx, ChannelBuffer::disconnect);
902 }
903 }
904 }
905 }
906
907 let mut index = self.channel_index.bulk_insert();
908 for channel in payload.channels {
909 index.insert(channel)
910 }
911
912 for unseen_buffer_change in payload.unseen_channel_buffer_changes {
913 let version = language::proto::deserialize_version(&unseen_buffer_change.version);
914 index.note_changed(
915 unseen_buffer_change.channel_id,
916 unseen_buffer_change.epoch,
917 &version,
918 );
919 }
920
921 for unseen_channel_message in payload.unseen_channel_messages {
922 index.new_messages(
923 unseen_channel_message.channel_id,
924 unseen_channel_message.message_id,
925 );
926 }
927
928 for edge in payload.insert_edge {
929 index.insert_edge(edge.channel_id, edge.parent_id);
930 }
931
932 for edge in payload.delete_edge {
933 index.delete_edge(edge.parent_id, edge.channel_id);
934 }
935 }
936
937 for permission in payload.channel_permissions {
938 if permission.is_admin {
939 self.channels_with_admin_privileges
940 .insert(permission.channel_id);
941 } else {
942 self.channels_with_admin_privileges
943 .remove(&permission.channel_id);
944 }
945 }
946
947 cx.notify();
948 if payload.channel_participants.is_empty() {
949 return None;
950 }
951
952 let mut all_user_ids = Vec::new();
953 let channel_participants = payload.channel_participants;
954 for entry in &channel_participants {
955 for user_id in entry.participant_user_ids.iter() {
956 if let Err(ix) = all_user_ids.binary_search(user_id) {
957 all_user_ids.insert(ix, *user_id);
958 }
959 }
960 }
961
962 let users = self
963 .user_store
964 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
965 Some(cx.spawn(|this, mut cx| async move {
966 let users = users.await?;
967
968 this.update(&mut cx, |this, cx| {
969 for entry in &channel_participants {
970 let mut participants: Vec<_> = entry
971 .participant_user_ids
972 .iter()
973 .filter_map(|user_id| {
974 users
975 .binary_search_by_key(&user_id, |user| &user.id)
976 .ok()
977 .map(|ix| users[ix].clone())
978 })
979 .collect();
980
981 participants.sort_by_key(|u| u.id);
982
983 this.channel_participants
984 .insert(entry.channel_id, participants);
985 }
986
987 cx.notify();
988 });
989 anyhow::Ok(())
990 }))
991 }
992}
993
994impl Deref for ChannelPath {
995 type Target = [ChannelId];
996
997 fn deref(&self) -> &Self::Target {
998 &self.0
999 }
1000}
1001
1002impl ChannelPath {
1003 pub fn new(path: Arc<[ChannelId]>) -> Self {
1004 debug_assert!(path.len() >= 1);
1005 Self(path)
1006 }
1007
1008 pub fn parent_id(&self) -> Option<ChannelId> {
1009 self.0.len().checked_sub(2).map(|i| self.0[i])
1010 }
1011
1012 pub fn channel_id(&self) -> ChannelId {
1013 self.0[self.0.len() - 1]
1014 }
1015}
1016
1017impl From<ChannelPath> for Cow<'static, ChannelPath> {
1018 fn from(value: ChannelPath) -> Self {
1019 Cow::Owned(value)
1020 }
1021}
1022
1023impl<'a> From<&'a ChannelPath> for Cow<'a, ChannelPath> {
1024 fn from(value: &'a ChannelPath) -> Self {
1025 Cow::Borrowed(value)
1026 }
1027}
1028
1029impl Default for ChannelPath {
1030 fn default() -> Self {
1031 ChannelPath(Arc::from([]))
1032 }
1033}