1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
11 Task, WeakModel,
12};
13use language::Capability;
14use rpc::{
15 proto::{self, ChannelRole, ChannelVisibility},
16 TypedEnvelope,
17};
18use settings::Settings;
19use std::{mem, sync::Arc, time::Duration};
20use util::{maybe, ResultExt};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
25 let channel_store =
26 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36#[derive(Debug, Clone)]
37pub struct HostedProject {
38 project_id: ProjectId,
39 channel_id: ChannelId,
40 name: SharedString,
41 _visibility: proto::ChannelVisibility,
42}
43impl From<proto::HostedProject> for HostedProject {
44 fn from(project: proto::HostedProject) -> Self {
45 Self {
46 project_id: ProjectId(project.project_id),
47 channel_id: ChannelId(project.channel_id),
48 _visibility: project.visibility(),
49 name: project.name.into(),
50 }
51 }
52}
53pub struct ChannelStore {
54 pub channel_index: ChannelIndex,
55 channel_invitations: Vec<Arc<Channel>>,
56 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
57 channel_states: HashMap<ChannelId, ChannelState>,
58 hosted_projects: HashMap<ProjectId, HostedProject>,
59
60 outgoing_invites: HashSet<(ChannelId, UserId)>,
61 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
62 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
63 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
64 client: Arc<Client>,
65 did_subscribe: bool,
66 user_store: Model<UserStore>,
67 _rpc_subscriptions: [Subscription; 2],
68 _watch_connection_status: Task<Option<()>>,
69 disconnect_channel_buffers_task: Option<Task<()>>,
70 _update_channels: Task<()>,
71}
72
73#[derive(Clone, Debug)]
74pub struct Channel {
75 pub id: ChannelId,
76 pub name: SharedString,
77 pub visibility: proto::ChannelVisibility,
78 pub parent_path: Vec<ChannelId>,
79}
80
81#[derive(Default, Debug)]
82pub struct ChannelState {
83 latest_chat_message: Option<u64>,
84 latest_notes_version: NotesVersion,
85 observed_notes_version: NotesVersion,
86 observed_chat_message: Option<u64>,
87 role: Option<ChannelRole>,
88 projects: HashSet<ProjectId>,
89}
90
91impl Channel {
92 pub fn link(&self, cx: &AppContext) -> String {
93 format!(
94 "{}/channel/{}-{}",
95 ClientSettings::get_global(cx).server_url,
96 Self::slug(&self.name),
97 self.id
98 )
99 }
100
101 pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
102 self.link(cx)
103 + "/notes"
104 + &heading
105 .map(|h| format!("#{}", Self::slug(&h)))
106 .unwrap_or_default()
107 }
108
109 pub fn is_root_channel(&self) -> bool {
110 self.parent_path.is_empty()
111 }
112
113 pub fn root_id(&self) -> ChannelId {
114 self.parent_path.first().copied().unwrap_or(self.id)
115 }
116
117 pub fn slug(str: &str) -> String {
118 let slug: String = str
119 .chars()
120 .map(|c| if c.is_alphanumeric() { c } else { '-' })
121 .collect();
122
123 slug.trim_matches(|c| c == '-').to_string()
124 }
125}
126
127#[derive(Debug)]
128pub struct ChannelMembership {
129 pub user: Arc<User>,
130 pub kind: proto::channel_member::Kind,
131 pub role: proto::ChannelRole,
132}
133impl ChannelMembership {
134 pub fn sort_key(&self) -> MembershipSortKey {
135 MembershipSortKey {
136 role_order: match self.role {
137 proto::ChannelRole::Admin => 0,
138 proto::ChannelRole::Member => 1,
139 proto::ChannelRole::Banned => 2,
140 proto::ChannelRole::Talker => 3,
141 proto::ChannelRole::Guest => 4,
142 },
143 kind_order: match self.kind {
144 proto::channel_member::Kind::Member => 0,
145 proto::channel_member::Kind::Invitee => 1,
146 },
147 username_order: self.user.github_login.as_str(),
148 }
149 }
150}
151
152#[derive(PartialOrd, Ord, PartialEq, Eq)]
153pub struct MembershipSortKey<'a> {
154 role_order: u8,
155 kind_order: u8,
156 username_order: &'a str,
157}
158
159pub enum ChannelEvent {
160 ChannelCreated(ChannelId),
161 ChannelRenamed(ChannelId),
162}
163
164impl EventEmitter<ChannelEvent> for ChannelStore {}
165
166enum OpenedModelHandle<E> {
167 Open(WeakModel<E>),
168 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
169}
170
171struct GlobalChannelStore(Model<ChannelStore>);
172
173impl Global for GlobalChannelStore {}
174
175impl ChannelStore {
176 pub fn global(cx: &AppContext) -> Model<Self> {
177 cx.global::<GlobalChannelStore>().0.clone()
178 }
179
180 pub fn new(
181 client: Arc<Client>,
182 user_store: Model<UserStore>,
183 cx: &mut ModelContext<Self>,
184 ) -> Self {
185 let rpc_subscriptions = [
186 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
187 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
188 ];
189
190 let mut connection_status = client.status();
191 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
192 let watch_connection_status = cx.spawn(|this, mut cx| async move {
193 while let Some(status) = connection_status.next().await {
194 let this = this.upgrade()?;
195 match status {
196 client::Status::Connected { .. } => {
197 this.update(&mut cx, |this, cx| this.handle_connect(cx))
198 .ok()?
199 .await
200 .log_err()?;
201 }
202 client::Status::SignedOut | client::Status::UpgradeRequired => {
203 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
204 .ok();
205 }
206 _ => {
207 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
208 .ok();
209 }
210 }
211 }
212 Some(())
213 });
214
215 Self {
216 channel_invitations: Vec::default(),
217 channel_index: ChannelIndex::default(),
218 channel_participants: Default::default(),
219 hosted_projects: Default::default(),
220 outgoing_invites: Default::default(),
221 opened_buffers: Default::default(),
222 opened_chats: Default::default(),
223 update_channels_tx,
224 client,
225 user_store,
226 _rpc_subscriptions: rpc_subscriptions,
227 _watch_connection_status: watch_connection_status,
228 disconnect_channel_buffers_task: None,
229 _update_channels: cx.spawn(|this, mut cx| async move {
230 maybe!(async move {
231 while let Some(update_channels) = update_channels_rx.next().await {
232 if let Some(this) = this.upgrade() {
233 let update_task = this.update(&mut cx, |this, cx| {
234 this.update_channels(update_channels, cx)
235 })?;
236 if let Some(update_task) = update_task {
237 update_task.await.log_err();
238 }
239 }
240 }
241 anyhow::Ok(())
242 })
243 .await
244 .log_err();
245 }),
246 channel_states: Default::default(),
247 did_subscribe: false,
248 }
249 }
250
251 pub fn initialize(&mut self) {
252 if !self.did_subscribe
253 && self
254 .client
255 .send(proto::SubscribeToChannels {})
256 .log_err()
257 .is_some()
258 {
259 self.did_subscribe = true;
260 }
261 }
262
263 pub fn client(&self) -> Arc<Client> {
264 self.client.clone()
265 }
266
267 /// Returns the number of unique channels in the store
268 pub fn channel_count(&self) -> usize {
269 self.channel_index.by_id().len()
270 }
271
272 /// Returns the index of a channel ID in the list of unique channels
273 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
274 self.channel_index
275 .by_id()
276 .keys()
277 .position(|id| *id == channel_id)
278 }
279
280 /// Returns an iterator over all unique channels
281 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
282 self.channel_index.by_id().values()
283 }
284
285 /// Iterate over all entries in the channel DAG
286 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
287 self.channel_index
288 .ordered_channels()
289 .iter()
290 .filter_map(move |id| {
291 let channel = self.channel_index.by_id().get(id)?;
292 Some((channel.parent_path.len(), channel))
293 })
294 }
295
296 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
297 let channel_id = self.channel_index.ordered_channels().get(ix)?;
298 self.channel_index.by_id().get(channel_id)
299 }
300
301 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
302 self.channel_index.by_id().values().nth(ix)
303 }
304
305 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
306 self.channel_invitations
307 .iter()
308 .any(|channel| channel.id == channel_id)
309 }
310
311 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
312 &self.channel_invitations
313 }
314
315 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
316 self.channel_index.by_id().get(&channel_id)
317 }
318
319 pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
320 let mut projects: Vec<(SharedString, ProjectId)> = self
321 .channel_states
322 .get(&channel_id)
323 .map(|state| state.projects.clone())
324 .unwrap_or_default()
325 .into_iter()
326 .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
327 .collect();
328 projects.sort();
329 projects
330 }
331
332 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
333 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
334 if let OpenedModelHandle::Open(buffer) = buffer {
335 return buffer.upgrade().is_some();
336 }
337 }
338 false
339 }
340
341 pub fn open_channel_buffer(
342 &mut self,
343 channel_id: ChannelId,
344 cx: &mut ModelContext<Self>,
345 ) -> Task<Result<Model<ChannelBuffer>>> {
346 let client = self.client.clone();
347 let user_store = self.user_store.clone();
348 let channel_store = cx.handle();
349 self.open_channel_resource(
350 channel_id,
351 |this| &mut this.opened_buffers,
352 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
353 cx,
354 )
355 }
356
357 pub fn fetch_channel_messages(
358 &self,
359 message_ids: Vec<u64>,
360 cx: &mut ModelContext<Self>,
361 ) -> Task<Result<Vec<ChannelMessage>>> {
362 let request = if message_ids.is_empty() {
363 None
364 } else {
365 Some(
366 self.client
367 .request(proto::GetChannelMessagesById { message_ids }),
368 )
369 };
370 cx.spawn(|this, mut cx| async move {
371 if let Some(request) = request {
372 let response = request.await?;
373 let this = this
374 .upgrade()
375 .ok_or_else(|| anyhow!("channel store dropped"))?;
376 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
377 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
378 } else {
379 Ok(Vec::new())
380 }
381 })
382 }
383
384 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
385 self.channel_states
386 .get(&channel_id)
387 .is_some_and(|state| state.has_channel_buffer_changed())
388 }
389
390 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
391 self.channel_states
392 .get(&channel_id)
393 .is_some_and(|state| state.has_new_messages())
394 }
395
396 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
397 if let Some(state) = self.channel_states.get_mut(&channel_id) {
398 state.latest_chat_message = message_id;
399 }
400 }
401
402 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
403 self.channel_states.get(&channel_id).and_then(|state| {
404 if let Some(last_message_id) = state.latest_chat_message {
405 if state
406 .last_acknowledged_message_id()
407 .is_some_and(|id| id < last_message_id)
408 {
409 return state.last_acknowledged_message_id();
410 }
411 }
412
413 None
414 })
415 }
416
417 pub fn acknowledge_message_id(
418 &mut self,
419 channel_id: ChannelId,
420 message_id: u64,
421 cx: &mut ModelContext<Self>,
422 ) {
423 self.channel_states
424 .entry(channel_id)
425 .or_default()
426 .acknowledge_message_id(message_id);
427 cx.notify();
428 }
429
430 pub fn update_latest_message_id(
431 &mut self,
432 channel_id: ChannelId,
433 message_id: u64,
434 cx: &mut ModelContext<Self>,
435 ) {
436 self.channel_states
437 .entry(channel_id)
438 .or_default()
439 .update_latest_message_id(message_id);
440 cx.notify();
441 }
442
443 pub fn acknowledge_notes_version(
444 &mut self,
445 channel_id: ChannelId,
446 epoch: u64,
447 version: &clock::Global,
448 cx: &mut ModelContext<Self>,
449 ) {
450 self.channel_states
451 .entry(channel_id)
452 .or_default()
453 .acknowledge_notes_version(epoch, version);
454 cx.notify()
455 }
456
457 pub fn update_latest_notes_version(
458 &mut self,
459 channel_id: ChannelId,
460 epoch: u64,
461 version: &clock::Global,
462 cx: &mut ModelContext<Self>,
463 ) {
464 self.channel_states
465 .entry(channel_id)
466 .or_default()
467 .update_latest_notes_version(epoch, version);
468 cx.notify()
469 }
470
471 pub fn open_channel_chat(
472 &mut self,
473 channel_id: ChannelId,
474 cx: &mut ModelContext<Self>,
475 ) -> Task<Result<Model<ChannelChat>>> {
476 let client = self.client.clone();
477 let user_store = self.user_store.clone();
478 let this = cx.handle();
479 self.open_channel_resource(
480 channel_id,
481 |this| &mut this.opened_chats,
482 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
483 cx,
484 )
485 }
486
487 /// Asynchronously open a given resource associated with a channel.
488 ///
489 /// Make sure that the resource is only opened once, even if this method
490 /// is called multiple times with the same channel id while the first task
491 /// is still running.
492 fn open_channel_resource<T, F, Fut>(
493 &mut self,
494 channel_id: ChannelId,
495 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
496 load: F,
497 cx: &mut ModelContext<Self>,
498 ) -> Task<Result<Model<T>>>
499 where
500 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
501 Fut: Future<Output = Result<Model<T>>>,
502 T: 'static,
503 {
504 let task = loop {
505 match get_map(self).entry(channel_id) {
506 hash_map::Entry::Occupied(e) => match e.get() {
507 OpenedModelHandle::Open(model) => {
508 if let Some(model) = model.upgrade() {
509 break Task::ready(Ok(model)).shared();
510 } else {
511 get_map(self).remove(&channel_id);
512 continue;
513 }
514 }
515 OpenedModelHandle::Loading(task) => {
516 break task.clone();
517 }
518 },
519 hash_map::Entry::Vacant(e) => {
520 let task = cx
521 .spawn(move |this, mut cx| async move {
522 let channel = this.update(&mut cx, |this, _| {
523 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
524 Arc::new(anyhow!("no channel for id: {}", channel_id))
525 })
526 })??;
527
528 load(channel, cx).await.map_err(Arc::new)
529 })
530 .shared();
531
532 e.insert(OpenedModelHandle::Loading(task.clone()));
533 cx.spawn({
534 let task = task.clone();
535 move |this, mut cx| async move {
536 let result = task.await;
537 this.update(&mut cx, |this, _| match result {
538 Ok(model) => {
539 get_map(this).insert(
540 channel_id,
541 OpenedModelHandle::Open(model.downgrade()),
542 );
543 }
544 Err(_) => {
545 get_map(this).remove(&channel_id);
546 }
547 })
548 .ok();
549 }
550 })
551 .detach();
552 break task;
553 }
554 }
555 };
556 cx.background_executor()
557 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
558 }
559
560 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
561 self.channel_role(channel_id) == proto::ChannelRole::Admin
562 }
563
564 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
565 self.channel_index
566 .by_id()
567 .get(&channel_id)
568 .map_or(false, |channel| channel.is_root_channel())
569 }
570
571 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
572 self.channel_index
573 .by_id()
574 .get(&channel_id)
575 .map_or(false, |channel| {
576 channel.visibility == ChannelVisibility::Public
577 })
578 }
579
580 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
581 match self.channel_role(channel_id) {
582 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
583 _ => Capability::ReadOnly,
584 }
585 }
586
587 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
588 maybe!({
589 let mut channel = self.channel_for_id(channel_id)?;
590 if !channel.is_root_channel() {
591 channel = self.channel_for_id(channel.root_id())?;
592 }
593 let root_channel_state = self.channel_states.get(&channel.id);
594 root_channel_state?.role
595 })
596 .unwrap_or(proto::ChannelRole::Guest)
597 }
598
599 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
600 self.channel_participants
601 .get(&channel_id)
602 .map_or(&[], |v| v.as_slice())
603 }
604
605 pub fn create_channel(
606 &self,
607 name: &str,
608 parent_id: Option<ChannelId>,
609 cx: &mut ModelContext<Self>,
610 ) -> Task<Result<ChannelId>> {
611 let client = self.client.clone();
612 let name = name.trim_start_matches('#').to_owned();
613 cx.spawn(move |this, mut cx| async move {
614 let response = client
615 .request(proto::CreateChannel {
616 name,
617 parent_id: parent_id.map(|cid| cid.0),
618 })
619 .await?;
620
621 let channel = response
622 .channel
623 .ok_or_else(|| anyhow!("missing channel in response"))?;
624 let channel_id = ChannelId(channel.id);
625
626 this.update(&mut cx, |this, cx| {
627 let task = this.update_channels(
628 proto::UpdateChannels {
629 channels: vec![channel],
630 ..Default::default()
631 },
632 cx,
633 );
634 assert!(task.is_none());
635
636 // This event is emitted because the collab panel wants to clear the pending edit state
637 // before this frame is rendered. But we can't guarantee that the collab panel's future
638 // will resolve before this flush_effects finishes. Synchronously emitting this event
639 // ensures that the collab panel will observe this creation before the frame completes
640 cx.emit(ChannelEvent::ChannelCreated(channel_id));
641 })?;
642
643 Ok(channel_id)
644 })
645 }
646
647 pub fn move_channel(
648 &mut self,
649 channel_id: ChannelId,
650 to: ChannelId,
651 cx: &mut ModelContext<Self>,
652 ) -> Task<Result<()>> {
653 let client = self.client.clone();
654 cx.spawn(move |_, _| async move {
655 let _ = client
656 .request(proto::MoveChannel {
657 channel_id: channel_id.0,
658 to: to.0,
659 })
660 .await?;
661
662 Ok(())
663 })
664 }
665
666 pub fn set_channel_visibility(
667 &mut self,
668 channel_id: ChannelId,
669 visibility: ChannelVisibility,
670 cx: &mut ModelContext<Self>,
671 ) -> Task<Result<()>> {
672 let client = self.client.clone();
673 cx.spawn(move |_, _| async move {
674 let _ = client
675 .request(proto::SetChannelVisibility {
676 channel_id: channel_id.0,
677 visibility: visibility.into(),
678 })
679 .await?;
680
681 Ok(())
682 })
683 }
684
685 pub fn invite_member(
686 &mut self,
687 channel_id: ChannelId,
688 user_id: UserId,
689 role: proto::ChannelRole,
690 cx: &mut ModelContext<Self>,
691 ) -> Task<Result<()>> {
692 if !self.outgoing_invites.insert((channel_id, user_id)) {
693 return Task::ready(Err(anyhow!("invite request already in progress")));
694 }
695
696 cx.notify();
697 let client = self.client.clone();
698 cx.spawn(move |this, mut cx| async move {
699 let result = client
700 .request(proto::InviteChannelMember {
701 channel_id: channel_id.0,
702 user_id,
703 role: role.into(),
704 })
705 .await;
706
707 this.update(&mut cx, |this, cx| {
708 this.outgoing_invites.remove(&(channel_id, user_id));
709 cx.notify();
710 })?;
711
712 result?;
713
714 Ok(())
715 })
716 }
717
718 pub fn remove_member(
719 &mut self,
720 channel_id: ChannelId,
721 user_id: u64,
722 cx: &mut ModelContext<Self>,
723 ) -> Task<Result<()>> {
724 if !self.outgoing_invites.insert((channel_id, user_id)) {
725 return Task::ready(Err(anyhow!("invite request already in progress")));
726 }
727
728 cx.notify();
729 let client = self.client.clone();
730 cx.spawn(move |this, mut cx| async move {
731 let result = client
732 .request(proto::RemoveChannelMember {
733 channel_id: channel_id.0,
734 user_id,
735 })
736 .await;
737
738 this.update(&mut cx, |this, cx| {
739 this.outgoing_invites.remove(&(channel_id, user_id));
740 cx.notify();
741 })?;
742 result?;
743 Ok(())
744 })
745 }
746
747 pub fn set_member_role(
748 &mut self,
749 channel_id: ChannelId,
750 user_id: UserId,
751 role: proto::ChannelRole,
752 cx: &mut ModelContext<Self>,
753 ) -> Task<Result<()>> {
754 if !self.outgoing_invites.insert((channel_id, user_id)) {
755 return Task::ready(Err(anyhow!("member request already in progress")));
756 }
757
758 cx.notify();
759 let client = self.client.clone();
760 cx.spawn(move |this, mut cx| async move {
761 let result = client
762 .request(proto::SetChannelMemberRole {
763 channel_id: channel_id.0,
764 user_id,
765 role: role.into(),
766 })
767 .await;
768
769 this.update(&mut cx, |this, cx| {
770 this.outgoing_invites.remove(&(channel_id, user_id));
771 cx.notify();
772 })?;
773
774 result?;
775 Ok(())
776 })
777 }
778
779 pub fn rename(
780 &mut self,
781 channel_id: ChannelId,
782 new_name: &str,
783 cx: &mut ModelContext<Self>,
784 ) -> Task<Result<()>> {
785 let client = self.client.clone();
786 let name = new_name.to_string();
787 cx.spawn(move |this, mut cx| async move {
788 let channel = client
789 .request(proto::RenameChannel {
790 channel_id: channel_id.0,
791 name,
792 })
793 .await?
794 .channel
795 .ok_or_else(|| anyhow!("missing channel in response"))?;
796 this.update(&mut cx, |this, cx| {
797 let task = this.update_channels(
798 proto::UpdateChannels {
799 channels: vec![channel],
800 ..Default::default()
801 },
802 cx,
803 );
804 assert!(task.is_none());
805
806 // This event is emitted because the collab panel wants to clear the pending edit state
807 // before this frame is rendered. But we can't guarantee that the collab panel's future
808 // will resolve before this flush_effects finishes. Synchronously emitting this event
809 // ensures that the collab panel will observe this creation before the frame complete
810 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
811 })?;
812 Ok(())
813 })
814 }
815
816 pub fn respond_to_channel_invite(
817 &mut self,
818 channel_id: ChannelId,
819 accept: bool,
820 cx: &mut ModelContext<Self>,
821 ) -> Task<Result<()>> {
822 let client = self.client.clone();
823 cx.background_executor().spawn(async move {
824 client
825 .request(proto::RespondToChannelInvite {
826 channel_id: channel_id.0,
827 accept,
828 })
829 .await?;
830 Ok(())
831 })
832 }
833 pub fn fuzzy_search_members(
834 &self,
835 channel_id: ChannelId,
836 query: String,
837 limit: u16,
838 cx: &mut ModelContext<Self>,
839 ) -> Task<Result<Vec<ChannelMembership>>> {
840 let client = self.client.clone();
841 let user_store = self.user_store.downgrade();
842 cx.spawn(move |_, mut cx| async move {
843 let response = client
844 .request(proto::GetChannelMembers {
845 channel_id: channel_id.0,
846 query,
847 limit: limit as u64,
848 })
849 .await?;
850 user_store.update(&mut cx, |user_store, _| {
851 user_store.insert(response.users);
852 response
853 .members
854 .into_iter()
855 .filter_map(|member| {
856 Some(ChannelMembership {
857 user: user_store.get_cached_user(member.user_id)?,
858 role: member.role(),
859 kind: member.kind(),
860 })
861 })
862 .collect()
863 })
864 })
865 }
866
867 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
868 let client = self.client.clone();
869 async move {
870 client
871 .request(proto::DeleteChannel {
872 channel_id: channel_id.0,
873 })
874 .await?;
875 Ok(())
876 }
877 }
878
879 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
880 false
881 }
882
883 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
884 self.outgoing_invites.contains(&(channel_id, user_id))
885 }
886
887 async fn handle_update_channels(
888 this: Model<Self>,
889 message: TypedEnvelope<proto::UpdateChannels>,
890 mut cx: AsyncAppContext,
891 ) -> Result<()> {
892 this.update(&mut cx, |this, _| {
893 this.update_channels_tx
894 .unbounded_send(message.payload)
895 .unwrap();
896 })?;
897 Ok(())
898 }
899
900 async fn handle_update_user_channels(
901 this: Model<Self>,
902 message: TypedEnvelope<proto::UpdateUserChannels>,
903 mut cx: AsyncAppContext,
904 ) -> Result<()> {
905 this.update(&mut cx, |this, cx| {
906 for buffer_version in message.payload.observed_channel_buffer_version {
907 let version = language::proto::deserialize_version(&buffer_version.version);
908 this.acknowledge_notes_version(
909 ChannelId(buffer_version.channel_id),
910 buffer_version.epoch,
911 &version,
912 cx,
913 );
914 }
915 for message_id in message.payload.observed_channel_message_id {
916 this.acknowledge_message_id(
917 ChannelId(message_id.channel_id),
918 message_id.message_id,
919 cx,
920 );
921 }
922 for membership in message.payload.channel_memberships {
923 if let Some(role) = ChannelRole::from_i32(membership.role) {
924 this.channel_states
925 .entry(ChannelId(membership.channel_id))
926 .or_default()
927 .set_role(role)
928 }
929 }
930 })
931 }
932
933 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
934 self.channel_index.clear();
935 self.channel_invitations.clear();
936 self.channel_participants.clear();
937 self.channel_index.clear();
938 self.outgoing_invites.clear();
939 self.disconnect_channel_buffers_task.take();
940
941 for chat in self.opened_chats.values() {
942 if let OpenedModelHandle::Open(chat) = chat {
943 if let Some(chat) = chat.upgrade() {
944 chat.update(cx, |chat, cx| {
945 chat.rejoin(cx);
946 });
947 }
948 }
949 }
950
951 let mut buffer_versions = Vec::new();
952 for buffer in self.opened_buffers.values() {
953 if let OpenedModelHandle::Open(buffer) = buffer {
954 if let Some(buffer) = buffer.upgrade() {
955 let channel_buffer = buffer.read(cx);
956 let buffer = channel_buffer.buffer().read(cx);
957 buffer_versions.push(proto::ChannelBufferVersion {
958 channel_id: channel_buffer.channel_id.0,
959 epoch: channel_buffer.epoch(),
960 version: language::proto::serialize_version(&buffer.version()),
961 });
962 }
963 }
964 }
965
966 if buffer_versions.is_empty() {
967 return Task::ready(Ok(()));
968 }
969
970 let response = self.client.request(proto::RejoinChannelBuffers {
971 buffers: buffer_versions,
972 });
973
974 cx.spawn(|this, mut cx| async move {
975 let mut response = response.await?;
976
977 this.update(&mut cx, |this, cx| {
978 this.opened_buffers.retain(|_, buffer| match buffer {
979 OpenedModelHandle::Open(channel_buffer) => {
980 let Some(channel_buffer) = channel_buffer.upgrade() else {
981 return false;
982 };
983
984 channel_buffer.update(cx, |channel_buffer, cx| {
985 let channel_id = channel_buffer.channel_id;
986 if let Some(remote_buffer) = response
987 .buffers
988 .iter_mut()
989 .find(|buffer| buffer.channel_id == channel_id.0)
990 {
991 let channel_id = channel_buffer.channel_id;
992 let remote_version =
993 language::proto::deserialize_version(&remote_buffer.version);
994
995 channel_buffer.replace_collaborators(
996 mem::take(&mut remote_buffer.collaborators),
997 cx,
998 );
999
1000 let operations = channel_buffer
1001 .buffer()
1002 .update(cx, |buffer, cx| {
1003 let outgoing_operations =
1004 buffer.serialize_ops(Some(remote_version), cx);
1005 let incoming_operations =
1006 mem::take(&mut remote_buffer.operations)
1007 .into_iter()
1008 .map(language::proto::deserialize_operation)
1009 .collect::<Result<Vec<_>>>()?;
1010 buffer.apply_ops(incoming_operations, cx);
1011 anyhow::Ok(outgoing_operations)
1012 })
1013 .log_err();
1014
1015 if let Some(operations) = operations {
1016 let client = this.client.clone();
1017 cx.background_executor()
1018 .spawn(async move {
1019 let operations = operations.await;
1020 for chunk in
1021 language::proto::split_operations(operations)
1022 {
1023 client
1024 .send(proto::UpdateChannelBuffer {
1025 channel_id: channel_id.0,
1026 operations: chunk,
1027 })
1028 .ok();
1029 }
1030 })
1031 .detach();
1032 return true;
1033 }
1034 }
1035
1036 channel_buffer.disconnect(cx);
1037 false
1038 })
1039 }
1040 OpenedModelHandle::Loading(_) => true,
1041 });
1042 })
1043 .ok();
1044 anyhow::Ok(())
1045 })
1046 }
1047
1048 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1049 cx.notify();
1050 self.did_subscribe = false;
1051 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1052 cx.spawn(move |this, mut cx| async move {
1053 if wait_for_reconnect {
1054 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1055 }
1056
1057 if let Some(this) = this.upgrade() {
1058 this.update(&mut cx, |this, cx| {
1059 for (_, buffer) in this.opened_buffers.drain() {
1060 if let OpenedModelHandle::Open(buffer) = buffer {
1061 if let Some(buffer) = buffer.upgrade() {
1062 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1063 }
1064 }
1065 }
1066 })
1067 .ok();
1068 }
1069 })
1070 });
1071 }
1072
1073 pub(crate) fn update_channels(
1074 &mut self,
1075 payload: proto::UpdateChannels,
1076 cx: &mut ModelContext<ChannelStore>,
1077 ) -> Option<Task<Result<()>>> {
1078 if !payload.remove_channel_invitations.is_empty() {
1079 self.channel_invitations
1080 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1081 }
1082 for channel in payload.channel_invitations {
1083 match self
1084 .channel_invitations
1085 .binary_search_by_key(&channel.id, |c| c.id.0)
1086 {
1087 Ok(ix) => {
1088 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1089 }
1090 Err(ix) => self.channel_invitations.insert(
1091 ix,
1092 Arc::new(Channel {
1093 id: ChannelId(channel.id),
1094 visibility: channel.visibility(),
1095 name: channel.name.into(),
1096 parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1097 }),
1098 ),
1099 }
1100 }
1101
1102 let channels_changed = !payload.channels.is_empty()
1103 || !payload.delete_channels.is_empty()
1104 || !payload.latest_channel_message_ids.is_empty()
1105 || !payload.latest_channel_buffer_versions.is_empty()
1106 || !payload.hosted_projects.is_empty()
1107 || !payload.deleted_hosted_projects.is_empty();
1108
1109 if channels_changed {
1110 if !payload.delete_channels.is_empty() {
1111 let delete_channels: Vec<ChannelId> =
1112 payload.delete_channels.into_iter().map(ChannelId).collect();
1113 self.channel_index.delete_channels(&delete_channels);
1114 self.channel_participants
1115 .retain(|channel_id, _| !delete_channels.contains(channel_id));
1116
1117 for channel_id in &delete_channels {
1118 let channel_id = *channel_id;
1119 if payload
1120 .channels
1121 .iter()
1122 .any(|channel| channel.id == channel_id.0)
1123 {
1124 continue;
1125 }
1126 if let Some(OpenedModelHandle::Open(buffer)) =
1127 self.opened_buffers.remove(&channel_id)
1128 {
1129 if let Some(buffer) = buffer.upgrade() {
1130 buffer.update(cx, ChannelBuffer::disconnect);
1131 }
1132 }
1133 }
1134 }
1135
1136 let mut index = self.channel_index.bulk_insert();
1137 for channel in payload.channels {
1138 let id = ChannelId(channel.id);
1139 let channel_changed = index.insert(channel);
1140
1141 if channel_changed {
1142 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1143 if let Some(buffer) = buffer.upgrade() {
1144 buffer.update(cx, ChannelBuffer::channel_changed);
1145 }
1146 }
1147 }
1148 }
1149
1150 for latest_buffer_version in payload.latest_channel_buffer_versions {
1151 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1152 self.channel_states
1153 .entry(ChannelId(latest_buffer_version.channel_id))
1154 .or_default()
1155 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1156 }
1157
1158 for latest_channel_message in payload.latest_channel_message_ids {
1159 self.channel_states
1160 .entry(ChannelId(latest_channel_message.channel_id))
1161 .or_default()
1162 .update_latest_message_id(latest_channel_message.message_id);
1163 }
1164
1165 for hosted_project in payload.hosted_projects {
1166 let hosted_project: HostedProject = hosted_project.into();
1167 if let Some(old_project) = self
1168 .hosted_projects
1169 .insert(hosted_project.project_id, hosted_project.clone())
1170 {
1171 self.channel_states
1172 .entry(old_project.channel_id)
1173 .or_default()
1174 .remove_hosted_project(old_project.project_id);
1175 }
1176 self.channel_states
1177 .entry(hosted_project.channel_id)
1178 .or_default()
1179 .add_hosted_project(hosted_project.project_id);
1180 }
1181
1182 for hosted_project_id in payload.deleted_hosted_projects {
1183 let hosted_project_id = ProjectId(hosted_project_id);
1184
1185 if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1186 self.channel_states
1187 .entry(old_project.channel_id)
1188 .or_default()
1189 .remove_hosted_project(old_project.project_id);
1190 }
1191 }
1192 }
1193
1194 cx.notify();
1195 if payload.channel_participants.is_empty() {
1196 return None;
1197 }
1198
1199 let mut all_user_ids = Vec::new();
1200 let channel_participants = payload.channel_participants;
1201 for entry in &channel_participants {
1202 for user_id in entry.participant_user_ids.iter() {
1203 if let Err(ix) = all_user_ids.binary_search(user_id) {
1204 all_user_ids.insert(ix, *user_id);
1205 }
1206 }
1207 }
1208
1209 let users = self
1210 .user_store
1211 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1212 Some(cx.spawn(|this, mut cx| async move {
1213 let users = users.await?;
1214
1215 this.update(&mut cx, |this, cx| {
1216 for entry in &channel_participants {
1217 let mut participants: Vec<_> = entry
1218 .participant_user_ids
1219 .iter()
1220 .filter_map(|user_id| {
1221 users
1222 .binary_search_by_key(&user_id, |user| &user.id)
1223 .ok()
1224 .map(|ix| users[ix].clone())
1225 })
1226 .collect();
1227
1228 participants.sort_by_key(|u| u.id);
1229
1230 this.channel_participants
1231 .insert(ChannelId(entry.channel_id), participants);
1232 }
1233
1234 cx.notify();
1235 })
1236 }))
1237 }
1238}
1239
1240impl ChannelState {
1241 fn set_role(&mut self, role: ChannelRole) {
1242 self.role = Some(role);
1243 }
1244
1245 fn has_channel_buffer_changed(&self) -> bool {
1246 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1247 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1248 && self
1249 .latest_notes_version
1250 .version
1251 .changed_since(&self.observed_notes_version.version))
1252 }
1253
1254 fn has_new_messages(&self) -> bool {
1255 let latest_message_id = self.latest_chat_message;
1256 let observed_message_id = self.observed_chat_message;
1257
1258 latest_message_id.is_some_and(|latest_message_id| {
1259 latest_message_id > observed_message_id.unwrap_or_default()
1260 })
1261 }
1262
1263 fn last_acknowledged_message_id(&self) -> Option<u64> {
1264 self.observed_chat_message
1265 }
1266
1267 fn acknowledge_message_id(&mut self, message_id: u64) {
1268 let observed = self.observed_chat_message.get_or_insert(message_id);
1269 *observed = (*observed).max(message_id);
1270 }
1271
1272 fn update_latest_message_id(&mut self, message_id: u64) {
1273 self.latest_chat_message =
1274 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1275 }
1276
1277 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1278 if self.observed_notes_version.epoch == epoch {
1279 self.observed_notes_version.version.join(version);
1280 } else {
1281 self.observed_notes_version = NotesVersion {
1282 epoch,
1283 version: version.clone(),
1284 };
1285 }
1286 }
1287
1288 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1289 if self.latest_notes_version.epoch == epoch {
1290 self.latest_notes_version.version.join(version);
1291 } else {
1292 self.latest_notes_version = NotesVersion {
1293 epoch,
1294 version: version.clone(),
1295 };
1296 }
1297 }
1298
1299 fn add_hosted_project(&mut self, project_id: ProjectId) {
1300 self.projects.insert(project_id);
1301 }
1302
1303 fn remove_hosted_project(&mut self, project_id: ProjectId) {
1304 self.projects.remove(&project_id);
1305 }
1306}