1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{
7 ChannelId, Client, ClientSettings, HostedProjectId, Subscription, User, UserId, UserStore,
8};
9use collections::{hash_map, HashMap, HashSet};
10use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
11use gpui::{
12 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
13 Task, WeakModel,
14};
15use language::Capability;
16use rpc::{
17 proto::{self, ChannelRole, ChannelVisibility},
18 TypedEnvelope,
19};
20use settings::Settings;
21use std::{mem, sync::Arc, time::Duration};
22use util::{async_maybe, maybe, ResultExt};
23
24pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
25
26pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
27 let channel_store =
28 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
29 cx.set_global(GlobalChannelStore(channel_store));
30}
31
32#[derive(Debug, Clone, Default)]
33struct NotesVersion {
34 epoch: u64,
35 version: clock::Global,
36}
37
38#[derive(Debug, Clone)]
39pub struct HostedProject {
40 id: HostedProjectId,
41 channel_id: ChannelId,
42 name: SharedString,
43 _visibility: proto::ChannelVisibility,
44}
45
46impl From<proto::HostedProject> for HostedProject {
47 fn from(project: proto::HostedProject) -> Self {
48 Self {
49 id: HostedProjectId(project.id),
50 channel_id: ChannelId(project.channel_id),
51 _visibility: project.visibility(),
52 name: project.name.into(),
53 }
54 }
55}
56
57pub struct ChannelStore {
58 pub channel_index: ChannelIndex,
59 channel_invitations: Vec<Arc<Channel>>,
60 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
61 channel_states: HashMap<ChannelId, ChannelState>,
62 hosted_projects: HashMap<HostedProjectId, HostedProject>,
63
64 outgoing_invites: HashSet<(ChannelId, UserId)>,
65 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
66 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
67 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
68 client: Arc<Client>,
69 user_store: Model<UserStore>,
70 _rpc_subscriptions: [Subscription; 2],
71 _watch_connection_status: Task<Option<()>>,
72 disconnect_channel_buffers_task: Option<Task<()>>,
73 _update_channels: Task<()>,
74}
75
76#[derive(Clone, Debug)]
77pub struct Channel {
78 pub id: ChannelId,
79 pub name: SharedString,
80 pub visibility: proto::ChannelVisibility,
81 pub parent_path: Vec<ChannelId>,
82}
83
84#[derive(Default)]
85pub struct ChannelState {
86 latest_chat_message: Option<u64>,
87 latest_notes_versions: Option<NotesVersion>,
88 observed_chat_message: Option<u64>,
89 observed_notes_versions: Option<NotesVersion>,
90 role: Option<ChannelRole>,
91 projects: HashSet<HostedProjectId>,
92}
93
94impl Channel {
95 pub fn link(&self, cx: &AppContext) -> String {
96 format!(
97 "{}/channel/{}-{}",
98 ClientSettings::get_global(cx).server_url,
99 Self::slug(&self.name),
100 self.id
101 )
102 }
103
104 pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
105 self.link(cx)
106 + "/notes"
107 + &heading
108 .map(|h| format!("#{}", Self::slug(&h)))
109 .unwrap_or_default()
110 }
111
112 pub fn is_root_channel(&self) -> bool {
113 self.parent_path.is_empty()
114 }
115
116 pub fn root_id(&self) -> ChannelId {
117 self.parent_path.first().copied().unwrap_or(self.id)
118 }
119
120 pub fn slug(str: &str) -> String {
121 let slug: String = str
122 .chars()
123 .map(|c| if c.is_alphanumeric() { c } else { '-' })
124 .collect();
125
126 slug.trim_matches(|c| c == '-').to_string()
127 }
128}
129
130pub struct ChannelMembership {
131 pub user: Arc<User>,
132 pub kind: proto::channel_member::Kind,
133 pub role: proto::ChannelRole,
134}
135impl ChannelMembership {
136 pub fn sort_key(&self) -> MembershipSortKey {
137 MembershipSortKey {
138 role_order: match self.role {
139 proto::ChannelRole::Admin => 0,
140 proto::ChannelRole::Member => 1,
141 proto::ChannelRole::Banned => 2,
142 proto::ChannelRole::Talker => 3,
143 proto::ChannelRole::Guest => 4,
144 },
145 kind_order: match self.kind {
146 proto::channel_member::Kind::Member => 0,
147 proto::channel_member::Kind::Invitee => 1,
148 },
149 username_order: self.user.github_login.as_str(),
150 }
151 }
152}
153
154#[derive(PartialOrd, Ord, PartialEq, Eq)]
155pub struct MembershipSortKey<'a> {
156 role_order: u8,
157 kind_order: u8,
158 username_order: &'a str,
159}
160
161pub enum ChannelEvent {
162 ChannelCreated(ChannelId),
163 ChannelRenamed(ChannelId),
164}
165
166impl EventEmitter<ChannelEvent> for ChannelStore {}
167
168enum OpenedModelHandle<E> {
169 Open(WeakModel<E>),
170 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
171}
172
173struct GlobalChannelStore(Model<ChannelStore>);
174
175impl Global for GlobalChannelStore {}
176
177impl ChannelStore {
178 pub fn global(cx: &AppContext) -> Model<Self> {
179 cx.global::<GlobalChannelStore>().0.clone()
180 }
181
182 pub fn new(
183 client: Arc<Client>,
184 user_store: Model<UserStore>,
185 cx: &mut ModelContext<Self>,
186 ) -> Self {
187 let rpc_subscriptions = [
188 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
189 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
190 ];
191
192 let mut connection_status = client.status();
193 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
194 let watch_connection_status = cx.spawn(|this, mut cx| async move {
195 while let Some(status) = connection_status.next().await {
196 let this = this.upgrade()?;
197 match status {
198 client::Status::Connected { .. } => {
199 this.update(&mut cx, |this, cx| this.handle_connect(cx))
200 .ok()?
201 .await
202 .log_err()?;
203 }
204 client::Status::SignedOut | client::Status::UpgradeRequired => {
205 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
206 .ok();
207 }
208 _ => {
209 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
210 .ok();
211 }
212 }
213 }
214 Some(())
215 });
216
217 Self {
218 channel_invitations: Vec::default(),
219 channel_index: ChannelIndex::default(),
220 channel_participants: Default::default(),
221 hosted_projects: Default::default(),
222 outgoing_invites: Default::default(),
223 opened_buffers: Default::default(),
224 opened_chats: Default::default(),
225 update_channels_tx,
226 client,
227 user_store,
228 _rpc_subscriptions: rpc_subscriptions,
229 _watch_connection_status: watch_connection_status,
230 disconnect_channel_buffers_task: None,
231 _update_channels: cx.spawn(|this, mut cx| async move {
232 async_maybe!({
233 while let Some(update_channels) = update_channels_rx.next().await {
234 if let Some(this) = this.upgrade() {
235 let update_task = this.update(&mut cx, |this, cx| {
236 this.update_channels(update_channels, cx)
237 })?;
238 if let Some(update_task) = update_task {
239 update_task.await.log_err();
240 }
241 }
242 }
243 anyhow::Ok(())
244 })
245 .await
246 .log_err();
247 }),
248 channel_states: Default::default(),
249 }
250 }
251
252 pub fn client(&self) -> Arc<Client> {
253 self.client.clone()
254 }
255
256 /// Returns the number of unique channels in the store
257 pub fn channel_count(&self) -> usize {
258 self.channel_index.by_id().len()
259 }
260
261 /// Returns the index of a channel ID in the list of unique channels
262 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
263 self.channel_index
264 .by_id()
265 .keys()
266 .position(|id| *id == channel_id)
267 }
268
269 /// Returns an iterator over all unique channels
270 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
271 self.channel_index.by_id().values()
272 }
273
274 /// Iterate over all entries in the channel DAG
275 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
276 self.channel_index
277 .ordered_channels()
278 .iter()
279 .filter_map(move |id| {
280 let channel = self.channel_index.by_id().get(id)?;
281 Some((channel.parent_path.len(), channel))
282 })
283 }
284
285 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
286 let channel_id = self.channel_index.ordered_channels().get(ix)?;
287 self.channel_index.by_id().get(channel_id)
288 }
289
290 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
291 self.channel_index.by_id().values().nth(ix)
292 }
293
294 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
295 self.channel_invitations
296 .iter()
297 .any(|channel| channel.id == channel_id)
298 }
299
300 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
301 &self.channel_invitations
302 }
303
304 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
305 self.channel_index.by_id().get(&channel_id)
306 }
307
308 pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, HostedProjectId)> {
309 let mut projects: Vec<(SharedString, HostedProjectId)> = self
310 .channel_states
311 .get(&channel_id)
312 .map(|state| state.projects.clone())
313 .unwrap_or_default()
314 .into_iter()
315 .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
316 .collect();
317 projects.sort();
318 projects
319 }
320
321 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
322 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
323 if let OpenedModelHandle::Open(buffer) = buffer {
324 return buffer.upgrade().is_some();
325 }
326 }
327 false
328 }
329
330 pub fn open_channel_buffer(
331 &mut self,
332 channel_id: ChannelId,
333 cx: &mut ModelContext<Self>,
334 ) -> Task<Result<Model<ChannelBuffer>>> {
335 let client = self.client.clone();
336 let user_store = self.user_store.clone();
337 let channel_store = cx.handle();
338 self.open_channel_resource(
339 channel_id,
340 |this| &mut this.opened_buffers,
341 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
342 cx,
343 )
344 }
345
346 pub fn fetch_channel_messages(
347 &self,
348 message_ids: Vec<u64>,
349 cx: &mut ModelContext<Self>,
350 ) -> Task<Result<Vec<ChannelMessage>>> {
351 let request = if message_ids.is_empty() {
352 None
353 } else {
354 Some(
355 self.client
356 .request(proto::GetChannelMessagesById { message_ids }),
357 )
358 };
359 cx.spawn(|this, mut cx| async move {
360 if let Some(request) = request {
361 let response = request.await?;
362 let this = this
363 .upgrade()
364 .ok_or_else(|| anyhow!("channel store dropped"))?;
365 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
366 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
367 } else {
368 Ok(Vec::new())
369 }
370 })
371 }
372
373 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
374 self.channel_states
375 .get(&channel_id)
376 .is_some_and(|state| state.has_channel_buffer_changed())
377 }
378
379 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
380 self.channel_states
381 .get(&channel_id)
382 .is_some_and(|state| state.has_new_messages())
383 }
384
385 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
386 self.channel_states.get(&channel_id).and_then(|state| {
387 if let Some(last_message_id) = state.latest_chat_message {
388 if state
389 .last_acknowledged_message_id()
390 .is_some_and(|id| id < last_message_id)
391 {
392 return state.last_acknowledged_message_id();
393 }
394 }
395
396 None
397 })
398 }
399
400 pub fn acknowledge_message_id(
401 &mut self,
402 channel_id: ChannelId,
403 message_id: u64,
404 cx: &mut ModelContext<Self>,
405 ) {
406 self.channel_states
407 .entry(channel_id)
408 .or_insert_with(|| Default::default())
409 .acknowledge_message_id(message_id);
410 cx.notify();
411 }
412
413 pub fn update_latest_message_id(
414 &mut self,
415 channel_id: ChannelId,
416 message_id: u64,
417 cx: &mut ModelContext<Self>,
418 ) {
419 self.channel_states
420 .entry(channel_id)
421 .or_insert_with(|| Default::default())
422 .update_latest_message_id(message_id);
423 cx.notify();
424 }
425
426 pub fn acknowledge_notes_version(
427 &mut self,
428 channel_id: ChannelId,
429 epoch: u64,
430 version: &clock::Global,
431 cx: &mut ModelContext<Self>,
432 ) {
433 self.channel_states
434 .entry(channel_id)
435 .or_insert_with(|| Default::default())
436 .acknowledge_notes_version(epoch, version);
437 cx.notify()
438 }
439
440 pub fn update_latest_notes_version(
441 &mut self,
442 channel_id: ChannelId,
443 epoch: u64,
444 version: &clock::Global,
445 cx: &mut ModelContext<Self>,
446 ) {
447 self.channel_states
448 .entry(channel_id)
449 .or_insert_with(|| Default::default())
450 .update_latest_notes_version(epoch, version);
451 cx.notify()
452 }
453
454 pub fn open_channel_chat(
455 &mut self,
456 channel_id: ChannelId,
457 cx: &mut ModelContext<Self>,
458 ) -> Task<Result<Model<ChannelChat>>> {
459 let client = self.client.clone();
460 let user_store = self.user_store.clone();
461 let this = cx.handle();
462 self.open_channel_resource(
463 channel_id,
464 |this| &mut this.opened_chats,
465 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
466 cx,
467 )
468 }
469
470 /// Asynchronously open a given resource associated with a channel.
471 ///
472 /// Make sure that the resource is only opened once, even if this method
473 /// is called multiple times with the same channel id while the first task
474 /// is still running.
475 fn open_channel_resource<T, F, Fut>(
476 &mut self,
477 channel_id: ChannelId,
478 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
479 load: F,
480 cx: &mut ModelContext<Self>,
481 ) -> Task<Result<Model<T>>>
482 where
483 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
484 Fut: Future<Output = Result<Model<T>>>,
485 T: 'static,
486 {
487 let task = loop {
488 match get_map(self).entry(channel_id) {
489 hash_map::Entry::Occupied(e) => match e.get() {
490 OpenedModelHandle::Open(model) => {
491 if let Some(model) = model.upgrade() {
492 break Task::ready(Ok(model)).shared();
493 } else {
494 get_map(self).remove(&channel_id);
495 continue;
496 }
497 }
498 OpenedModelHandle::Loading(task) => {
499 break task.clone();
500 }
501 },
502 hash_map::Entry::Vacant(e) => {
503 let task = cx
504 .spawn(move |this, mut cx| async move {
505 let channel = this.update(&mut cx, |this, _| {
506 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
507 Arc::new(anyhow!("no channel for id: {}", channel_id))
508 })
509 })??;
510
511 load(channel, cx).await.map_err(Arc::new)
512 })
513 .shared();
514
515 e.insert(OpenedModelHandle::Loading(task.clone()));
516 cx.spawn({
517 let task = task.clone();
518 move |this, mut cx| async move {
519 let result = task.await;
520 this.update(&mut cx, |this, _| match result {
521 Ok(model) => {
522 get_map(this).insert(
523 channel_id,
524 OpenedModelHandle::Open(model.downgrade()),
525 );
526 }
527 Err(_) => {
528 get_map(this).remove(&channel_id);
529 }
530 })
531 .ok();
532 }
533 })
534 .detach();
535 break task;
536 }
537 }
538 };
539 cx.background_executor()
540 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
541 }
542
543 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
544 self.channel_role(channel_id) == proto::ChannelRole::Admin
545 }
546
547 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
548 self.channel_index
549 .by_id()
550 .get(&channel_id)
551 .map_or(false, |channel| channel.is_root_channel())
552 }
553
554 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
555 self.channel_index
556 .by_id()
557 .get(&channel_id)
558 .map_or(false, |channel| {
559 channel.visibility == ChannelVisibility::Public
560 })
561 }
562
563 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
564 match self.channel_role(channel_id) {
565 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
566 _ => Capability::ReadOnly,
567 }
568 }
569
570 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
571 maybe!({
572 let mut channel = self.channel_for_id(channel_id)?;
573 if !channel.is_root_channel() {
574 channel = self.channel_for_id(channel.root_id())?;
575 }
576 let root_channel_state = self.channel_states.get(&channel.id);
577 root_channel_state?.role
578 })
579 .unwrap_or(proto::ChannelRole::Guest)
580 }
581
582 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
583 self.channel_participants
584 .get(&channel_id)
585 .map_or(&[], |v| v.as_slice())
586 }
587
588 pub fn create_channel(
589 &self,
590 name: &str,
591 parent_id: Option<ChannelId>,
592 cx: &mut ModelContext<Self>,
593 ) -> Task<Result<ChannelId>> {
594 let client = self.client.clone();
595 let name = name.trim_start_matches('#').to_owned();
596 cx.spawn(move |this, mut cx| async move {
597 let response = client
598 .request(proto::CreateChannel {
599 name,
600 parent_id: parent_id.map(|cid| cid.0),
601 })
602 .await?;
603
604 let channel = response
605 .channel
606 .ok_or_else(|| anyhow!("missing channel in response"))?;
607 let channel_id = ChannelId(channel.id);
608
609 this.update(&mut cx, |this, cx| {
610 let task = this.update_channels(
611 proto::UpdateChannels {
612 channels: vec![channel],
613 ..Default::default()
614 },
615 cx,
616 );
617 assert!(task.is_none());
618
619 // This event is emitted because the collab panel wants to clear the pending edit state
620 // before this frame is rendered. But we can't guarantee that the collab panel's future
621 // will resolve before this flush_effects finishes. Synchronously emitting this event
622 // ensures that the collab panel will observe this creation before the frame completes
623 cx.emit(ChannelEvent::ChannelCreated(channel_id));
624 })?;
625
626 Ok(channel_id)
627 })
628 }
629
630 pub fn move_channel(
631 &mut self,
632 channel_id: ChannelId,
633 to: ChannelId,
634 cx: &mut ModelContext<Self>,
635 ) -> Task<Result<()>> {
636 let client = self.client.clone();
637 cx.spawn(move |_, _| async move {
638 let _ = client
639 .request(proto::MoveChannel {
640 channel_id: channel_id.0,
641 to: to.0,
642 })
643 .await?;
644
645 Ok(())
646 })
647 }
648
649 pub fn set_channel_visibility(
650 &mut self,
651 channel_id: ChannelId,
652 visibility: ChannelVisibility,
653 cx: &mut ModelContext<Self>,
654 ) -> Task<Result<()>> {
655 let client = self.client.clone();
656 cx.spawn(move |_, _| async move {
657 let _ = client
658 .request(proto::SetChannelVisibility {
659 channel_id: channel_id.0,
660 visibility: visibility.into(),
661 })
662 .await?;
663
664 Ok(())
665 })
666 }
667
668 pub fn invite_member(
669 &mut self,
670 channel_id: ChannelId,
671 user_id: UserId,
672 role: proto::ChannelRole,
673 cx: &mut ModelContext<Self>,
674 ) -> Task<Result<()>> {
675 if !self.outgoing_invites.insert((channel_id, user_id)) {
676 return Task::ready(Err(anyhow!("invite request already in progress")));
677 }
678
679 cx.notify();
680 let client = self.client.clone();
681 cx.spawn(move |this, mut cx| async move {
682 let result = client
683 .request(proto::InviteChannelMember {
684 channel_id: channel_id.0,
685 user_id,
686 role: role.into(),
687 })
688 .await;
689
690 this.update(&mut cx, |this, cx| {
691 this.outgoing_invites.remove(&(channel_id, user_id));
692 cx.notify();
693 })?;
694
695 result?;
696
697 Ok(())
698 })
699 }
700
701 pub fn remove_member(
702 &mut self,
703 channel_id: ChannelId,
704 user_id: u64,
705 cx: &mut ModelContext<Self>,
706 ) -> Task<Result<()>> {
707 if !self.outgoing_invites.insert((channel_id, user_id)) {
708 return Task::ready(Err(anyhow!("invite request already in progress")));
709 }
710
711 cx.notify();
712 let client = self.client.clone();
713 cx.spawn(move |this, mut cx| async move {
714 let result = client
715 .request(proto::RemoveChannelMember {
716 channel_id: channel_id.0,
717 user_id,
718 })
719 .await;
720
721 this.update(&mut cx, |this, cx| {
722 this.outgoing_invites.remove(&(channel_id, user_id));
723 cx.notify();
724 })?;
725 result?;
726 Ok(())
727 })
728 }
729
730 pub fn set_member_role(
731 &mut self,
732 channel_id: ChannelId,
733 user_id: UserId,
734 role: proto::ChannelRole,
735 cx: &mut ModelContext<Self>,
736 ) -> Task<Result<()>> {
737 if !self.outgoing_invites.insert((channel_id, user_id)) {
738 return Task::ready(Err(anyhow!("member request already in progress")));
739 }
740
741 cx.notify();
742 let client = self.client.clone();
743 cx.spawn(move |this, mut cx| async move {
744 let result = client
745 .request(proto::SetChannelMemberRole {
746 channel_id: channel_id.0,
747 user_id,
748 role: role.into(),
749 })
750 .await;
751
752 this.update(&mut cx, |this, cx| {
753 this.outgoing_invites.remove(&(channel_id, user_id));
754 cx.notify();
755 })?;
756
757 result?;
758 Ok(())
759 })
760 }
761
762 pub fn rename(
763 &mut self,
764 channel_id: ChannelId,
765 new_name: &str,
766 cx: &mut ModelContext<Self>,
767 ) -> Task<Result<()>> {
768 let client = self.client.clone();
769 let name = new_name.to_string();
770 cx.spawn(move |this, mut cx| async move {
771 let channel = client
772 .request(proto::RenameChannel {
773 channel_id: channel_id.0,
774 name,
775 })
776 .await?
777 .channel
778 .ok_or_else(|| anyhow!("missing channel in response"))?;
779 this.update(&mut cx, |this, cx| {
780 let task = this.update_channels(
781 proto::UpdateChannels {
782 channels: vec![channel],
783 ..Default::default()
784 },
785 cx,
786 );
787 assert!(task.is_none());
788
789 // This event is emitted because the collab panel wants to clear the pending edit state
790 // before this frame is rendered. But we can't guarantee that the collab panel's future
791 // will resolve before this flush_effects finishes. Synchronously emitting this event
792 // ensures that the collab panel will observe this creation before the frame complete
793 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
794 })?;
795 Ok(())
796 })
797 }
798
799 pub fn respond_to_channel_invite(
800 &mut self,
801 channel_id: ChannelId,
802 accept: bool,
803 cx: &mut ModelContext<Self>,
804 ) -> Task<Result<()>> {
805 let client = self.client.clone();
806 cx.background_executor().spawn(async move {
807 client
808 .request(proto::RespondToChannelInvite {
809 channel_id: channel_id.0,
810 accept,
811 })
812 .await?;
813 Ok(())
814 })
815 }
816
817 pub fn get_channel_member_details(
818 &self,
819 channel_id: ChannelId,
820 cx: &mut ModelContext<Self>,
821 ) -> Task<Result<Vec<ChannelMembership>>> {
822 let client = self.client.clone();
823 let user_store = self.user_store.downgrade();
824 cx.spawn(move |_, mut cx| async move {
825 let response = client
826 .request(proto::GetChannelMembers {
827 channel_id: channel_id.0,
828 })
829 .await?;
830
831 let user_ids = response.members.iter().map(|m| m.user_id).collect();
832 let user_store = user_store
833 .upgrade()
834 .ok_or_else(|| anyhow!("user store dropped"))?;
835 let users = user_store
836 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
837 .await?;
838
839 Ok(users
840 .into_iter()
841 .zip(response.members)
842 .map(|(user, member)| ChannelMembership {
843 user,
844 role: member.role(),
845 kind: member.kind(),
846 })
847 .collect())
848 })
849 }
850
851 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
852 let client = self.client.clone();
853 async move {
854 client
855 .request(proto::DeleteChannel {
856 channel_id: channel_id.0,
857 })
858 .await?;
859 Ok(())
860 }
861 }
862
863 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
864 false
865 }
866
867 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
868 self.outgoing_invites.contains(&(channel_id, user_id))
869 }
870
871 async fn handle_update_channels(
872 this: Model<Self>,
873 message: TypedEnvelope<proto::UpdateChannels>,
874 _: Arc<Client>,
875 mut cx: AsyncAppContext,
876 ) -> Result<()> {
877 this.update(&mut cx, |this, _| {
878 this.update_channels_tx
879 .unbounded_send(message.payload)
880 .unwrap();
881 })?;
882 Ok(())
883 }
884
885 async fn handle_update_user_channels(
886 this: Model<Self>,
887 message: TypedEnvelope<proto::UpdateUserChannels>,
888 _: Arc<Client>,
889 mut cx: AsyncAppContext,
890 ) -> Result<()> {
891 this.update(&mut cx, |this, cx| {
892 for buffer_version in message.payload.observed_channel_buffer_version {
893 let version = language::proto::deserialize_version(&buffer_version.version);
894 this.acknowledge_notes_version(
895 ChannelId(buffer_version.channel_id),
896 buffer_version.epoch,
897 &version,
898 cx,
899 );
900 }
901 for message_id in message.payload.observed_channel_message_id {
902 this.acknowledge_message_id(
903 ChannelId(message_id.channel_id),
904 message_id.message_id,
905 cx,
906 );
907 }
908 for membership in message.payload.channel_memberships {
909 if let Some(role) = ChannelRole::from_i32(membership.role) {
910 this.channel_states
911 .entry(ChannelId(membership.channel_id))
912 .or_insert_with(|| ChannelState::default())
913 .set_role(role)
914 }
915 }
916 })
917 }
918
919 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
920 self.channel_index.clear();
921 self.channel_invitations.clear();
922 self.channel_participants.clear();
923 self.channel_index.clear();
924 self.outgoing_invites.clear();
925 self.disconnect_channel_buffers_task.take();
926
927 for chat in self.opened_chats.values() {
928 if let OpenedModelHandle::Open(chat) = chat {
929 if let Some(chat) = chat.upgrade() {
930 chat.update(cx, |chat, cx| {
931 chat.rejoin(cx);
932 });
933 }
934 }
935 }
936
937 let mut buffer_versions = Vec::new();
938 for buffer in self.opened_buffers.values() {
939 if let OpenedModelHandle::Open(buffer) = buffer {
940 if let Some(buffer) = buffer.upgrade() {
941 let channel_buffer = buffer.read(cx);
942 let buffer = channel_buffer.buffer().read(cx);
943 buffer_versions.push(proto::ChannelBufferVersion {
944 channel_id: channel_buffer.channel_id.0,
945 epoch: channel_buffer.epoch(),
946 version: language::proto::serialize_version(&buffer.version()),
947 });
948 }
949 }
950 }
951
952 if buffer_versions.is_empty() {
953 return Task::ready(Ok(()));
954 }
955
956 let response = self.client.request(proto::RejoinChannelBuffers {
957 buffers: buffer_versions,
958 });
959
960 cx.spawn(|this, mut cx| async move {
961 let mut response = response.await?;
962
963 this.update(&mut cx, |this, cx| {
964 this.opened_buffers.retain(|_, buffer| match buffer {
965 OpenedModelHandle::Open(channel_buffer) => {
966 let Some(channel_buffer) = channel_buffer.upgrade() else {
967 return false;
968 };
969
970 channel_buffer.update(cx, |channel_buffer, cx| {
971 let channel_id = channel_buffer.channel_id;
972 if let Some(remote_buffer) = response
973 .buffers
974 .iter_mut()
975 .find(|buffer| buffer.channel_id == channel_id.0)
976 {
977 let channel_id = channel_buffer.channel_id;
978 let remote_version =
979 language::proto::deserialize_version(&remote_buffer.version);
980
981 channel_buffer.replace_collaborators(
982 mem::take(&mut remote_buffer.collaborators),
983 cx,
984 );
985
986 let operations = channel_buffer
987 .buffer()
988 .update(cx, |buffer, cx| {
989 let outgoing_operations =
990 buffer.serialize_ops(Some(remote_version), cx);
991 let incoming_operations =
992 mem::take(&mut remote_buffer.operations)
993 .into_iter()
994 .map(language::proto::deserialize_operation)
995 .collect::<Result<Vec<_>>>()?;
996 buffer.apply_ops(incoming_operations, cx)?;
997 anyhow::Ok(outgoing_operations)
998 })
999 .log_err();
1000
1001 if let Some(operations) = operations {
1002 let client = this.client.clone();
1003 cx.background_executor()
1004 .spawn(async move {
1005 let operations = operations.await;
1006 for chunk in
1007 language::proto::split_operations(operations)
1008 {
1009 client
1010 .send(proto::UpdateChannelBuffer {
1011 channel_id: channel_id.0,
1012 operations: chunk,
1013 })
1014 .ok();
1015 }
1016 })
1017 .detach();
1018 return true;
1019 }
1020 }
1021
1022 channel_buffer.disconnect(cx);
1023 false
1024 })
1025 }
1026 OpenedModelHandle::Loading(_) => true,
1027 });
1028 })
1029 .ok();
1030 anyhow::Ok(())
1031 })
1032 }
1033
1034 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1035 cx.notify();
1036
1037 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1038 cx.spawn(move |this, mut cx| async move {
1039 if wait_for_reconnect {
1040 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1041 }
1042
1043 if let Some(this) = this.upgrade() {
1044 this.update(&mut cx, |this, cx| {
1045 for (_, buffer) in this.opened_buffers.drain() {
1046 if let OpenedModelHandle::Open(buffer) = buffer {
1047 if let Some(buffer) = buffer.upgrade() {
1048 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1049 }
1050 }
1051 }
1052 })
1053 .ok();
1054 }
1055 })
1056 });
1057 }
1058
1059 pub(crate) fn update_channels(
1060 &mut self,
1061 payload: proto::UpdateChannels,
1062 cx: &mut ModelContext<ChannelStore>,
1063 ) -> Option<Task<Result<()>>> {
1064 if !payload.remove_channel_invitations.is_empty() {
1065 self.channel_invitations
1066 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1067 }
1068 for channel in payload.channel_invitations {
1069 match self
1070 .channel_invitations
1071 .binary_search_by_key(&channel.id, |c| c.id.0)
1072 {
1073 Ok(ix) => {
1074 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1075 }
1076 Err(ix) => self.channel_invitations.insert(
1077 ix,
1078 Arc::new(Channel {
1079 id: ChannelId(channel.id),
1080 visibility: channel.visibility(),
1081 name: channel.name.into(),
1082 parent_path: channel
1083 .parent_path
1084 .into_iter()
1085 .map(|cid| ChannelId(cid))
1086 .collect(),
1087 }),
1088 ),
1089 }
1090 }
1091
1092 let channels_changed = !payload.channels.is_empty()
1093 || !payload.delete_channels.is_empty()
1094 || !payload.latest_channel_message_ids.is_empty()
1095 || !payload.latest_channel_buffer_versions.is_empty()
1096 || !payload.hosted_projects.is_empty()
1097 || !payload.deleted_hosted_projects.is_empty();
1098
1099 if channels_changed {
1100 if !payload.delete_channels.is_empty() {
1101 let delete_channels: Vec<ChannelId> = payload
1102 .delete_channels
1103 .into_iter()
1104 .map(|cid| ChannelId(cid))
1105 .collect();
1106 self.channel_index.delete_channels(&delete_channels);
1107 self.channel_participants
1108 .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1109
1110 for channel_id in &delete_channels {
1111 let channel_id = *channel_id;
1112 if payload
1113 .channels
1114 .iter()
1115 .any(|channel| channel.id == channel_id.0)
1116 {
1117 continue;
1118 }
1119 if let Some(OpenedModelHandle::Open(buffer)) =
1120 self.opened_buffers.remove(&channel_id)
1121 {
1122 if let Some(buffer) = buffer.upgrade() {
1123 buffer.update(cx, ChannelBuffer::disconnect);
1124 }
1125 }
1126 }
1127 }
1128
1129 let mut index = self.channel_index.bulk_insert();
1130 for channel in payload.channels {
1131 let id = ChannelId(channel.id);
1132 let channel_changed = index.insert(channel);
1133
1134 if channel_changed {
1135 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1136 if let Some(buffer) = buffer.upgrade() {
1137 buffer.update(cx, ChannelBuffer::channel_changed);
1138 }
1139 }
1140 }
1141 }
1142
1143 for latest_buffer_version in payload.latest_channel_buffer_versions {
1144 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1145 self.channel_states
1146 .entry(ChannelId(latest_buffer_version.channel_id))
1147 .or_default()
1148 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1149 }
1150
1151 for latest_channel_message in payload.latest_channel_message_ids {
1152 self.channel_states
1153 .entry(ChannelId(latest_channel_message.channel_id))
1154 .or_default()
1155 .update_latest_message_id(latest_channel_message.message_id);
1156 }
1157
1158 for hosted_project in payload.hosted_projects {
1159 let hosted_project: HostedProject = hosted_project.into();
1160 if let Some(old_project) = self
1161 .hosted_projects
1162 .insert(hosted_project.id, hosted_project.clone())
1163 {
1164 self.channel_states
1165 .entry(old_project.channel_id)
1166 .or_default()
1167 .remove_hosted_project(old_project.id);
1168 }
1169 self.channel_states
1170 .entry(hosted_project.channel_id)
1171 .or_default()
1172 .add_hosted_project(hosted_project.id);
1173 }
1174
1175 for hosted_project_id in payload.deleted_hosted_projects {
1176 let hosted_project_id = HostedProjectId(hosted_project_id);
1177
1178 if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1179 self.channel_states
1180 .entry(old_project.channel_id)
1181 .or_default()
1182 .remove_hosted_project(old_project.id);
1183 }
1184 }
1185 }
1186
1187 cx.notify();
1188 if payload.channel_participants.is_empty() {
1189 return None;
1190 }
1191
1192 let mut all_user_ids = Vec::new();
1193 let channel_participants = payload.channel_participants;
1194 for entry in &channel_participants {
1195 for user_id in entry.participant_user_ids.iter() {
1196 if let Err(ix) = all_user_ids.binary_search(user_id) {
1197 all_user_ids.insert(ix, *user_id);
1198 }
1199 }
1200 }
1201
1202 let users = self
1203 .user_store
1204 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1205 Some(cx.spawn(|this, mut cx| async move {
1206 let users = users.await?;
1207
1208 this.update(&mut cx, |this, cx| {
1209 for entry in &channel_participants {
1210 let mut participants: Vec<_> = entry
1211 .participant_user_ids
1212 .iter()
1213 .filter_map(|user_id| {
1214 users
1215 .binary_search_by_key(&user_id, |user| &user.id)
1216 .ok()
1217 .map(|ix| users[ix].clone())
1218 })
1219 .collect();
1220
1221 participants.sort_by_key(|u| u.id);
1222
1223 this.channel_participants
1224 .insert(ChannelId(entry.channel_id), participants);
1225 }
1226
1227 cx.notify();
1228 })
1229 }))
1230 }
1231}
1232
1233impl ChannelState {
1234 fn set_role(&mut self, role: ChannelRole) {
1235 self.role = Some(role);
1236 }
1237
1238 fn has_channel_buffer_changed(&self) -> bool {
1239 if let Some(latest_version) = &self.latest_notes_versions {
1240 if let Some(observed_version) = &self.observed_notes_versions {
1241 latest_version.epoch > observed_version.epoch
1242 || (latest_version.epoch == observed_version.epoch
1243 && latest_version
1244 .version
1245 .changed_since(&observed_version.version))
1246 } else {
1247 true
1248 }
1249 } else {
1250 false
1251 }
1252 }
1253
1254 fn has_new_messages(&self) -> bool {
1255 let latest_message_id = self.latest_chat_message;
1256 let observed_message_id = self.observed_chat_message;
1257
1258 latest_message_id.is_some_and(|latest_message_id| {
1259 latest_message_id > observed_message_id.unwrap_or_default()
1260 })
1261 }
1262
1263 fn last_acknowledged_message_id(&self) -> Option<u64> {
1264 self.observed_chat_message
1265 }
1266
1267 fn acknowledge_message_id(&mut self, message_id: u64) {
1268 let observed = self.observed_chat_message.get_or_insert(message_id);
1269 *observed = (*observed).max(message_id);
1270 }
1271
1272 fn update_latest_message_id(&mut self, message_id: u64) {
1273 self.latest_chat_message =
1274 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1275 }
1276
1277 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1278 if let Some(existing) = &mut self.observed_notes_versions {
1279 if existing.epoch == epoch {
1280 existing.version.join(version);
1281 return;
1282 }
1283 }
1284 self.observed_notes_versions = Some(NotesVersion {
1285 epoch,
1286 version: version.clone(),
1287 });
1288 }
1289
1290 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1291 if let Some(existing) = &mut self.latest_notes_versions {
1292 if existing.epoch == epoch {
1293 existing.version.join(version);
1294 return;
1295 }
1296 }
1297 self.latest_notes_versions = Some(NotesVersion {
1298 epoch,
1299 version: version.clone(),
1300 });
1301 }
1302
1303 fn add_hosted_project(&mut self, project_id: HostedProjectId) {
1304 self.projects.insert(project_id);
1305 }
1306
1307 fn remove_hosted_project(&mut self, project_id: HostedProjectId) {
1308 self.projects.remove(&project_id);
1309 }
1310}