1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
11 Task, WeakModel,
12};
13use language::Capability;
14use rpc::{
15 proto::{self, ChannelRole, ChannelVisibility},
16 TypedEnvelope,
17};
18use settings::Settings;
19use std::{mem, sync::Arc, time::Duration};
20use util::{maybe, ResultExt};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
25 let channel_store =
26 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36#[derive(Debug, Clone)]
37pub struct HostedProject {
38 project_id: ProjectId,
39 channel_id: ChannelId,
40 name: SharedString,
41 _visibility: proto::ChannelVisibility,
42}
43impl From<proto::HostedProject> for HostedProject {
44 fn from(project: proto::HostedProject) -> Self {
45 Self {
46 project_id: ProjectId(project.project_id),
47 channel_id: ChannelId(project.channel_id),
48 _visibility: project.visibility(),
49 name: project.name.into(),
50 }
51 }
52}
53pub struct ChannelStore {
54 pub channel_index: ChannelIndex,
55 channel_invitations: Vec<Arc<Channel>>,
56 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
57 channel_states: HashMap<ChannelId, ChannelState>,
58 hosted_projects: HashMap<ProjectId, HostedProject>,
59
60 outgoing_invites: HashSet<(ChannelId, UserId)>,
61 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
62 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
63 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
64 client: Arc<Client>,
65 user_store: Model<UserStore>,
66 _rpc_subscriptions: [Subscription; 2],
67 _watch_connection_status: Task<Option<()>>,
68 disconnect_channel_buffers_task: Option<Task<()>>,
69 _update_channels: Task<()>,
70}
71
72#[derive(Clone, Debug)]
73pub struct Channel {
74 pub id: ChannelId,
75 pub name: SharedString,
76 pub visibility: proto::ChannelVisibility,
77 pub parent_path: Vec<ChannelId>,
78}
79
80#[derive(Default, Debug)]
81pub struct ChannelState {
82 latest_chat_message: Option<u64>,
83 latest_notes_version: NotesVersion,
84 observed_notes_version: NotesVersion,
85 observed_chat_message: Option<u64>,
86 role: Option<ChannelRole>,
87 projects: HashSet<ProjectId>,
88}
89
90impl Channel {
91 pub fn link(&self, cx: &AppContext) -> String {
92 format!(
93 "{}/channel/{}-{}",
94 ClientSettings::get_global(cx).server_url,
95 Self::slug(&self.name),
96 self.id
97 )
98 }
99
100 pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
101 self.link(cx)
102 + "/notes"
103 + &heading
104 .map(|h| format!("#{}", Self::slug(&h)))
105 .unwrap_or_default()
106 }
107
108 pub fn is_root_channel(&self) -> bool {
109 self.parent_path.is_empty()
110 }
111
112 pub fn root_id(&self) -> ChannelId {
113 self.parent_path.first().copied().unwrap_or(self.id)
114 }
115
116 pub fn slug(str: &str) -> String {
117 let slug: String = str
118 .chars()
119 .map(|c| if c.is_alphanumeric() { c } else { '-' })
120 .collect();
121
122 slug.trim_matches(|c| c == '-').to_string()
123 }
124}
125
126#[derive(Debug)]
127pub struct ChannelMembership {
128 pub user: Arc<User>,
129 pub kind: proto::channel_member::Kind,
130 pub role: proto::ChannelRole,
131}
132impl ChannelMembership {
133 pub fn sort_key(&self) -> MembershipSortKey {
134 MembershipSortKey {
135 role_order: match self.role {
136 proto::ChannelRole::Admin => 0,
137 proto::ChannelRole::Member => 1,
138 proto::ChannelRole::Banned => 2,
139 proto::ChannelRole::Talker => 3,
140 proto::ChannelRole::Guest => 4,
141 },
142 kind_order: match self.kind {
143 proto::channel_member::Kind::Member => 0,
144 proto::channel_member::Kind::Invitee => 1,
145 },
146 username_order: self.user.github_login.as_str(),
147 }
148 }
149}
150
151#[derive(PartialOrd, Ord, PartialEq, Eq)]
152pub struct MembershipSortKey<'a> {
153 role_order: u8,
154 kind_order: u8,
155 username_order: &'a str,
156}
157
158pub enum ChannelEvent {
159 ChannelCreated(ChannelId),
160 ChannelRenamed(ChannelId),
161}
162
163impl EventEmitter<ChannelEvent> for ChannelStore {}
164
165enum OpenedModelHandle<E> {
166 Open(WeakModel<E>),
167 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
168}
169
170struct GlobalChannelStore(Model<ChannelStore>);
171
172impl Global for GlobalChannelStore {}
173
174impl ChannelStore {
175 pub fn global(cx: &AppContext) -> Model<Self> {
176 cx.global::<GlobalChannelStore>().0.clone()
177 }
178
179 pub fn new(
180 client: Arc<Client>,
181 user_store: Model<UserStore>,
182 cx: &mut ModelContext<Self>,
183 ) -> Self {
184 let rpc_subscriptions = [
185 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
186 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
187 ];
188
189 let mut connection_status = client.status();
190 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
191 let watch_connection_status = cx.spawn(|this, mut cx| async move {
192 while let Some(status) = connection_status.next().await {
193 let this = this.upgrade()?;
194 match status {
195 client::Status::Connected { .. } => {
196 this.update(&mut cx, |this, cx| this.handle_connect(cx))
197 .ok()?
198 .await
199 .log_err()?;
200 }
201 client::Status::SignedOut | client::Status::UpgradeRequired => {
202 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
203 .ok();
204 }
205 _ => {
206 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
207 .ok();
208 }
209 }
210 }
211 Some(())
212 });
213
214 Self {
215 channel_invitations: Vec::default(),
216 channel_index: ChannelIndex::default(),
217 channel_participants: Default::default(),
218 hosted_projects: Default::default(),
219 outgoing_invites: Default::default(),
220 opened_buffers: Default::default(),
221 opened_chats: Default::default(),
222 update_channels_tx,
223 client,
224 user_store,
225 _rpc_subscriptions: rpc_subscriptions,
226 _watch_connection_status: watch_connection_status,
227 disconnect_channel_buffers_task: None,
228 _update_channels: cx.spawn(|this, mut cx| async move {
229 maybe!(async move {
230 while let Some(update_channels) = update_channels_rx.next().await {
231 if let Some(this) = this.upgrade() {
232 let update_task = this.update(&mut cx, |this, cx| {
233 this.update_channels(update_channels, cx)
234 })?;
235 if let Some(update_task) = update_task {
236 update_task.await.log_err();
237 }
238 }
239 }
240 anyhow::Ok(())
241 })
242 .await
243 .log_err();
244 }),
245 channel_states: Default::default(),
246 }
247 }
248
249 pub fn client(&self) -> Arc<Client> {
250 self.client.clone()
251 }
252
253 /// Returns the number of unique channels in the store
254 pub fn channel_count(&self) -> usize {
255 self.channel_index.by_id().len()
256 }
257
258 /// Returns the index of a channel ID in the list of unique channels
259 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
260 self.channel_index
261 .by_id()
262 .keys()
263 .position(|id| *id == channel_id)
264 }
265
266 /// Returns an iterator over all unique channels
267 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
268 self.channel_index.by_id().values()
269 }
270
271 /// Iterate over all entries in the channel DAG
272 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
273 self.channel_index
274 .ordered_channels()
275 .iter()
276 .filter_map(move |id| {
277 let channel = self.channel_index.by_id().get(id)?;
278 Some((channel.parent_path.len(), channel))
279 })
280 }
281
282 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
283 let channel_id = self.channel_index.ordered_channels().get(ix)?;
284 self.channel_index.by_id().get(channel_id)
285 }
286
287 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
288 self.channel_index.by_id().values().nth(ix)
289 }
290
291 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
292 self.channel_invitations
293 .iter()
294 .any(|channel| channel.id == channel_id)
295 }
296
297 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
298 &self.channel_invitations
299 }
300
301 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
302 self.channel_index.by_id().get(&channel_id)
303 }
304
305 pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
306 let mut projects: Vec<(SharedString, ProjectId)> = self
307 .channel_states
308 .get(&channel_id)
309 .map(|state| state.projects.clone())
310 .unwrap_or_default()
311 .into_iter()
312 .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
313 .collect();
314 projects.sort();
315 projects
316 }
317
318 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
319 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
320 if let OpenedModelHandle::Open(buffer) = buffer {
321 return buffer.upgrade().is_some();
322 }
323 }
324 false
325 }
326
327 pub fn open_channel_buffer(
328 &mut self,
329 channel_id: ChannelId,
330 cx: &mut ModelContext<Self>,
331 ) -> Task<Result<Model<ChannelBuffer>>> {
332 let client = self.client.clone();
333 let user_store = self.user_store.clone();
334 let channel_store = cx.handle();
335 self.open_channel_resource(
336 channel_id,
337 |this| &mut this.opened_buffers,
338 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
339 cx,
340 )
341 }
342
343 pub fn fetch_channel_messages(
344 &self,
345 message_ids: Vec<u64>,
346 cx: &mut ModelContext<Self>,
347 ) -> Task<Result<Vec<ChannelMessage>>> {
348 let request = if message_ids.is_empty() {
349 None
350 } else {
351 Some(
352 self.client
353 .request(proto::GetChannelMessagesById { message_ids }),
354 )
355 };
356 cx.spawn(|this, mut cx| async move {
357 if let Some(request) = request {
358 let response = request.await?;
359 let this = this
360 .upgrade()
361 .ok_or_else(|| anyhow!("channel store dropped"))?;
362 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
363 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
364 } else {
365 Ok(Vec::new())
366 }
367 })
368 }
369
370 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
371 self.channel_states
372 .get(&channel_id)
373 .is_some_and(|state| state.has_channel_buffer_changed())
374 }
375
376 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
377 self.channel_states
378 .get(&channel_id)
379 .is_some_and(|state| state.has_new_messages())
380 }
381
382 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
383 if let Some(state) = self.channel_states.get_mut(&channel_id) {
384 state.latest_chat_message = message_id;
385 }
386 }
387
388 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
389 self.channel_states.get(&channel_id).and_then(|state| {
390 if let Some(last_message_id) = state.latest_chat_message {
391 if state
392 .last_acknowledged_message_id()
393 .is_some_and(|id| id < last_message_id)
394 {
395 return state.last_acknowledged_message_id();
396 }
397 }
398
399 None
400 })
401 }
402
403 pub fn acknowledge_message_id(
404 &mut self,
405 channel_id: ChannelId,
406 message_id: u64,
407 cx: &mut ModelContext<Self>,
408 ) {
409 self.channel_states
410 .entry(channel_id)
411 .or_insert_with(|| Default::default())
412 .acknowledge_message_id(message_id);
413 cx.notify();
414 }
415
416 pub fn update_latest_message_id(
417 &mut self,
418 channel_id: ChannelId,
419 message_id: u64,
420 cx: &mut ModelContext<Self>,
421 ) {
422 self.channel_states
423 .entry(channel_id)
424 .or_insert_with(|| Default::default())
425 .update_latest_message_id(message_id);
426 cx.notify();
427 }
428
429 pub fn acknowledge_notes_version(
430 &mut self,
431 channel_id: ChannelId,
432 epoch: u64,
433 version: &clock::Global,
434 cx: &mut ModelContext<Self>,
435 ) {
436 self.channel_states
437 .entry(channel_id)
438 .or_insert_with(|| Default::default())
439 .acknowledge_notes_version(epoch, version);
440 cx.notify()
441 }
442
443 pub fn update_latest_notes_version(
444 &mut self,
445 channel_id: ChannelId,
446 epoch: u64,
447 version: &clock::Global,
448 cx: &mut ModelContext<Self>,
449 ) {
450 self.channel_states
451 .entry(channel_id)
452 .or_insert_with(|| Default::default())
453 .update_latest_notes_version(epoch, version);
454 cx.notify()
455 }
456
457 pub fn open_channel_chat(
458 &mut self,
459 channel_id: ChannelId,
460 cx: &mut ModelContext<Self>,
461 ) -> Task<Result<Model<ChannelChat>>> {
462 let client = self.client.clone();
463 let user_store = self.user_store.clone();
464 let this = cx.handle();
465 self.open_channel_resource(
466 channel_id,
467 |this| &mut this.opened_chats,
468 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
469 cx,
470 )
471 }
472
473 /// Asynchronously open a given resource associated with a channel.
474 ///
475 /// Make sure that the resource is only opened once, even if this method
476 /// is called multiple times with the same channel id while the first task
477 /// is still running.
478 fn open_channel_resource<T, F, Fut>(
479 &mut self,
480 channel_id: ChannelId,
481 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
482 load: F,
483 cx: &mut ModelContext<Self>,
484 ) -> Task<Result<Model<T>>>
485 where
486 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
487 Fut: Future<Output = Result<Model<T>>>,
488 T: 'static,
489 {
490 let task = loop {
491 match get_map(self).entry(channel_id) {
492 hash_map::Entry::Occupied(e) => match e.get() {
493 OpenedModelHandle::Open(model) => {
494 if let Some(model) = model.upgrade() {
495 break Task::ready(Ok(model)).shared();
496 } else {
497 get_map(self).remove(&channel_id);
498 continue;
499 }
500 }
501 OpenedModelHandle::Loading(task) => {
502 break task.clone();
503 }
504 },
505 hash_map::Entry::Vacant(e) => {
506 let task = cx
507 .spawn(move |this, mut cx| async move {
508 let channel = this.update(&mut cx, |this, _| {
509 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
510 Arc::new(anyhow!("no channel for id: {}", channel_id))
511 })
512 })??;
513
514 load(channel, cx).await.map_err(Arc::new)
515 })
516 .shared();
517
518 e.insert(OpenedModelHandle::Loading(task.clone()));
519 cx.spawn({
520 let task = task.clone();
521 move |this, mut cx| async move {
522 let result = task.await;
523 this.update(&mut cx, |this, _| match result {
524 Ok(model) => {
525 get_map(this).insert(
526 channel_id,
527 OpenedModelHandle::Open(model.downgrade()),
528 );
529 }
530 Err(_) => {
531 get_map(this).remove(&channel_id);
532 }
533 })
534 .ok();
535 }
536 })
537 .detach();
538 break task;
539 }
540 }
541 };
542 cx.background_executor()
543 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
544 }
545
546 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
547 self.channel_role(channel_id) == proto::ChannelRole::Admin
548 }
549
550 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
551 self.channel_index
552 .by_id()
553 .get(&channel_id)
554 .map_or(false, |channel| channel.is_root_channel())
555 }
556
557 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
558 self.channel_index
559 .by_id()
560 .get(&channel_id)
561 .map_or(false, |channel| {
562 channel.visibility == ChannelVisibility::Public
563 })
564 }
565
566 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
567 match self.channel_role(channel_id) {
568 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
569 _ => Capability::ReadOnly,
570 }
571 }
572
573 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
574 maybe!({
575 let mut channel = self.channel_for_id(channel_id)?;
576 if !channel.is_root_channel() {
577 channel = self.channel_for_id(channel.root_id())?;
578 }
579 let root_channel_state = self.channel_states.get(&channel.id);
580 root_channel_state?.role
581 })
582 .unwrap_or(proto::ChannelRole::Guest)
583 }
584
585 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
586 self.channel_participants
587 .get(&channel_id)
588 .map_or(&[], |v| v.as_slice())
589 }
590
591 pub fn create_channel(
592 &self,
593 name: &str,
594 parent_id: Option<ChannelId>,
595 cx: &mut ModelContext<Self>,
596 ) -> Task<Result<ChannelId>> {
597 let client = self.client.clone();
598 let name = name.trim_start_matches('#').to_owned();
599 cx.spawn(move |this, mut cx| async move {
600 let response = client
601 .request(proto::CreateChannel {
602 name,
603 parent_id: parent_id.map(|cid| cid.0),
604 })
605 .await?;
606
607 let channel = response
608 .channel
609 .ok_or_else(|| anyhow!("missing channel in response"))?;
610 let channel_id = ChannelId(channel.id);
611
612 this.update(&mut cx, |this, cx| {
613 let task = this.update_channels(
614 proto::UpdateChannels {
615 channels: vec![channel],
616 ..Default::default()
617 },
618 cx,
619 );
620 assert!(task.is_none());
621
622 // This event is emitted because the collab panel wants to clear the pending edit state
623 // before this frame is rendered. But we can't guarantee that the collab panel's future
624 // will resolve before this flush_effects finishes. Synchronously emitting this event
625 // ensures that the collab panel will observe this creation before the frame completes
626 cx.emit(ChannelEvent::ChannelCreated(channel_id));
627 })?;
628
629 Ok(channel_id)
630 })
631 }
632
633 pub fn move_channel(
634 &mut self,
635 channel_id: ChannelId,
636 to: ChannelId,
637 cx: &mut ModelContext<Self>,
638 ) -> Task<Result<()>> {
639 let client = self.client.clone();
640 cx.spawn(move |_, _| async move {
641 let _ = client
642 .request(proto::MoveChannel {
643 channel_id: channel_id.0,
644 to: to.0,
645 })
646 .await?;
647
648 Ok(())
649 })
650 }
651
652 pub fn set_channel_visibility(
653 &mut self,
654 channel_id: ChannelId,
655 visibility: ChannelVisibility,
656 cx: &mut ModelContext<Self>,
657 ) -> Task<Result<()>> {
658 let client = self.client.clone();
659 cx.spawn(move |_, _| async move {
660 let _ = client
661 .request(proto::SetChannelVisibility {
662 channel_id: channel_id.0,
663 visibility: visibility.into(),
664 })
665 .await?;
666
667 Ok(())
668 })
669 }
670
671 pub fn invite_member(
672 &mut self,
673 channel_id: ChannelId,
674 user_id: UserId,
675 role: proto::ChannelRole,
676 cx: &mut ModelContext<Self>,
677 ) -> Task<Result<()>> {
678 if !self.outgoing_invites.insert((channel_id, user_id)) {
679 return Task::ready(Err(anyhow!("invite request already in progress")));
680 }
681
682 cx.notify();
683 let client = self.client.clone();
684 cx.spawn(move |this, mut cx| async move {
685 let result = client
686 .request(proto::InviteChannelMember {
687 channel_id: channel_id.0,
688 user_id,
689 role: role.into(),
690 })
691 .await;
692
693 this.update(&mut cx, |this, cx| {
694 this.outgoing_invites.remove(&(channel_id, user_id));
695 cx.notify();
696 })?;
697
698 result?;
699
700 Ok(())
701 })
702 }
703
704 pub fn remove_member(
705 &mut self,
706 channel_id: ChannelId,
707 user_id: u64,
708 cx: &mut ModelContext<Self>,
709 ) -> Task<Result<()>> {
710 if !self.outgoing_invites.insert((channel_id, user_id)) {
711 return Task::ready(Err(anyhow!("invite request already in progress")));
712 }
713
714 cx.notify();
715 let client = self.client.clone();
716 cx.spawn(move |this, mut cx| async move {
717 let result = client
718 .request(proto::RemoveChannelMember {
719 channel_id: channel_id.0,
720 user_id,
721 })
722 .await;
723
724 this.update(&mut cx, |this, cx| {
725 this.outgoing_invites.remove(&(channel_id, user_id));
726 cx.notify();
727 })?;
728 result?;
729 Ok(())
730 })
731 }
732
733 pub fn set_member_role(
734 &mut self,
735 channel_id: ChannelId,
736 user_id: UserId,
737 role: proto::ChannelRole,
738 cx: &mut ModelContext<Self>,
739 ) -> Task<Result<()>> {
740 if !self.outgoing_invites.insert((channel_id, user_id)) {
741 return Task::ready(Err(anyhow!("member request already in progress")));
742 }
743
744 cx.notify();
745 let client = self.client.clone();
746 cx.spawn(move |this, mut cx| async move {
747 let result = client
748 .request(proto::SetChannelMemberRole {
749 channel_id: channel_id.0,
750 user_id,
751 role: role.into(),
752 })
753 .await;
754
755 this.update(&mut cx, |this, cx| {
756 this.outgoing_invites.remove(&(channel_id, user_id));
757 cx.notify();
758 })?;
759
760 result?;
761 Ok(())
762 })
763 }
764
765 pub fn rename(
766 &mut self,
767 channel_id: ChannelId,
768 new_name: &str,
769 cx: &mut ModelContext<Self>,
770 ) -> Task<Result<()>> {
771 let client = self.client.clone();
772 let name = new_name.to_string();
773 cx.spawn(move |this, mut cx| async move {
774 let channel = client
775 .request(proto::RenameChannel {
776 channel_id: channel_id.0,
777 name,
778 })
779 .await?
780 .channel
781 .ok_or_else(|| anyhow!("missing channel in response"))?;
782 this.update(&mut cx, |this, cx| {
783 let task = this.update_channels(
784 proto::UpdateChannels {
785 channels: vec![channel],
786 ..Default::default()
787 },
788 cx,
789 );
790 assert!(task.is_none());
791
792 // This event is emitted because the collab panel wants to clear the pending edit state
793 // before this frame is rendered. But we can't guarantee that the collab panel's future
794 // will resolve before this flush_effects finishes. Synchronously emitting this event
795 // ensures that the collab panel will observe this creation before the frame complete
796 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
797 })?;
798 Ok(())
799 })
800 }
801
802 pub fn respond_to_channel_invite(
803 &mut self,
804 channel_id: ChannelId,
805 accept: bool,
806 cx: &mut ModelContext<Self>,
807 ) -> Task<Result<()>> {
808 let client = self.client.clone();
809 cx.background_executor().spawn(async move {
810 client
811 .request(proto::RespondToChannelInvite {
812 channel_id: channel_id.0,
813 accept,
814 })
815 .await?;
816 Ok(())
817 })
818 }
819 pub fn fuzzy_search_members(
820 &self,
821 channel_id: ChannelId,
822 query: String,
823 limit: u16,
824 cx: &mut ModelContext<Self>,
825 ) -> Task<Result<Vec<ChannelMembership>>> {
826 let client = self.client.clone();
827 let user_store = self.user_store.downgrade();
828 cx.spawn(move |_, mut cx| async move {
829 let response = client
830 .request(proto::GetChannelMembers {
831 channel_id: channel_id.0,
832 query,
833 limit: limit as u64,
834 })
835 .await?;
836 user_store.update(&mut cx, |user_store, _| {
837 user_store.insert(response.users);
838 response
839 .members
840 .into_iter()
841 .filter_map(|member| {
842 Some(ChannelMembership {
843 user: user_store.get_cached_user(member.user_id)?,
844 role: member.role(),
845 kind: member.kind(),
846 })
847 })
848 .collect()
849 })
850 })
851 }
852
853 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
854 let client = self.client.clone();
855 async move {
856 client
857 .request(proto::DeleteChannel {
858 channel_id: channel_id.0,
859 })
860 .await?;
861 Ok(())
862 }
863 }
864
865 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
866 false
867 }
868
869 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
870 self.outgoing_invites.contains(&(channel_id, user_id))
871 }
872
873 async fn handle_update_channels(
874 this: Model<Self>,
875 message: TypedEnvelope<proto::UpdateChannels>,
876 _: Arc<Client>,
877 mut cx: AsyncAppContext,
878 ) -> Result<()> {
879 this.update(&mut cx, |this, _| {
880 this.update_channels_tx
881 .unbounded_send(message.payload)
882 .unwrap();
883 })?;
884 Ok(())
885 }
886
887 async fn handle_update_user_channels(
888 this: Model<Self>,
889 message: TypedEnvelope<proto::UpdateUserChannels>,
890 _: Arc<Client>,
891 mut cx: AsyncAppContext,
892 ) -> Result<()> {
893 this.update(&mut cx, |this, cx| {
894 for buffer_version in message.payload.observed_channel_buffer_version {
895 let version = language::proto::deserialize_version(&buffer_version.version);
896 this.acknowledge_notes_version(
897 ChannelId(buffer_version.channel_id),
898 buffer_version.epoch,
899 &version,
900 cx,
901 );
902 }
903 for message_id in message.payload.observed_channel_message_id {
904 this.acknowledge_message_id(
905 ChannelId(message_id.channel_id),
906 message_id.message_id,
907 cx,
908 );
909 }
910 for membership in message.payload.channel_memberships {
911 if let Some(role) = ChannelRole::from_i32(membership.role) {
912 this.channel_states
913 .entry(ChannelId(membership.channel_id))
914 .or_insert_with(|| ChannelState::default())
915 .set_role(role)
916 }
917 }
918 })
919 }
920
921 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
922 self.channel_index.clear();
923 self.channel_invitations.clear();
924 self.channel_participants.clear();
925 self.channel_index.clear();
926 self.outgoing_invites.clear();
927 self.disconnect_channel_buffers_task.take();
928
929 for chat in self.opened_chats.values() {
930 if let OpenedModelHandle::Open(chat) = chat {
931 if let Some(chat) = chat.upgrade() {
932 chat.update(cx, |chat, cx| {
933 chat.rejoin(cx);
934 });
935 }
936 }
937 }
938
939 let mut buffer_versions = Vec::new();
940 for buffer in self.opened_buffers.values() {
941 if let OpenedModelHandle::Open(buffer) = buffer {
942 if let Some(buffer) = buffer.upgrade() {
943 let channel_buffer = buffer.read(cx);
944 let buffer = channel_buffer.buffer().read(cx);
945 buffer_versions.push(proto::ChannelBufferVersion {
946 channel_id: channel_buffer.channel_id.0,
947 epoch: channel_buffer.epoch(),
948 version: language::proto::serialize_version(&buffer.version()),
949 });
950 }
951 }
952 }
953
954 if buffer_versions.is_empty() {
955 return Task::ready(Ok(()));
956 }
957
958 let response = self.client.request(proto::RejoinChannelBuffers {
959 buffers: buffer_versions,
960 });
961
962 cx.spawn(|this, mut cx| async move {
963 let mut response = response.await?;
964
965 this.update(&mut cx, |this, cx| {
966 this.opened_buffers.retain(|_, buffer| match buffer {
967 OpenedModelHandle::Open(channel_buffer) => {
968 let Some(channel_buffer) = channel_buffer.upgrade() else {
969 return false;
970 };
971
972 channel_buffer.update(cx, |channel_buffer, cx| {
973 let channel_id = channel_buffer.channel_id;
974 if let Some(remote_buffer) = response
975 .buffers
976 .iter_mut()
977 .find(|buffer| buffer.channel_id == channel_id.0)
978 {
979 let channel_id = channel_buffer.channel_id;
980 let remote_version =
981 language::proto::deserialize_version(&remote_buffer.version);
982
983 channel_buffer.replace_collaborators(
984 mem::take(&mut remote_buffer.collaborators),
985 cx,
986 );
987
988 let operations = channel_buffer
989 .buffer()
990 .update(cx, |buffer, cx| {
991 let outgoing_operations =
992 buffer.serialize_ops(Some(remote_version), cx);
993 let incoming_operations =
994 mem::take(&mut remote_buffer.operations)
995 .into_iter()
996 .map(language::proto::deserialize_operation)
997 .collect::<Result<Vec<_>>>()?;
998 buffer.apply_ops(incoming_operations, cx)?;
999 anyhow::Ok(outgoing_operations)
1000 })
1001 .log_err();
1002
1003 if let Some(operations) = operations {
1004 let client = this.client.clone();
1005 cx.background_executor()
1006 .spawn(async move {
1007 let operations = operations.await;
1008 for chunk in
1009 language::proto::split_operations(operations)
1010 {
1011 client
1012 .send(proto::UpdateChannelBuffer {
1013 channel_id: channel_id.0,
1014 operations: chunk,
1015 })
1016 .ok();
1017 }
1018 })
1019 .detach();
1020 return true;
1021 }
1022 }
1023
1024 channel_buffer.disconnect(cx);
1025 false
1026 })
1027 }
1028 OpenedModelHandle::Loading(_) => true,
1029 });
1030 })
1031 .ok();
1032 anyhow::Ok(())
1033 })
1034 }
1035
1036 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1037 cx.notify();
1038
1039 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1040 cx.spawn(move |this, mut cx| async move {
1041 if wait_for_reconnect {
1042 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1043 }
1044
1045 if let Some(this) = this.upgrade() {
1046 this.update(&mut cx, |this, cx| {
1047 for (_, buffer) in this.opened_buffers.drain() {
1048 if let OpenedModelHandle::Open(buffer) = buffer {
1049 if let Some(buffer) = buffer.upgrade() {
1050 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1051 }
1052 }
1053 }
1054 })
1055 .ok();
1056 }
1057 })
1058 });
1059 }
1060
1061 pub(crate) fn update_channels(
1062 &mut self,
1063 payload: proto::UpdateChannels,
1064 cx: &mut ModelContext<ChannelStore>,
1065 ) -> Option<Task<Result<()>>> {
1066 if !payload.remove_channel_invitations.is_empty() {
1067 self.channel_invitations
1068 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1069 }
1070 for channel in payload.channel_invitations {
1071 match self
1072 .channel_invitations
1073 .binary_search_by_key(&channel.id, |c| c.id.0)
1074 {
1075 Ok(ix) => {
1076 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1077 }
1078 Err(ix) => self.channel_invitations.insert(
1079 ix,
1080 Arc::new(Channel {
1081 id: ChannelId(channel.id),
1082 visibility: channel.visibility(),
1083 name: channel.name.into(),
1084 parent_path: channel
1085 .parent_path
1086 .into_iter()
1087 .map(|cid| ChannelId(cid))
1088 .collect(),
1089 }),
1090 ),
1091 }
1092 }
1093
1094 let channels_changed = !payload.channels.is_empty()
1095 || !payload.delete_channels.is_empty()
1096 || !payload.latest_channel_message_ids.is_empty()
1097 || !payload.latest_channel_buffer_versions.is_empty()
1098 || !payload.hosted_projects.is_empty()
1099 || !payload.deleted_hosted_projects.is_empty();
1100
1101 if channels_changed {
1102 if !payload.delete_channels.is_empty() {
1103 let delete_channels: Vec<ChannelId> = payload
1104 .delete_channels
1105 .into_iter()
1106 .map(|cid| ChannelId(cid))
1107 .collect();
1108 self.channel_index.delete_channels(&delete_channels);
1109 self.channel_participants
1110 .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1111
1112 for channel_id in &delete_channels {
1113 let channel_id = *channel_id;
1114 if payload
1115 .channels
1116 .iter()
1117 .any(|channel| channel.id == channel_id.0)
1118 {
1119 continue;
1120 }
1121 if let Some(OpenedModelHandle::Open(buffer)) =
1122 self.opened_buffers.remove(&channel_id)
1123 {
1124 if let Some(buffer) = buffer.upgrade() {
1125 buffer.update(cx, ChannelBuffer::disconnect);
1126 }
1127 }
1128 }
1129 }
1130
1131 let mut index = self.channel_index.bulk_insert();
1132 for channel in payload.channels {
1133 let id = ChannelId(channel.id);
1134 let channel_changed = index.insert(channel);
1135
1136 if channel_changed {
1137 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1138 if let Some(buffer) = buffer.upgrade() {
1139 buffer.update(cx, ChannelBuffer::channel_changed);
1140 }
1141 }
1142 }
1143 }
1144
1145 for latest_buffer_version in payload.latest_channel_buffer_versions {
1146 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1147 self.channel_states
1148 .entry(ChannelId(latest_buffer_version.channel_id))
1149 .or_default()
1150 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1151 }
1152
1153 for latest_channel_message in payload.latest_channel_message_ids {
1154 self.channel_states
1155 .entry(ChannelId(latest_channel_message.channel_id))
1156 .or_default()
1157 .update_latest_message_id(latest_channel_message.message_id);
1158 }
1159
1160 for hosted_project in payload.hosted_projects {
1161 let hosted_project: HostedProject = hosted_project.into();
1162 if let Some(old_project) = self
1163 .hosted_projects
1164 .insert(hosted_project.project_id, hosted_project.clone())
1165 {
1166 self.channel_states
1167 .entry(old_project.channel_id)
1168 .or_default()
1169 .remove_hosted_project(old_project.project_id);
1170 }
1171 self.channel_states
1172 .entry(hosted_project.channel_id)
1173 .or_default()
1174 .add_hosted_project(hosted_project.project_id);
1175 }
1176
1177 for hosted_project_id in payload.deleted_hosted_projects {
1178 let hosted_project_id = ProjectId(hosted_project_id);
1179
1180 if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1181 self.channel_states
1182 .entry(old_project.channel_id)
1183 .or_default()
1184 .remove_hosted_project(old_project.project_id);
1185 }
1186 }
1187 }
1188
1189 cx.notify();
1190 if payload.channel_participants.is_empty() {
1191 return None;
1192 }
1193
1194 let mut all_user_ids = Vec::new();
1195 let channel_participants = payload.channel_participants;
1196 for entry in &channel_participants {
1197 for user_id in entry.participant_user_ids.iter() {
1198 if let Err(ix) = all_user_ids.binary_search(user_id) {
1199 all_user_ids.insert(ix, *user_id);
1200 }
1201 }
1202 }
1203
1204 let users = self
1205 .user_store
1206 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1207 Some(cx.spawn(|this, mut cx| async move {
1208 let users = users.await?;
1209
1210 this.update(&mut cx, |this, cx| {
1211 for entry in &channel_participants {
1212 let mut participants: Vec<_> = entry
1213 .participant_user_ids
1214 .iter()
1215 .filter_map(|user_id| {
1216 users
1217 .binary_search_by_key(&user_id, |user| &user.id)
1218 .ok()
1219 .map(|ix| users[ix].clone())
1220 })
1221 .collect();
1222
1223 participants.sort_by_key(|u| u.id);
1224
1225 this.channel_participants
1226 .insert(ChannelId(entry.channel_id), participants);
1227 }
1228
1229 cx.notify();
1230 })
1231 }))
1232 }
1233}
1234
1235impl ChannelState {
1236 fn set_role(&mut self, role: ChannelRole) {
1237 self.role = Some(role);
1238 }
1239
1240 fn has_channel_buffer_changed(&self) -> bool {
1241 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1242 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1243 && self
1244 .latest_notes_version
1245 .version
1246 .changed_since(&self.observed_notes_version.version))
1247 }
1248
1249 fn has_new_messages(&self) -> bool {
1250 let latest_message_id = self.latest_chat_message;
1251 let observed_message_id = self.observed_chat_message;
1252
1253 latest_message_id.is_some_and(|latest_message_id| {
1254 latest_message_id > observed_message_id.unwrap_or_default()
1255 })
1256 }
1257
1258 fn last_acknowledged_message_id(&self) -> Option<u64> {
1259 self.observed_chat_message
1260 }
1261
1262 fn acknowledge_message_id(&mut self, message_id: u64) {
1263 let observed = self.observed_chat_message.get_or_insert(message_id);
1264 *observed = (*observed).max(message_id);
1265 }
1266
1267 fn update_latest_message_id(&mut self, message_id: u64) {
1268 self.latest_chat_message =
1269 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1270 }
1271
1272 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1273 if self.observed_notes_version.epoch == epoch {
1274 self.observed_notes_version.version.join(version);
1275 } else {
1276 self.observed_notes_version = NotesVersion {
1277 epoch,
1278 version: version.clone(),
1279 };
1280 }
1281 }
1282
1283 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1284 if self.latest_notes_version.epoch == epoch {
1285 self.latest_notes_version.version.join(version);
1286 } else {
1287 self.latest_notes_version = NotesVersion {
1288 epoch,
1289 version: version.clone(),
1290 };
1291 }
1292 }
1293
1294 fn add_hosted_project(&mut self, project_id: ProjectId) {
1295 self.projects.insert(project_id);
1296 }
1297
1298 fn remove_hosted_project(&mut self, project_id: ProjectId) {
1299 self.projects.remove(&project_id);
1300 }
1301}