1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
11 Task, WeakModel,
12};
13use language::Capability;
14use rpc::{
15 proto::{self, ChannelRole, ChannelVisibility},
16 TypedEnvelope,
17};
18use settings::Settings;
19use std::{mem, sync::Arc, time::Duration};
20use util::{maybe, ResultExt};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
25 let channel_store =
26 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36#[derive(Debug, Clone)]
37pub struct HostedProject {
38 project_id: ProjectId,
39 channel_id: ChannelId,
40 name: SharedString,
41 _visibility: proto::ChannelVisibility,
42}
43impl From<proto::HostedProject> for HostedProject {
44 fn from(project: proto::HostedProject) -> Self {
45 Self {
46 project_id: ProjectId(project.project_id),
47 channel_id: ChannelId(project.channel_id),
48 _visibility: project.visibility(),
49 name: project.name.into(),
50 }
51 }
52}
53pub struct ChannelStore {
54 pub channel_index: ChannelIndex,
55 channel_invitations: Vec<Arc<Channel>>,
56 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
57 channel_states: HashMap<ChannelId, ChannelState>,
58 hosted_projects: HashMap<ProjectId, HostedProject>,
59
60 outgoing_invites: HashSet<(ChannelId, UserId)>,
61 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
62 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
63 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
64 client: Arc<Client>,
65 did_subscribe: bool,
66 user_store: Model<UserStore>,
67 _rpc_subscriptions: [Subscription; 2],
68 _watch_connection_status: Task<Option<()>>,
69 disconnect_channel_buffers_task: Option<Task<()>>,
70 _update_channels: Task<()>,
71}
72
73#[derive(Clone, Debug)]
74pub struct Channel {
75 pub id: ChannelId,
76 pub name: SharedString,
77 pub visibility: proto::ChannelVisibility,
78 pub parent_path: Vec<ChannelId>,
79}
80
81#[derive(Default, Debug)]
82pub struct ChannelState {
83 latest_chat_message: Option<u64>,
84 latest_notes_version: NotesVersion,
85 observed_notes_version: NotesVersion,
86 observed_chat_message: Option<u64>,
87 role: Option<ChannelRole>,
88 projects: HashSet<ProjectId>,
89}
90
91impl Channel {
92 pub fn link(&self, cx: &AppContext) -> String {
93 format!(
94 "{}/channel/{}-{}",
95 ClientSettings::get_global(cx).server_url,
96 Self::slug(&self.name),
97 self.id
98 )
99 }
100
101 pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
102 self.link(cx)
103 + "/notes"
104 + &heading
105 .map(|h| format!("#{}", Self::slug(&h)))
106 .unwrap_or_default()
107 }
108
109 pub fn is_root_channel(&self) -> bool {
110 self.parent_path.is_empty()
111 }
112
113 pub fn root_id(&self) -> ChannelId {
114 self.parent_path.first().copied().unwrap_or(self.id)
115 }
116
117 pub fn slug(str: &str) -> String {
118 let slug: String = str
119 .chars()
120 .map(|c| if c.is_alphanumeric() { c } else { '-' })
121 .collect();
122
123 slug.trim_matches(|c| c == '-').to_string()
124 }
125}
126
127#[derive(Debug)]
128pub struct ChannelMembership {
129 pub user: Arc<User>,
130 pub kind: proto::channel_member::Kind,
131 pub role: proto::ChannelRole,
132}
133impl ChannelMembership {
134 pub fn sort_key(&self) -> MembershipSortKey {
135 MembershipSortKey {
136 role_order: match self.role {
137 proto::ChannelRole::Admin => 0,
138 proto::ChannelRole::Member => 1,
139 proto::ChannelRole::Banned => 2,
140 proto::ChannelRole::Talker => 3,
141 proto::ChannelRole::Guest => 4,
142 },
143 kind_order: match self.kind {
144 proto::channel_member::Kind::Member => 0,
145 proto::channel_member::Kind::Invitee => 1,
146 },
147 username_order: self.user.github_login.as_str(),
148 }
149 }
150}
151
152#[derive(PartialOrd, Ord, PartialEq, Eq)]
153pub struct MembershipSortKey<'a> {
154 role_order: u8,
155 kind_order: u8,
156 username_order: &'a str,
157}
158
159pub enum ChannelEvent {
160 ChannelCreated(ChannelId),
161 ChannelRenamed(ChannelId),
162}
163
164impl EventEmitter<ChannelEvent> for ChannelStore {}
165
166enum OpenedModelHandle<E> {
167 Open(WeakModel<E>),
168 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
169}
170
171struct GlobalChannelStore(Model<ChannelStore>);
172
173impl Global for GlobalChannelStore {}
174
175impl ChannelStore {
176 pub fn global(cx: &AppContext) -> Model<Self> {
177 cx.global::<GlobalChannelStore>().0.clone()
178 }
179
180 pub fn new(
181 client: Arc<Client>,
182 user_store: Model<UserStore>,
183 cx: &mut ModelContext<Self>,
184 ) -> Self {
185 let rpc_subscriptions = [
186 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
187 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
188 ];
189
190 let mut connection_status = client.status();
191 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
192 let watch_connection_status = cx.spawn(|this, mut cx| async move {
193 while let Some(status) = connection_status.next().await {
194 let this = this.upgrade()?;
195 match status {
196 client::Status::Connected { .. } => {
197 this.update(&mut cx, |this, cx| this.handle_connect(cx))
198 .ok()?
199 .await
200 .log_err()?;
201 }
202 client::Status::SignedOut | client::Status::UpgradeRequired => {
203 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
204 .ok();
205 }
206 _ => {
207 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
208 .ok();
209 }
210 }
211 }
212 Some(())
213 });
214
215 Self {
216 channel_invitations: Vec::default(),
217 channel_index: ChannelIndex::default(),
218 channel_participants: Default::default(),
219 hosted_projects: Default::default(),
220 outgoing_invites: Default::default(),
221 opened_buffers: Default::default(),
222 opened_chats: Default::default(),
223 update_channels_tx,
224 client,
225 user_store,
226 _rpc_subscriptions: rpc_subscriptions,
227 _watch_connection_status: watch_connection_status,
228 disconnect_channel_buffers_task: None,
229 _update_channels: cx.spawn(|this, mut cx| async move {
230 maybe!(async move {
231 while let Some(update_channels) = update_channels_rx.next().await {
232 if let Some(this) = this.upgrade() {
233 let update_task = this.update(&mut cx, |this, cx| {
234 this.update_channels(update_channels, cx)
235 })?;
236 if let Some(update_task) = update_task {
237 update_task.await.log_err();
238 }
239 }
240 }
241 anyhow::Ok(())
242 })
243 .await
244 .log_err();
245 }),
246 channel_states: Default::default(),
247 did_subscribe: false,
248 }
249 }
250
251 pub fn initialize(&mut self) {
252 if !self.did_subscribe {
253 if self
254 .client
255 .send(proto::SubscribeToChannels {})
256 .log_err()
257 .is_some()
258 {
259 self.did_subscribe = true;
260 }
261 }
262 }
263
264 pub fn client(&self) -> Arc<Client> {
265 self.client.clone()
266 }
267
268 /// Returns the number of unique channels in the store
269 pub fn channel_count(&self) -> usize {
270 self.channel_index.by_id().len()
271 }
272
273 /// Returns the index of a channel ID in the list of unique channels
274 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
275 self.channel_index
276 .by_id()
277 .keys()
278 .position(|id| *id == channel_id)
279 }
280
281 /// Returns an iterator over all unique channels
282 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
283 self.channel_index.by_id().values()
284 }
285
286 /// Iterate over all entries in the channel DAG
287 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
288 self.channel_index
289 .ordered_channels()
290 .iter()
291 .filter_map(move |id| {
292 let channel = self.channel_index.by_id().get(id)?;
293 Some((channel.parent_path.len(), channel))
294 })
295 }
296
297 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
298 let channel_id = self.channel_index.ordered_channels().get(ix)?;
299 self.channel_index.by_id().get(channel_id)
300 }
301
302 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
303 self.channel_index.by_id().values().nth(ix)
304 }
305
306 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
307 self.channel_invitations
308 .iter()
309 .any(|channel| channel.id == channel_id)
310 }
311
312 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
313 &self.channel_invitations
314 }
315
316 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
317 self.channel_index.by_id().get(&channel_id)
318 }
319
320 pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
321 let mut projects: Vec<(SharedString, ProjectId)> = self
322 .channel_states
323 .get(&channel_id)
324 .map(|state| state.projects.clone())
325 .unwrap_or_default()
326 .into_iter()
327 .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
328 .collect();
329 projects.sort();
330 projects
331 }
332
333 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
334 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
335 if let OpenedModelHandle::Open(buffer) = buffer {
336 return buffer.upgrade().is_some();
337 }
338 }
339 false
340 }
341
342 pub fn open_channel_buffer(
343 &mut self,
344 channel_id: ChannelId,
345 cx: &mut ModelContext<Self>,
346 ) -> Task<Result<Model<ChannelBuffer>>> {
347 let client = self.client.clone();
348 let user_store = self.user_store.clone();
349 let channel_store = cx.handle();
350 self.open_channel_resource(
351 channel_id,
352 |this| &mut this.opened_buffers,
353 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
354 cx,
355 )
356 }
357
358 pub fn fetch_channel_messages(
359 &self,
360 message_ids: Vec<u64>,
361 cx: &mut ModelContext<Self>,
362 ) -> Task<Result<Vec<ChannelMessage>>> {
363 let request = if message_ids.is_empty() {
364 None
365 } else {
366 Some(
367 self.client
368 .request(proto::GetChannelMessagesById { message_ids }),
369 )
370 };
371 cx.spawn(|this, mut cx| async move {
372 if let Some(request) = request {
373 let response = request.await?;
374 let this = this
375 .upgrade()
376 .ok_or_else(|| anyhow!("channel store dropped"))?;
377 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
378 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
379 } else {
380 Ok(Vec::new())
381 }
382 })
383 }
384
385 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
386 self.channel_states
387 .get(&channel_id)
388 .is_some_and(|state| state.has_channel_buffer_changed())
389 }
390
391 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
392 self.channel_states
393 .get(&channel_id)
394 .is_some_and(|state| state.has_new_messages())
395 }
396
397 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
398 if let Some(state) = self.channel_states.get_mut(&channel_id) {
399 state.latest_chat_message = message_id;
400 }
401 }
402
403 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
404 self.channel_states.get(&channel_id).and_then(|state| {
405 if let Some(last_message_id) = state.latest_chat_message {
406 if state
407 .last_acknowledged_message_id()
408 .is_some_and(|id| id < last_message_id)
409 {
410 return state.last_acknowledged_message_id();
411 }
412 }
413
414 None
415 })
416 }
417
418 pub fn acknowledge_message_id(
419 &mut self,
420 channel_id: ChannelId,
421 message_id: u64,
422 cx: &mut ModelContext<Self>,
423 ) {
424 self.channel_states
425 .entry(channel_id)
426 .or_insert_with(|| Default::default())
427 .acknowledge_message_id(message_id);
428 cx.notify();
429 }
430
431 pub fn update_latest_message_id(
432 &mut self,
433 channel_id: ChannelId,
434 message_id: u64,
435 cx: &mut ModelContext<Self>,
436 ) {
437 self.channel_states
438 .entry(channel_id)
439 .or_insert_with(|| Default::default())
440 .update_latest_message_id(message_id);
441 cx.notify();
442 }
443
444 pub fn acknowledge_notes_version(
445 &mut self,
446 channel_id: ChannelId,
447 epoch: u64,
448 version: &clock::Global,
449 cx: &mut ModelContext<Self>,
450 ) {
451 self.channel_states
452 .entry(channel_id)
453 .or_insert_with(|| Default::default())
454 .acknowledge_notes_version(epoch, version);
455 cx.notify()
456 }
457
458 pub fn update_latest_notes_version(
459 &mut self,
460 channel_id: ChannelId,
461 epoch: u64,
462 version: &clock::Global,
463 cx: &mut ModelContext<Self>,
464 ) {
465 self.channel_states
466 .entry(channel_id)
467 .or_insert_with(|| Default::default())
468 .update_latest_notes_version(epoch, version);
469 cx.notify()
470 }
471
472 pub fn open_channel_chat(
473 &mut self,
474 channel_id: ChannelId,
475 cx: &mut ModelContext<Self>,
476 ) -> Task<Result<Model<ChannelChat>>> {
477 let client = self.client.clone();
478 let user_store = self.user_store.clone();
479 let this = cx.handle();
480 self.open_channel_resource(
481 channel_id,
482 |this| &mut this.opened_chats,
483 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
484 cx,
485 )
486 }
487
488 /// Asynchronously open a given resource associated with a channel.
489 ///
490 /// Make sure that the resource is only opened once, even if this method
491 /// is called multiple times with the same channel id while the first task
492 /// is still running.
493 fn open_channel_resource<T, F, Fut>(
494 &mut self,
495 channel_id: ChannelId,
496 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
497 load: F,
498 cx: &mut ModelContext<Self>,
499 ) -> Task<Result<Model<T>>>
500 where
501 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
502 Fut: Future<Output = Result<Model<T>>>,
503 T: 'static,
504 {
505 let task = loop {
506 match get_map(self).entry(channel_id) {
507 hash_map::Entry::Occupied(e) => match e.get() {
508 OpenedModelHandle::Open(model) => {
509 if let Some(model) = model.upgrade() {
510 break Task::ready(Ok(model)).shared();
511 } else {
512 get_map(self).remove(&channel_id);
513 continue;
514 }
515 }
516 OpenedModelHandle::Loading(task) => {
517 break task.clone();
518 }
519 },
520 hash_map::Entry::Vacant(e) => {
521 let task = cx
522 .spawn(move |this, mut cx| async move {
523 let channel = this.update(&mut cx, |this, _| {
524 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
525 Arc::new(anyhow!("no channel for id: {}", channel_id))
526 })
527 })??;
528
529 load(channel, cx).await.map_err(Arc::new)
530 })
531 .shared();
532
533 e.insert(OpenedModelHandle::Loading(task.clone()));
534 cx.spawn({
535 let task = task.clone();
536 move |this, mut cx| async move {
537 let result = task.await;
538 this.update(&mut cx, |this, _| match result {
539 Ok(model) => {
540 get_map(this).insert(
541 channel_id,
542 OpenedModelHandle::Open(model.downgrade()),
543 );
544 }
545 Err(_) => {
546 get_map(this).remove(&channel_id);
547 }
548 })
549 .ok();
550 }
551 })
552 .detach();
553 break task;
554 }
555 }
556 };
557 cx.background_executor()
558 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
559 }
560
561 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
562 self.channel_role(channel_id) == proto::ChannelRole::Admin
563 }
564
565 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
566 self.channel_index
567 .by_id()
568 .get(&channel_id)
569 .map_or(false, |channel| channel.is_root_channel())
570 }
571
572 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
573 self.channel_index
574 .by_id()
575 .get(&channel_id)
576 .map_or(false, |channel| {
577 channel.visibility == ChannelVisibility::Public
578 })
579 }
580
581 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
582 match self.channel_role(channel_id) {
583 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
584 _ => Capability::ReadOnly,
585 }
586 }
587
588 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
589 maybe!({
590 let mut channel = self.channel_for_id(channel_id)?;
591 if !channel.is_root_channel() {
592 channel = self.channel_for_id(channel.root_id())?;
593 }
594 let root_channel_state = self.channel_states.get(&channel.id);
595 root_channel_state?.role
596 })
597 .unwrap_or(proto::ChannelRole::Guest)
598 }
599
600 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
601 self.channel_participants
602 .get(&channel_id)
603 .map_or(&[], |v| v.as_slice())
604 }
605
606 pub fn create_channel(
607 &self,
608 name: &str,
609 parent_id: Option<ChannelId>,
610 cx: &mut ModelContext<Self>,
611 ) -> Task<Result<ChannelId>> {
612 let client = self.client.clone();
613 let name = name.trim_start_matches('#').to_owned();
614 cx.spawn(move |this, mut cx| async move {
615 let response = client
616 .request(proto::CreateChannel {
617 name,
618 parent_id: parent_id.map(|cid| cid.0),
619 })
620 .await?;
621
622 let channel = response
623 .channel
624 .ok_or_else(|| anyhow!("missing channel in response"))?;
625 let channel_id = ChannelId(channel.id);
626
627 this.update(&mut cx, |this, cx| {
628 let task = this.update_channels(
629 proto::UpdateChannels {
630 channels: vec![channel],
631 ..Default::default()
632 },
633 cx,
634 );
635 assert!(task.is_none());
636
637 // This event is emitted because the collab panel wants to clear the pending edit state
638 // before this frame is rendered. But we can't guarantee that the collab panel's future
639 // will resolve before this flush_effects finishes. Synchronously emitting this event
640 // ensures that the collab panel will observe this creation before the frame completes
641 cx.emit(ChannelEvent::ChannelCreated(channel_id));
642 })?;
643
644 Ok(channel_id)
645 })
646 }
647
648 pub fn move_channel(
649 &mut self,
650 channel_id: ChannelId,
651 to: ChannelId,
652 cx: &mut ModelContext<Self>,
653 ) -> Task<Result<()>> {
654 let client = self.client.clone();
655 cx.spawn(move |_, _| async move {
656 let _ = client
657 .request(proto::MoveChannel {
658 channel_id: channel_id.0,
659 to: to.0,
660 })
661 .await?;
662
663 Ok(())
664 })
665 }
666
667 pub fn set_channel_visibility(
668 &mut self,
669 channel_id: ChannelId,
670 visibility: ChannelVisibility,
671 cx: &mut ModelContext<Self>,
672 ) -> Task<Result<()>> {
673 let client = self.client.clone();
674 cx.spawn(move |_, _| async move {
675 let _ = client
676 .request(proto::SetChannelVisibility {
677 channel_id: channel_id.0,
678 visibility: visibility.into(),
679 })
680 .await?;
681
682 Ok(())
683 })
684 }
685
686 pub fn invite_member(
687 &mut self,
688 channel_id: ChannelId,
689 user_id: UserId,
690 role: proto::ChannelRole,
691 cx: &mut ModelContext<Self>,
692 ) -> Task<Result<()>> {
693 if !self.outgoing_invites.insert((channel_id, user_id)) {
694 return Task::ready(Err(anyhow!("invite request already in progress")));
695 }
696
697 cx.notify();
698 let client = self.client.clone();
699 cx.spawn(move |this, mut cx| async move {
700 let result = client
701 .request(proto::InviteChannelMember {
702 channel_id: channel_id.0,
703 user_id,
704 role: role.into(),
705 })
706 .await;
707
708 this.update(&mut cx, |this, cx| {
709 this.outgoing_invites.remove(&(channel_id, user_id));
710 cx.notify();
711 })?;
712
713 result?;
714
715 Ok(())
716 })
717 }
718
719 pub fn remove_member(
720 &mut self,
721 channel_id: ChannelId,
722 user_id: u64,
723 cx: &mut ModelContext<Self>,
724 ) -> Task<Result<()>> {
725 if !self.outgoing_invites.insert((channel_id, user_id)) {
726 return Task::ready(Err(anyhow!("invite request already in progress")));
727 }
728
729 cx.notify();
730 let client = self.client.clone();
731 cx.spawn(move |this, mut cx| async move {
732 let result = client
733 .request(proto::RemoveChannelMember {
734 channel_id: channel_id.0,
735 user_id,
736 })
737 .await;
738
739 this.update(&mut cx, |this, cx| {
740 this.outgoing_invites.remove(&(channel_id, user_id));
741 cx.notify();
742 })?;
743 result?;
744 Ok(())
745 })
746 }
747
748 pub fn set_member_role(
749 &mut self,
750 channel_id: ChannelId,
751 user_id: UserId,
752 role: proto::ChannelRole,
753 cx: &mut ModelContext<Self>,
754 ) -> Task<Result<()>> {
755 if !self.outgoing_invites.insert((channel_id, user_id)) {
756 return Task::ready(Err(anyhow!("member request already in progress")));
757 }
758
759 cx.notify();
760 let client = self.client.clone();
761 cx.spawn(move |this, mut cx| async move {
762 let result = client
763 .request(proto::SetChannelMemberRole {
764 channel_id: channel_id.0,
765 user_id,
766 role: role.into(),
767 })
768 .await;
769
770 this.update(&mut cx, |this, cx| {
771 this.outgoing_invites.remove(&(channel_id, user_id));
772 cx.notify();
773 })?;
774
775 result?;
776 Ok(())
777 })
778 }
779
780 pub fn rename(
781 &mut self,
782 channel_id: ChannelId,
783 new_name: &str,
784 cx: &mut ModelContext<Self>,
785 ) -> Task<Result<()>> {
786 let client = self.client.clone();
787 let name = new_name.to_string();
788 cx.spawn(move |this, mut cx| async move {
789 let channel = client
790 .request(proto::RenameChannel {
791 channel_id: channel_id.0,
792 name,
793 })
794 .await?
795 .channel
796 .ok_or_else(|| anyhow!("missing channel in response"))?;
797 this.update(&mut cx, |this, cx| {
798 let task = this.update_channels(
799 proto::UpdateChannels {
800 channels: vec![channel],
801 ..Default::default()
802 },
803 cx,
804 );
805 assert!(task.is_none());
806
807 // This event is emitted because the collab panel wants to clear the pending edit state
808 // before this frame is rendered. But we can't guarantee that the collab panel's future
809 // will resolve before this flush_effects finishes. Synchronously emitting this event
810 // ensures that the collab panel will observe this creation before the frame complete
811 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
812 })?;
813 Ok(())
814 })
815 }
816
817 pub fn respond_to_channel_invite(
818 &mut self,
819 channel_id: ChannelId,
820 accept: bool,
821 cx: &mut ModelContext<Self>,
822 ) -> Task<Result<()>> {
823 let client = self.client.clone();
824 cx.background_executor().spawn(async move {
825 client
826 .request(proto::RespondToChannelInvite {
827 channel_id: channel_id.0,
828 accept,
829 })
830 .await?;
831 Ok(())
832 })
833 }
834 pub fn fuzzy_search_members(
835 &self,
836 channel_id: ChannelId,
837 query: String,
838 limit: u16,
839 cx: &mut ModelContext<Self>,
840 ) -> Task<Result<Vec<ChannelMembership>>> {
841 let client = self.client.clone();
842 let user_store = self.user_store.downgrade();
843 cx.spawn(move |_, mut cx| async move {
844 let response = client
845 .request(proto::GetChannelMembers {
846 channel_id: channel_id.0,
847 query,
848 limit: limit as u64,
849 })
850 .await?;
851 user_store.update(&mut cx, |user_store, _| {
852 user_store.insert(response.users);
853 response
854 .members
855 .into_iter()
856 .filter_map(|member| {
857 Some(ChannelMembership {
858 user: user_store.get_cached_user(member.user_id)?,
859 role: member.role(),
860 kind: member.kind(),
861 })
862 })
863 .collect()
864 })
865 })
866 }
867
868 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
869 let client = self.client.clone();
870 async move {
871 client
872 .request(proto::DeleteChannel {
873 channel_id: channel_id.0,
874 })
875 .await?;
876 Ok(())
877 }
878 }
879
880 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
881 false
882 }
883
884 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
885 self.outgoing_invites.contains(&(channel_id, user_id))
886 }
887
888 async fn handle_update_channels(
889 this: Model<Self>,
890 message: TypedEnvelope<proto::UpdateChannels>,
891 _: Arc<Client>,
892 mut cx: AsyncAppContext,
893 ) -> Result<()> {
894 this.update(&mut cx, |this, _| {
895 this.update_channels_tx
896 .unbounded_send(message.payload)
897 .unwrap();
898 })?;
899 Ok(())
900 }
901
902 async fn handle_update_user_channels(
903 this: Model<Self>,
904 message: TypedEnvelope<proto::UpdateUserChannels>,
905 _: Arc<Client>,
906 mut cx: AsyncAppContext,
907 ) -> Result<()> {
908 this.update(&mut cx, |this, cx| {
909 for buffer_version in message.payload.observed_channel_buffer_version {
910 let version = language::proto::deserialize_version(&buffer_version.version);
911 this.acknowledge_notes_version(
912 ChannelId(buffer_version.channel_id),
913 buffer_version.epoch,
914 &version,
915 cx,
916 );
917 }
918 for message_id in message.payload.observed_channel_message_id {
919 this.acknowledge_message_id(
920 ChannelId(message_id.channel_id),
921 message_id.message_id,
922 cx,
923 );
924 }
925 for membership in message.payload.channel_memberships {
926 if let Some(role) = ChannelRole::from_i32(membership.role) {
927 this.channel_states
928 .entry(ChannelId(membership.channel_id))
929 .or_insert_with(|| ChannelState::default())
930 .set_role(role)
931 }
932 }
933 })
934 }
935
936 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
937 self.channel_index.clear();
938 self.channel_invitations.clear();
939 self.channel_participants.clear();
940 self.channel_index.clear();
941 self.outgoing_invites.clear();
942 self.disconnect_channel_buffers_task.take();
943
944 for chat in self.opened_chats.values() {
945 if let OpenedModelHandle::Open(chat) = chat {
946 if let Some(chat) = chat.upgrade() {
947 chat.update(cx, |chat, cx| {
948 chat.rejoin(cx);
949 });
950 }
951 }
952 }
953
954 let mut buffer_versions = Vec::new();
955 for buffer in self.opened_buffers.values() {
956 if let OpenedModelHandle::Open(buffer) = buffer {
957 if let Some(buffer) = buffer.upgrade() {
958 let channel_buffer = buffer.read(cx);
959 let buffer = channel_buffer.buffer().read(cx);
960 buffer_versions.push(proto::ChannelBufferVersion {
961 channel_id: channel_buffer.channel_id.0,
962 epoch: channel_buffer.epoch(),
963 version: language::proto::serialize_version(&buffer.version()),
964 });
965 }
966 }
967 }
968
969 if buffer_versions.is_empty() {
970 return Task::ready(Ok(()));
971 }
972
973 let response = self.client.request(proto::RejoinChannelBuffers {
974 buffers: buffer_versions,
975 });
976
977 cx.spawn(|this, mut cx| async move {
978 let mut response = response.await?;
979
980 this.update(&mut cx, |this, cx| {
981 this.opened_buffers.retain(|_, buffer| match buffer {
982 OpenedModelHandle::Open(channel_buffer) => {
983 let Some(channel_buffer) = channel_buffer.upgrade() else {
984 return false;
985 };
986
987 channel_buffer.update(cx, |channel_buffer, cx| {
988 let channel_id = channel_buffer.channel_id;
989 if let Some(remote_buffer) = response
990 .buffers
991 .iter_mut()
992 .find(|buffer| buffer.channel_id == channel_id.0)
993 {
994 let channel_id = channel_buffer.channel_id;
995 let remote_version =
996 language::proto::deserialize_version(&remote_buffer.version);
997
998 channel_buffer.replace_collaborators(
999 mem::take(&mut remote_buffer.collaborators),
1000 cx,
1001 );
1002
1003 let operations = channel_buffer
1004 .buffer()
1005 .update(cx, |buffer, cx| {
1006 let outgoing_operations =
1007 buffer.serialize_ops(Some(remote_version), cx);
1008 let incoming_operations =
1009 mem::take(&mut remote_buffer.operations)
1010 .into_iter()
1011 .map(language::proto::deserialize_operation)
1012 .collect::<Result<Vec<_>>>()?;
1013 buffer.apply_ops(incoming_operations, cx)?;
1014 anyhow::Ok(outgoing_operations)
1015 })
1016 .log_err();
1017
1018 if let Some(operations) = operations {
1019 let client = this.client.clone();
1020 cx.background_executor()
1021 .spawn(async move {
1022 let operations = operations.await;
1023 for chunk in
1024 language::proto::split_operations(operations)
1025 {
1026 client
1027 .send(proto::UpdateChannelBuffer {
1028 channel_id: channel_id.0,
1029 operations: chunk,
1030 })
1031 .ok();
1032 }
1033 })
1034 .detach();
1035 return true;
1036 }
1037 }
1038
1039 channel_buffer.disconnect(cx);
1040 false
1041 })
1042 }
1043 OpenedModelHandle::Loading(_) => true,
1044 });
1045 })
1046 .ok();
1047 anyhow::Ok(())
1048 })
1049 }
1050
1051 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1052 cx.notify();
1053 self.did_subscribe = false;
1054 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1055 cx.spawn(move |this, mut cx| async move {
1056 if wait_for_reconnect {
1057 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1058 }
1059
1060 if let Some(this) = this.upgrade() {
1061 this.update(&mut cx, |this, cx| {
1062 for (_, buffer) in this.opened_buffers.drain() {
1063 if let OpenedModelHandle::Open(buffer) = buffer {
1064 if let Some(buffer) = buffer.upgrade() {
1065 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1066 }
1067 }
1068 }
1069 })
1070 .ok();
1071 }
1072 })
1073 });
1074 }
1075
1076 pub(crate) fn update_channels(
1077 &mut self,
1078 payload: proto::UpdateChannels,
1079 cx: &mut ModelContext<ChannelStore>,
1080 ) -> Option<Task<Result<()>>> {
1081 if !payload.remove_channel_invitations.is_empty() {
1082 self.channel_invitations
1083 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1084 }
1085 for channel in payload.channel_invitations {
1086 match self
1087 .channel_invitations
1088 .binary_search_by_key(&channel.id, |c| c.id.0)
1089 {
1090 Ok(ix) => {
1091 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1092 }
1093 Err(ix) => self.channel_invitations.insert(
1094 ix,
1095 Arc::new(Channel {
1096 id: ChannelId(channel.id),
1097 visibility: channel.visibility(),
1098 name: channel.name.into(),
1099 parent_path: channel
1100 .parent_path
1101 .into_iter()
1102 .map(|cid| ChannelId(cid))
1103 .collect(),
1104 }),
1105 ),
1106 }
1107 }
1108
1109 let channels_changed = !payload.channels.is_empty()
1110 || !payload.delete_channels.is_empty()
1111 || !payload.latest_channel_message_ids.is_empty()
1112 || !payload.latest_channel_buffer_versions.is_empty()
1113 || !payload.hosted_projects.is_empty()
1114 || !payload.deleted_hosted_projects.is_empty();
1115
1116 if channels_changed {
1117 if !payload.delete_channels.is_empty() {
1118 let delete_channels: Vec<ChannelId> = payload
1119 .delete_channels
1120 .into_iter()
1121 .map(|cid| ChannelId(cid))
1122 .collect();
1123 self.channel_index.delete_channels(&delete_channels);
1124 self.channel_participants
1125 .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1126
1127 for channel_id in &delete_channels {
1128 let channel_id = *channel_id;
1129 if payload
1130 .channels
1131 .iter()
1132 .any(|channel| channel.id == channel_id.0)
1133 {
1134 continue;
1135 }
1136 if let Some(OpenedModelHandle::Open(buffer)) =
1137 self.opened_buffers.remove(&channel_id)
1138 {
1139 if let Some(buffer) = buffer.upgrade() {
1140 buffer.update(cx, ChannelBuffer::disconnect);
1141 }
1142 }
1143 }
1144 }
1145
1146 let mut index = self.channel_index.bulk_insert();
1147 for channel in payload.channels {
1148 let id = ChannelId(channel.id);
1149 let channel_changed = index.insert(channel);
1150
1151 if channel_changed {
1152 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1153 if let Some(buffer) = buffer.upgrade() {
1154 buffer.update(cx, ChannelBuffer::channel_changed);
1155 }
1156 }
1157 }
1158 }
1159
1160 for latest_buffer_version in payload.latest_channel_buffer_versions {
1161 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1162 self.channel_states
1163 .entry(ChannelId(latest_buffer_version.channel_id))
1164 .or_default()
1165 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1166 }
1167
1168 for latest_channel_message in payload.latest_channel_message_ids {
1169 self.channel_states
1170 .entry(ChannelId(latest_channel_message.channel_id))
1171 .or_default()
1172 .update_latest_message_id(latest_channel_message.message_id);
1173 }
1174
1175 for hosted_project in payload.hosted_projects {
1176 let hosted_project: HostedProject = hosted_project.into();
1177 if let Some(old_project) = self
1178 .hosted_projects
1179 .insert(hosted_project.project_id, hosted_project.clone())
1180 {
1181 self.channel_states
1182 .entry(old_project.channel_id)
1183 .or_default()
1184 .remove_hosted_project(old_project.project_id);
1185 }
1186 self.channel_states
1187 .entry(hosted_project.channel_id)
1188 .or_default()
1189 .add_hosted_project(hosted_project.project_id);
1190 }
1191
1192 for hosted_project_id in payload.deleted_hosted_projects {
1193 let hosted_project_id = ProjectId(hosted_project_id);
1194
1195 if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1196 self.channel_states
1197 .entry(old_project.channel_id)
1198 .or_default()
1199 .remove_hosted_project(old_project.project_id);
1200 }
1201 }
1202 }
1203
1204 cx.notify();
1205 if payload.channel_participants.is_empty() {
1206 return None;
1207 }
1208
1209 let mut all_user_ids = Vec::new();
1210 let channel_participants = payload.channel_participants;
1211 for entry in &channel_participants {
1212 for user_id in entry.participant_user_ids.iter() {
1213 if let Err(ix) = all_user_ids.binary_search(user_id) {
1214 all_user_ids.insert(ix, *user_id);
1215 }
1216 }
1217 }
1218
1219 let users = self
1220 .user_store
1221 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1222 Some(cx.spawn(|this, mut cx| async move {
1223 let users = users.await?;
1224
1225 this.update(&mut cx, |this, cx| {
1226 for entry in &channel_participants {
1227 let mut participants: Vec<_> = entry
1228 .participant_user_ids
1229 .iter()
1230 .filter_map(|user_id| {
1231 users
1232 .binary_search_by_key(&user_id, |user| &user.id)
1233 .ok()
1234 .map(|ix| users[ix].clone())
1235 })
1236 .collect();
1237
1238 participants.sort_by_key(|u| u.id);
1239
1240 this.channel_participants
1241 .insert(ChannelId(entry.channel_id), participants);
1242 }
1243
1244 cx.notify();
1245 })
1246 }))
1247 }
1248}
1249
1250impl ChannelState {
1251 fn set_role(&mut self, role: ChannelRole) {
1252 self.role = Some(role);
1253 }
1254
1255 fn has_channel_buffer_changed(&self) -> bool {
1256 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1257 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1258 && self
1259 .latest_notes_version
1260 .version
1261 .changed_since(&self.observed_notes_version.version))
1262 }
1263
1264 fn has_new_messages(&self) -> bool {
1265 let latest_message_id = self.latest_chat_message;
1266 let observed_message_id = self.observed_chat_message;
1267
1268 latest_message_id.is_some_and(|latest_message_id| {
1269 latest_message_id > observed_message_id.unwrap_or_default()
1270 })
1271 }
1272
1273 fn last_acknowledged_message_id(&self) -> Option<u64> {
1274 self.observed_chat_message
1275 }
1276
1277 fn acknowledge_message_id(&mut self, message_id: u64) {
1278 let observed = self.observed_chat_message.get_or_insert(message_id);
1279 *observed = (*observed).max(message_id);
1280 }
1281
1282 fn update_latest_message_id(&mut self, message_id: u64) {
1283 self.latest_chat_message =
1284 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1285 }
1286
1287 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1288 if self.observed_notes_version.epoch == epoch {
1289 self.observed_notes_version.version.join(version);
1290 } else {
1291 self.observed_notes_version = NotesVersion {
1292 epoch,
1293 version: version.clone(),
1294 };
1295 }
1296 }
1297
1298 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1299 if self.latest_notes_version.epoch == epoch {
1300 self.latest_notes_version.version.join(version);
1301 } else {
1302 self.latest_notes_version = NotesVersion {
1303 epoch,
1304 version: version.clone(),
1305 };
1306 }
1307 }
1308
1309 fn add_hosted_project(&mut self, project_id: ProjectId) {
1310 self.projects.insert(project_id);
1311 }
1312
1313 fn remove_hosted_project(&mut self, project_id: ProjectId) {
1314 self.projects.remove(&project_id);
1315 }
1316}