1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
11 Task, WeakModel,
12};
13use language::Capability;
14use release_channel::RELEASE_CHANNEL;
15use rpc::{
16 proto::{self, ChannelRole, ChannelVisibility},
17 TypedEnvelope,
18};
19use std::{mem, sync::Arc, time::Duration};
20use util::{async_maybe, maybe, ResultExt};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
25 let channel_store =
26 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
31pub struct HostedProjectId(pub u64);
32
33#[derive(Debug, Clone, Default)]
34struct NotesVersion {
35 epoch: u64,
36 version: clock::Global,
37}
38
39#[derive(Debug, Clone)]
40pub struct HostedProject {
41 id: HostedProjectId,
42 channel_id: ChannelId,
43 name: SharedString,
44 _visibility: proto::ChannelVisibility,
45}
46
47impl From<proto::HostedProject> for HostedProject {
48 fn from(project: proto::HostedProject) -> Self {
49 Self {
50 id: HostedProjectId(project.id),
51 channel_id: ChannelId(project.channel_id),
52 _visibility: project.visibility(),
53 name: project.name.into(),
54 }
55 }
56}
57
58pub struct ChannelStore {
59 pub channel_index: ChannelIndex,
60 channel_invitations: Vec<Arc<Channel>>,
61 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
62 channel_states: HashMap<ChannelId, ChannelState>,
63 hosted_projects: HashMap<HostedProjectId, HostedProject>,
64
65 outgoing_invites: HashSet<(ChannelId, UserId)>,
66 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
67 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
68 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
69 client: Arc<Client>,
70 user_store: Model<UserStore>,
71 _rpc_subscriptions: [Subscription; 2],
72 _watch_connection_status: Task<Option<()>>,
73 disconnect_channel_buffers_task: Option<Task<()>>,
74 _update_channels: Task<()>,
75}
76
77#[derive(Clone, Debug)]
78pub struct Channel {
79 pub id: ChannelId,
80 pub name: SharedString,
81 pub visibility: proto::ChannelVisibility,
82 pub parent_path: Vec<ChannelId>,
83}
84
85#[derive(Default)]
86pub struct ChannelState {
87 latest_chat_message: Option<u64>,
88 latest_notes_versions: Option<NotesVersion>,
89 observed_chat_message: Option<u64>,
90 observed_notes_versions: Option<NotesVersion>,
91 role: Option<ChannelRole>,
92 projects: HashSet<HostedProjectId>,
93}
94
95impl Channel {
96 pub fn link(&self) -> String {
97 RELEASE_CHANNEL.link_prefix().to_owned()
98 + "channel/"
99 + &Self::slug(&self.name)
100 + "-"
101 + &self.id.to_string()
102 }
103
104 pub fn notes_link(&self, heading: Option<String>) -> String {
105 self.link()
106 + "/notes"
107 + &heading
108 .map(|h| format!("#{}", Self::slug(&h)))
109 .unwrap_or_default()
110 }
111
112 pub fn is_root_channel(&self) -> bool {
113 self.parent_path.is_empty()
114 }
115
116 pub fn root_id(&self) -> ChannelId {
117 self.parent_path.first().copied().unwrap_or(self.id)
118 }
119
120 pub fn slug(str: &str) -> String {
121 let slug: String = str
122 .chars()
123 .map(|c| if c.is_alphanumeric() { c } else { '-' })
124 .collect();
125
126 slug.trim_matches(|c| c == '-').to_string()
127 }
128}
129
130pub struct ChannelMembership {
131 pub user: Arc<User>,
132 pub kind: proto::channel_member::Kind,
133 pub role: proto::ChannelRole,
134}
135impl ChannelMembership {
136 pub fn sort_key(&self) -> MembershipSortKey {
137 MembershipSortKey {
138 role_order: match self.role {
139 proto::ChannelRole::Admin => 0,
140 proto::ChannelRole::Member => 1,
141 proto::ChannelRole::Banned => 2,
142 proto::ChannelRole::Talker => 3,
143 proto::ChannelRole::Guest => 4,
144 },
145 kind_order: match self.kind {
146 proto::channel_member::Kind::Member => 0,
147 proto::channel_member::Kind::Invitee => 1,
148 },
149 username_order: self.user.github_login.as_str(),
150 }
151 }
152}
153
154#[derive(PartialOrd, Ord, PartialEq, Eq)]
155pub struct MembershipSortKey<'a> {
156 role_order: u8,
157 kind_order: u8,
158 username_order: &'a str,
159}
160
161pub enum ChannelEvent {
162 ChannelCreated(ChannelId),
163 ChannelRenamed(ChannelId),
164}
165
166impl EventEmitter<ChannelEvent> for ChannelStore {}
167
168enum OpenedModelHandle<E> {
169 Open(WeakModel<E>),
170 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
171}
172
173struct GlobalChannelStore(Model<ChannelStore>);
174
175impl Global for GlobalChannelStore {}
176
177impl ChannelStore {
178 pub fn global(cx: &AppContext) -> Model<Self> {
179 cx.global::<GlobalChannelStore>().0.clone()
180 }
181
182 pub fn new(
183 client: Arc<Client>,
184 user_store: Model<UserStore>,
185 cx: &mut ModelContext<Self>,
186 ) -> Self {
187 let rpc_subscriptions = [
188 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
189 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
190 ];
191
192 let mut connection_status = client.status();
193 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
194 let watch_connection_status = cx.spawn(|this, mut cx| async move {
195 while let Some(status) = connection_status.next().await {
196 let this = this.upgrade()?;
197 match status {
198 client::Status::Connected { .. } => {
199 this.update(&mut cx, |this, cx| this.handle_connect(cx))
200 .ok()?
201 .await
202 .log_err()?;
203 }
204 client::Status::SignedOut | client::Status::UpgradeRequired => {
205 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
206 .ok();
207 }
208 _ => {
209 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
210 .ok();
211 }
212 }
213 }
214 Some(())
215 });
216
217 Self {
218 channel_invitations: Vec::default(),
219 channel_index: ChannelIndex::default(),
220 channel_participants: Default::default(),
221 hosted_projects: Default::default(),
222 outgoing_invites: Default::default(),
223 opened_buffers: Default::default(),
224 opened_chats: Default::default(),
225 update_channels_tx,
226 client,
227 user_store,
228 _rpc_subscriptions: rpc_subscriptions,
229 _watch_connection_status: watch_connection_status,
230 disconnect_channel_buffers_task: None,
231 _update_channels: cx.spawn(|this, mut cx| async move {
232 async_maybe!({
233 while let Some(update_channels) = update_channels_rx.next().await {
234 if let Some(this) = this.upgrade() {
235 let update_task = this.update(&mut cx, |this, cx| {
236 this.update_channels(update_channels, cx)
237 })?;
238 if let Some(update_task) = update_task {
239 update_task.await.log_err();
240 }
241 }
242 }
243 anyhow::Ok(())
244 })
245 .await
246 .log_err();
247 }),
248 channel_states: Default::default(),
249 }
250 }
251
252 pub fn client(&self) -> Arc<Client> {
253 self.client.clone()
254 }
255
256 /// Returns the number of unique channels in the store
257 pub fn channel_count(&self) -> usize {
258 self.channel_index.by_id().len()
259 }
260
261 /// Returns the index of a channel ID in the list of unique channels
262 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
263 self.channel_index
264 .by_id()
265 .keys()
266 .position(|id| *id == channel_id)
267 }
268
269 /// Returns an iterator over all unique channels
270 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
271 self.channel_index.by_id().values()
272 }
273
274 /// Iterate over all entries in the channel DAG
275 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
276 self.channel_index
277 .ordered_channels()
278 .iter()
279 .filter_map(move |id| {
280 let channel = self.channel_index.by_id().get(id)?;
281 Some((channel.parent_path.len(), channel))
282 })
283 }
284
285 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
286 let channel_id = self.channel_index.ordered_channels().get(ix)?;
287 self.channel_index.by_id().get(channel_id)
288 }
289
290 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
291 self.channel_index.by_id().values().nth(ix)
292 }
293
294 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
295 self.channel_invitations
296 .iter()
297 .any(|channel| channel.id == channel_id)
298 }
299
300 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
301 &self.channel_invitations
302 }
303
304 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
305 self.channel_index.by_id().get(&channel_id)
306 }
307
308 pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, HostedProjectId)> {
309 let mut projects: Vec<(SharedString, HostedProjectId)> = self
310 .channel_states
311 .get(&channel_id)
312 .map(|state| state.projects.clone())
313 .unwrap_or_default()
314 .into_iter()
315 .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
316 .collect();
317 projects.sort();
318 projects
319 }
320
321 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
322 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
323 if let OpenedModelHandle::Open(buffer) = buffer {
324 return buffer.upgrade().is_some();
325 }
326 }
327 false
328 }
329
330 pub fn open_channel_buffer(
331 &mut self,
332 channel_id: ChannelId,
333 cx: &mut ModelContext<Self>,
334 ) -> Task<Result<Model<ChannelBuffer>>> {
335 let client = self.client.clone();
336 let user_store = self.user_store.clone();
337 let channel_store = cx.handle();
338 self.open_channel_resource(
339 channel_id,
340 |this| &mut this.opened_buffers,
341 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
342 cx,
343 )
344 }
345
346 pub fn fetch_channel_messages(
347 &self,
348 message_ids: Vec<u64>,
349 cx: &mut ModelContext<Self>,
350 ) -> Task<Result<Vec<ChannelMessage>>> {
351 let request = if message_ids.is_empty() {
352 None
353 } else {
354 Some(
355 self.client
356 .request(proto::GetChannelMessagesById { message_ids }),
357 )
358 };
359 cx.spawn(|this, mut cx| async move {
360 if let Some(request) = request {
361 let response = request.await?;
362 let this = this
363 .upgrade()
364 .ok_or_else(|| anyhow!("channel store dropped"))?;
365 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
366 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
367 } else {
368 Ok(Vec::new())
369 }
370 })
371 }
372
373 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
374 self.channel_states
375 .get(&channel_id)
376 .is_some_and(|state| state.has_channel_buffer_changed())
377 }
378
379 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
380 self.channel_states
381 .get(&channel_id)
382 .is_some_and(|state| state.has_new_messages())
383 }
384
385 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
386 self.channel_states.get(&channel_id).and_then(|state| {
387 if let Some(last_message_id) = state.latest_chat_message {
388 if state
389 .last_acknowledged_message_id()
390 .is_some_and(|id| id < last_message_id)
391 {
392 return state.last_acknowledged_message_id();
393 }
394 }
395
396 None
397 })
398 }
399
400 pub fn acknowledge_message_id(
401 &mut self,
402 channel_id: ChannelId,
403 message_id: u64,
404 cx: &mut ModelContext<Self>,
405 ) {
406 self.channel_states
407 .entry(channel_id)
408 .or_insert_with(|| Default::default())
409 .acknowledge_message_id(message_id);
410 cx.notify();
411 }
412
413 pub fn update_latest_message_id(
414 &mut self,
415 channel_id: ChannelId,
416 message_id: u64,
417 cx: &mut ModelContext<Self>,
418 ) {
419 self.channel_states
420 .entry(channel_id)
421 .or_insert_with(|| Default::default())
422 .update_latest_message_id(message_id);
423 cx.notify();
424 }
425
426 pub fn acknowledge_notes_version(
427 &mut self,
428 channel_id: ChannelId,
429 epoch: u64,
430 version: &clock::Global,
431 cx: &mut ModelContext<Self>,
432 ) {
433 self.channel_states
434 .entry(channel_id)
435 .or_insert_with(|| Default::default())
436 .acknowledge_notes_version(epoch, version);
437 cx.notify()
438 }
439
440 pub fn update_latest_notes_version(
441 &mut self,
442 channel_id: ChannelId,
443 epoch: u64,
444 version: &clock::Global,
445 cx: &mut ModelContext<Self>,
446 ) {
447 self.channel_states
448 .entry(channel_id)
449 .or_insert_with(|| Default::default())
450 .update_latest_notes_version(epoch, version);
451 cx.notify()
452 }
453
454 pub fn open_channel_chat(
455 &mut self,
456 channel_id: ChannelId,
457 cx: &mut ModelContext<Self>,
458 ) -> Task<Result<Model<ChannelChat>>> {
459 let client = self.client.clone();
460 let user_store = self.user_store.clone();
461 let this = cx.handle();
462 self.open_channel_resource(
463 channel_id,
464 |this| &mut this.opened_chats,
465 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
466 cx,
467 )
468 }
469
470 /// Asynchronously open a given resource associated with a channel.
471 ///
472 /// Make sure that the resource is only opened once, even if this method
473 /// is called multiple times with the same channel id while the first task
474 /// is still running.
475 fn open_channel_resource<T, F, Fut>(
476 &mut self,
477 channel_id: ChannelId,
478 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
479 load: F,
480 cx: &mut ModelContext<Self>,
481 ) -> Task<Result<Model<T>>>
482 where
483 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
484 Fut: Future<Output = Result<Model<T>>>,
485 T: 'static,
486 {
487 let task = loop {
488 match get_map(self).entry(channel_id) {
489 hash_map::Entry::Occupied(e) => match e.get() {
490 OpenedModelHandle::Open(model) => {
491 if let Some(model) = model.upgrade() {
492 break Task::ready(Ok(model)).shared();
493 } else {
494 get_map(self).remove(&channel_id);
495 continue;
496 }
497 }
498 OpenedModelHandle::Loading(task) => {
499 break task.clone();
500 }
501 },
502 hash_map::Entry::Vacant(e) => {
503 let task = cx
504 .spawn(move |this, mut cx| async move {
505 let channel = this.update(&mut cx, |this, _| {
506 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
507 Arc::new(anyhow!("no channel for id: {}", channel_id))
508 })
509 })??;
510
511 load(channel, cx).await.map_err(Arc::new)
512 })
513 .shared();
514
515 e.insert(OpenedModelHandle::Loading(task.clone()));
516 cx.spawn({
517 let task = task.clone();
518 move |this, mut cx| async move {
519 let result = task.await;
520 this.update(&mut cx, |this, _| match result {
521 Ok(model) => {
522 get_map(this).insert(
523 channel_id,
524 OpenedModelHandle::Open(model.downgrade()),
525 );
526 }
527 Err(_) => {
528 get_map(this).remove(&channel_id);
529 }
530 })
531 .ok();
532 }
533 })
534 .detach();
535 break task;
536 }
537 }
538 };
539 cx.background_executor()
540 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
541 }
542
543 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
544 self.channel_role(channel_id) == proto::ChannelRole::Admin
545 }
546
547 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
548 self.channel_index
549 .by_id()
550 .get(&channel_id)
551 .map_or(false, |channel| channel.is_root_channel())
552 }
553
554 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
555 self.channel_index
556 .by_id()
557 .get(&channel_id)
558 .map_or(false, |channel| {
559 channel.visibility == ChannelVisibility::Public
560 })
561 }
562
563 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
564 match self.channel_role(channel_id) {
565 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
566 _ => Capability::ReadOnly,
567 }
568 }
569
570 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
571 maybe!({
572 let mut channel = self.channel_for_id(channel_id)?;
573 if !channel.is_root_channel() {
574 channel = self.channel_for_id(channel.root_id())?;
575 }
576 let root_channel_state = self.channel_states.get(&channel.id);
577 root_channel_state?.role
578 })
579 .unwrap_or(proto::ChannelRole::Guest)
580 }
581
582 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
583 self.channel_participants
584 .get(&channel_id)
585 .map_or(&[], |v| v.as_slice())
586 }
587
588 pub fn create_channel(
589 &self,
590 name: &str,
591 parent_id: Option<ChannelId>,
592 cx: &mut ModelContext<Self>,
593 ) -> Task<Result<ChannelId>> {
594 let client = self.client.clone();
595 let name = name.trim_start_matches("#").to_owned();
596 cx.spawn(move |this, mut cx| async move {
597 let response = client
598 .request(proto::CreateChannel {
599 name,
600 parent_id: parent_id.map(|cid| cid.0),
601 })
602 .await?;
603
604 let channel = response
605 .channel
606 .ok_or_else(|| anyhow!("missing channel in response"))?;
607 let channel_id = ChannelId(channel.id);
608
609 this.update(&mut cx, |this, cx| {
610 let task = this.update_channels(
611 proto::UpdateChannels {
612 channels: vec![channel],
613 ..Default::default()
614 },
615 cx,
616 );
617 assert!(task.is_none());
618
619 // This event is emitted because the collab panel wants to clear the pending edit state
620 // before this frame is rendered. But we can't guarantee that the collab panel's future
621 // will resolve before this flush_effects finishes. Synchronously emitting this event
622 // ensures that the collab panel will observe this creation before the frame completes
623 cx.emit(ChannelEvent::ChannelCreated(channel_id));
624 })?;
625
626 Ok(channel_id)
627 })
628 }
629
630 pub fn move_channel(
631 &mut self,
632 channel_id: ChannelId,
633 to: ChannelId,
634 cx: &mut ModelContext<Self>,
635 ) -> Task<Result<()>> {
636 let client = self.client.clone();
637 cx.spawn(move |_, _| async move {
638 let _ = client
639 .request(proto::MoveChannel {
640 channel_id: channel_id.0,
641 to: to.0,
642 })
643 .await?;
644
645 Ok(())
646 })
647 }
648
649 pub fn set_channel_visibility(
650 &mut self,
651 channel_id: ChannelId,
652 visibility: ChannelVisibility,
653 cx: &mut ModelContext<Self>,
654 ) -> Task<Result<()>> {
655 let client = self.client.clone();
656 cx.spawn(move |_, _| async move {
657 let _ = client
658 .request(proto::SetChannelVisibility {
659 channel_id: channel_id.0,
660 visibility: visibility.into(),
661 })
662 .await?;
663
664 Ok(())
665 })
666 }
667
668 pub fn invite_member(
669 &mut self,
670 channel_id: ChannelId,
671 user_id: UserId,
672 role: proto::ChannelRole,
673 cx: &mut ModelContext<Self>,
674 ) -> Task<Result<()>> {
675 if !self.outgoing_invites.insert((channel_id, user_id)) {
676 return Task::ready(Err(anyhow!("invite request already in progress")));
677 }
678
679 cx.notify();
680 let client = self.client.clone();
681 cx.spawn(move |this, mut cx| async move {
682 let result = client
683 .request(proto::InviteChannelMember {
684 channel_id: channel_id.0,
685 user_id,
686 role: role.into(),
687 })
688 .await;
689
690 this.update(&mut cx, |this, cx| {
691 this.outgoing_invites.remove(&(channel_id, user_id));
692 cx.notify();
693 })?;
694
695 result?;
696
697 Ok(())
698 })
699 }
700
701 pub fn remove_member(
702 &mut self,
703 channel_id: ChannelId,
704 user_id: u64,
705 cx: &mut ModelContext<Self>,
706 ) -> Task<Result<()>> {
707 if !self.outgoing_invites.insert((channel_id, user_id)) {
708 return Task::ready(Err(anyhow!("invite request already in progress")));
709 }
710
711 cx.notify();
712 let client = self.client.clone();
713 cx.spawn(move |this, mut cx| async move {
714 let result = client
715 .request(proto::RemoveChannelMember {
716 channel_id: channel_id.0,
717 user_id,
718 })
719 .await;
720
721 this.update(&mut cx, |this, cx| {
722 this.outgoing_invites.remove(&(channel_id, user_id));
723 cx.notify();
724 })?;
725 result?;
726 Ok(())
727 })
728 }
729
730 pub fn set_member_role(
731 &mut self,
732 channel_id: ChannelId,
733 user_id: UserId,
734 role: proto::ChannelRole,
735 cx: &mut ModelContext<Self>,
736 ) -> Task<Result<()>> {
737 if !self.outgoing_invites.insert((channel_id, user_id)) {
738 return Task::ready(Err(anyhow!("member request already in progress")));
739 }
740
741 cx.notify();
742 let client = self.client.clone();
743 cx.spawn(move |this, mut cx| async move {
744 let result = client
745 .request(proto::SetChannelMemberRole {
746 channel_id: channel_id.0,
747 user_id,
748 role: role.into(),
749 })
750 .await;
751
752 this.update(&mut cx, |this, cx| {
753 this.outgoing_invites.remove(&(channel_id, user_id));
754 cx.notify();
755 })?;
756
757 result?;
758 Ok(())
759 })
760 }
761
762 pub fn rename(
763 &mut self,
764 channel_id: ChannelId,
765 new_name: &str,
766 cx: &mut ModelContext<Self>,
767 ) -> Task<Result<()>> {
768 let client = self.client.clone();
769 let name = new_name.to_string();
770 cx.spawn(move |this, mut cx| async move {
771 let channel = client
772 .request(proto::RenameChannel {
773 channel_id: channel_id.0,
774 name,
775 })
776 .await?
777 .channel
778 .ok_or_else(|| anyhow!("missing channel in response"))?;
779 this.update(&mut cx, |this, cx| {
780 let task = this.update_channels(
781 proto::UpdateChannels {
782 channels: vec![channel],
783 ..Default::default()
784 },
785 cx,
786 );
787 assert!(task.is_none());
788
789 // This event is emitted because the collab panel wants to clear the pending edit state
790 // before this frame is rendered. But we can't guarantee that the collab panel's future
791 // will resolve before this flush_effects finishes. Synchronously emitting this event
792 // ensures that the collab panel will observe this creation before the frame complete
793 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
794 })?;
795 Ok(())
796 })
797 }
798
799 pub fn respond_to_channel_invite(
800 &mut self,
801 channel_id: ChannelId,
802 accept: bool,
803 cx: &mut ModelContext<Self>,
804 ) -> Task<Result<()>> {
805 let client = self.client.clone();
806 cx.background_executor().spawn(async move {
807 client
808 .request(proto::RespondToChannelInvite {
809 channel_id: channel_id.0,
810 accept,
811 })
812 .await?;
813 Ok(())
814 })
815 }
816
817 pub fn get_channel_member_details(
818 &self,
819 channel_id: ChannelId,
820 cx: &mut ModelContext<Self>,
821 ) -> Task<Result<Vec<ChannelMembership>>> {
822 let client = self.client.clone();
823 let user_store = self.user_store.downgrade();
824 cx.spawn(move |_, mut cx| async move {
825 let response = client
826 .request(proto::GetChannelMembers {
827 channel_id: channel_id.0,
828 })
829 .await?;
830
831 let user_ids = response.members.iter().map(|m| m.user_id).collect();
832 let user_store = user_store
833 .upgrade()
834 .ok_or_else(|| anyhow!("user store dropped"))?;
835 let users = user_store
836 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
837 .await?;
838
839 Ok(users
840 .into_iter()
841 .zip(response.members)
842 .filter_map(|(user, member)| {
843 Some(ChannelMembership {
844 user,
845 role: member.role(),
846 kind: member.kind(),
847 })
848 })
849 .collect())
850 })
851 }
852
853 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
854 let client = self.client.clone();
855 async move {
856 client
857 .request(proto::DeleteChannel {
858 channel_id: channel_id.0,
859 })
860 .await?;
861 Ok(())
862 }
863 }
864
865 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
866 false
867 }
868
869 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
870 self.outgoing_invites.contains(&(channel_id, user_id))
871 }
872
873 async fn handle_update_channels(
874 this: Model<Self>,
875 message: TypedEnvelope<proto::UpdateChannels>,
876 _: Arc<Client>,
877 mut cx: AsyncAppContext,
878 ) -> Result<()> {
879 this.update(&mut cx, |this, _| {
880 this.update_channels_tx
881 .unbounded_send(message.payload)
882 .unwrap();
883 })?;
884 Ok(())
885 }
886
887 async fn handle_update_user_channels(
888 this: Model<Self>,
889 message: TypedEnvelope<proto::UpdateUserChannels>,
890 _: Arc<Client>,
891 mut cx: AsyncAppContext,
892 ) -> Result<()> {
893 this.update(&mut cx, |this, cx| {
894 for buffer_version in message.payload.observed_channel_buffer_version {
895 let version = language::proto::deserialize_version(&buffer_version.version);
896 this.acknowledge_notes_version(
897 ChannelId(buffer_version.channel_id),
898 buffer_version.epoch,
899 &version,
900 cx,
901 );
902 }
903 for message_id in message.payload.observed_channel_message_id {
904 this.acknowledge_message_id(
905 ChannelId(message_id.channel_id),
906 message_id.message_id,
907 cx,
908 );
909 }
910 for membership in message.payload.channel_memberships {
911 if let Some(role) = ChannelRole::from_i32(membership.role) {
912 this.channel_states
913 .entry(ChannelId(membership.channel_id))
914 .or_insert_with(|| ChannelState::default())
915 .set_role(role)
916 }
917 }
918 })
919 }
920
921 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
922 self.channel_index.clear();
923 self.channel_invitations.clear();
924 self.channel_participants.clear();
925 self.channel_index.clear();
926 self.outgoing_invites.clear();
927 self.disconnect_channel_buffers_task.take();
928
929 for chat in self.opened_chats.values() {
930 if let OpenedModelHandle::Open(chat) = chat {
931 if let Some(chat) = chat.upgrade() {
932 chat.update(cx, |chat, cx| {
933 chat.rejoin(cx);
934 });
935 }
936 }
937 }
938
939 let mut buffer_versions = Vec::new();
940 for buffer in self.opened_buffers.values() {
941 if let OpenedModelHandle::Open(buffer) = buffer {
942 if let Some(buffer) = buffer.upgrade() {
943 let channel_buffer = buffer.read(cx);
944 let buffer = channel_buffer.buffer().read(cx);
945 buffer_versions.push(proto::ChannelBufferVersion {
946 channel_id: channel_buffer.channel_id.0,
947 epoch: channel_buffer.epoch(),
948 version: language::proto::serialize_version(&buffer.version()),
949 });
950 }
951 }
952 }
953
954 if buffer_versions.is_empty() {
955 return Task::ready(Ok(()));
956 }
957
958 let response = self.client.request(proto::RejoinChannelBuffers {
959 buffers: buffer_versions,
960 });
961
962 cx.spawn(|this, mut cx| async move {
963 let mut response = response.await?;
964
965 this.update(&mut cx, |this, cx| {
966 this.opened_buffers.retain(|_, buffer| match buffer {
967 OpenedModelHandle::Open(channel_buffer) => {
968 let Some(channel_buffer) = channel_buffer.upgrade() else {
969 return false;
970 };
971
972 channel_buffer.update(cx, |channel_buffer, cx| {
973 let channel_id = channel_buffer.channel_id;
974 if let Some(remote_buffer) = response
975 .buffers
976 .iter_mut()
977 .find(|buffer| buffer.channel_id == channel_id.0)
978 {
979 let channel_id = channel_buffer.channel_id;
980 let remote_version =
981 language::proto::deserialize_version(&remote_buffer.version);
982
983 channel_buffer.replace_collaborators(
984 mem::take(&mut remote_buffer.collaborators),
985 cx,
986 );
987
988 let operations = channel_buffer
989 .buffer()
990 .update(cx, |buffer, cx| {
991 let outgoing_operations =
992 buffer.serialize_ops(Some(remote_version), cx);
993 let incoming_operations =
994 mem::take(&mut remote_buffer.operations)
995 .into_iter()
996 .map(language::proto::deserialize_operation)
997 .collect::<Result<Vec<_>>>()?;
998 buffer.apply_ops(incoming_operations, cx)?;
999 anyhow::Ok(outgoing_operations)
1000 })
1001 .log_err();
1002
1003 if let Some(operations) = operations {
1004 let client = this.client.clone();
1005 cx.background_executor()
1006 .spawn(async move {
1007 let operations = operations.await;
1008 for chunk in
1009 language::proto::split_operations(operations)
1010 {
1011 client
1012 .send(proto::UpdateChannelBuffer {
1013 channel_id: channel_id.0,
1014 operations: chunk,
1015 })
1016 .ok();
1017 }
1018 })
1019 .detach();
1020 return true;
1021 }
1022 }
1023
1024 channel_buffer.disconnect(cx);
1025 false
1026 })
1027 }
1028 OpenedModelHandle::Loading(_) => true,
1029 });
1030 })
1031 .ok();
1032 anyhow::Ok(())
1033 })
1034 }
1035
1036 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1037 cx.notify();
1038
1039 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1040 cx.spawn(move |this, mut cx| async move {
1041 if wait_for_reconnect {
1042 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1043 }
1044
1045 if let Some(this) = this.upgrade() {
1046 this.update(&mut cx, |this, cx| {
1047 for (_, buffer) in this.opened_buffers.drain() {
1048 if let OpenedModelHandle::Open(buffer) = buffer {
1049 if let Some(buffer) = buffer.upgrade() {
1050 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1051 }
1052 }
1053 }
1054 })
1055 .ok();
1056 }
1057 })
1058 });
1059 }
1060
1061 pub(crate) fn update_channels(
1062 &mut self,
1063 payload: proto::UpdateChannels,
1064 cx: &mut ModelContext<ChannelStore>,
1065 ) -> Option<Task<Result<()>>> {
1066 if !payload.remove_channel_invitations.is_empty() {
1067 self.channel_invitations
1068 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1069 }
1070 for channel in payload.channel_invitations {
1071 match self
1072 .channel_invitations
1073 .binary_search_by_key(&channel.id, |c| c.id.0)
1074 {
1075 Ok(ix) => {
1076 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1077 }
1078 Err(ix) => self.channel_invitations.insert(
1079 ix,
1080 Arc::new(Channel {
1081 id: ChannelId(channel.id),
1082 visibility: channel.visibility(),
1083 name: channel.name.into(),
1084 parent_path: channel
1085 .parent_path
1086 .into_iter()
1087 .map(|cid| ChannelId(cid))
1088 .collect(),
1089 }),
1090 ),
1091 }
1092 }
1093
1094 let channels_changed = !payload.channels.is_empty()
1095 || !payload.delete_channels.is_empty()
1096 || !payload.latest_channel_message_ids.is_empty()
1097 || !payload.latest_channel_buffer_versions.is_empty()
1098 || !payload.hosted_projects.is_empty()
1099 || !payload.deleted_hosted_projects.is_empty();
1100
1101 if channels_changed {
1102 if !payload.delete_channels.is_empty() {
1103 let delete_channels: Vec<ChannelId> = payload
1104 .delete_channels
1105 .into_iter()
1106 .map(|cid| ChannelId(cid))
1107 .collect();
1108 self.channel_index.delete_channels(&delete_channels);
1109 self.channel_participants
1110 .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1111
1112 for channel_id in &delete_channels {
1113 let channel_id = *channel_id;
1114 if payload
1115 .channels
1116 .iter()
1117 .any(|channel| channel.id == channel_id.0)
1118 {
1119 continue;
1120 }
1121 if let Some(OpenedModelHandle::Open(buffer)) =
1122 self.opened_buffers.remove(&channel_id)
1123 {
1124 if let Some(buffer) = buffer.upgrade() {
1125 buffer.update(cx, ChannelBuffer::disconnect);
1126 }
1127 }
1128 }
1129 }
1130
1131 let mut index = self.channel_index.bulk_insert();
1132 for channel in payload.channels {
1133 let id = ChannelId(channel.id);
1134 let channel_changed = index.insert(channel);
1135
1136 if channel_changed {
1137 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1138 if let Some(buffer) = buffer.upgrade() {
1139 buffer.update(cx, ChannelBuffer::channel_changed);
1140 }
1141 }
1142 }
1143 }
1144
1145 for latest_buffer_version in payload.latest_channel_buffer_versions {
1146 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1147 self.channel_states
1148 .entry(ChannelId(latest_buffer_version.channel_id))
1149 .or_default()
1150 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1151 }
1152
1153 for latest_channel_message in payload.latest_channel_message_ids {
1154 self.channel_states
1155 .entry(ChannelId(latest_channel_message.channel_id))
1156 .or_default()
1157 .update_latest_message_id(latest_channel_message.message_id);
1158 }
1159
1160 for hosted_project in payload.hosted_projects {
1161 let hosted_project: HostedProject = hosted_project.into();
1162 if let Some(old_project) = self
1163 .hosted_projects
1164 .insert(hosted_project.id, hosted_project.clone())
1165 {
1166 self.channel_states
1167 .entry(old_project.channel_id)
1168 .or_default()
1169 .remove_hosted_project(old_project.id);
1170 }
1171 self.channel_states
1172 .entry(hosted_project.channel_id)
1173 .or_default()
1174 .add_hosted_project(hosted_project.id);
1175 }
1176
1177 for hosted_project_id in payload.deleted_hosted_projects {
1178 let hosted_project_id = HostedProjectId(hosted_project_id);
1179
1180 if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1181 self.channel_states
1182 .entry(old_project.channel_id)
1183 .or_default()
1184 .remove_hosted_project(old_project.id);
1185 }
1186 }
1187 }
1188
1189 cx.notify();
1190 if payload.channel_participants.is_empty() {
1191 return None;
1192 }
1193
1194 let mut all_user_ids = Vec::new();
1195 let channel_participants = payload.channel_participants;
1196 for entry in &channel_participants {
1197 for user_id in entry.participant_user_ids.iter() {
1198 if let Err(ix) = all_user_ids.binary_search(user_id) {
1199 all_user_ids.insert(ix, *user_id);
1200 }
1201 }
1202 }
1203
1204 let users = self
1205 .user_store
1206 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1207 Some(cx.spawn(|this, mut cx| async move {
1208 let users = users.await?;
1209
1210 this.update(&mut cx, |this, cx| {
1211 for entry in &channel_participants {
1212 let mut participants: Vec<_> = entry
1213 .participant_user_ids
1214 .iter()
1215 .filter_map(|user_id| {
1216 users
1217 .binary_search_by_key(&user_id, |user| &user.id)
1218 .ok()
1219 .map(|ix| users[ix].clone())
1220 })
1221 .collect();
1222
1223 participants.sort_by_key(|u| u.id);
1224
1225 this.channel_participants
1226 .insert(ChannelId(entry.channel_id), participants);
1227 }
1228
1229 cx.notify();
1230 })
1231 }))
1232 }
1233}
1234
1235impl ChannelState {
1236 fn set_role(&mut self, role: ChannelRole) {
1237 self.role = Some(role);
1238 }
1239
1240 fn has_channel_buffer_changed(&self) -> bool {
1241 if let Some(latest_version) = &self.latest_notes_versions {
1242 if let Some(observed_version) = &self.observed_notes_versions {
1243 latest_version.epoch > observed_version.epoch
1244 || (latest_version.epoch == observed_version.epoch
1245 && latest_version
1246 .version
1247 .changed_since(&observed_version.version))
1248 } else {
1249 true
1250 }
1251 } else {
1252 false
1253 }
1254 }
1255
1256 fn has_new_messages(&self) -> bool {
1257 let latest_message_id = self.latest_chat_message;
1258 let observed_message_id = self.observed_chat_message;
1259
1260 latest_message_id.is_some_and(|latest_message_id| {
1261 latest_message_id > observed_message_id.unwrap_or_default()
1262 })
1263 }
1264
1265 fn last_acknowledged_message_id(&self) -> Option<u64> {
1266 self.observed_chat_message
1267 }
1268
1269 fn acknowledge_message_id(&mut self, message_id: u64) {
1270 let observed = self.observed_chat_message.get_or_insert(message_id);
1271 *observed = (*observed).max(message_id);
1272 }
1273
1274 fn update_latest_message_id(&mut self, message_id: u64) {
1275 self.latest_chat_message =
1276 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1277 }
1278
1279 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1280 if let Some(existing) = &mut self.observed_notes_versions {
1281 if existing.epoch == epoch {
1282 existing.version.join(version);
1283 return;
1284 }
1285 }
1286 self.observed_notes_versions = Some(NotesVersion {
1287 epoch,
1288 version: version.clone(),
1289 });
1290 }
1291
1292 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1293 if let Some(existing) = &mut self.latest_notes_versions {
1294 if existing.epoch == epoch {
1295 existing.version.join(version);
1296 return;
1297 }
1298 }
1299 self.latest_notes_versions = Some(NotesVersion {
1300 epoch,
1301 version: version.clone(),
1302 });
1303 }
1304
1305 fn add_hosted_project(&mut self, project_id: HostedProjectId) {
1306 self.projects.insert(project_id);
1307 }
1308
1309 fn remove_hosted_project(&mut self, project_id: HostedProjectId) {
1310 self.projects.remove(&project_id);
1311 }
1312}