1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
11 Task, WeakModel,
12};
13use language::Capability;
14use rpc::{
15 proto::{self, ChannelRole, ChannelVisibility},
16 TypedEnvelope,
17};
18use settings::Settings;
19use std::{mem, sync::Arc, time::Duration};
20use util::{maybe, ResultExt};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
25 let channel_store =
26 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36#[derive(Debug, Clone)]
37pub struct HostedProject {
38 project_id: ProjectId,
39 channel_id: ChannelId,
40 name: SharedString,
41 _visibility: proto::ChannelVisibility,
42}
43impl From<proto::HostedProject> for HostedProject {
44 fn from(project: proto::HostedProject) -> Self {
45 Self {
46 project_id: ProjectId(project.project_id),
47 channel_id: ChannelId(project.channel_id),
48 _visibility: project.visibility(),
49 name: project.name.into(),
50 }
51 }
52}
53pub struct ChannelStore {
54 pub channel_index: ChannelIndex,
55 channel_invitations: Vec<Arc<Channel>>,
56 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
57 channel_states: HashMap<ChannelId, ChannelState>,
58 hosted_projects: HashMap<ProjectId, HostedProject>,
59
60 outgoing_invites: HashSet<(ChannelId, UserId)>,
61 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
62 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
63 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
64 client: Arc<Client>,
65 did_subscribe: bool,
66 user_store: Model<UserStore>,
67 _rpc_subscriptions: [Subscription; 2],
68 _watch_connection_status: Task<Option<()>>,
69 disconnect_channel_buffers_task: Option<Task<()>>,
70 _update_channels: Task<()>,
71}
72
73#[derive(Clone, Debug)]
74pub struct Channel {
75 pub id: ChannelId,
76 pub name: SharedString,
77 pub visibility: proto::ChannelVisibility,
78 pub parent_path: Vec<ChannelId>,
79}
80
81#[derive(Default, Debug)]
82pub struct ChannelState {
83 latest_chat_message: Option<u64>,
84 latest_notes_version: NotesVersion,
85 observed_notes_version: NotesVersion,
86 observed_chat_message: Option<u64>,
87 role: Option<ChannelRole>,
88 projects: HashSet<ProjectId>,
89}
90
91impl Channel {
92 pub fn link(&self, cx: &AppContext) -> String {
93 format!(
94 "{}/channel/{}-{}",
95 ClientSettings::get_global(cx).server_url,
96 Self::slug(&self.name),
97 self.id
98 )
99 }
100
101 pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
102 self.link(cx)
103 + "/notes"
104 + &heading
105 .map(|h| format!("#{}", Self::slug(&h)))
106 .unwrap_or_default()
107 }
108
109 pub fn is_root_channel(&self) -> bool {
110 self.parent_path.is_empty()
111 }
112
113 pub fn root_id(&self) -> ChannelId {
114 self.parent_path.first().copied().unwrap_or(self.id)
115 }
116
117 pub fn slug(str: &str) -> String {
118 let slug: String = str
119 .chars()
120 .map(|c| if c.is_alphanumeric() { c } else { '-' })
121 .collect();
122
123 slug.trim_matches(|c| c == '-').to_string()
124 }
125}
126
127#[derive(Debug)]
128pub struct ChannelMembership {
129 pub user: Arc<User>,
130 pub kind: proto::channel_member::Kind,
131 pub role: proto::ChannelRole,
132}
133impl ChannelMembership {
134 pub fn sort_key(&self) -> MembershipSortKey {
135 MembershipSortKey {
136 role_order: match self.role {
137 proto::ChannelRole::Admin => 0,
138 proto::ChannelRole::Member => 1,
139 proto::ChannelRole::Banned => 2,
140 proto::ChannelRole::Talker => 3,
141 proto::ChannelRole::Guest => 4,
142 },
143 kind_order: match self.kind {
144 proto::channel_member::Kind::Member => 0,
145 proto::channel_member::Kind::Invitee => 1,
146 },
147 username_order: self.user.github_login.as_str(),
148 }
149 }
150}
151
152#[derive(PartialOrd, Ord, PartialEq, Eq)]
153pub struct MembershipSortKey<'a> {
154 role_order: u8,
155 kind_order: u8,
156 username_order: &'a str,
157}
158
159pub enum ChannelEvent {
160 ChannelCreated(ChannelId),
161 ChannelRenamed(ChannelId),
162}
163
164impl EventEmitter<ChannelEvent> for ChannelStore {}
165
166enum OpenedModelHandle<E> {
167 Open(WeakModel<E>),
168 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
169}
170
171struct GlobalChannelStore(Model<ChannelStore>);
172
173impl Global for GlobalChannelStore {}
174
175impl ChannelStore {
176 pub fn global(cx: &AppContext) -> Model<Self> {
177 cx.global::<GlobalChannelStore>().0.clone()
178 }
179
180 pub fn new(
181 client: Arc<Client>,
182 user_store: Model<UserStore>,
183 cx: &mut ModelContext<Self>,
184 ) -> Self {
185 let rpc_subscriptions = [
186 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
187 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
188 ];
189
190 let mut connection_status = client.status();
191 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
192 let watch_connection_status = cx.spawn(|this, mut cx| async move {
193 while let Some(status) = connection_status.next().await {
194 let this = this.upgrade()?;
195 match status {
196 client::Status::Connected { .. } => {
197 this.update(&mut cx, |this, cx| this.handle_connect(cx))
198 .ok()?
199 .await
200 .log_err()?;
201 }
202 client::Status::SignedOut | client::Status::UpgradeRequired => {
203 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
204 .ok();
205 }
206 _ => {
207 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
208 .ok();
209 }
210 }
211 }
212 Some(())
213 });
214
215 Self {
216 channel_invitations: Vec::default(),
217 channel_index: ChannelIndex::default(),
218 channel_participants: Default::default(),
219 hosted_projects: Default::default(),
220 outgoing_invites: Default::default(),
221 opened_buffers: Default::default(),
222 opened_chats: Default::default(),
223 update_channels_tx,
224 client,
225 user_store,
226 _rpc_subscriptions: rpc_subscriptions,
227 _watch_connection_status: watch_connection_status,
228 disconnect_channel_buffers_task: None,
229 _update_channels: cx.spawn(|this, mut cx| async move {
230 maybe!(async move {
231 while let Some(update_channels) = update_channels_rx.next().await {
232 if let Some(this) = this.upgrade() {
233 let update_task = this.update(&mut cx, |this, cx| {
234 this.update_channels(update_channels, cx)
235 })?;
236 if let Some(update_task) = update_task {
237 update_task.await.log_err();
238 }
239 }
240 }
241 anyhow::Ok(())
242 })
243 .await
244 .log_err();
245 }),
246 channel_states: Default::default(),
247 did_subscribe: false,
248 }
249 }
250
251 pub fn initialize(&mut self) {
252 if !self.did_subscribe {
253 if self
254 .client
255 .send(proto::SubscribeToChannels {})
256 .log_err()
257 .is_some()
258 {
259 self.did_subscribe = true;
260 }
261 }
262 }
263
264 pub fn client(&self) -> Arc<Client> {
265 self.client.clone()
266 }
267
268 /// Returns the number of unique channels in the store
269 pub fn channel_count(&self) -> usize {
270 self.channel_index.by_id().len()
271 }
272
273 /// Returns the index of a channel ID in the list of unique channels
274 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
275 self.channel_index
276 .by_id()
277 .keys()
278 .position(|id| *id == channel_id)
279 }
280
281 /// Returns an iterator over all unique channels
282 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
283 self.channel_index.by_id().values()
284 }
285
286 /// Iterate over all entries in the channel DAG
287 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
288 self.channel_index
289 .ordered_channels()
290 .iter()
291 .filter_map(move |id| {
292 let channel = self.channel_index.by_id().get(id)?;
293 Some((channel.parent_path.len(), channel))
294 })
295 }
296
297 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
298 let channel_id = self.channel_index.ordered_channels().get(ix)?;
299 self.channel_index.by_id().get(channel_id)
300 }
301
302 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
303 self.channel_index.by_id().values().nth(ix)
304 }
305
306 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
307 self.channel_invitations
308 .iter()
309 .any(|channel| channel.id == channel_id)
310 }
311
312 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
313 &self.channel_invitations
314 }
315
316 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
317 self.channel_index.by_id().get(&channel_id)
318 }
319
320 pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
321 let mut projects: Vec<(SharedString, ProjectId)> = self
322 .channel_states
323 .get(&channel_id)
324 .map(|state| state.projects.clone())
325 .unwrap_or_default()
326 .into_iter()
327 .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
328 .collect();
329 projects.sort();
330 projects
331 }
332
333 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
334 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
335 if let OpenedModelHandle::Open(buffer) = buffer {
336 return buffer.upgrade().is_some();
337 }
338 }
339 false
340 }
341
342 pub fn open_channel_buffer(
343 &mut self,
344 channel_id: ChannelId,
345 cx: &mut ModelContext<Self>,
346 ) -> Task<Result<Model<ChannelBuffer>>> {
347 let client = self.client.clone();
348 let user_store = self.user_store.clone();
349 let channel_store = cx.handle();
350 self.open_channel_resource(
351 channel_id,
352 |this| &mut this.opened_buffers,
353 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
354 cx,
355 )
356 }
357
358 pub fn fetch_channel_messages(
359 &self,
360 message_ids: Vec<u64>,
361 cx: &mut ModelContext<Self>,
362 ) -> Task<Result<Vec<ChannelMessage>>> {
363 let request = if message_ids.is_empty() {
364 None
365 } else {
366 Some(
367 self.client
368 .request(proto::GetChannelMessagesById { message_ids }),
369 )
370 };
371 cx.spawn(|this, mut cx| async move {
372 if let Some(request) = request {
373 let response = request.await?;
374 let this = this
375 .upgrade()
376 .ok_or_else(|| anyhow!("channel store dropped"))?;
377 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
378 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
379 } else {
380 Ok(Vec::new())
381 }
382 })
383 }
384
385 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
386 self.channel_states
387 .get(&channel_id)
388 .is_some_and(|state| state.has_channel_buffer_changed())
389 }
390
391 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
392 self.channel_states
393 .get(&channel_id)
394 .is_some_and(|state| state.has_new_messages())
395 }
396
397 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
398 if let Some(state) = self.channel_states.get_mut(&channel_id) {
399 state.latest_chat_message = message_id;
400 }
401 }
402
403 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
404 self.channel_states.get(&channel_id).and_then(|state| {
405 if let Some(last_message_id) = state.latest_chat_message {
406 if state
407 .last_acknowledged_message_id()
408 .is_some_and(|id| id < last_message_id)
409 {
410 return state.last_acknowledged_message_id();
411 }
412 }
413
414 None
415 })
416 }
417
418 pub fn acknowledge_message_id(
419 &mut self,
420 channel_id: ChannelId,
421 message_id: u64,
422 cx: &mut ModelContext<Self>,
423 ) {
424 self.channel_states
425 .entry(channel_id)
426 .or_insert_with(|| Default::default())
427 .acknowledge_message_id(message_id);
428 cx.notify();
429 }
430
431 pub fn update_latest_message_id(
432 &mut self,
433 channel_id: ChannelId,
434 message_id: u64,
435 cx: &mut ModelContext<Self>,
436 ) {
437 self.channel_states
438 .entry(channel_id)
439 .or_insert_with(|| Default::default())
440 .update_latest_message_id(message_id);
441 cx.notify();
442 }
443
444 pub fn acknowledge_notes_version(
445 &mut self,
446 channel_id: ChannelId,
447 epoch: u64,
448 version: &clock::Global,
449 cx: &mut ModelContext<Self>,
450 ) {
451 self.channel_states
452 .entry(channel_id)
453 .or_insert_with(|| Default::default())
454 .acknowledge_notes_version(epoch, version);
455 cx.notify()
456 }
457
458 pub fn update_latest_notes_version(
459 &mut self,
460 channel_id: ChannelId,
461 epoch: u64,
462 version: &clock::Global,
463 cx: &mut ModelContext<Self>,
464 ) {
465 self.channel_states
466 .entry(channel_id)
467 .or_insert_with(|| Default::default())
468 .update_latest_notes_version(epoch, version);
469 cx.notify()
470 }
471
472 pub fn open_channel_chat(
473 &mut self,
474 channel_id: ChannelId,
475 cx: &mut ModelContext<Self>,
476 ) -> Task<Result<Model<ChannelChat>>> {
477 let client = self.client.clone();
478 let user_store = self.user_store.clone();
479 let this = cx.handle();
480 self.open_channel_resource(
481 channel_id,
482 |this| &mut this.opened_chats,
483 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
484 cx,
485 )
486 }
487
488 /// Asynchronously open a given resource associated with a channel.
489 ///
490 /// Make sure that the resource is only opened once, even if this method
491 /// is called multiple times with the same channel id while the first task
492 /// is still running.
493 fn open_channel_resource<T, F, Fut>(
494 &mut self,
495 channel_id: ChannelId,
496 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
497 load: F,
498 cx: &mut ModelContext<Self>,
499 ) -> Task<Result<Model<T>>>
500 where
501 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
502 Fut: Future<Output = Result<Model<T>>>,
503 T: 'static,
504 {
505 let task = loop {
506 match get_map(self).entry(channel_id) {
507 hash_map::Entry::Occupied(e) => match e.get() {
508 OpenedModelHandle::Open(model) => {
509 if let Some(model) = model.upgrade() {
510 break Task::ready(Ok(model)).shared();
511 } else {
512 get_map(self).remove(&channel_id);
513 continue;
514 }
515 }
516 OpenedModelHandle::Loading(task) => {
517 break task.clone();
518 }
519 },
520 hash_map::Entry::Vacant(e) => {
521 let task = cx
522 .spawn(move |this, mut cx| async move {
523 let channel = this.update(&mut cx, |this, _| {
524 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
525 Arc::new(anyhow!("no channel for id: {}", channel_id))
526 })
527 })??;
528
529 load(channel, cx).await.map_err(Arc::new)
530 })
531 .shared();
532
533 e.insert(OpenedModelHandle::Loading(task.clone()));
534 cx.spawn({
535 let task = task.clone();
536 move |this, mut cx| async move {
537 let result = task.await;
538 this.update(&mut cx, |this, _| match result {
539 Ok(model) => {
540 get_map(this).insert(
541 channel_id,
542 OpenedModelHandle::Open(model.downgrade()),
543 );
544 }
545 Err(_) => {
546 get_map(this).remove(&channel_id);
547 }
548 })
549 .ok();
550 }
551 })
552 .detach();
553 break task;
554 }
555 }
556 };
557 cx.background_executor()
558 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
559 }
560
561 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
562 self.channel_role(channel_id) == proto::ChannelRole::Admin
563 }
564
565 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
566 self.channel_index
567 .by_id()
568 .get(&channel_id)
569 .map_or(false, |channel| channel.is_root_channel())
570 }
571
572 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
573 self.channel_index
574 .by_id()
575 .get(&channel_id)
576 .map_or(false, |channel| {
577 channel.visibility == ChannelVisibility::Public
578 })
579 }
580
581 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
582 match self.channel_role(channel_id) {
583 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
584 _ => Capability::ReadOnly,
585 }
586 }
587
588 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
589 maybe!({
590 let mut channel = self.channel_for_id(channel_id)?;
591 if !channel.is_root_channel() {
592 channel = self.channel_for_id(channel.root_id())?;
593 }
594 let root_channel_state = self.channel_states.get(&channel.id);
595 root_channel_state?.role
596 })
597 .unwrap_or(proto::ChannelRole::Guest)
598 }
599
600 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
601 self.channel_participants
602 .get(&channel_id)
603 .map_or(&[], |v| v.as_slice())
604 }
605
606 pub fn create_channel(
607 &self,
608 name: &str,
609 parent_id: Option<ChannelId>,
610 cx: &mut ModelContext<Self>,
611 ) -> Task<Result<ChannelId>> {
612 let client = self.client.clone();
613 let name = name.trim_start_matches('#').to_owned();
614 cx.spawn(move |this, mut cx| async move {
615 let response = client
616 .request(proto::CreateChannel {
617 name,
618 parent_id: parent_id.map(|cid| cid.0),
619 })
620 .await?;
621
622 let channel = response
623 .channel
624 .ok_or_else(|| anyhow!("missing channel in response"))?;
625 let channel_id = ChannelId(channel.id);
626
627 this.update(&mut cx, |this, cx| {
628 let task = this.update_channels(
629 proto::UpdateChannels {
630 channels: vec![channel],
631 ..Default::default()
632 },
633 cx,
634 );
635 assert!(task.is_none());
636
637 // This event is emitted because the collab panel wants to clear the pending edit state
638 // before this frame is rendered. But we can't guarantee that the collab panel's future
639 // will resolve before this flush_effects finishes. Synchronously emitting this event
640 // ensures that the collab panel will observe this creation before the frame completes
641 cx.emit(ChannelEvent::ChannelCreated(channel_id));
642 })?;
643
644 Ok(channel_id)
645 })
646 }
647
648 pub fn move_channel(
649 &mut self,
650 channel_id: ChannelId,
651 to: ChannelId,
652 cx: &mut ModelContext<Self>,
653 ) -> Task<Result<()>> {
654 let client = self.client.clone();
655 cx.spawn(move |_, _| async move {
656 let _ = client
657 .request(proto::MoveChannel {
658 channel_id: channel_id.0,
659 to: to.0,
660 })
661 .await?;
662
663 Ok(())
664 })
665 }
666
667 pub fn set_channel_visibility(
668 &mut self,
669 channel_id: ChannelId,
670 visibility: ChannelVisibility,
671 cx: &mut ModelContext<Self>,
672 ) -> Task<Result<()>> {
673 let client = self.client.clone();
674 cx.spawn(move |_, _| async move {
675 let _ = client
676 .request(proto::SetChannelVisibility {
677 channel_id: channel_id.0,
678 visibility: visibility.into(),
679 })
680 .await?;
681
682 Ok(())
683 })
684 }
685
686 pub fn invite_member(
687 &mut self,
688 channel_id: ChannelId,
689 user_id: UserId,
690 role: proto::ChannelRole,
691 cx: &mut ModelContext<Self>,
692 ) -> Task<Result<()>> {
693 if !self.outgoing_invites.insert((channel_id, user_id)) {
694 return Task::ready(Err(anyhow!("invite request already in progress")));
695 }
696
697 cx.notify();
698 let client = self.client.clone();
699 cx.spawn(move |this, mut cx| async move {
700 let result = client
701 .request(proto::InviteChannelMember {
702 channel_id: channel_id.0,
703 user_id,
704 role: role.into(),
705 })
706 .await;
707
708 this.update(&mut cx, |this, cx| {
709 this.outgoing_invites.remove(&(channel_id, user_id));
710 cx.notify();
711 })?;
712
713 result?;
714
715 Ok(())
716 })
717 }
718
719 pub fn remove_member(
720 &mut self,
721 channel_id: ChannelId,
722 user_id: u64,
723 cx: &mut ModelContext<Self>,
724 ) -> Task<Result<()>> {
725 if !self.outgoing_invites.insert((channel_id, user_id)) {
726 return Task::ready(Err(anyhow!("invite request already in progress")));
727 }
728
729 cx.notify();
730 let client = self.client.clone();
731 cx.spawn(move |this, mut cx| async move {
732 let result = client
733 .request(proto::RemoveChannelMember {
734 channel_id: channel_id.0,
735 user_id,
736 })
737 .await;
738
739 this.update(&mut cx, |this, cx| {
740 this.outgoing_invites.remove(&(channel_id, user_id));
741 cx.notify();
742 })?;
743 result?;
744 Ok(())
745 })
746 }
747
748 pub fn set_member_role(
749 &mut self,
750 channel_id: ChannelId,
751 user_id: UserId,
752 role: proto::ChannelRole,
753 cx: &mut ModelContext<Self>,
754 ) -> Task<Result<()>> {
755 if !self.outgoing_invites.insert((channel_id, user_id)) {
756 return Task::ready(Err(anyhow!("member request already in progress")));
757 }
758
759 cx.notify();
760 let client = self.client.clone();
761 cx.spawn(move |this, mut cx| async move {
762 let result = client
763 .request(proto::SetChannelMemberRole {
764 channel_id: channel_id.0,
765 user_id,
766 role: role.into(),
767 })
768 .await;
769
770 this.update(&mut cx, |this, cx| {
771 this.outgoing_invites.remove(&(channel_id, user_id));
772 cx.notify();
773 })?;
774
775 result?;
776 Ok(())
777 })
778 }
779
780 pub fn rename(
781 &mut self,
782 channel_id: ChannelId,
783 new_name: &str,
784 cx: &mut ModelContext<Self>,
785 ) -> Task<Result<()>> {
786 let client = self.client.clone();
787 let name = new_name.to_string();
788 cx.spawn(move |this, mut cx| async move {
789 let channel = client
790 .request(proto::RenameChannel {
791 channel_id: channel_id.0,
792 name,
793 })
794 .await?
795 .channel
796 .ok_or_else(|| anyhow!("missing channel in response"))?;
797 this.update(&mut cx, |this, cx| {
798 let task = this.update_channels(
799 proto::UpdateChannels {
800 channels: vec![channel],
801 ..Default::default()
802 },
803 cx,
804 );
805 assert!(task.is_none());
806
807 // This event is emitted because the collab panel wants to clear the pending edit state
808 // before this frame is rendered. But we can't guarantee that the collab panel's future
809 // will resolve before this flush_effects finishes. Synchronously emitting this event
810 // ensures that the collab panel will observe this creation before the frame complete
811 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
812 })?;
813 Ok(())
814 })
815 }
816
817 pub fn respond_to_channel_invite(
818 &mut self,
819 channel_id: ChannelId,
820 accept: bool,
821 cx: &mut ModelContext<Self>,
822 ) -> Task<Result<()>> {
823 let client = self.client.clone();
824 cx.background_executor().spawn(async move {
825 client
826 .request(proto::RespondToChannelInvite {
827 channel_id: channel_id.0,
828 accept,
829 })
830 .await?;
831 Ok(())
832 })
833 }
834 pub fn fuzzy_search_members(
835 &self,
836 channel_id: ChannelId,
837 query: String,
838 limit: u16,
839 cx: &mut ModelContext<Self>,
840 ) -> Task<Result<Vec<ChannelMembership>>> {
841 let client = self.client.clone();
842 let user_store = self.user_store.downgrade();
843 cx.spawn(move |_, mut cx| async move {
844 let response = client
845 .request(proto::GetChannelMembers {
846 channel_id: channel_id.0,
847 query,
848 limit: limit as u64,
849 })
850 .await?;
851 user_store.update(&mut cx, |user_store, _| {
852 user_store.insert(response.users);
853 response
854 .members
855 .into_iter()
856 .filter_map(|member| {
857 Some(ChannelMembership {
858 user: user_store.get_cached_user(member.user_id)?,
859 role: member.role(),
860 kind: member.kind(),
861 })
862 })
863 .collect()
864 })
865 })
866 }
867
868 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
869 let client = self.client.clone();
870 async move {
871 client
872 .request(proto::DeleteChannel {
873 channel_id: channel_id.0,
874 })
875 .await?;
876 Ok(())
877 }
878 }
879
880 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
881 false
882 }
883
884 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
885 self.outgoing_invites.contains(&(channel_id, user_id))
886 }
887
888 async fn handle_update_channels(
889 this: Model<Self>,
890 message: TypedEnvelope<proto::UpdateChannels>,
891 mut cx: AsyncAppContext,
892 ) -> Result<()> {
893 this.update(&mut cx, |this, _| {
894 this.update_channels_tx
895 .unbounded_send(message.payload)
896 .unwrap();
897 })?;
898 Ok(())
899 }
900
901 async fn handle_update_user_channels(
902 this: Model<Self>,
903 message: TypedEnvelope<proto::UpdateUserChannels>,
904 mut cx: AsyncAppContext,
905 ) -> Result<()> {
906 this.update(&mut cx, |this, cx| {
907 for buffer_version in message.payload.observed_channel_buffer_version {
908 let version = language::proto::deserialize_version(&buffer_version.version);
909 this.acknowledge_notes_version(
910 ChannelId(buffer_version.channel_id),
911 buffer_version.epoch,
912 &version,
913 cx,
914 );
915 }
916 for message_id in message.payload.observed_channel_message_id {
917 this.acknowledge_message_id(
918 ChannelId(message_id.channel_id),
919 message_id.message_id,
920 cx,
921 );
922 }
923 for membership in message.payload.channel_memberships {
924 if let Some(role) = ChannelRole::from_i32(membership.role) {
925 this.channel_states
926 .entry(ChannelId(membership.channel_id))
927 .or_insert_with(|| ChannelState::default())
928 .set_role(role)
929 }
930 }
931 })
932 }
933
934 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
935 self.channel_index.clear();
936 self.channel_invitations.clear();
937 self.channel_participants.clear();
938 self.channel_index.clear();
939 self.outgoing_invites.clear();
940 self.disconnect_channel_buffers_task.take();
941
942 for chat in self.opened_chats.values() {
943 if let OpenedModelHandle::Open(chat) = chat {
944 if let Some(chat) = chat.upgrade() {
945 chat.update(cx, |chat, cx| {
946 chat.rejoin(cx);
947 });
948 }
949 }
950 }
951
952 let mut buffer_versions = Vec::new();
953 for buffer in self.opened_buffers.values() {
954 if let OpenedModelHandle::Open(buffer) = buffer {
955 if let Some(buffer) = buffer.upgrade() {
956 let channel_buffer = buffer.read(cx);
957 let buffer = channel_buffer.buffer().read(cx);
958 buffer_versions.push(proto::ChannelBufferVersion {
959 channel_id: channel_buffer.channel_id.0,
960 epoch: channel_buffer.epoch(),
961 version: language::proto::serialize_version(&buffer.version()),
962 });
963 }
964 }
965 }
966
967 if buffer_versions.is_empty() {
968 return Task::ready(Ok(()));
969 }
970
971 let response = self.client.request(proto::RejoinChannelBuffers {
972 buffers: buffer_versions,
973 });
974
975 cx.spawn(|this, mut cx| async move {
976 let mut response = response.await?;
977
978 this.update(&mut cx, |this, cx| {
979 this.opened_buffers.retain(|_, buffer| match buffer {
980 OpenedModelHandle::Open(channel_buffer) => {
981 let Some(channel_buffer) = channel_buffer.upgrade() else {
982 return false;
983 };
984
985 channel_buffer.update(cx, |channel_buffer, cx| {
986 let channel_id = channel_buffer.channel_id;
987 if let Some(remote_buffer) = response
988 .buffers
989 .iter_mut()
990 .find(|buffer| buffer.channel_id == channel_id.0)
991 {
992 let channel_id = channel_buffer.channel_id;
993 let remote_version =
994 language::proto::deserialize_version(&remote_buffer.version);
995
996 channel_buffer.replace_collaborators(
997 mem::take(&mut remote_buffer.collaborators),
998 cx,
999 );
1000
1001 let operations = channel_buffer
1002 .buffer()
1003 .update(cx, |buffer, cx| {
1004 let outgoing_operations =
1005 buffer.serialize_ops(Some(remote_version), cx);
1006 let incoming_operations =
1007 mem::take(&mut remote_buffer.operations)
1008 .into_iter()
1009 .map(language::proto::deserialize_operation)
1010 .collect::<Result<Vec<_>>>()?;
1011 buffer.apply_ops(incoming_operations, cx)?;
1012 anyhow::Ok(outgoing_operations)
1013 })
1014 .log_err();
1015
1016 if let Some(operations) = operations {
1017 let client = this.client.clone();
1018 cx.background_executor()
1019 .spawn(async move {
1020 let operations = operations.await;
1021 for chunk in
1022 language::proto::split_operations(operations)
1023 {
1024 client
1025 .send(proto::UpdateChannelBuffer {
1026 channel_id: channel_id.0,
1027 operations: chunk,
1028 })
1029 .ok();
1030 }
1031 })
1032 .detach();
1033 return true;
1034 }
1035 }
1036
1037 channel_buffer.disconnect(cx);
1038 false
1039 })
1040 }
1041 OpenedModelHandle::Loading(_) => true,
1042 });
1043 })
1044 .ok();
1045 anyhow::Ok(())
1046 })
1047 }
1048
1049 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1050 cx.notify();
1051 self.did_subscribe = false;
1052 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1053 cx.spawn(move |this, mut cx| async move {
1054 if wait_for_reconnect {
1055 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1056 }
1057
1058 if let Some(this) = this.upgrade() {
1059 this.update(&mut cx, |this, cx| {
1060 for (_, buffer) in this.opened_buffers.drain() {
1061 if let OpenedModelHandle::Open(buffer) = buffer {
1062 if let Some(buffer) = buffer.upgrade() {
1063 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1064 }
1065 }
1066 }
1067 })
1068 .ok();
1069 }
1070 })
1071 });
1072 }
1073
1074 pub(crate) fn update_channels(
1075 &mut self,
1076 payload: proto::UpdateChannels,
1077 cx: &mut ModelContext<ChannelStore>,
1078 ) -> Option<Task<Result<()>>> {
1079 if !payload.remove_channel_invitations.is_empty() {
1080 self.channel_invitations
1081 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1082 }
1083 for channel in payload.channel_invitations {
1084 match self
1085 .channel_invitations
1086 .binary_search_by_key(&channel.id, |c| c.id.0)
1087 {
1088 Ok(ix) => {
1089 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1090 }
1091 Err(ix) => self.channel_invitations.insert(
1092 ix,
1093 Arc::new(Channel {
1094 id: ChannelId(channel.id),
1095 visibility: channel.visibility(),
1096 name: channel.name.into(),
1097 parent_path: channel
1098 .parent_path
1099 .into_iter()
1100 .map(|cid| ChannelId(cid))
1101 .collect(),
1102 }),
1103 ),
1104 }
1105 }
1106
1107 let channels_changed = !payload.channels.is_empty()
1108 || !payload.delete_channels.is_empty()
1109 || !payload.latest_channel_message_ids.is_empty()
1110 || !payload.latest_channel_buffer_versions.is_empty()
1111 || !payload.hosted_projects.is_empty()
1112 || !payload.deleted_hosted_projects.is_empty();
1113
1114 if channels_changed {
1115 if !payload.delete_channels.is_empty() {
1116 let delete_channels: Vec<ChannelId> = payload
1117 .delete_channels
1118 .into_iter()
1119 .map(|cid| ChannelId(cid))
1120 .collect();
1121 self.channel_index.delete_channels(&delete_channels);
1122 self.channel_participants
1123 .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1124
1125 for channel_id in &delete_channels {
1126 let channel_id = *channel_id;
1127 if payload
1128 .channels
1129 .iter()
1130 .any(|channel| channel.id == channel_id.0)
1131 {
1132 continue;
1133 }
1134 if let Some(OpenedModelHandle::Open(buffer)) =
1135 self.opened_buffers.remove(&channel_id)
1136 {
1137 if let Some(buffer) = buffer.upgrade() {
1138 buffer.update(cx, ChannelBuffer::disconnect);
1139 }
1140 }
1141 }
1142 }
1143
1144 let mut index = self.channel_index.bulk_insert();
1145 for channel in payload.channels {
1146 let id = ChannelId(channel.id);
1147 let channel_changed = index.insert(channel);
1148
1149 if channel_changed {
1150 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1151 if let Some(buffer) = buffer.upgrade() {
1152 buffer.update(cx, ChannelBuffer::channel_changed);
1153 }
1154 }
1155 }
1156 }
1157
1158 for latest_buffer_version in payload.latest_channel_buffer_versions {
1159 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1160 self.channel_states
1161 .entry(ChannelId(latest_buffer_version.channel_id))
1162 .or_default()
1163 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1164 }
1165
1166 for latest_channel_message in payload.latest_channel_message_ids {
1167 self.channel_states
1168 .entry(ChannelId(latest_channel_message.channel_id))
1169 .or_default()
1170 .update_latest_message_id(latest_channel_message.message_id);
1171 }
1172
1173 for hosted_project in payload.hosted_projects {
1174 let hosted_project: HostedProject = hosted_project.into();
1175 if let Some(old_project) = self
1176 .hosted_projects
1177 .insert(hosted_project.project_id, hosted_project.clone())
1178 {
1179 self.channel_states
1180 .entry(old_project.channel_id)
1181 .or_default()
1182 .remove_hosted_project(old_project.project_id);
1183 }
1184 self.channel_states
1185 .entry(hosted_project.channel_id)
1186 .or_default()
1187 .add_hosted_project(hosted_project.project_id);
1188 }
1189
1190 for hosted_project_id in payload.deleted_hosted_projects {
1191 let hosted_project_id = ProjectId(hosted_project_id);
1192
1193 if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1194 self.channel_states
1195 .entry(old_project.channel_id)
1196 .or_default()
1197 .remove_hosted_project(old_project.project_id);
1198 }
1199 }
1200 }
1201
1202 cx.notify();
1203 if payload.channel_participants.is_empty() {
1204 return None;
1205 }
1206
1207 let mut all_user_ids = Vec::new();
1208 let channel_participants = payload.channel_participants;
1209 for entry in &channel_participants {
1210 for user_id in entry.participant_user_ids.iter() {
1211 if let Err(ix) = all_user_ids.binary_search(user_id) {
1212 all_user_ids.insert(ix, *user_id);
1213 }
1214 }
1215 }
1216
1217 let users = self
1218 .user_store
1219 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1220 Some(cx.spawn(|this, mut cx| async move {
1221 let users = users.await?;
1222
1223 this.update(&mut cx, |this, cx| {
1224 for entry in &channel_participants {
1225 let mut participants: Vec<_> = entry
1226 .participant_user_ids
1227 .iter()
1228 .filter_map(|user_id| {
1229 users
1230 .binary_search_by_key(&user_id, |user| &user.id)
1231 .ok()
1232 .map(|ix| users[ix].clone())
1233 })
1234 .collect();
1235
1236 participants.sort_by_key(|u| u.id);
1237
1238 this.channel_participants
1239 .insert(ChannelId(entry.channel_id), participants);
1240 }
1241
1242 cx.notify();
1243 })
1244 }))
1245 }
1246}
1247
1248impl ChannelState {
1249 fn set_role(&mut self, role: ChannelRole) {
1250 self.role = Some(role);
1251 }
1252
1253 fn has_channel_buffer_changed(&self) -> bool {
1254 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1255 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1256 && self
1257 .latest_notes_version
1258 .version
1259 .changed_since(&self.observed_notes_version.version))
1260 }
1261
1262 fn has_new_messages(&self) -> bool {
1263 let latest_message_id = self.latest_chat_message;
1264 let observed_message_id = self.observed_chat_message;
1265
1266 latest_message_id.is_some_and(|latest_message_id| {
1267 latest_message_id > observed_message_id.unwrap_or_default()
1268 })
1269 }
1270
1271 fn last_acknowledged_message_id(&self) -> Option<u64> {
1272 self.observed_chat_message
1273 }
1274
1275 fn acknowledge_message_id(&mut self, message_id: u64) {
1276 let observed = self.observed_chat_message.get_or_insert(message_id);
1277 *observed = (*observed).max(message_id);
1278 }
1279
1280 fn update_latest_message_id(&mut self, message_id: u64) {
1281 self.latest_chat_message =
1282 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1283 }
1284
1285 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1286 if self.observed_notes_version.epoch == epoch {
1287 self.observed_notes_version.version.join(version);
1288 } else {
1289 self.observed_notes_version = NotesVersion {
1290 epoch,
1291 version: version.clone(),
1292 };
1293 }
1294 }
1295
1296 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1297 if self.latest_notes_version.epoch == epoch {
1298 self.latest_notes_version.version.join(version);
1299 } else {
1300 self.latest_notes_version = NotesVersion {
1301 epoch,
1302 version: version.clone(),
1303 };
1304 }
1305 }
1306
1307 fn add_hosted_project(&mut self, project_id: ProjectId) {
1308 self.projects.insert(project_id);
1309 }
1310
1311 fn remove_hosted_project(&mut self, project_id: ProjectId) {
1312 self.projects.remove(&project_id);
1313 }
1314}