1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
11 Task, WeakModel,
12};
13use language::Capability;
14use rpc::{
15 proto::{self, ChannelRole, ChannelVisibility},
16 TypedEnvelope,
17};
18use settings::Settings;
19use std::{mem, sync::Arc, time::Duration};
20use util::{maybe, ResultExt};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
25 let channel_store =
26 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36pub struct ChannelStore {
37 pub channel_index: ChannelIndex,
38 channel_invitations: Vec<Arc<Channel>>,
39 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
40 channel_states: HashMap<ChannelId, ChannelState>,
41 outgoing_invites: HashSet<(ChannelId, UserId)>,
42 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
43 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
44 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
45 client: Arc<Client>,
46 did_subscribe: bool,
47 user_store: Model<UserStore>,
48 _rpc_subscriptions: [Subscription; 2],
49 _watch_connection_status: Task<Option<()>>,
50 disconnect_channel_buffers_task: Option<Task<()>>,
51 _update_channels: Task<()>,
52}
53
54#[derive(Clone, Debug)]
55pub struct Channel {
56 pub id: ChannelId,
57 pub name: SharedString,
58 pub visibility: proto::ChannelVisibility,
59 pub parent_path: Vec<ChannelId>,
60}
61
62#[derive(Default, Debug)]
63pub struct ChannelState {
64 latest_chat_message: Option<u64>,
65 latest_notes_version: NotesVersion,
66 observed_notes_version: NotesVersion,
67 observed_chat_message: Option<u64>,
68 role: Option<ChannelRole>,
69}
70
71impl Channel {
72 pub fn link(&self, cx: &AppContext) -> String {
73 format!(
74 "{}/channel/{}-{}",
75 ClientSettings::get_global(cx).server_url,
76 Self::slug(&self.name),
77 self.id
78 )
79 }
80
81 pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
82 self.link(cx)
83 + "/notes"
84 + &heading
85 .map(|h| format!("#{}", Self::slug(&h)))
86 .unwrap_or_default()
87 }
88
89 pub fn is_root_channel(&self) -> bool {
90 self.parent_path.is_empty()
91 }
92
93 pub fn root_id(&self) -> ChannelId {
94 self.parent_path.first().copied().unwrap_or(self.id)
95 }
96
97 pub fn slug(str: &str) -> String {
98 let slug: String = str
99 .chars()
100 .map(|c| if c.is_alphanumeric() { c } else { '-' })
101 .collect();
102
103 slug.trim_matches(|c| c == '-').to_string()
104 }
105}
106
107#[derive(Debug)]
108pub struct ChannelMembership {
109 pub user: Arc<User>,
110 pub kind: proto::channel_member::Kind,
111 pub role: proto::ChannelRole,
112}
113impl ChannelMembership {
114 pub fn sort_key(&self) -> MembershipSortKey {
115 MembershipSortKey {
116 role_order: match self.role {
117 proto::ChannelRole::Admin => 0,
118 proto::ChannelRole::Member => 1,
119 proto::ChannelRole::Banned => 2,
120 proto::ChannelRole::Talker => 3,
121 proto::ChannelRole::Guest => 4,
122 },
123 kind_order: match self.kind {
124 proto::channel_member::Kind::Member => 0,
125 proto::channel_member::Kind::Invitee => 1,
126 },
127 username_order: self.user.github_login.as_str(),
128 }
129 }
130}
131
132#[derive(PartialOrd, Ord, PartialEq, Eq)]
133pub struct MembershipSortKey<'a> {
134 role_order: u8,
135 kind_order: u8,
136 username_order: &'a str,
137}
138
139pub enum ChannelEvent {
140 ChannelCreated(ChannelId),
141 ChannelRenamed(ChannelId),
142}
143
144impl EventEmitter<ChannelEvent> for ChannelStore {}
145
146enum OpenedModelHandle<E> {
147 Open(WeakModel<E>),
148 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
149}
150
151struct GlobalChannelStore(Model<ChannelStore>);
152
153impl Global for GlobalChannelStore {}
154
155impl ChannelStore {
156 pub fn global(cx: &AppContext) -> Model<Self> {
157 cx.global::<GlobalChannelStore>().0.clone()
158 }
159
160 pub fn new(
161 client: Arc<Client>,
162 user_store: Model<UserStore>,
163 cx: &mut ModelContext<Self>,
164 ) -> Self {
165 let rpc_subscriptions = [
166 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
167 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
168 ];
169
170 let mut connection_status = client.status();
171 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
172 let watch_connection_status = cx.spawn(|this, mut cx| async move {
173 while let Some(status) = connection_status.next().await {
174 let this = this.upgrade()?;
175 match status {
176 client::Status::Connected { .. } => {
177 this.update(&mut cx, |this, cx| this.handle_connect(cx))
178 .ok()?
179 .await
180 .log_err()?;
181 }
182 client::Status::SignedOut | client::Status::UpgradeRequired => {
183 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
184 .ok();
185 }
186 _ => {
187 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
188 .ok();
189 }
190 }
191 }
192 Some(())
193 });
194
195 Self {
196 channel_invitations: Vec::default(),
197 channel_index: ChannelIndex::default(),
198 channel_participants: Default::default(),
199 outgoing_invites: Default::default(),
200 opened_buffers: Default::default(),
201 opened_chats: Default::default(),
202 update_channels_tx,
203 client,
204 user_store,
205 _rpc_subscriptions: rpc_subscriptions,
206 _watch_connection_status: watch_connection_status,
207 disconnect_channel_buffers_task: None,
208 _update_channels: cx.spawn(|this, mut cx| async move {
209 maybe!(async move {
210 while let Some(update_channels) = update_channels_rx.next().await {
211 if let Some(this) = this.upgrade() {
212 let update_task = this.update(&mut cx, |this, cx| {
213 this.update_channels(update_channels, cx)
214 })?;
215 if let Some(update_task) = update_task {
216 update_task.await.log_err();
217 }
218 }
219 }
220 anyhow::Ok(())
221 })
222 .await
223 .log_err();
224 }),
225 channel_states: Default::default(),
226 did_subscribe: false,
227 }
228 }
229
230 pub fn initialize(&mut self) {
231 if !self.did_subscribe
232 && self
233 .client
234 .send(proto::SubscribeToChannels {})
235 .log_err()
236 .is_some()
237 {
238 self.did_subscribe = true;
239 }
240 }
241
242 pub fn client(&self) -> Arc<Client> {
243 self.client.clone()
244 }
245
246 /// Returns the number of unique channels in the store
247 pub fn channel_count(&self) -> usize {
248 self.channel_index.by_id().len()
249 }
250
251 /// Returns the index of a channel ID in the list of unique channels
252 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
253 self.channel_index
254 .by_id()
255 .keys()
256 .position(|id| *id == channel_id)
257 }
258
259 /// Returns an iterator over all unique channels
260 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
261 self.channel_index.by_id().values()
262 }
263
264 /// Iterate over all entries in the channel DAG
265 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
266 self.channel_index
267 .ordered_channels()
268 .iter()
269 .filter_map(move |id| {
270 let channel = self.channel_index.by_id().get(id)?;
271 Some((channel.parent_path.len(), channel))
272 })
273 }
274
275 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
276 let channel_id = self.channel_index.ordered_channels().get(ix)?;
277 self.channel_index.by_id().get(channel_id)
278 }
279
280 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
281 self.channel_index.by_id().values().nth(ix)
282 }
283
284 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
285 self.channel_invitations
286 .iter()
287 .any(|channel| channel.id == channel_id)
288 }
289
290 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
291 &self.channel_invitations
292 }
293
294 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
295 self.channel_index.by_id().get(&channel_id)
296 }
297
298 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
299 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
300 if let OpenedModelHandle::Open(buffer) = buffer {
301 return buffer.upgrade().is_some();
302 }
303 }
304 false
305 }
306
307 pub fn open_channel_buffer(
308 &mut self,
309 channel_id: ChannelId,
310 cx: &mut ModelContext<Self>,
311 ) -> Task<Result<Model<ChannelBuffer>>> {
312 let client = self.client.clone();
313 let user_store = self.user_store.clone();
314 let channel_store = cx.handle();
315 self.open_channel_resource(
316 channel_id,
317 |this| &mut this.opened_buffers,
318 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
319 cx,
320 )
321 }
322
323 pub fn fetch_channel_messages(
324 &self,
325 message_ids: Vec<u64>,
326 cx: &mut ModelContext<Self>,
327 ) -> Task<Result<Vec<ChannelMessage>>> {
328 let request = if message_ids.is_empty() {
329 None
330 } else {
331 Some(
332 self.client
333 .request(proto::GetChannelMessagesById { message_ids }),
334 )
335 };
336 cx.spawn(|this, mut cx| async move {
337 if let Some(request) = request {
338 let response = request.await?;
339 let this = this
340 .upgrade()
341 .ok_or_else(|| anyhow!("channel store dropped"))?;
342 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
343 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
344 } else {
345 Ok(Vec::new())
346 }
347 })
348 }
349
350 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
351 self.channel_states
352 .get(&channel_id)
353 .is_some_and(|state| state.has_channel_buffer_changed())
354 }
355
356 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
357 self.channel_states
358 .get(&channel_id)
359 .is_some_and(|state| state.has_new_messages())
360 }
361
362 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
363 if let Some(state) = self.channel_states.get_mut(&channel_id) {
364 state.latest_chat_message = message_id;
365 }
366 }
367
368 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
369 self.channel_states.get(&channel_id).and_then(|state| {
370 if let Some(last_message_id) = state.latest_chat_message {
371 if state
372 .last_acknowledged_message_id()
373 .is_some_and(|id| id < last_message_id)
374 {
375 return state.last_acknowledged_message_id();
376 }
377 }
378
379 None
380 })
381 }
382
383 pub fn acknowledge_message_id(
384 &mut self,
385 channel_id: ChannelId,
386 message_id: u64,
387 cx: &mut ModelContext<Self>,
388 ) {
389 self.channel_states
390 .entry(channel_id)
391 .or_default()
392 .acknowledge_message_id(message_id);
393 cx.notify();
394 }
395
396 pub fn update_latest_message_id(
397 &mut self,
398 channel_id: ChannelId,
399 message_id: u64,
400 cx: &mut ModelContext<Self>,
401 ) {
402 self.channel_states
403 .entry(channel_id)
404 .or_default()
405 .update_latest_message_id(message_id);
406 cx.notify();
407 }
408
409 pub fn acknowledge_notes_version(
410 &mut self,
411 channel_id: ChannelId,
412 epoch: u64,
413 version: &clock::Global,
414 cx: &mut ModelContext<Self>,
415 ) {
416 self.channel_states
417 .entry(channel_id)
418 .or_default()
419 .acknowledge_notes_version(epoch, version);
420 cx.notify()
421 }
422
423 pub fn update_latest_notes_version(
424 &mut self,
425 channel_id: ChannelId,
426 epoch: u64,
427 version: &clock::Global,
428 cx: &mut ModelContext<Self>,
429 ) {
430 self.channel_states
431 .entry(channel_id)
432 .or_default()
433 .update_latest_notes_version(epoch, version);
434 cx.notify()
435 }
436
437 pub fn open_channel_chat(
438 &mut self,
439 channel_id: ChannelId,
440 cx: &mut ModelContext<Self>,
441 ) -> Task<Result<Model<ChannelChat>>> {
442 let client = self.client.clone();
443 let user_store = self.user_store.clone();
444 let this = cx.handle();
445 self.open_channel_resource(
446 channel_id,
447 |this| &mut this.opened_chats,
448 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
449 cx,
450 )
451 }
452
453 /// Asynchronously open a given resource associated with a channel.
454 ///
455 /// Make sure that the resource is only opened once, even if this method
456 /// is called multiple times with the same channel id while the first task
457 /// is still running.
458 fn open_channel_resource<T, F, Fut>(
459 &mut self,
460 channel_id: ChannelId,
461 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
462 load: F,
463 cx: &mut ModelContext<Self>,
464 ) -> Task<Result<Model<T>>>
465 where
466 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
467 Fut: Future<Output = Result<Model<T>>>,
468 T: 'static,
469 {
470 let task = loop {
471 match get_map(self).entry(channel_id) {
472 hash_map::Entry::Occupied(e) => match e.get() {
473 OpenedModelHandle::Open(model) => {
474 if let Some(model) = model.upgrade() {
475 break Task::ready(Ok(model)).shared();
476 } else {
477 get_map(self).remove(&channel_id);
478 continue;
479 }
480 }
481 OpenedModelHandle::Loading(task) => {
482 break task.clone();
483 }
484 },
485 hash_map::Entry::Vacant(e) => {
486 let task = cx
487 .spawn(move |this, mut cx| async move {
488 let channel = this.update(&mut cx, |this, _| {
489 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
490 Arc::new(anyhow!("no channel for id: {}", channel_id))
491 })
492 })??;
493
494 load(channel, cx).await.map_err(Arc::new)
495 })
496 .shared();
497
498 e.insert(OpenedModelHandle::Loading(task.clone()));
499 cx.spawn({
500 let task = task.clone();
501 move |this, mut cx| async move {
502 let result = task.await;
503 this.update(&mut cx, |this, _| match result {
504 Ok(model) => {
505 get_map(this).insert(
506 channel_id,
507 OpenedModelHandle::Open(model.downgrade()),
508 );
509 }
510 Err(_) => {
511 get_map(this).remove(&channel_id);
512 }
513 })
514 .ok();
515 }
516 })
517 .detach();
518 break task;
519 }
520 }
521 };
522 cx.background_executor()
523 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
524 }
525
526 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
527 self.channel_role(channel_id) == proto::ChannelRole::Admin
528 }
529
530 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
531 self.channel_index
532 .by_id()
533 .get(&channel_id)
534 .map_or(false, |channel| channel.is_root_channel())
535 }
536
537 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
538 self.channel_index
539 .by_id()
540 .get(&channel_id)
541 .map_or(false, |channel| {
542 channel.visibility == ChannelVisibility::Public
543 })
544 }
545
546 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
547 match self.channel_role(channel_id) {
548 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
549 _ => Capability::ReadOnly,
550 }
551 }
552
553 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
554 maybe!({
555 let mut channel = self.channel_for_id(channel_id)?;
556 if !channel.is_root_channel() {
557 channel = self.channel_for_id(channel.root_id())?;
558 }
559 let root_channel_state = self.channel_states.get(&channel.id);
560 root_channel_state?.role
561 })
562 .unwrap_or(proto::ChannelRole::Guest)
563 }
564
565 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
566 self.channel_participants
567 .get(&channel_id)
568 .map_or(&[], |v| v.as_slice())
569 }
570
571 pub fn create_channel(
572 &self,
573 name: &str,
574 parent_id: Option<ChannelId>,
575 cx: &mut ModelContext<Self>,
576 ) -> Task<Result<ChannelId>> {
577 let client = self.client.clone();
578 let name = name.trim_start_matches('#').to_owned();
579 cx.spawn(move |this, mut cx| async move {
580 let response = client
581 .request(proto::CreateChannel {
582 name,
583 parent_id: parent_id.map(|cid| cid.0),
584 })
585 .await?;
586
587 let channel = response
588 .channel
589 .ok_or_else(|| anyhow!("missing channel in response"))?;
590 let channel_id = ChannelId(channel.id);
591
592 this.update(&mut cx, |this, cx| {
593 let task = this.update_channels(
594 proto::UpdateChannels {
595 channels: vec![channel],
596 ..Default::default()
597 },
598 cx,
599 );
600 assert!(task.is_none());
601
602 // This event is emitted because the collab panel wants to clear the pending edit state
603 // before this frame is rendered. But we can't guarantee that the collab panel's future
604 // will resolve before this flush_effects finishes. Synchronously emitting this event
605 // ensures that the collab panel will observe this creation before the frame completes
606 cx.emit(ChannelEvent::ChannelCreated(channel_id));
607 })?;
608
609 Ok(channel_id)
610 })
611 }
612
613 pub fn move_channel(
614 &mut self,
615 channel_id: ChannelId,
616 to: ChannelId,
617 cx: &mut ModelContext<Self>,
618 ) -> Task<Result<()>> {
619 let client = self.client.clone();
620 cx.spawn(move |_, _| async move {
621 let _ = client
622 .request(proto::MoveChannel {
623 channel_id: channel_id.0,
624 to: to.0,
625 })
626 .await?;
627
628 Ok(())
629 })
630 }
631
632 pub fn set_channel_visibility(
633 &mut self,
634 channel_id: ChannelId,
635 visibility: ChannelVisibility,
636 cx: &mut ModelContext<Self>,
637 ) -> Task<Result<()>> {
638 let client = self.client.clone();
639 cx.spawn(move |_, _| async move {
640 let _ = client
641 .request(proto::SetChannelVisibility {
642 channel_id: channel_id.0,
643 visibility: visibility.into(),
644 })
645 .await?;
646
647 Ok(())
648 })
649 }
650
651 pub fn invite_member(
652 &mut self,
653 channel_id: ChannelId,
654 user_id: UserId,
655 role: proto::ChannelRole,
656 cx: &mut ModelContext<Self>,
657 ) -> Task<Result<()>> {
658 if !self.outgoing_invites.insert((channel_id, user_id)) {
659 return Task::ready(Err(anyhow!("invite request already in progress")));
660 }
661
662 cx.notify();
663 let client = self.client.clone();
664 cx.spawn(move |this, mut cx| async move {
665 let result = client
666 .request(proto::InviteChannelMember {
667 channel_id: channel_id.0,
668 user_id,
669 role: role.into(),
670 })
671 .await;
672
673 this.update(&mut cx, |this, cx| {
674 this.outgoing_invites.remove(&(channel_id, user_id));
675 cx.notify();
676 })?;
677
678 result?;
679
680 Ok(())
681 })
682 }
683
684 pub fn remove_member(
685 &mut self,
686 channel_id: ChannelId,
687 user_id: u64,
688 cx: &mut ModelContext<Self>,
689 ) -> Task<Result<()>> {
690 if !self.outgoing_invites.insert((channel_id, user_id)) {
691 return Task::ready(Err(anyhow!("invite request already in progress")));
692 }
693
694 cx.notify();
695 let client = self.client.clone();
696 cx.spawn(move |this, mut cx| async move {
697 let result = client
698 .request(proto::RemoveChannelMember {
699 channel_id: channel_id.0,
700 user_id,
701 })
702 .await;
703
704 this.update(&mut cx, |this, cx| {
705 this.outgoing_invites.remove(&(channel_id, user_id));
706 cx.notify();
707 })?;
708 result?;
709 Ok(())
710 })
711 }
712
713 pub fn set_member_role(
714 &mut self,
715 channel_id: ChannelId,
716 user_id: UserId,
717 role: proto::ChannelRole,
718 cx: &mut ModelContext<Self>,
719 ) -> Task<Result<()>> {
720 if !self.outgoing_invites.insert((channel_id, user_id)) {
721 return Task::ready(Err(anyhow!("member request already in progress")));
722 }
723
724 cx.notify();
725 let client = self.client.clone();
726 cx.spawn(move |this, mut cx| async move {
727 let result = client
728 .request(proto::SetChannelMemberRole {
729 channel_id: channel_id.0,
730 user_id,
731 role: role.into(),
732 })
733 .await;
734
735 this.update(&mut cx, |this, cx| {
736 this.outgoing_invites.remove(&(channel_id, user_id));
737 cx.notify();
738 })?;
739
740 result?;
741 Ok(())
742 })
743 }
744
745 pub fn rename(
746 &mut self,
747 channel_id: ChannelId,
748 new_name: &str,
749 cx: &mut ModelContext<Self>,
750 ) -> Task<Result<()>> {
751 let client = self.client.clone();
752 let name = new_name.to_string();
753 cx.spawn(move |this, mut cx| async move {
754 let channel = client
755 .request(proto::RenameChannel {
756 channel_id: channel_id.0,
757 name,
758 })
759 .await?
760 .channel
761 .ok_or_else(|| anyhow!("missing channel in response"))?;
762 this.update(&mut cx, |this, cx| {
763 let task = this.update_channels(
764 proto::UpdateChannels {
765 channels: vec![channel],
766 ..Default::default()
767 },
768 cx,
769 );
770 assert!(task.is_none());
771
772 // This event is emitted because the collab panel wants to clear the pending edit state
773 // before this frame is rendered. But we can't guarantee that the collab panel's future
774 // will resolve before this flush_effects finishes. Synchronously emitting this event
775 // ensures that the collab panel will observe this creation before the frame complete
776 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
777 })?;
778 Ok(())
779 })
780 }
781
782 pub fn respond_to_channel_invite(
783 &mut self,
784 channel_id: ChannelId,
785 accept: bool,
786 cx: &mut ModelContext<Self>,
787 ) -> Task<Result<()>> {
788 let client = self.client.clone();
789 cx.background_executor().spawn(async move {
790 client
791 .request(proto::RespondToChannelInvite {
792 channel_id: channel_id.0,
793 accept,
794 })
795 .await?;
796 Ok(())
797 })
798 }
799 pub fn fuzzy_search_members(
800 &self,
801 channel_id: ChannelId,
802 query: String,
803 limit: u16,
804 cx: &mut ModelContext<Self>,
805 ) -> Task<Result<Vec<ChannelMembership>>> {
806 let client = self.client.clone();
807 let user_store = self.user_store.downgrade();
808 cx.spawn(move |_, mut cx| async move {
809 let response = client
810 .request(proto::GetChannelMembers {
811 channel_id: channel_id.0,
812 query,
813 limit: limit as u64,
814 })
815 .await?;
816 user_store.update(&mut cx, |user_store, _| {
817 user_store.insert(response.users);
818 response
819 .members
820 .into_iter()
821 .filter_map(|member| {
822 Some(ChannelMembership {
823 user: user_store.get_cached_user(member.user_id)?,
824 role: member.role(),
825 kind: member.kind(),
826 })
827 })
828 .collect()
829 })
830 })
831 }
832
833 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
834 let client = self.client.clone();
835 async move {
836 client
837 .request(proto::DeleteChannel {
838 channel_id: channel_id.0,
839 })
840 .await?;
841 Ok(())
842 }
843 }
844
845 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
846 false
847 }
848
849 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
850 self.outgoing_invites.contains(&(channel_id, user_id))
851 }
852
853 async fn handle_update_channels(
854 this: Model<Self>,
855 message: TypedEnvelope<proto::UpdateChannels>,
856 mut cx: AsyncAppContext,
857 ) -> Result<()> {
858 this.update(&mut cx, |this, _| {
859 this.update_channels_tx
860 .unbounded_send(message.payload)
861 .unwrap();
862 })?;
863 Ok(())
864 }
865
866 async fn handle_update_user_channels(
867 this: Model<Self>,
868 message: TypedEnvelope<proto::UpdateUserChannels>,
869 mut cx: AsyncAppContext,
870 ) -> Result<()> {
871 this.update(&mut cx, |this, cx| {
872 for buffer_version in message.payload.observed_channel_buffer_version {
873 let version = language::proto::deserialize_version(&buffer_version.version);
874 this.acknowledge_notes_version(
875 ChannelId(buffer_version.channel_id),
876 buffer_version.epoch,
877 &version,
878 cx,
879 );
880 }
881 for message_id in message.payload.observed_channel_message_id {
882 this.acknowledge_message_id(
883 ChannelId(message_id.channel_id),
884 message_id.message_id,
885 cx,
886 );
887 }
888 for membership in message.payload.channel_memberships {
889 if let Some(role) = ChannelRole::from_i32(membership.role) {
890 this.channel_states
891 .entry(ChannelId(membership.channel_id))
892 .or_default()
893 .set_role(role)
894 }
895 }
896 })
897 }
898
899 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
900 self.channel_index.clear();
901 self.channel_invitations.clear();
902 self.channel_participants.clear();
903 self.channel_index.clear();
904 self.outgoing_invites.clear();
905 self.disconnect_channel_buffers_task.take();
906
907 for chat in self.opened_chats.values() {
908 if let OpenedModelHandle::Open(chat) = chat {
909 if let Some(chat) = chat.upgrade() {
910 chat.update(cx, |chat, cx| {
911 chat.rejoin(cx);
912 });
913 }
914 }
915 }
916
917 let mut buffer_versions = Vec::new();
918 for buffer in self.opened_buffers.values() {
919 if let OpenedModelHandle::Open(buffer) = buffer {
920 if let Some(buffer) = buffer.upgrade() {
921 let channel_buffer = buffer.read(cx);
922 let buffer = channel_buffer.buffer().read(cx);
923 buffer_versions.push(proto::ChannelBufferVersion {
924 channel_id: channel_buffer.channel_id.0,
925 epoch: channel_buffer.epoch(),
926 version: language::proto::serialize_version(&buffer.version()),
927 });
928 }
929 }
930 }
931
932 if buffer_versions.is_empty() {
933 return Task::ready(Ok(()));
934 }
935
936 let response = self.client.request(proto::RejoinChannelBuffers {
937 buffers: buffer_versions,
938 });
939
940 cx.spawn(|this, mut cx| async move {
941 let mut response = response.await?;
942
943 this.update(&mut cx, |this, cx| {
944 this.opened_buffers.retain(|_, buffer| match buffer {
945 OpenedModelHandle::Open(channel_buffer) => {
946 let Some(channel_buffer) = channel_buffer.upgrade() else {
947 return false;
948 };
949
950 channel_buffer.update(cx, |channel_buffer, cx| {
951 let channel_id = channel_buffer.channel_id;
952 if let Some(remote_buffer) = response
953 .buffers
954 .iter_mut()
955 .find(|buffer| buffer.channel_id == channel_id.0)
956 {
957 let channel_id = channel_buffer.channel_id;
958 let remote_version =
959 language::proto::deserialize_version(&remote_buffer.version);
960
961 channel_buffer.replace_collaborators(
962 mem::take(&mut remote_buffer.collaborators),
963 cx,
964 );
965
966 let operations = channel_buffer
967 .buffer()
968 .update(cx, |buffer, cx| {
969 let outgoing_operations =
970 buffer.serialize_ops(Some(remote_version), cx);
971 let incoming_operations =
972 mem::take(&mut remote_buffer.operations)
973 .into_iter()
974 .map(language::proto::deserialize_operation)
975 .collect::<Result<Vec<_>>>()?;
976 buffer.apply_ops(incoming_operations, cx);
977 anyhow::Ok(outgoing_operations)
978 })
979 .log_err();
980
981 if let Some(operations) = operations {
982 let client = this.client.clone();
983 cx.background_executor()
984 .spawn(async move {
985 let operations = operations.await;
986 for chunk in
987 language::proto::split_operations(operations)
988 {
989 client
990 .send(proto::UpdateChannelBuffer {
991 channel_id: channel_id.0,
992 operations: chunk,
993 })
994 .ok();
995 }
996 })
997 .detach();
998 return true;
999 }
1000 }
1001
1002 channel_buffer.disconnect(cx);
1003 false
1004 })
1005 }
1006 OpenedModelHandle::Loading(_) => true,
1007 });
1008 })
1009 .ok();
1010 anyhow::Ok(())
1011 })
1012 }
1013
1014 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1015 cx.notify();
1016 self.did_subscribe = false;
1017 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1018 cx.spawn(move |this, mut cx| async move {
1019 if wait_for_reconnect {
1020 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1021 }
1022
1023 if let Some(this) = this.upgrade() {
1024 this.update(&mut cx, |this, cx| {
1025 for (_, buffer) in this.opened_buffers.drain() {
1026 if let OpenedModelHandle::Open(buffer) = buffer {
1027 if let Some(buffer) = buffer.upgrade() {
1028 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1029 }
1030 }
1031 }
1032 })
1033 .ok();
1034 }
1035 })
1036 });
1037 }
1038
1039 pub(crate) fn update_channels(
1040 &mut self,
1041 payload: proto::UpdateChannels,
1042 cx: &mut ModelContext<ChannelStore>,
1043 ) -> Option<Task<Result<()>>> {
1044 if !payload.remove_channel_invitations.is_empty() {
1045 self.channel_invitations
1046 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1047 }
1048 for channel in payload.channel_invitations {
1049 match self
1050 .channel_invitations
1051 .binary_search_by_key(&channel.id, |c| c.id.0)
1052 {
1053 Ok(ix) => {
1054 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1055 }
1056 Err(ix) => self.channel_invitations.insert(
1057 ix,
1058 Arc::new(Channel {
1059 id: ChannelId(channel.id),
1060 visibility: channel.visibility(),
1061 name: channel.name.into(),
1062 parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1063 }),
1064 ),
1065 }
1066 }
1067
1068 let channels_changed = !payload.channels.is_empty()
1069 || !payload.delete_channels.is_empty()
1070 || !payload.latest_channel_message_ids.is_empty()
1071 || !payload.latest_channel_buffer_versions.is_empty();
1072
1073 if channels_changed {
1074 if !payload.delete_channels.is_empty() {
1075 let delete_channels: Vec<ChannelId> =
1076 payload.delete_channels.into_iter().map(ChannelId).collect();
1077 self.channel_index.delete_channels(&delete_channels);
1078 self.channel_participants
1079 .retain(|channel_id, _| !delete_channels.contains(channel_id));
1080
1081 for channel_id in &delete_channels {
1082 let channel_id = *channel_id;
1083 if payload
1084 .channels
1085 .iter()
1086 .any(|channel| channel.id == channel_id.0)
1087 {
1088 continue;
1089 }
1090 if let Some(OpenedModelHandle::Open(buffer)) =
1091 self.opened_buffers.remove(&channel_id)
1092 {
1093 if let Some(buffer) = buffer.upgrade() {
1094 buffer.update(cx, ChannelBuffer::disconnect);
1095 }
1096 }
1097 }
1098 }
1099
1100 let mut index = self.channel_index.bulk_insert();
1101 for channel in payload.channels {
1102 let id = ChannelId(channel.id);
1103 let channel_changed = index.insert(channel);
1104
1105 if channel_changed {
1106 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1107 if let Some(buffer) = buffer.upgrade() {
1108 buffer.update(cx, ChannelBuffer::channel_changed);
1109 }
1110 }
1111 }
1112 }
1113
1114 for latest_buffer_version in payload.latest_channel_buffer_versions {
1115 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1116 self.channel_states
1117 .entry(ChannelId(latest_buffer_version.channel_id))
1118 .or_default()
1119 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1120 }
1121
1122 for latest_channel_message in payload.latest_channel_message_ids {
1123 self.channel_states
1124 .entry(ChannelId(latest_channel_message.channel_id))
1125 .or_default()
1126 .update_latest_message_id(latest_channel_message.message_id);
1127 }
1128 }
1129
1130 cx.notify();
1131 if payload.channel_participants.is_empty() {
1132 return None;
1133 }
1134
1135 let mut all_user_ids = Vec::new();
1136 let channel_participants = payload.channel_participants;
1137 for entry in &channel_participants {
1138 for user_id in entry.participant_user_ids.iter() {
1139 if let Err(ix) = all_user_ids.binary_search(user_id) {
1140 all_user_ids.insert(ix, *user_id);
1141 }
1142 }
1143 }
1144
1145 let users = self
1146 .user_store
1147 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1148 Some(cx.spawn(|this, mut cx| async move {
1149 let users = users.await?;
1150
1151 this.update(&mut cx, |this, cx| {
1152 for entry in &channel_participants {
1153 let mut participants: Vec<_> = entry
1154 .participant_user_ids
1155 .iter()
1156 .filter_map(|user_id| {
1157 users
1158 .binary_search_by_key(&user_id, |user| &user.id)
1159 .ok()
1160 .map(|ix| users[ix].clone())
1161 })
1162 .collect();
1163
1164 participants.sort_by_key(|u| u.id);
1165
1166 this.channel_participants
1167 .insert(ChannelId(entry.channel_id), participants);
1168 }
1169
1170 cx.notify();
1171 })
1172 }))
1173 }
1174}
1175
1176impl ChannelState {
1177 fn set_role(&mut self, role: ChannelRole) {
1178 self.role = Some(role);
1179 }
1180
1181 fn has_channel_buffer_changed(&self) -> bool {
1182 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1183 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1184 && self
1185 .latest_notes_version
1186 .version
1187 .changed_since(&self.observed_notes_version.version))
1188 }
1189
1190 fn has_new_messages(&self) -> bool {
1191 let latest_message_id = self.latest_chat_message;
1192 let observed_message_id = self.observed_chat_message;
1193
1194 latest_message_id.is_some_and(|latest_message_id| {
1195 latest_message_id > observed_message_id.unwrap_or_default()
1196 })
1197 }
1198
1199 fn last_acknowledged_message_id(&self) -> Option<u64> {
1200 self.observed_chat_message
1201 }
1202
1203 fn acknowledge_message_id(&mut self, message_id: u64) {
1204 let observed = self.observed_chat_message.get_or_insert(message_id);
1205 *observed = (*observed).max(message_id);
1206 }
1207
1208 fn update_latest_message_id(&mut self, message_id: u64) {
1209 self.latest_chat_message =
1210 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1211 }
1212
1213 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1214 if self.observed_notes_version.epoch == epoch {
1215 self.observed_notes_version.version.join(version);
1216 } else {
1217 self.observed_notes_version = NotesVersion {
1218 epoch,
1219 version: version.clone(),
1220 };
1221 }
1222 }
1223
1224 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1225 if self.latest_notes_version.epoch == epoch {
1226 self.latest_notes_version.version.join(version);
1227 } else {
1228 self.latest_notes_version = NotesVersion {
1229 epoch,
1230 version: version.clone(),
1231 };
1232 }
1233 }
1234}