1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
11 Task, WeakModel,
12};
13use language::Capability;
14use rpc::{
15 proto::{self, ChannelRole, ChannelVisibility},
16 TypedEnvelope,
17};
18use settings::Settings;
19use std::{mem, sync::Arc, time::Duration};
20use util::{maybe, ResultExt};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
25 let channel_store =
26 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36#[derive(Debug, Clone)]
37pub struct HostedProject {
38 project_id: ProjectId,
39 channel_id: ChannelId,
40 name: SharedString,
41 _visibility: proto::ChannelVisibility,
42}
43impl From<proto::HostedProject> for HostedProject {
44 fn from(project: proto::HostedProject) -> Self {
45 Self {
46 project_id: ProjectId(project.project_id),
47 channel_id: ChannelId(project.channel_id),
48 _visibility: project.visibility(),
49 name: project.name.into(),
50 }
51 }
52}
53pub struct ChannelStore {
54 pub channel_index: ChannelIndex,
55 channel_invitations: Vec<Arc<Channel>>,
56 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
57 channel_states: HashMap<ChannelId, ChannelState>,
58 hosted_projects: HashMap<ProjectId, HostedProject>,
59
60 outgoing_invites: HashSet<(ChannelId, UserId)>,
61 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
62 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
63 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
64 client: Arc<Client>,
65 user_store: Model<UserStore>,
66 _rpc_subscriptions: [Subscription; 2],
67 _watch_connection_status: Task<Option<()>>,
68 disconnect_channel_buffers_task: Option<Task<()>>,
69 _update_channels: Task<()>,
70}
71
72#[derive(Clone, Debug)]
73pub struct Channel {
74 pub id: ChannelId,
75 pub name: SharedString,
76 pub visibility: proto::ChannelVisibility,
77 pub parent_path: Vec<ChannelId>,
78}
79
80#[derive(Default, Debug)]
81pub struct ChannelState {
82 latest_chat_message: Option<u64>,
83 latest_notes_version: NotesVersion,
84 observed_notes_version: NotesVersion,
85 observed_chat_message: Option<u64>,
86 role: Option<ChannelRole>,
87 projects: HashSet<ProjectId>,
88}
89
90impl Channel {
91 pub fn link(&self, cx: &AppContext) -> String {
92 format!(
93 "{}/channel/{}-{}",
94 ClientSettings::get_global(cx).server_url,
95 Self::slug(&self.name),
96 self.id
97 )
98 }
99
100 pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
101 self.link(cx)
102 + "/notes"
103 + &heading
104 .map(|h| format!("#{}", Self::slug(&h)))
105 .unwrap_or_default()
106 }
107
108 pub fn is_root_channel(&self) -> bool {
109 self.parent_path.is_empty()
110 }
111
112 pub fn root_id(&self) -> ChannelId {
113 self.parent_path.first().copied().unwrap_or(self.id)
114 }
115
116 pub fn slug(str: &str) -> String {
117 let slug: String = str
118 .chars()
119 .map(|c| if c.is_alphanumeric() { c } else { '-' })
120 .collect();
121
122 slug.trim_matches(|c| c == '-').to_string()
123 }
124}
125
126pub struct ChannelMembership {
127 pub user: Arc<User>,
128 pub kind: proto::channel_member::Kind,
129 pub role: proto::ChannelRole,
130}
131impl ChannelMembership {
132 pub fn sort_key(&self) -> MembershipSortKey {
133 MembershipSortKey {
134 role_order: match self.role {
135 proto::ChannelRole::Admin => 0,
136 proto::ChannelRole::Member => 1,
137 proto::ChannelRole::Banned => 2,
138 proto::ChannelRole::Talker => 3,
139 proto::ChannelRole::Guest => 4,
140 },
141 kind_order: match self.kind {
142 proto::channel_member::Kind::Member => 0,
143 proto::channel_member::Kind::Invitee => 1,
144 },
145 username_order: self.user.github_login.as_str(),
146 }
147 }
148}
149
150#[derive(PartialOrd, Ord, PartialEq, Eq)]
151pub struct MembershipSortKey<'a> {
152 role_order: u8,
153 kind_order: u8,
154 username_order: &'a str,
155}
156
157pub enum ChannelEvent {
158 ChannelCreated(ChannelId),
159 ChannelRenamed(ChannelId),
160}
161
162impl EventEmitter<ChannelEvent> for ChannelStore {}
163
164enum OpenedModelHandle<E> {
165 Open(WeakModel<E>),
166 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
167}
168
169struct GlobalChannelStore(Model<ChannelStore>);
170
171impl Global for GlobalChannelStore {}
172
173impl ChannelStore {
174 pub fn global(cx: &AppContext) -> Model<Self> {
175 cx.global::<GlobalChannelStore>().0.clone()
176 }
177
178 pub fn new(
179 client: Arc<Client>,
180 user_store: Model<UserStore>,
181 cx: &mut ModelContext<Self>,
182 ) -> Self {
183 let rpc_subscriptions = [
184 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
185 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
186 ];
187
188 let mut connection_status = client.status();
189 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
190 let watch_connection_status = cx.spawn(|this, mut cx| async move {
191 while let Some(status) = connection_status.next().await {
192 let this = this.upgrade()?;
193 match status {
194 client::Status::Connected { .. } => {
195 this.update(&mut cx, |this, cx| this.handle_connect(cx))
196 .ok()?
197 .await
198 .log_err()?;
199 }
200 client::Status::SignedOut | client::Status::UpgradeRequired => {
201 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
202 .ok();
203 }
204 _ => {
205 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
206 .ok();
207 }
208 }
209 }
210 Some(())
211 });
212
213 Self {
214 channel_invitations: Vec::default(),
215 channel_index: ChannelIndex::default(),
216 channel_participants: Default::default(),
217 hosted_projects: Default::default(),
218 outgoing_invites: Default::default(),
219 opened_buffers: Default::default(),
220 opened_chats: Default::default(),
221 update_channels_tx,
222 client,
223 user_store,
224 _rpc_subscriptions: rpc_subscriptions,
225 _watch_connection_status: watch_connection_status,
226 disconnect_channel_buffers_task: None,
227 _update_channels: cx.spawn(|this, mut cx| async move {
228 maybe!(async move {
229 while let Some(update_channels) = update_channels_rx.next().await {
230 if let Some(this) = this.upgrade() {
231 let update_task = this.update(&mut cx, |this, cx| {
232 this.update_channels(update_channels, cx)
233 })?;
234 if let Some(update_task) = update_task {
235 update_task.await.log_err();
236 }
237 }
238 }
239 anyhow::Ok(())
240 })
241 .await
242 .log_err();
243 }),
244 channel_states: Default::default(),
245 }
246 }
247
248 pub fn client(&self) -> Arc<Client> {
249 self.client.clone()
250 }
251
252 /// Returns the number of unique channels in the store
253 pub fn channel_count(&self) -> usize {
254 self.channel_index.by_id().len()
255 }
256
257 /// Returns the index of a channel ID in the list of unique channels
258 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
259 self.channel_index
260 .by_id()
261 .keys()
262 .position(|id| *id == channel_id)
263 }
264
265 /// Returns an iterator over all unique channels
266 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
267 self.channel_index.by_id().values()
268 }
269
270 /// Iterate over all entries in the channel DAG
271 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
272 self.channel_index
273 .ordered_channels()
274 .iter()
275 .filter_map(move |id| {
276 let channel = self.channel_index.by_id().get(id)?;
277 Some((channel.parent_path.len(), channel))
278 })
279 }
280
281 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
282 let channel_id = self.channel_index.ordered_channels().get(ix)?;
283 self.channel_index.by_id().get(channel_id)
284 }
285
286 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
287 self.channel_index.by_id().values().nth(ix)
288 }
289
290 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
291 self.channel_invitations
292 .iter()
293 .any(|channel| channel.id == channel_id)
294 }
295
296 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
297 &self.channel_invitations
298 }
299
300 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
301 self.channel_index.by_id().get(&channel_id)
302 }
303
304 pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
305 let mut projects: Vec<(SharedString, ProjectId)> = self
306 .channel_states
307 .get(&channel_id)
308 .map(|state| state.projects.clone())
309 .unwrap_or_default()
310 .into_iter()
311 .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
312 .collect();
313 projects.sort();
314 projects
315 }
316
317 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
318 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
319 if let OpenedModelHandle::Open(buffer) = buffer {
320 return buffer.upgrade().is_some();
321 }
322 }
323 false
324 }
325
326 pub fn open_channel_buffer(
327 &mut self,
328 channel_id: ChannelId,
329 cx: &mut ModelContext<Self>,
330 ) -> Task<Result<Model<ChannelBuffer>>> {
331 let client = self.client.clone();
332 let user_store = self.user_store.clone();
333 let channel_store = cx.handle();
334 self.open_channel_resource(
335 channel_id,
336 |this| &mut this.opened_buffers,
337 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
338 cx,
339 )
340 }
341
342 pub fn fetch_channel_messages(
343 &self,
344 message_ids: Vec<u64>,
345 cx: &mut ModelContext<Self>,
346 ) -> Task<Result<Vec<ChannelMessage>>> {
347 let request = if message_ids.is_empty() {
348 None
349 } else {
350 Some(
351 self.client
352 .request(proto::GetChannelMessagesById { message_ids }),
353 )
354 };
355 cx.spawn(|this, mut cx| async move {
356 if let Some(request) = request {
357 let response = request.await?;
358 let this = this
359 .upgrade()
360 .ok_or_else(|| anyhow!("channel store dropped"))?;
361 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
362 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
363 } else {
364 Ok(Vec::new())
365 }
366 })
367 }
368
369 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
370 self.channel_states
371 .get(&channel_id)
372 .is_some_and(|state| state.has_channel_buffer_changed())
373 }
374
375 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
376 self.channel_states
377 .get(&channel_id)
378 .is_some_and(|state| state.has_new_messages())
379 }
380
381 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
382 if let Some(state) = self.channel_states.get_mut(&channel_id) {
383 state.latest_chat_message = message_id;
384 }
385 }
386
387 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
388 self.channel_states.get(&channel_id).and_then(|state| {
389 if let Some(last_message_id) = state.latest_chat_message {
390 if state
391 .last_acknowledged_message_id()
392 .is_some_and(|id| id < last_message_id)
393 {
394 return state.last_acknowledged_message_id();
395 }
396 }
397
398 None
399 })
400 }
401
402 pub fn acknowledge_message_id(
403 &mut self,
404 channel_id: ChannelId,
405 message_id: u64,
406 cx: &mut ModelContext<Self>,
407 ) {
408 self.channel_states
409 .entry(channel_id)
410 .or_insert_with(|| Default::default())
411 .acknowledge_message_id(message_id);
412 cx.notify();
413 }
414
415 pub fn update_latest_message_id(
416 &mut self,
417 channel_id: ChannelId,
418 message_id: u64,
419 cx: &mut ModelContext<Self>,
420 ) {
421 self.channel_states
422 .entry(channel_id)
423 .or_insert_with(|| Default::default())
424 .update_latest_message_id(message_id);
425 cx.notify();
426 }
427
428 pub fn acknowledge_notes_version(
429 &mut self,
430 channel_id: ChannelId,
431 epoch: u64,
432 version: &clock::Global,
433 cx: &mut ModelContext<Self>,
434 ) {
435 self.channel_states
436 .entry(channel_id)
437 .or_insert_with(|| Default::default())
438 .acknowledge_notes_version(epoch, version);
439 cx.notify()
440 }
441
442 pub fn update_latest_notes_version(
443 &mut self,
444 channel_id: ChannelId,
445 epoch: u64,
446 version: &clock::Global,
447 cx: &mut ModelContext<Self>,
448 ) {
449 self.channel_states
450 .entry(channel_id)
451 .or_insert_with(|| Default::default())
452 .update_latest_notes_version(epoch, version);
453 cx.notify()
454 }
455
456 pub fn open_channel_chat(
457 &mut self,
458 channel_id: ChannelId,
459 cx: &mut ModelContext<Self>,
460 ) -> Task<Result<Model<ChannelChat>>> {
461 let client = self.client.clone();
462 let user_store = self.user_store.clone();
463 let this = cx.handle();
464 self.open_channel_resource(
465 channel_id,
466 |this| &mut this.opened_chats,
467 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
468 cx,
469 )
470 }
471
472 /// Asynchronously open a given resource associated with a channel.
473 ///
474 /// Make sure that the resource is only opened once, even if this method
475 /// is called multiple times with the same channel id while the first task
476 /// is still running.
477 fn open_channel_resource<T, F, Fut>(
478 &mut self,
479 channel_id: ChannelId,
480 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
481 load: F,
482 cx: &mut ModelContext<Self>,
483 ) -> Task<Result<Model<T>>>
484 where
485 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
486 Fut: Future<Output = Result<Model<T>>>,
487 T: 'static,
488 {
489 let task = loop {
490 match get_map(self).entry(channel_id) {
491 hash_map::Entry::Occupied(e) => match e.get() {
492 OpenedModelHandle::Open(model) => {
493 if let Some(model) = model.upgrade() {
494 break Task::ready(Ok(model)).shared();
495 } else {
496 get_map(self).remove(&channel_id);
497 continue;
498 }
499 }
500 OpenedModelHandle::Loading(task) => {
501 break task.clone();
502 }
503 },
504 hash_map::Entry::Vacant(e) => {
505 let task = cx
506 .spawn(move |this, mut cx| async move {
507 let channel = this.update(&mut cx, |this, _| {
508 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
509 Arc::new(anyhow!("no channel for id: {}", channel_id))
510 })
511 })??;
512
513 load(channel, cx).await.map_err(Arc::new)
514 })
515 .shared();
516
517 e.insert(OpenedModelHandle::Loading(task.clone()));
518 cx.spawn({
519 let task = task.clone();
520 move |this, mut cx| async move {
521 let result = task.await;
522 this.update(&mut cx, |this, _| match result {
523 Ok(model) => {
524 get_map(this).insert(
525 channel_id,
526 OpenedModelHandle::Open(model.downgrade()),
527 );
528 }
529 Err(_) => {
530 get_map(this).remove(&channel_id);
531 }
532 })
533 .ok();
534 }
535 })
536 .detach();
537 break task;
538 }
539 }
540 };
541 cx.background_executor()
542 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
543 }
544
545 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
546 self.channel_role(channel_id) == proto::ChannelRole::Admin
547 }
548
549 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
550 self.channel_index
551 .by_id()
552 .get(&channel_id)
553 .map_or(false, |channel| channel.is_root_channel())
554 }
555
556 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
557 self.channel_index
558 .by_id()
559 .get(&channel_id)
560 .map_or(false, |channel| {
561 channel.visibility == ChannelVisibility::Public
562 })
563 }
564
565 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
566 match self.channel_role(channel_id) {
567 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
568 _ => Capability::ReadOnly,
569 }
570 }
571
572 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
573 maybe!({
574 let mut channel = self.channel_for_id(channel_id)?;
575 if !channel.is_root_channel() {
576 channel = self.channel_for_id(channel.root_id())?;
577 }
578 let root_channel_state = self.channel_states.get(&channel.id);
579 root_channel_state?.role
580 })
581 .unwrap_or(proto::ChannelRole::Guest)
582 }
583
584 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
585 self.channel_participants
586 .get(&channel_id)
587 .map_or(&[], |v| v.as_slice())
588 }
589
590 pub fn create_channel(
591 &self,
592 name: &str,
593 parent_id: Option<ChannelId>,
594 cx: &mut ModelContext<Self>,
595 ) -> Task<Result<ChannelId>> {
596 let client = self.client.clone();
597 let name = name.trim_start_matches('#').to_owned();
598 cx.spawn(move |this, mut cx| async move {
599 let response = client
600 .request(proto::CreateChannel {
601 name,
602 parent_id: parent_id.map(|cid| cid.0),
603 })
604 .await?;
605
606 let channel = response
607 .channel
608 .ok_or_else(|| anyhow!("missing channel in response"))?;
609 let channel_id = ChannelId(channel.id);
610
611 this.update(&mut cx, |this, cx| {
612 let task = this.update_channels(
613 proto::UpdateChannels {
614 channels: vec![channel],
615 ..Default::default()
616 },
617 cx,
618 );
619 assert!(task.is_none());
620
621 // This event is emitted because the collab panel wants to clear the pending edit state
622 // before this frame is rendered. But we can't guarantee that the collab panel's future
623 // will resolve before this flush_effects finishes. Synchronously emitting this event
624 // ensures that the collab panel will observe this creation before the frame completes
625 cx.emit(ChannelEvent::ChannelCreated(channel_id));
626 })?;
627
628 Ok(channel_id)
629 })
630 }
631
632 pub fn move_channel(
633 &mut self,
634 channel_id: ChannelId,
635 to: ChannelId,
636 cx: &mut ModelContext<Self>,
637 ) -> Task<Result<()>> {
638 let client = self.client.clone();
639 cx.spawn(move |_, _| async move {
640 let _ = client
641 .request(proto::MoveChannel {
642 channel_id: channel_id.0,
643 to: to.0,
644 })
645 .await?;
646
647 Ok(())
648 })
649 }
650
651 pub fn set_channel_visibility(
652 &mut self,
653 channel_id: ChannelId,
654 visibility: ChannelVisibility,
655 cx: &mut ModelContext<Self>,
656 ) -> Task<Result<()>> {
657 let client = self.client.clone();
658 cx.spawn(move |_, _| async move {
659 let _ = client
660 .request(proto::SetChannelVisibility {
661 channel_id: channel_id.0,
662 visibility: visibility.into(),
663 })
664 .await?;
665
666 Ok(())
667 })
668 }
669
670 pub fn invite_member(
671 &mut self,
672 channel_id: ChannelId,
673 user_id: UserId,
674 role: proto::ChannelRole,
675 cx: &mut ModelContext<Self>,
676 ) -> Task<Result<()>> {
677 if !self.outgoing_invites.insert((channel_id, user_id)) {
678 return Task::ready(Err(anyhow!("invite request already in progress")));
679 }
680
681 cx.notify();
682 let client = self.client.clone();
683 cx.spawn(move |this, mut cx| async move {
684 let result = client
685 .request(proto::InviteChannelMember {
686 channel_id: channel_id.0,
687 user_id,
688 role: role.into(),
689 })
690 .await;
691
692 this.update(&mut cx, |this, cx| {
693 this.outgoing_invites.remove(&(channel_id, user_id));
694 cx.notify();
695 })?;
696
697 result?;
698
699 Ok(())
700 })
701 }
702
703 pub fn remove_member(
704 &mut self,
705 channel_id: ChannelId,
706 user_id: u64,
707 cx: &mut ModelContext<Self>,
708 ) -> Task<Result<()>> {
709 if !self.outgoing_invites.insert((channel_id, user_id)) {
710 return Task::ready(Err(anyhow!("invite request already in progress")));
711 }
712
713 cx.notify();
714 let client = self.client.clone();
715 cx.spawn(move |this, mut cx| async move {
716 let result = client
717 .request(proto::RemoveChannelMember {
718 channel_id: channel_id.0,
719 user_id,
720 })
721 .await;
722
723 this.update(&mut cx, |this, cx| {
724 this.outgoing_invites.remove(&(channel_id, user_id));
725 cx.notify();
726 })?;
727 result?;
728 Ok(())
729 })
730 }
731
732 pub fn set_member_role(
733 &mut self,
734 channel_id: ChannelId,
735 user_id: UserId,
736 role: proto::ChannelRole,
737 cx: &mut ModelContext<Self>,
738 ) -> Task<Result<()>> {
739 if !self.outgoing_invites.insert((channel_id, user_id)) {
740 return Task::ready(Err(anyhow!("member request already in progress")));
741 }
742
743 cx.notify();
744 let client = self.client.clone();
745 cx.spawn(move |this, mut cx| async move {
746 let result = client
747 .request(proto::SetChannelMemberRole {
748 channel_id: channel_id.0,
749 user_id,
750 role: role.into(),
751 })
752 .await;
753
754 this.update(&mut cx, |this, cx| {
755 this.outgoing_invites.remove(&(channel_id, user_id));
756 cx.notify();
757 })?;
758
759 result?;
760 Ok(())
761 })
762 }
763
764 pub fn rename(
765 &mut self,
766 channel_id: ChannelId,
767 new_name: &str,
768 cx: &mut ModelContext<Self>,
769 ) -> Task<Result<()>> {
770 let client = self.client.clone();
771 let name = new_name.to_string();
772 cx.spawn(move |this, mut cx| async move {
773 let channel = client
774 .request(proto::RenameChannel {
775 channel_id: channel_id.0,
776 name,
777 })
778 .await?
779 .channel
780 .ok_or_else(|| anyhow!("missing channel in response"))?;
781 this.update(&mut cx, |this, cx| {
782 let task = this.update_channels(
783 proto::UpdateChannels {
784 channels: vec![channel],
785 ..Default::default()
786 },
787 cx,
788 );
789 assert!(task.is_none());
790
791 // This event is emitted because the collab panel wants to clear the pending edit state
792 // before this frame is rendered. But we can't guarantee that the collab panel's future
793 // will resolve before this flush_effects finishes. Synchronously emitting this event
794 // ensures that the collab panel will observe this creation before the frame complete
795 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
796 })?;
797 Ok(())
798 })
799 }
800
801 pub fn respond_to_channel_invite(
802 &mut self,
803 channel_id: ChannelId,
804 accept: bool,
805 cx: &mut ModelContext<Self>,
806 ) -> Task<Result<()>> {
807 let client = self.client.clone();
808 cx.background_executor().spawn(async move {
809 client
810 .request(proto::RespondToChannelInvite {
811 channel_id: channel_id.0,
812 accept,
813 })
814 .await?;
815 Ok(())
816 })
817 }
818 pub fn get_channel_member_details(
819 &self,
820 channel_id: ChannelId,
821 cx: &mut ModelContext<Self>,
822 ) -> Task<Result<Vec<ChannelMembership>>> {
823 let client = self.client.clone();
824 let user_store = self.user_store.downgrade();
825 cx.spawn(move |_, mut cx| async move {
826 let response = client
827 .request(proto::GetChannelMembers {
828 channel_id: channel_id.0,
829 })
830 .await?;
831
832 let user_ids = response.members.iter().map(|m| m.user_id).collect();
833 let user_store = user_store
834 .upgrade()
835 .ok_or_else(|| anyhow!("user store dropped"))?;
836 let users = user_store
837 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
838 .await?;
839
840 Ok(users
841 .into_iter()
842 .zip(response.members)
843 .map(|(user, member)| ChannelMembership {
844 user,
845 role: member.role(),
846 kind: member.kind(),
847 })
848 .collect())
849 })
850 }
851
852 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
853 let client = self.client.clone();
854 async move {
855 client
856 .request(proto::DeleteChannel {
857 channel_id: channel_id.0,
858 })
859 .await?;
860 Ok(())
861 }
862 }
863
864 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
865 false
866 }
867
868 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
869 self.outgoing_invites.contains(&(channel_id, user_id))
870 }
871
872 async fn handle_update_channels(
873 this: Model<Self>,
874 message: TypedEnvelope<proto::UpdateChannels>,
875 _: Arc<Client>,
876 mut cx: AsyncAppContext,
877 ) -> Result<()> {
878 this.update(&mut cx, |this, _| {
879 this.update_channels_tx
880 .unbounded_send(message.payload)
881 .unwrap();
882 })?;
883 Ok(())
884 }
885
886 async fn handle_update_user_channels(
887 this: Model<Self>,
888 message: TypedEnvelope<proto::UpdateUserChannels>,
889 _: Arc<Client>,
890 mut cx: AsyncAppContext,
891 ) -> Result<()> {
892 this.update(&mut cx, |this, cx| {
893 for buffer_version in message.payload.observed_channel_buffer_version {
894 let version = language::proto::deserialize_version(&buffer_version.version);
895 this.acknowledge_notes_version(
896 ChannelId(buffer_version.channel_id),
897 buffer_version.epoch,
898 &version,
899 cx,
900 );
901 }
902 for message_id in message.payload.observed_channel_message_id {
903 this.acknowledge_message_id(
904 ChannelId(message_id.channel_id),
905 message_id.message_id,
906 cx,
907 );
908 }
909 for membership in message.payload.channel_memberships {
910 if let Some(role) = ChannelRole::from_i32(membership.role) {
911 this.channel_states
912 .entry(ChannelId(membership.channel_id))
913 .or_insert_with(|| ChannelState::default())
914 .set_role(role)
915 }
916 }
917 })
918 }
919
920 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
921 self.channel_index.clear();
922 self.channel_invitations.clear();
923 self.channel_participants.clear();
924 self.channel_index.clear();
925 self.outgoing_invites.clear();
926 self.disconnect_channel_buffers_task.take();
927
928 for chat in self.opened_chats.values() {
929 if let OpenedModelHandle::Open(chat) = chat {
930 if let Some(chat) = chat.upgrade() {
931 chat.update(cx, |chat, cx| {
932 chat.rejoin(cx);
933 });
934 }
935 }
936 }
937
938 let mut buffer_versions = Vec::new();
939 for buffer in self.opened_buffers.values() {
940 if let OpenedModelHandle::Open(buffer) = buffer {
941 if let Some(buffer) = buffer.upgrade() {
942 let channel_buffer = buffer.read(cx);
943 let buffer = channel_buffer.buffer().read(cx);
944 buffer_versions.push(proto::ChannelBufferVersion {
945 channel_id: channel_buffer.channel_id.0,
946 epoch: channel_buffer.epoch(),
947 version: language::proto::serialize_version(&buffer.version()),
948 });
949 }
950 }
951 }
952
953 if buffer_versions.is_empty() {
954 return Task::ready(Ok(()));
955 }
956
957 let response = self.client.request(proto::RejoinChannelBuffers {
958 buffers: buffer_versions,
959 });
960
961 cx.spawn(|this, mut cx| async move {
962 let mut response = response.await?;
963
964 this.update(&mut cx, |this, cx| {
965 this.opened_buffers.retain(|_, buffer| match buffer {
966 OpenedModelHandle::Open(channel_buffer) => {
967 let Some(channel_buffer) = channel_buffer.upgrade() else {
968 return false;
969 };
970
971 channel_buffer.update(cx, |channel_buffer, cx| {
972 let channel_id = channel_buffer.channel_id;
973 if let Some(remote_buffer) = response
974 .buffers
975 .iter_mut()
976 .find(|buffer| buffer.channel_id == channel_id.0)
977 {
978 let channel_id = channel_buffer.channel_id;
979 let remote_version =
980 language::proto::deserialize_version(&remote_buffer.version);
981
982 channel_buffer.replace_collaborators(
983 mem::take(&mut remote_buffer.collaborators),
984 cx,
985 );
986
987 let operations = channel_buffer
988 .buffer()
989 .update(cx, |buffer, cx| {
990 let outgoing_operations =
991 buffer.serialize_ops(Some(remote_version), cx);
992 let incoming_operations =
993 mem::take(&mut remote_buffer.operations)
994 .into_iter()
995 .map(language::proto::deserialize_operation)
996 .collect::<Result<Vec<_>>>()?;
997 buffer.apply_ops(incoming_operations, cx)?;
998 anyhow::Ok(outgoing_operations)
999 })
1000 .log_err();
1001
1002 if let Some(operations) = operations {
1003 let client = this.client.clone();
1004 cx.background_executor()
1005 .spawn(async move {
1006 let operations = operations.await;
1007 for chunk in
1008 language::proto::split_operations(operations)
1009 {
1010 client
1011 .send(proto::UpdateChannelBuffer {
1012 channel_id: channel_id.0,
1013 operations: chunk,
1014 })
1015 .ok();
1016 }
1017 })
1018 .detach();
1019 return true;
1020 }
1021 }
1022
1023 channel_buffer.disconnect(cx);
1024 false
1025 })
1026 }
1027 OpenedModelHandle::Loading(_) => true,
1028 });
1029 })
1030 .ok();
1031 anyhow::Ok(())
1032 })
1033 }
1034
1035 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1036 cx.notify();
1037
1038 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1039 cx.spawn(move |this, mut cx| async move {
1040 if wait_for_reconnect {
1041 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1042 }
1043
1044 if let Some(this) = this.upgrade() {
1045 this.update(&mut cx, |this, cx| {
1046 for (_, buffer) in this.opened_buffers.drain() {
1047 if let OpenedModelHandle::Open(buffer) = buffer {
1048 if let Some(buffer) = buffer.upgrade() {
1049 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1050 }
1051 }
1052 }
1053 })
1054 .ok();
1055 }
1056 })
1057 });
1058 }
1059
1060 pub(crate) fn update_channels(
1061 &mut self,
1062 payload: proto::UpdateChannels,
1063 cx: &mut ModelContext<ChannelStore>,
1064 ) -> Option<Task<Result<()>>> {
1065 if !payload.remove_channel_invitations.is_empty() {
1066 self.channel_invitations
1067 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1068 }
1069 for channel in payload.channel_invitations {
1070 match self
1071 .channel_invitations
1072 .binary_search_by_key(&channel.id, |c| c.id.0)
1073 {
1074 Ok(ix) => {
1075 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1076 }
1077 Err(ix) => self.channel_invitations.insert(
1078 ix,
1079 Arc::new(Channel {
1080 id: ChannelId(channel.id),
1081 visibility: channel.visibility(),
1082 name: channel.name.into(),
1083 parent_path: channel
1084 .parent_path
1085 .into_iter()
1086 .map(|cid| ChannelId(cid))
1087 .collect(),
1088 }),
1089 ),
1090 }
1091 }
1092
1093 let channels_changed = !payload.channels.is_empty()
1094 || !payload.delete_channels.is_empty()
1095 || !payload.latest_channel_message_ids.is_empty()
1096 || !payload.latest_channel_buffer_versions.is_empty()
1097 || !payload.hosted_projects.is_empty()
1098 || !payload.deleted_hosted_projects.is_empty();
1099
1100 if channels_changed {
1101 if !payload.delete_channels.is_empty() {
1102 let delete_channels: Vec<ChannelId> = payload
1103 .delete_channels
1104 .into_iter()
1105 .map(|cid| ChannelId(cid))
1106 .collect();
1107 self.channel_index.delete_channels(&delete_channels);
1108 self.channel_participants
1109 .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1110
1111 for channel_id in &delete_channels {
1112 let channel_id = *channel_id;
1113 if payload
1114 .channels
1115 .iter()
1116 .any(|channel| channel.id == channel_id.0)
1117 {
1118 continue;
1119 }
1120 if let Some(OpenedModelHandle::Open(buffer)) =
1121 self.opened_buffers.remove(&channel_id)
1122 {
1123 if let Some(buffer) = buffer.upgrade() {
1124 buffer.update(cx, ChannelBuffer::disconnect);
1125 }
1126 }
1127 }
1128 }
1129
1130 let mut index = self.channel_index.bulk_insert();
1131 for channel in payload.channels {
1132 let id = ChannelId(channel.id);
1133 let channel_changed = index.insert(channel);
1134
1135 if channel_changed {
1136 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1137 if let Some(buffer) = buffer.upgrade() {
1138 buffer.update(cx, ChannelBuffer::channel_changed);
1139 }
1140 }
1141 }
1142 }
1143
1144 for latest_buffer_version in payload.latest_channel_buffer_versions {
1145 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1146 self.channel_states
1147 .entry(ChannelId(latest_buffer_version.channel_id))
1148 .or_default()
1149 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1150 }
1151
1152 for latest_channel_message in payload.latest_channel_message_ids {
1153 self.channel_states
1154 .entry(ChannelId(latest_channel_message.channel_id))
1155 .or_default()
1156 .update_latest_message_id(latest_channel_message.message_id);
1157 }
1158
1159 for hosted_project in payload.hosted_projects {
1160 let hosted_project: HostedProject = hosted_project.into();
1161 if let Some(old_project) = self
1162 .hosted_projects
1163 .insert(hosted_project.project_id, hosted_project.clone())
1164 {
1165 self.channel_states
1166 .entry(old_project.channel_id)
1167 .or_default()
1168 .remove_hosted_project(old_project.project_id);
1169 }
1170 self.channel_states
1171 .entry(hosted_project.channel_id)
1172 .or_default()
1173 .add_hosted_project(hosted_project.project_id);
1174 }
1175
1176 for hosted_project_id in payload.deleted_hosted_projects {
1177 let hosted_project_id = ProjectId(hosted_project_id);
1178
1179 if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1180 self.channel_states
1181 .entry(old_project.channel_id)
1182 .or_default()
1183 .remove_hosted_project(old_project.project_id);
1184 }
1185 }
1186 }
1187
1188 cx.notify();
1189 if payload.channel_participants.is_empty() {
1190 return None;
1191 }
1192
1193 let mut all_user_ids = Vec::new();
1194 let channel_participants = payload.channel_participants;
1195 for entry in &channel_participants {
1196 for user_id in entry.participant_user_ids.iter() {
1197 if let Err(ix) = all_user_ids.binary_search(user_id) {
1198 all_user_ids.insert(ix, *user_id);
1199 }
1200 }
1201 }
1202
1203 let users = self
1204 .user_store
1205 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1206 Some(cx.spawn(|this, mut cx| async move {
1207 let users = users.await?;
1208
1209 this.update(&mut cx, |this, cx| {
1210 for entry in &channel_participants {
1211 let mut participants: Vec<_> = entry
1212 .participant_user_ids
1213 .iter()
1214 .filter_map(|user_id| {
1215 users
1216 .binary_search_by_key(&user_id, |user| &user.id)
1217 .ok()
1218 .map(|ix| users[ix].clone())
1219 })
1220 .collect();
1221
1222 participants.sort_by_key(|u| u.id);
1223
1224 this.channel_participants
1225 .insert(ChannelId(entry.channel_id), participants);
1226 }
1227
1228 cx.notify();
1229 })
1230 }))
1231 }
1232}
1233
1234impl ChannelState {
1235 fn set_role(&mut self, role: ChannelRole) {
1236 self.role = Some(role);
1237 }
1238
1239 fn has_channel_buffer_changed(&self) -> bool {
1240 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1241 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1242 && self
1243 .latest_notes_version
1244 .version
1245 .changed_since(&self.observed_notes_version.version))
1246 }
1247
1248 fn has_new_messages(&self) -> bool {
1249 let latest_message_id = self.latest_chat_message;
1250 let observed_message_id = self.observed_chat_message;
1251
1252 latest_message_id.is_some_and(|latest_message_id| {
1253 latest_message_id > observed_message_id.unwrap_or_default()
1254 })
1255 }
1256
1257 fn last_acknowledged_message_id(&self) -> Option<u64> {
1258 self.observed_chat_message
1259 }
1260
1261 fn acknowledge_message_id(&mut self, message_id: u64) {
1262 let observed = self.observed_chat_message.get_or_insert(message_id);
1263 *observed = (*observed).max(message_id);
1264 }
1265
1266 fn update_latest_message_id(&mut self, message_id: u64) {
1267 self.latest_chat_message =
1268 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1269 }
1270
1271 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1272 if self.observed_notes_version.epoch == epoch {
1273 self.observed_notes_version.version.join(version);
1274 } else {
1275 self.observed_notes_version = NotesVersion {
1276 epoch,
1277 version: version.clone(),
1278 };
1279 }
1280 }
1281
1282 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1283 if self.latest_notes_version.epoch == epoch {
1284 self.latest_notes_version.version.join(version);
1285 } else {
1286 self.latest_notes_version = NotesVersion {
1287 epoch,
1288 version: version.clone(),
1289 };
1290 }
1291 }
1292
1293 fn add_hosted_project(&mut self, project_id: ProjectId) {
1294 self.projects.insert(project_id);
1295 }
1296
1297 fn remove_hosted_project(&mut self, project_id: ProjectId) {
1298 self.projects.remove(&project_id);
1299 }
1300}