1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
11 Task, WeakModel,
12};
13use language::Capability;
14use rpc::{
15 proto::{self, ChannelRole, ChannelVisibility},
16 TypedEnvelope,
17};
18use settings::Settings;
19use std::{mem, sync::Arc, time::Duration};
20use util::{maybe, ResultExt};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
25 let channel_store =
26 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36#[derive(Debug, Clone)]
37pub struct HostedProject {
38 project_id: ProjectId,
39 channel_id: ChannelId,
40 name: SharedString,
41 _visibility: proto::ChannelVisibility,
42}
43
44impl From<proto::HostedProject> for HostedProject {
45 fn from(project: proto::HostedProject) -> Self {
46 Self {
47 project_id: ProjectId(project.project_id),
48 channel_id: ChannelId(project.channel_id),
49 _visibility: project.visibility(),
50 name: project.name.into(),
51 }
52 }
53}
54
55pub struct ChannelStore {
56 pub channel_index: ChannelIndex,
57 channel_invitations: Vec<Arc<Channel>>,
58 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
59 channel_states: HashMap<ChannelId, ChannelState>,
60 hosted_projects: HashMap<ProjectId, HostedProject>,
61
62 outgoing_invites: HashSet<(ChannelId, UserId)>,
63 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
64 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
65 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
66 client: Arc<Client>,
67 user_store: Model<UserStore>,
68 _rpc_subscriptions: [Subscription; 2],
69 _watch_connection_status: Task<Option<()>>,
70 disconnect_channel_buffers_task: Option<Task<()>>,
71 _update_channels: Task<()>,
72}
73
74#[derive(Clone, Debug)]
75pub struct Channel {
76 pub id: ChannelId,
77 pub name: SharedString,
78 pub visibility: proto::ChannelVisibility,
79 pub parent_path: Vec<ChannelId>,
80}
81
82#[derive(Default, Debug)]
83pub struct ChannelState {
84 latest_chat_message: Option<u64>,
85 latest_notes_version: NotesVersion,
86 observed_notes_version: NotesVersion,
87 observed_chat_message: Option<u64>,
88 role: Option<ChannelRole>,
89 projects: HashSet<ProjectId>,
90}
91
92impl Channel {
93 pub fn link(&self, cx: &AppContext) -> String {
94 format!(
95 "{}/channel/{}-{}",
96 ClientSettings::get_global(cx).server_url,
97 Self::slug(&self.name),
98 self.id
99 )
100 }
101
102 pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
103 self.link(cx)
104 + "/notes"
105 + &heading
106 .map(|h| format!("#{}", Self::slug(&h)))
107 .unwrap_or_default()
108 }
109
110 pub fn is_root_channel(&self) -> bool {
111 self.parent_path.is_empty()
112 }
113
114 pub fn root_id(&self) -> ChannelId {
115 self.parent_path.first().copied().unwrap_or(self.id)
116 }
117
118 pub fn slug(str: &str) -> String {
119 let slug: String = str
120 .chars()
121 .map(|c| if c.is_alphanumeric() { c } else { '-' })
122 .collect();
123
124 slug.trim_matches(|c| c == '-').to_string()
125 }
126}
127
128pub struct ChannelMembership {
129 pub user: Arc<User>,
130 pub kind: proto::channel_member::Kind,
131 pub role: proto::ChannelRole,
132}
133impl ChannelMembership {
134 pub fn sort_key(&self) -> MembershipSortKey {
135 MembershipSortKey {
136 role_order: match self.role {
137 proto::ChannelRole::Admin => 0,
138 proto::ChannelRole::Member => 1,
139 proto::ChannelRole::Banned => 2,
140 proto::ChannelRole::Talker => 3,
141 proto::ChannelRole::Guest => 4,
142 },
143 kind_order: match self.kind {
144 proto::channel_member::Kind::Member => 0,
145 proto::channel_member::Kind::Invitee => 1,
146 },
147 username_order: self.user.github_login.as_str(),
148 }
149 }
150}
151
152#[derive(PartialOrd, Ord, PartialEq, Eq)]
153pub struct MembershipSortKey<'a> {
154 role_order: u8,
155 kind_order: u8,
156 username_order: &'a str,
157}
158
159pub enum ChannelEvent {
160 ChannelCreated(ChannelId),
161 ChannelRenamed(ChannelId),
162}
163
164impl EventEmitter<ChannelEvent> for ChannelStore {}
165
166enum OpenedModelHandle<E> {
167 Open(WeakModel<E>),
168 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
169}
170
171struct GlobalChannelStore(Model<ChannelStore>);
172
173impl Global for GlobalChannelStore {}
174
175impl ChannelStore {
176 pub fn global(cx: &AppContext) -> Model<Self> {
177 cx.global::<GlobalChannelStore>().0.clone()
178 }
179
180 pub fn new(
181 client: Arc<Client>,
182 user_store: Model<UserStore>,
183 cx: &mut ModelContext<Self>,
184 ) -> Self {
185 let rpc_subscriptions = [
186 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
187 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
188 ];
189
190 let mut connection_status = client.status();
191 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
192 let watch_connection_status = cx.spawn(|this, mut cx| async move {
193 while let Some(status) = connection_status.next().await {
194 let this = this.upgrade()?;
195 match status {
196 client::Status::Connected { .. } => {
197 this.update(&mut cx, |this, cx| this.handle_connect(cx))
198 .ok()?
199 .await
200 .log_err()?;
201 }
202 client::Status::SignedOut | client::Status::UpgradeRequired => {
203 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
204 .ok();
205 }
206 _ => {
207 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
208 .ok();
209 }
210 }
211 }
212 Some(())
213 });
214
215 Self {
216 channel_invitations: Vec::default(),
217 channel_index: ChannelIndex::default(),
218 channel_participants: Default::default(),
219 hosted_projects: Default::default(),
220 outgoing_invites: Default::default(),
221 opened_buffers: Default::default(),
222 opened_chats: Default::default(),
223 update_channels_tx,
224 client,
225 user_store,
226 _rpc_subscriptions: rpc_subscriptions,
227 _watch_connection_status: watch_connection_status,
228 disconnect_channel_buffers_task: None,
229 _update_channels: cx.spawn(|this, mut cx| async move {
230 maybe!(async move {
231 while let Some(update_channels) = update_channels_rx.next().await {
232 if let Some(this) = this.upgrade() {
233 let update_task = this.update(&mut cx, |this, cx| {
234 this.update_channels(update_channels, cx)
235 })?;
236 if let Some(update_task) = update_task {
237 update_task.await.log_err();
238 }
239 }
240 }
241 anyhow::Ok(())
242 })
243 .await
244 .log_err();
245 }),
246 channel_states: Default::default(),
247 }
248 }
249
250 pub fn client(&self) -> Arc<Client> {
251 self.client.clone()
252 }
253
254 /// Returns the number of unique channels in the store
255 pub fn channel_count(&self) -> usize {
256 self.channel_index.by_id().len()
257 }
258
259 /// Returns the index of a channel ID in the list of unique channels
260 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
261 self.channel_index
262 .by_id()
263 .keys()
264 .position(|id| *id == channel_id)
265 }
266
267 /// Returns an iterator over all unique channels
268 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
269 self.channel_index.by_id().values()
270 }
271
272 /// Iterate over all entries in the channel DAG
273 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
274 self.channel_index
275 .ordered_channels()
276 .iter()
277 .filter_map(move |id| {
278 let channel = self.channel_index.by_id().get(id)?;
279 Some((channel.parent_path.len(), channel))
280 })
281 }
282
283 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
284 let channel_id = self.channel_index.ordered_channels().get(ix)?;
285 self.channel_index.by_id().get(channel_id)
286 }
287
288 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
289 self.channel_index.by_id().values().nth(ix)
290 }
291
292 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
293 self.channel_invitations
294 .iter()
295 .any(|channel| channel.id == channel_id)
296 }
297
298 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
299 &self.channel_invitations
300 }
301
302 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
303 self.channel_index.by_id().get(&channel_id)
304 }
305
306 pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
307 let mut projects: Vec<(SharedString, ProjectId)> = self
308 .channel_states
309 .get(&channel_id)
310 .map(|state| state.projects.clone())
311 .unwrap_or_default()
312 .into_iter()
313 .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
314 .collect();
315 projects.sort();
316 projects
317 }
318
319 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
320 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
321 if let OpenedModelHandle::Open(buffer) = buffer {
322 return buffer.upgrade().is_some();
323 }
324 }
325 false
326 }
327
328 pub fn open_channel_buffer(
329 &mut self,
330 channel_id: ChannelId,
331 cx: &mut ModelContext<Self>,
332 ) -> Task<Result<Model<ChannelBuffer>>> {
333 let client = self.client.clone();
334 let user_store = self.user_store.clone();
335 let channel_store = cx.handle();
336 self.open_channel_resource(
337 channel_id,
338 |this| &mut this.opened_buffers,
339 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
340 cx,
341 )
342 }
343
344 pub fn fetch_channel_messages(
345 &self,
346 message_ids: Vec<u64>,
347 cx: &mut ModelContext<Self>,
348 ) -> Task<Result<Vec<ChannelMessage>>> {
349 let request = if message_ids.is_empty() {
350 None
351 } else {
352 Some(
353 self.client
354 .request(proto::GetChannelMessagesById { message_ids }),
355 )
356 };
357 cx.spawn(|this, mut cx| async move {
358 if let Some(request) = request {
359 let response = request.await?;
360 let this = this
361 .upgrade()
362 .ok_or_else(|| anyhow!("channel store dropped"))?;
363 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
364 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
365 } else {
366 Ok(Vec::new())
367 }
368 })
369 }
370
371 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
372 self.channel_states
373 .get(&channel_id)
374 .is_some_and(|state| state.has_channel_buffer_changed())
375 }
376
377 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
378 self.channel_states
379 .get(&channel_id)
380 .is_some_and(|state| state.has_new_messages())
381 }
382
383 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
384 if let Some(state) = self.channel_states.get_mut(&channel_id) {
385 state.latest_chat_message = message_id;
386 }
387 }
388
389 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
390 self.channel_states.get(&channel_id).and_then(|state| {
391 if let Some(last_message_id) = state.latest_chat_message {
392 if state
393 .last_acknowledged_message_id()
394 .is_some_and(|id| id < last_message_id)
395 {
396 return state.last_acknowledged_message_id();
397 }
398 }
399
400 None
401 })
402 }
403
404 pub fn acknowledge_message_id(
405 &mut self,
406 channel_id: ChannelId,
407 message_id: u64,
408 cx: &mut ModelContext<Self>,
409 ) {
410 self.channel_states
411 .entry(channel_id)
412 .or_insert_with(|| Default::default())
413 .acknowledge_message_id(message_id);
414 cx.notify();
415 }
416
417 pub fn update_latest_message_id(
418 &mut self,
419 channel_id: ChannelId,
420 message_id: u64,
421 cx: &mut ModelContext<Self>,
422 ) {
423 self.channel_states
424 .entry(channel_id)
425 .or_insert_with(|| Default::default())
426 .update_latest_message_id(message_id);
427 cx.notify();
428 }
429
430 pub fn acknowledge_notes_version(
431 &mut self,
432 channel_id: ChannelId,
433 epoch: u64,
434 version: &clock::Global,
435 cx: &mut ModelContext<Self>,
436 ) {
437 self.channel_states
438 .entry(channel_id)
439 .or_insert_with(|| Default::default())
440 .acknowledge_notes_version(epoch, version);
441 cx.notify()
442 }
443
444 pub fn update_latest_notes_version(
445 &mut self,
446 channel_id: ChannelId,
447 epoch: u64,
448 version: &clock::Global,
449 cx: &mut ModelContext<Self>,
450 ) {
451 self.channel_states
452 .entry(channel_id)
453 .or_insert_with(|| Default::default())
454 .update_latest_notes_version(epoch, version);
455 cx.notify()
456 }
457
458 pub fn open_channel_chat(
459 &mut self,
460 channel_id: ChannelId,
461 cx: &mut ModelContext<Self>,
462 ) -> Task<Result<Model<ChannelChat>>> {
463 let client = self.client.clone();
464 let user_store = self.user_store.clone();
465 let this = cx.handle();
466 self.open_channel_resource(
467 channel_id,
468 |this| &mut this.opened_chats,
469 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
470 cx,
471 )
472 }
473
474 /// Asynchronously open a given resource associated with a channel.
475 ///
476 /// Make sure that the resource is only opened once, even if this method
477 /// is called multiple times with the same channel id while the first task
478 /// is still running.
479 fn open_channel_resource<T, F, Fut>(
480 &mut self,
481 channel_id: ChannelId,
482 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
483 load: F,
484 cx: &mut ModelContext<Self>,
485 ) -> Task<Result<Model<T>>>
486 where
487 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
488 Fut: Future<Output = Result<Model<T>>>,
489 T: 'static,
490 {
491 let task = loop {
492 match get_map(self).entry(channel_id) {
493 hash_map::Entry::Occupied(e) => match e.get() {
494 OpenedModelHandle::Open(model) => {
495 if let Some(model) = model.upgrade() {
496 break Task::ready(Ok(model)).shared();
497 } else {
498 get_map(self).remove(&channel_id);
499 continue;
500 }
501 }
502 OpenedModelHandle::Loading(task) => {
503 break task.clone();
504 }
505 },
506 hash_map::Entry::Vacant(e) => {
507 let task = cx
508 .spawn(move |this, mut cx| async move {
509 let channel = this.update(&mut cx, |this, _| {
510 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
511 Arc::new(anyhow!("no channel for id: {}", channel_id))
512 })
513 })??;
514
515 load(channel, cx).await.map_err(Arc::new)
516 })
517 .shared();
518
519 e.insert(OpenedModelHandle::Loading(task.clone()));
520 cx.spawn({
521 let task = task.clone();
522 move |this, mut cx| async move {
523 let result = task.await;
524 this.update(&mut cx, |this, _| match result {
525 Ok(model) => {
526 get_map(this).insert(
527 channel_id,
528 OpenedModelHandle::Open(model.downgrade()),
529 );
530 }
531 Err(_) => {
532 get_map(this).remove(&channel_id);
533 }
534 })
535 .ok();
536 }
537 })
538 .detach();
539 break task;
540 }
541 }
542 };
543 cx.background_executor()
544 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
545 }
546
547 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
548 self.channel_role(channel_id) == proto::ChannelRole::Admin
549 }
550
551 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
552 self.channel_index
553 .by_id()
554 .get(&channel_id)
555 .map_or(false, |channel| channel.is_root_channel())
556 }
557
558 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
559 self.channel_index
560 .by_id()
561 .get(&channel_id)
562 .map_or(false, |channel| {
563 channel.visibility == ChannelVisibility::Public
564 })
565 }
566
567 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
568 match self.channel_role(channel_id) {
569 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
570 _ => Capability::ReadOnly,
571 }
572 }
573
574 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
575 maybe!({
576 let mut channel = self.channel_for_id(channel_id)?;
577 if !channel.is_root_channel() {
578 channel = self.channel_for_id(channel.root_id())?;
579 }
580 let root_channel_state = self.channel_states.get(&channel.id);
581 root_channel_state?.role
582 })
583 .unwrap_or(proto::ChannelRole::Guest)
584 }
585
586 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
587 self.channel_participants
588 .get(&channel_id)
589 .map_or(&[], |v| v.as_slice())
590 }
591
592 pub fn create_channel(
593 &self,
594 name: &str,
595 parent_id: Option<ChannelId>,
596 cx: &mut ModelContext<Self>,
597 ) -> Task<Result<ChannelId>> {
598 let client = self.client.clone();
599 let name = name.trim_start_matches('#').to_owned();
600 cx.spawn(move |this, mut cx| async move {
601 let response = client
602 .request(proto::CreateChannel {
603 name,
604 parent_id: parent_id.map(|cid| cid.0),
605 })
606 .await?;
607
608 let channel = response
609 .channel
610 .ok_or_else(|| anyhow!("missing channel in response"))?;
611 let channel_id = ChannelId(channel.id);
612
613 this.update(&mut cx, |this, cx| {
614 let task = this.update_channels(
615 proto::UpdateChannels {
616 channels: vec![channel],
617 ..Default::default()
618 },
619 cx,
620 );
621 assert!(task.is_none());
622
623 // This event is emitted because the collab panel wants to clear the pending edit state
624 // before this frame is rendered. But we can't guarantee that the collab panel's future
625 // will resolve before this flush_effects finishes. Synchronously emitting this event
626 // ensures that the collab panel will observe this creation before the frame completes
627 cx.emit(ChannelEvent::ChannelCreated(channel_id));
628 })?;
629
630 Ok(channel_id)
631 })
632 }
633
634 pub fn move_channel(
635 &mut self,
636 channel_id: ChannelId,
637 to: ChannelId,
638 cx: &mut ModelContext<Self>,
639 ) -> Task<Result<()>> {
640 let client = self.client.clone();
641 cx.spawn(move |_, _| async move {
642 let _ = client
643 .request(proto::MoveChannel {
644 channel_id: channel_id.0,
645 to: to.0,
646 })
647 .await?;
648
649 Ok(())
650 })
651 }
652
653 pub fn set_channel_visibility(
654 &mut self,
655 channel_id: ChannelId,
656 visibility: ChannelVisibility,
657 cx: &mut ModelContext<Self>,
658 ) -> Task<Result<()>> {
659 let client = self.client.clone();
660 cx.spawn(move |_, _| async move {
661 let _ = client
662 .request(proto::SetChannelVisibility {
663 channel_id: channel_id.0,
664 visibility: visibility.into(),
665 })
666 .await?;
667
668 Ok(())
669 })
670 }
671
672 pub fn invite_member(
673 &mut self,
674 channel_id: ChannelId,
675 user_id: UserId,
676 role: proto::ChannelRole,
677 cx: &mut ModelContext<Self>,
678 ) -> Task<Result<()>> {
679 if !self.outgoing_invites.insert((channel_id, user_id)) {
680 return Task::ready(Err(anyhow!("invite request already in progress")));
681 }
682
683 cx.notify();
684 let client = self.client.clone();
685 cx.spawn(move |this, mut cx| async move {
686 let result = client
687 .request(proto::InviteChannelMember {
688 channel_id: channel_id.0,
689 user_id,
690 role: role.into(),
691 })
692 .await;
693
694 this.update(&mut cx, |this, cx| {
695 this.outgoing_invites.remove(&(channel_id, user_id));
696 cx.notify();
697 })?;
698
699 result?;
700
701 Ok(())
702 })
703 }
704
705 pub fn remove_member(
706 &mut self,
707 channel_id: ChannelId,
708 user_id: u64,
709 cx: &mut ModelContext<Self>,
710 ) -> Task<Result<()>> {
711 if !self.outgoing_invites.insert((channel_id, user_id)) {
712 return Task::ready(Err(anyhow!("invite request already in progress")));
713 }
714
715 cx.notify();
716 let client = self.client.clone();
717 cx.spawn(move |this, mut cx| async move {
718 let result = client
719 .request(proto::RemoveChannelMember {
720 channel_id: channel_id.0,
721 user_id,
722 })
723 .await;
724
725 this.update(&mut cx, |this, cx| {
726 this.outgoing_invites.remove(&(channel_id, user_id));
727 cx.notify();
728 })?;
729 result?;
730 Ok(())
731 })
732 }
733
734 pub fn set_member_role(
735 &mut self,
736 channel_id: ChannelId,
737 user_id: UserId,
738 role: proto::ChannelRole,
739 cx: &mut ModelContext<Self>,
740 ) -> Task<Result<()>> {
741 if !self.outgoing_invites.insert((channel_id, user_id)) {
742 return Task::ready(Err(anyhow!("member request already in progress")));
743 }
744
745 cx.notify();
746 let client = self.client.clone();
747 cx.spawn(move |this, mut cx| async move {
748 let result = client
749 .request(proto::SetChannelMemberRole {
750 channel_id: channel_id.0,
751 user_id,
752 role: role.into(),
753 })
754 .await;
755
756 this.update(&mut cx, |this, cx| {
757 this.outgoing_invites.remove(&(channel_id, user_id));
758 cx.notify();
759 })?;
760
761 result?;
762 Ok(())
763 })
764 }
765
766 pub fn rename(
767 &mut self,
768 channel_id: ChannelId,
769 new_name: &str,
770 cx: &mut ModelContext<Self>,
771 ) -> Task<Result<()>> {
772 let client = self.client.clone();
773 let name = new_name.to_string();
774 cx.spawn(move |this, mut cx| async move {
775 let channel = client
776 .request(proto::RenameChannel {
777 channel_id: channel_id.0,
778 name,
779 })
780 .await?
781 .channel
782 .ok_or_else(|| anyhow!("missing channel in response"))?;
783 this.update(&mut cx, |this, cx| {
784 let task = this.update_channels(
785 proto::UpdateChannels {
786 channels: vec![channel],
787 ..Default::default()
788 },
789 cx,
790 );
791 assert!(task.is_none());
792
793 // This event is emitted because the collab panel wants to clear the pending edit state
794 // before this frame is rendered. But we can't guarantee that the collab panel's future
795 // will resolve before this flush_effects finishes. Synchronously emitting this event
796 // ensures that the collab panel will observe this creation before the frame complete
797 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
798 })?;
799 Ok(())
800 })
801 }
802
803 pub fn respond_to_channel_invite(
804 &mut self,
805 channel_id: ChannelId,
806 accept: bool,
807 cx: &mut ModelContext<Self>,
808 ) -> Task<Result<()>> {
809 let client = self.client.clone();
810 cx.background_executor().spawn(async move {
811 client
812 .request(proto::RespondToChannelInvite {
813 channel_id: channel_id.0,
814 accept,
815 })
816 .await?;
817 Ok(())
818 })
819 }
820
821 pub fn get_channel_member_details(
822 &self,
823 channel_id: ChannelId,
824 cx: &mut ModelContext<Self>,
825 ) -> Task<Result<Vec<ChannelMembership>>> {
826 let client = self.client.clone();
827 let user_store = self.user_store.downgrade();
828 cx.spawn(move |_, mut cx| async move {
829 let response = client
830 .request(proto::GetChannelMembers {
831 channel_id: channel_id.0,
832 })
833 .await?;
834
835 let user_ids = response.members.iter().map(|m| m.user_id).collect();
836 let user_store = user_store
837 .upgrade()
838 .ok_or_else(|| anyhow!("user store dropped"))?;
839 let users = user_store
840 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
841 .await?;
842
843 Ok(users
844 .into_iter()
845 .zip(response.members)
846 .map(|(user, member)| ChannelMembership {
847 user,
848 role: member.role(),
849 kind: member.kind(),
850 })
851 .collect())
852 })
853 }
854
855 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
856 let client = self.client.clone();
857 async move {
858 client
859 .request(proto::DeleteChannel {
860 channel_id: channel_id.0,
861 })
862 .await?;
863 Ok(())
864 }
865 }
866
867 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
868 false
869 }
870
871 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
872 self.outgoing_invites.contains(&(channel_id, user_id))
873 }
874
875 async fn handle_update_channels(
876 this: Model<Self>,
877 message: TypedEnvelope<proto::UpdateChannels>,
878 _: Arc<Client>,
879 mut cx: AsyncAppContext,
880 ) -> Result<()> {
881 this.update(&mut cx, |this, _| {
882 this.update_channels_tx
883 .unbounded_send(message.payload)
884 .unwrap();
885 })?;
886 Ok(())
887 }
888
889 async fn handle_update_user_channels(
890 this: Model<Self>,
891 message: TypedEnvelope<proto::UpdateUserChannels>,
892 _: Arc<Client>,
893 mut cx: AsyncAppContext,
894 ) -> Result<()> {
895 this.update(&mut cx, |this, cx| {
896 for buffer_version in message.payload.observed_channel_buffer_version {
897 let version = language::proto::deserialize_version(&buffer_version.version);
898 this.acknowledge_notes_version(
899 ChannelId(buffer_version.channel_id),
900 buffer_version.epoch,
901 &version,
902 cx,
903 );
904 }
905 for message_id in message.payload.observed_channel_message_id {
906 this.acknowledge_message_id(
907 ChannelId(message_id.channel_id),
908 message_id.message_id,
909 cx,
910 );
911 }
912 for membership in message.payload.channel_memberships {
913 if let Some(role) = ChannelRole::from_i32(membership.role) {
914 this.channel_states
915 .entry(ChannelId(membership.channel_id))
916 .or_insert_with(|| ChannelState::default())
917 .set_role(role)
918 }
919 }
920 })
921 }
922
923 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
924 self.channel_index.clear();
925 self.channel_invitations.clear();
926 self.channel_participants.clear();
927 self.channel_index.clear();
928 self.outgoing_invites.clear();
929 self.disconnect_channel_buffers_task.take();
930
931 for chat in self.opened_chats.values() {
932 if let OpenedModelHandle::Open(chat) = chat {
933 if let Some(chat) = chat.upgrade() {
934 chat.update(cx, |chat, cx| {
935 chat.rejoin(cx);
936 });
937 }
938 }
939 }
940
941 let mut buffer_versions = Vec::new();
942 for buffer in self.opened_buffers.values() {
943 if let OpenedModelHandle::Open(buffer) = buffer {
944 if let Some(buffer) = buffer.upgrade() {
945 let channel_buffer = buffer.read(cx);
946 let buffer = channel_buffer.buffer().read(cx);
947 buffer_versions.push(proto::ChannelBufferVersion {
948 channel_id: channel_buffer.channel_id.0,
949 epoch: channel_buffer.epoch(),
950 version: language::proto::serialize_version(&buffer.version()),
951 });
952 }
953 }
954 }
955
956 if buffer_versions.is_empty() {
957 return Task::ready(Ok(()));
958 }
959
960 let response = self.client.request(proto::RejoinChannelBuffers {
961 buffers: buffer_versions,
962 });
963
964 cx.spawn(|this, mut cx| async move {
965 let mut response = response.await?;
966
967 this.update(&mut cx, |this, cx| {
968 this.opened_buffers.retain(|_, buffer| match buffer {
969 OpenedModelHandle::Open(channel_buffer) => {
970 let Some(channel_buffer) = channel_buffer.upgrade() else {
971 return false;
972 };
973
974 channel_buffer.update(cx, |channel_buffer, cx| {
975 let channel_id = channel_buffer.channel_id;
976 if let Some(remote_buffer) = response
977 .buffers
978 .iter_mut()
979 .find(|buffer| buffer.channel_id == channel_id.0)
980 {
981 let channel_id = channel_buffer.channel_id;
982 let remote_version =
983 language::proto::deserialize_version(&remote_buffer.version);
984
985 channel_buffer.replace_collaborators(
986 mem::take(&mut remote_buffer.collaborators),
987 cx,
988 );
989
990 let operations = channel_buffer
991 .buffer()
992 .update(cx, |buffer, cx| {
993 let outgoing_operations =
994 buffer.serialize_ops(Some(remote_version), cx);
995 let incoming_operations =
996 mem::take(&mut remote_buffer.operations)
997 .into_iter()
998 .map(language::proto::deserialize_operation)
999 .collect::<Result<Vec<_>>>()?;
1000 buffer.apply_ops(incoming_operations, cx)?;
1001 anyhow::Ok(outgoing_operations)
1002 })
1003 .log_err();
1004
1005 if let Some(operations) = operations {
1006 let client = this.client.clone();
1007 cx.background_executor()
1008 .spawn(async move {
1009 let operations = operations.await;
1010 for chunk in
1011 language::proto::split_operations(operations)
1012 {
1013 client
1014 .send(proto::UpdateChannelBuffer {
1015 channel_id: channel_id.0,
1016 operations: chunk,
1017 })
1018 .ok();
1019 }
1020 })
1021 .detach();
1022 return true;
1023 }
1024 }
1025
1026 channel_buffer.disconnect(cx);
1027 false
1028 })
1029 }
1030 OpenedModelHandle::Loading(_) => true,
1031 });
1032 })
1033 .ok();
1034 anyhow::Ok(())
1035 })
1036 }
1037
1038 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1039 cx.notify();
1040
1041 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1042 cx.spawn(move |this, mut cx| async move {
1043 if wait_for_reconnect {
1044 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1045 }
1046
1047 if let Some(this) = this.upgrade() {
1048 this.update(&mut cx, |this, cx| {
1049 for (_, buffer) in this.opened_buffers.drain() {
1050 if let OpenedModelHandle::Open(buffer) = buffer {
1051 if let Some(buffer) = buffer.upgrade() {
1052 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1053 }
1054 }
1055 }
1056 })
1057 .ok();
1058 }
1059 })
1060 });
1061 }
1062
1063 pub(crate) fn update_channels(
1064 &mut self,
1065 payload: proto::UpdateChannels,
1066 cx: &mut ModelContext<ChannelStore>,
1067 ) -> Option<Task<Result<()>>> {
1068 if !payload.remove_channel_invitations.is_empty() {
1069 self.channel_invitations
1070 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1071 }
1072 for channel in payload.channel_invitations {
1073 match self
1074 .channel_invitations
1075 .binary_search_by_key(&channel.id, |c| c.id.0)
1076 {
1077 Ok(ix) => {
1078 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1079 }
1080 Err(ix) => self.channel_invitations.insert(
1081 ix,
1082 Arc::new(Channel {
1083 id: ChannelId(channel.id),
1084 visibility: channel.visibility(),
1085 name: channel.name.into(),
1086 parent_path: channel
1087 .parent_path
1088 .into_iter()
1089 .map(|cid| ChannelId(cid))
1090 .collect(),
1091 }),
1092 ),
1093 }
1094 }
1095
1096 let channels_changed = !payload.channels.is_empty()
1097 || !payload.delete_channels.is_empty()
1098 || !payload.latest_channel_message_ids.is_empty()
1099 || !payload.latest_channel_buffer_versions.is_empty()
1100 || !payload.hosted_projects.is_empty()
1101 || !payload.deleted_hosted_projects.is_empty();
1102
1103 if channels_changed {
1104 if !payload.delete_channels.is_empty() {
1105 let delete_channels: Vec<ChannelId> = payload
1106 .delete_channels
1107 .into_iter()
1108 .map(|cid| ChannelId(cid))
1109 .collect();
1110 self.channel_index.delete_channels(&delete_channels);
1111 self.channel_participants
1112 .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1113
1114 for channel_id in &delete_channels {
1115 let channel_id = *channel_id;
1116 if payload
1117 .channels
1118 .iter()
1119 .any(|channel| channel.id == channel_id.0)
1120 {
1121 continue;
1122 }
1123 if let Some(OpenedModelHandle::Open(buffer)) =
1124 self.opened_buffers.remove(&channel_id)
1125 {
1126 if let Some(buffer) = buffer.upgrade() {
1127 buffer.update(cx, ChannelBuffer::disconnect);
1128 }
1129 }
1130 }
1131 }
1132
1133 let mut index = self.channel_index.bulk_insert();
1134 for channel in payload.channels {
1135 let id = ChannelId(channel.id);
1136 let channel_changed = index.insert(channel);
1137
1138 if channel_changed {
1139 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1140 if let Some(buffer) = buffer.upgrade() {
1141 buffer.update(cx, ChannelBuffer::channel_changed);
1142 }
1143 }
1144 }
1145 }
1146
1147 for latest_buffer_version in payload.latest_channel_buffer_versions {
1148 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1149 self.channel_states
1150 .entry(ChannelId(latest_buffer_version.channel_id))
1151 .or_default()
1152 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1153 }
1154
1155 for latest_channel_message in payload.latest_channel_message_ids {
1156 self.channel_states
1157 .entry(ChannelId(latest_channel_message.channel_id))
1158 .or_default()
1159 .update_latest_message_id(latest_channel_message.message_id);
1160 }
1161
1162 for hosted_project in payload.hosted_projects {
1163 let hosted_project: HostedProject = hosted_project.into();
1164 if let Some(old_project) = self
1165 .hosted_projects
1166 .insert(hosted_project.project_id, hosted_project.clone())
1167 {
1168 self.channel_states
1169 .entry(old_project.channel_id)
1170 .or_default()
1171 .remove_hosted_project(old_project.project_id);
1172 }
1173 self.channel_states
1174 .entry(hosted_project.channel_id)
1175 .or_default()
1176 .add_hosted_project(hosted_project.project_id);
1177 }
1178
1179 for hosted_project_id in payload.deleted_hosted_projects {
1180 let hosted_project_id = ProjectId(hosted_project_id);
1181
1182 if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1183 self.channel_states
1184 .entry(old_project.channel_id)
1185 .or_default()
1186 .remove_hosted_project(old_project.project_id);
1187 }
1188 }
1189 }
1190
1191 cx.notify();
1192 if payload.channel_participants.is_empty() {
1193 return None;
1194 }
1195
1196 let mut all_user_ids = Vec::new();
1197 let channel_participants = payload.channel_participants;
1198 for entry in &channel_participants {
1199 for user_id in entry.participant_user_ids.iter() {
1200 if let Err(ix) = all_user_ids.binary_search(user_id) {
1201 all_user_ids.insert(ix, *user_id);
1202 }
1203 }
1204 }
1205
1206 let users = self
1207 .user_store
1208 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1209 Some(cx.spawn(|this, mut cx| async move {
1210 let users = users.await?;
1211
1212 this.update(&mut cx, |this, cx| {
1213 for entry in &channel_participants {
1214 let mut participants: Vec<_> = entry
1215 .participant_user_ids
1216 .iter()
1217 .filter_map(|user_id| {
1218 users
1219 .binary_search_by_key(&user_id, |user| &user.id)
1220 .ok()
1221 .map(|ix| users[ix].clone())
1222 })
1223 .collect();
1224
1225 participants.sort_by_key(|u| u.id);
1226
1227 this.channel_participants
1228 .insert(ChannelId(entry.channel_id), participants);
1229 }
1230
1231 cx.notify();
1232 })
1233 }))
1234 }
1235}
1236
1237impl ChannelState {
1238 fn set_role(&mut self, role: ChannelRole) {
1239 self.role = Some(role);
1240 }
1241
1242 fn has_channel_buffer_changed(&self) -> bool {
1243 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1244 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1245 && self
1246 .latest_notes_version
1247 .version
1248 .changed_since(&self.observed_notes_version.version))
1249 }
1250
1251 fn has_new_messages(&self) -> bool {
1252 let latest_message_id = self.latest_chat_message;
1253 let observed_message_id = self.observed_chat_message;
1254
1255 latest_message_id.is_some_and(|latest_message_id| {
1256 latest_message_id > observed_message_id.unwrap_or_default()
1257 })
1258 }
1259
1260 fn last_acknowledged_message_id(&self) -> Option<u64> {
1261 self.observed_chat_message
1262 }
1263
1264 fn acknowledge_message_id(&mut self, message_id: u64) {
1265 let observed = self.observed_chat_message.get_or_insert(message_id);
1266 *observed = (*observed).max(message_id);
1267 }
1268
1269 fn update_latest_message_id(&mut self, message_id: u64) {
1270 self.latest_chat_message =
1271 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1272 }
1273
1274 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1275 if self.observed_notes_version.epoch == epoch {
1276 self.observed_notes_version.version.join(version);
1277 } else {
1278 self.observed_notes_version = NotesVersion {
1279 epoch,
1280 version: version.clone(),
1281 };
1282 }
1283 }
1284
1285 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1286 if self.latest_notes_version.epoch == epoch {
1287 self.latest_notes_version.version.join(version);
1288 } else {
1289 self.latest_notes_version = NotesVersion {
1290 epoch,
1291 version: version.clone(),
1292 };
1293 }
1294 }
1295
1296 fn add_hosted_project(&mut self, project_id: ProjectId) {
1297 self.projects.insert(project_id);
1298 }
1299
1300 fn remove_hosted_project(&mut self, project_id: ProjectId) {
1301 self.projects.remove(&project_id);
1302 }
1303}