1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
11 Task, WeakModel,
12};
13use language::Capability;
14use rpc::{
15 proto::{self, ChannelRole, ChannelVisibility},
16 TypedEnvelope,
17};
18use settings::Settings;
19use std::{mem, sync::Arc, time::Duration};
20use util::{maybe, ResultExt};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
25 let channel_store =
26 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36#[derive(Debug, Clone)]
37pub struct HostedProject {
38 project_id: ProjectId,
39 channel_id: ChannelId,
40 name: SharedString,
41 _visibility: proto::ChannelVisibility,
42}
43
44impl From<proto::HostedProject> for HostedProject {
45 fn from(project: proto::HostedProject) -> Self {
46 Self {
47 project_id: ProjectId(project.project_id),
48 channel_id: ChannelId(project.channel_id),
49 _visibility: project.visibility(),
50 name: project.name.into(),
51 }
52 }
53}
54
55pub struct ChannelStore {
56 pub channel_index: ChannelIndex,
57 channel_invitations: Vec<Arc<Channel>>,
58 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
59 channel_states: HashMap<ChannelId, ChannelState>,
60 hosted_projects: HashMap<ProjectId, HostedProject>,
61
62 outgoing_invites: HashSet<(ChannelId, UserId)>,
63 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
64 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
65 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
66 client: Arc<Client>,
67 user_store: Model<UserStore>,
68 _rpc_subscriptions: [Subscription; 2],
69 _watch_connection_status: Task<Option<()>>,
70 disconnect_channel_buffers_task: Option<Task<()>>,
71 _update_channels: Task<()>,
72}
73
74#[derive(Clone, Debug)]
75pub struct Channel {
76 pub id: ChannelId,
77 pub name: SharedString,
78 pub visibility: proto::ChannelVisibility,
79 pub parent_path: Vec<ChannelId>,
80}
81
82#[derive(Default, Debug)]
83pub struct ChannelState {
84 latest_chat_message: Option<u64>,
85 latest_notes_version: NotesVersion,
86 observed_notes_version: NotesVersion,
87 observed_chat_message: Option<u64>,
88 role: Option<ChannelRole>,
89 projects: HashSet<ProjectId>,
90}
91
92impl Channel {
93 pub fn link(&self, cx: &AppContext) -> String {
94 format!(
95 "{}/channel/{}-{}",
96 ClientSettings::get_global(cx).server_url,
97 Self::slug(&self.name),
98 self.id
99 )
100 }
101
102 pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
103 self.link(cx)
104 + "/notes"
105 + &heading
106 .map(|h| format!("#{}", Self::slug(&h)))
107 .unwrap_or_default()
108 }
109
110 pub fn is_root_channel(&self) -> bool {
111 self.parent_path.is_empty()
112 }
113
114 pub fn root_id(&self) -> ChannelId {
115 self.parent_path.first().copied().unwrap_or(self.id)
116 }
117
118 pub fn slug(str: &str) -> String {
119 let slug: String = str
120 .chars()
121 .map(|c| if c.is_alphanumeric() { c } else { '-' })
122 .collect();
123
124 slug.trim_matches(|c| c == '-').to_string()
125 }
126}
127
128pub struct ChannelMembership {
129 pub user: Arc<User>,
130 pub kind: proto::channel_member::Kind,
131 pub role: proto::ChannelRole,
132}
133impl ChannelMembership {
134 pub fn sort_key(&self) -> MembershipSortKey {
135 MembershipSortKey {
136 role_order: match self.role {
137 proto::ChannelRole::Admin => 0,
138 proto::ChannelRole::Member => 1,
139 proto::ChannelRole::Banned => 2,
140 proto::ChannelRole::Talker => 3,
141 proto::ChannelRole::Guest => 4,
142 },
143 kind_order: match self.kind {
144 proto::channel_member::Kind::Member => 0,
145 proto::channel_member::Kind::Invitee => 1,
146 },
147 username_order: self.user.github_login.as_str(),
148 }
149 }
150}
151
152#[derive(PartialOrd, Ord, PartialEq, Eq)]
153pub struct MembershipSortKey<'a> {
154 role_order: u8,
155 kind_order: u8,
156 username_order: &'a str,
157}
158
159pub enum ChannelEvent {
160 ChannelCreated(ChannelId),
161 ChannelRenamed(ChannelId),
162}
163
164impl EventEmitter<ChannelEvent> for ChannelStore {}
165
166enum OpenedModelHandle<E> {
167 Open(WeakModel<E>),
168 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
169}
170
171struct GlobalChannelStore(Model<ChannelStore>);
172
173impl Global for GlobalChannelStore {}
174
175impl ChannelStore {
176 pub fn global(cx: &AppContext) -> Model<Self> {
177 cx.global::<GlobalChannelStore>().0.clone()
178 }
179
180 pub fn new(
181 client: Arc<Client>,
182 user_store: Model<UserStore>,
183 cx: &mut ModelContext<Self>,
184 ) -> Self {
185 let rpc_subscriptions = [
186 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
187 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
188 ];
189
190 let mut connection_status = client.status();
191 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
192 let watch_connection_status = cx.spawn(|this, mut cx| async move {
193 while let Some(status) = connection_status.next().await {
194 let this = this.upgrade()?;
195 match status {
196 client::Status::Connected { .. } => {
197 this.update(&mut cx, |this, cx| this.handle_connect(cx))
198 .ok()?
199 .await
200 .log_err()?;
201 }
202 client::Status::SignedOut | client::Status::UpgradeRequired => {
203 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
204 .ok();
205 }
206 _ => {
207 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
208 .ok();
209 }
210 }
211 }
212 Some(())
213 });
214
215 Self {
216 channel_invitations: Vec::default(),
217 channel_index: ChannelIndex::default(),
218 channel_participants: Default::default(),
219 hosted_projects: Default::default(),
220 outgoing_invites: Default::default(),
221 opened_buffers: Default::default(),
222 opened_chats: Default::default(),
223 update_channels_tx,
224 client,
225 user_store,
226 _rpc_subscriptions: rpc_subscriptions,
227 _watch_connection_status: watch_connection_status,
228 disconnect_channel_buffers_task: None,
229 _update_channels: cx.spawn(|this, mut cx| async move {
230 maybe!(async move {
231 while let Some(update_channels) = update_channels_rx.next().await {
232 if let Some(this) = this.upgrade() {
233 let update_task = this.update(&mut cx, |this, cx| {
234 this.update_channels(update_channels, cx)
235 })?;
236 if let Some(update_task) = update_task {
237 update_task.await.log_err();
238 }
239 }
240 }
241 anyhow::Ok(())
242 })
243 .await
244 .log_err();
245 }),
246 channel_states: Default::default(),
247 }
248 }
249
250 pub fn client(&self) -> Arc<Client> {
251 self.client.clone()
252 }
253
254 /// Returns the number of unique channels in the store
255 pub fn channel_count(&self) -> usize {
256 self.channel_index.by_id().len()
257 }
258
259 /// Returns the index of a channel ID in the list of unique channels
260 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
261 self.channel_index
262 .by_id()
263 .keys()
264 .position(|id| *id == channel_id)
265 }
266
267 /// Returns an iterator over all unique channels
268 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
269 self.channel_index.by_id().values()
270 }
271
272 /// Iterate over all entries in the channel DAG
273 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
274 self.channel_index
275 .ordered_channels()
276 .iter()
277 .filter_map(move |id| {
278 let channel = self.channel_index.by_id().get(id)?;
279 Some((channel.parent_path.len(), channel))
280 })
281 }
282
283 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
284 let channel_id = self.channel_index.ordered_channels().get(ix)?;
285 self.channel_index.by_id().get(channel_id)
286 }
287
288 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
289 self.channel_index.by_id().values().nth(ix)
290 }
291
292 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
293 self.channel_invitations
294 .iter()
295 .any(|channel| channel.id == channel_id)
296 }
297
298 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
299 &self.channel_invitations
300 }
301
302 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
303 self.channel_index.by_id().get(&channel_id)
304 }
305
306 pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
307 let mut projects: Vec<(SharedString, ProjectId)> = self
308 .channel_states
309 .get(&channel_id)
310 .map(|state| state.projects.clone())
311 .unwrap_or_default()
312 .into_iter()
313 .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
314 .collect();
315 projects.sort();
316 projects
317 }
318
319 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
320 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
321 if let OpenedModelHandle::Open(buffer) = buffer {
322 return buffer.upgrade().is_some();
323 }
324 }
325 false
326 }
327
328 pub fn open_channel_buffer(
329 &mut self,
330 channel_id: ChannelId,
331 cx: &mut ModelContext<Self>,
332 ) -> Task<Result<Model<ChannelBuffer>>> {
333 let client = self.client.clone();
334 let user_store = self.user_store.clone();
335 let channel_store = cx.handle();
336 self.open_channel_resource(
337 channel_id,
338 |this| &mut this.opened_buffers,
339 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
340 cx,
341 )
342 }
343
344 pub fn fetch_channel_messages(
345 &self,
346 message_ids: Vec<u64>,
347 cx: &mut ModelContext<Self>,
348 ) -> Task<Result<Vec<ChannelMessage>>> {
349 let request = if message_ids.is_empty() {
350 None
351 } else {
352 Some(
353 self.client
354 .request(proto::GetChannelMessagesById { message_ids }),
355 )
356 };
357 cx.spawn(|this, mut cx| async move {
358 if let Some(request) = request {
359 let response = request.await?;
360 let this = this
361 .upgrade()
362 .ok_or_else(|| anyhow!("channel store dropped"))?;
363 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
364 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
365 } else {
366 Ok(Vec::new())
367 }
368 })
369 }
370
371 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
372 self.channel_states
373 .get(&channel_id)
374 .is_some_and(|state| state.has_channel_buffer_changed())
375 }
376
377 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
378 self.channel_states
379 .get(&channel_id)
380 .is_some_and(|state| state.has_new_messages())
381 }
382
383 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
384 self.channel_states.get(&channel_id).and_then(|state| {
385 if let Some(last_message_id) = state.latest_chat_message {
386 if state
387 .last_acknowledged_message_id()
388 .is_some_and(|id| id < last_message_id)
389 {
390 return state.last_acknowledged_message_id();
391 }
392 }
393
394 None
395 })
396 }
397
398 pub fn acknowledge_message_id(
399 &mut self,
400 channel_id: ChannelId,
401 message_id: u64,
402 cx: &mut ModelContext<Self>,
403 ) {
404 self.channel_states
405 .entry(channel_id)
406 .or_insert_with(|| Default::default())
407 .acknowledge_message_id(message_id);
408 cx.notify();
409 }
410
411 pub fn update_latest_message_id(
412 &mut self,
413 channel_id: ChannelId,
414 message_id: u64,
415 cx: &mut ModelContext<Self>,
416 ) {
417 self.channel_states
418 .entry(channel_id)
419 .or_insert_with(|| Default::default())
420 .update_latest_message_id(message_id);
421 cx.notify();
422 }
423
424 pub fn acknowledge_notes_version(
425 &mut self,
426 channel_id: ChannelId,
427 epoch: u64,
428 version: &clock::Global,
429 cx: &mut ModelContext<Self>,
430 ) {
431 self.channel_states
432 .entry(channel_id)
433 .or_insert_with(|| Default::default())
434 .acknowledge_notes_version(epoch, version);
435 cx.notify()
436 }
437
438 pub fn update_latest_notes_version(
439 &mut self,
440 channel_id: ChannelId,
441 epoch: u64,
442 version: &clock::Global,
443 cx: &mut ModelContext<Self>,
444 ) {
445 self.channel_states
446 .entry(channel_id)
447 .or_insert_with(|| Default::default())
448 .update_latest_notes_version(epoch, version);
449 cx.notify()
450 }
451
452 pub fn open_channel_chat(
453 &mut self,
454 channel_id: ChannelId,
455 cx: &mut ModelContext<Self>,
456 ) -> Task<Result<Model<ChannelChat>>> {
457 let client = self.client.clone();
458 let user_store = self.user_store.clone();
459 let this = cx.handle();
460 self.open_channel_resource(
461 channel_id,
462 |this| &mut this.opened_chats,
463 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
464 cx,
465 )
466 }
467
468 /// Asynchronously open a given resource associated with a channel.
469 ///
470 /// Make sure that the resource is only opened once, even if this method
471 /// is called multiple times with the same channel id while the first task
472 /// is still running.
473 fn open_channel_resource<T, F, Fut>(
474 &mut self,
475 channel_id: ChannelId,
476 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
477 load: F,
478 cx: &mut ModelContext<Self>,
479 ) -> Task<Result<Model<T>>>
480 where
481 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
482 Fut: Future<Output = Result<Model<T>>>,
483 T: 'static,
484 {
485 let task = loop {
486 match get_map(self).entry(channel_id) {
487 hash_map::Entry::Occupied(e) => match e.get() {
488 OpenedModelHandle::Open(model) => {
489 if let Some(model) = model.upgrade() {
490 break Task::ready(Ok(model)).shared();
491 } else {
492 get_map(self).remove(&channel_id);
493 continue;
494 }
495 }
496 OpenedModelHandle::Loading(task) => {
497 break task.clone();
498 }
499 },
500 hash_map::Entry::Vacant(e) => {
501 let task = cx
502 .spawn(move |this, mut cx| async move {
503 let channel = this.update(&mut cx, |this, _| {
504 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
505 Arc::new(anyhow!("no channel for id: {}", channel_id))
506 })
507 })??;
508
509 load(channel, cx).await.map_err(Arc::new)
510 })
511 .shared();
512
513 e.insert(OpenedModelHandle::Loading(task.clone()));
514 cx.spawn({
515 let task = task.clone();
516 move |this, mut cx| async move {
517 let result = task.await;
518 this.update(&mut cx, |this, _| match result {
519 Ok(model) => {
520 get_map(this).insert(
521 channel_id,
522 OpenedModelHandle::Open(model.downgrade()),
523 );
524 }
525 Err(_) => {
526 get_map(this).remove(&channel_id);
527 }
528 })
529 .ok();
530 }
531 })
532 .detach();
533 break task;
534 }
535 }
536 };
537 cx.background_executor()
538 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
539 }
540
541 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
542 self.channel_role(channel_id) == proto::ChannelRole::Admin
543 }
544
545 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
546 self.channel_index
547 .by_id()
548 .get(&channel_id)
549 .map_or(false, |channel| channel.is_root_channel())
550 }
551
552 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
553 self.channel_index
554 .by_id()
555 .get(&channel_id)
556 .map_or(false, |channel| {
557 channel.visibility == ChannelVisibility::Public
558 })
559 }
560
561 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
562 match self.channel_role(channel_id) {
563 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
564 _ => Capability::ReadOnly,
565 }
566 }
567
568 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
569 maybe!({
570 let mut channel = self.channel_for_id(channel_id)?;
571 if !channel.is_root_channel() {
572 channel = self.channel_for_id(channel.root_id())?;
573 }
574 let root_channel_state = self.channel_states.get(&channel.id);
575 root_channel_state?.role
576 })
577 .unwrap_or(proto::ChannelRole::Guest)
578 }
579
580 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
581 self.channel_participants
582 .get(&channel_id)
583 .map_or(&[], |v| v.as_slice())
584 }
585
586 pub fn create_channel(
587 &self,
588 name: &str,
589 parent_id: Option<ChannelId>,
590 cx: &mut ModelContext<Self>,
591 ) -> Task<Result<ChannelId>> {
592 let client = self.client.clone();
593 let name = name.trim_start_matches('#').to_owned();
594 cx.spawn(move |this, mut cx| async move {
595 let response = client
596 .request(proto::CreateChannel {
597 name,
598 parent_id: parent_id.map(|cid| cid.0),
599 })
600 .await?;
601
602 let channel = response
603 .channel
604 .ok_or_else(|| anyhow!("missing channel in response"))?;
605 let channel_id = ChannelId(channel.id);
606
607 this.update(&mut cx, |this, cx| {
608 let task = this.update_channels(
609 proto::UpdateChannels {
610 channels: vec![channel],
611 ..Default::default()
612 },
613 cx,
614 );
615 assert!(task.is_none());
616
617 // This event is emitted because the collab panel wants to clear the pending edit state
618 // before this frame is rendered. But we can't guarantee that the collab panel's future
619 // will resolve before this flush_effects finishes. Synchronously emitting this event
620 // ensures that the collab panel will observe this creation before the frame completes
621 cx.emit(ChannelEvent::ChannelCreated(channel_id));
622 })?;
623
624 Ok(channel_id)
625 })
626 }
627
628 pub fn move_channel(
629 &mut self,
630 channel_id: ChannelId,
631 to: ChannelId,
632 cx: &mut ModelContext<Self>,
633 ) -> Task<Result<()>> {
634 let client = self.client.clone();
635 cx.spawn(move |_, _| async move {
636 let _ = client
637 .request(proto::MoveChannel {
638 channel_id: channel_id.0,
639 to: to.0,
640 })
641 .await?;
642
643 Ok(())
644 })
645 }
646
647 pub fn set_channel_visibility(
648 &mut self,
649 channel_id: ChannelId,
650 visibility: ChannelVisibility,
651 cx: &mut ModelContext<Self>,
652 ) -> Task<Result<()>> {
653 let client = self.client.clone();
654 cx.spawn(move |_, _| async move {
655 let _ = client
656 .request(proto::SetChannelVisibility {
657 channel_id: channel_id.0,
658 visibility: visibility.into(),
659 })
660 .await?;
661
662 Ok(())
663 })
664 }
665
666 pub fn invite_member(
667 &mut self,
668 channel_id: ChannelId,
669 user_id: UserId,
670 role: proto::ChannelRole,
671 cx: &mut ModelContext<Self>,
672 ) -> Task<Result<()>> {
673 if !self.outgoing_invites.insert((channel_id, user_id)) {
674 return Task::ready(Err(anyhow!("invite request already in progress")));
675 }
676
677 cx.notify();
678 let client = self.client.clone();
679 cx.spawn(move |this, mut cx| async move {
680 let result = client
681 .request(proto::InviteChannelMember {
682 channel_id: channel_id.0,
683 user_id,
684 role: role.into(),
685 })
686 .await;
687
688 this.update(&mut cx, |this, cx| {
689 this.outgoing_invites.remove(&(channel_id, user_id));
690 cx.notify();
691 })?;
692
693 result?;
694
695 Ok(())
696 })
697 }
698
699 pub fn remove_member(
700 &mut self,
701 channel_id: ChannelId,
702 user_id: u64,
703 cx: &mut ModelContext<Self>,
704 ) -> Task<Result<()>> {
705 if !self.outgoing_invites.insert((channel_id, user_id)) {
706 return Task::ready(Err(anyhow!("invite request already in progress")));
707 }
708
709 cx.notify();
710 let client = self.client.clone();
711 cx.spawn(move |this, mut cx| async move {
712 let result = client
713 .request(proto::RemoveChannelMember {
714 channel_id: channel_id.0,
715 user_id,
716 })
717 .await;
718
719 this.update(&mut cx, |this, cx| {
720 this.outgoing_invites.remove(&(channel_id, user_id));
721 cx.notify();
722 })?;
723 result?;
724 Ok(())
725 })
726 }
727
728 pub fn set_member_role(
729 &mut self,
730 channel_id: ChannelId,
731 user_id: UserId,
732 role: proto::ChannelRole,
733 cx: &mut ModelContext<Self>,
734 ) -> Task<Result<()>> {
735 if !self.outgoing_invites.insert((channel_id, user_id)) {
736 return Task::ready(Err(anyhow!("member request already in progress")));
737 }
738
739 cx.notify();
740 let client = self.client.clone();
741 cx.spawn(move |this, mut cx| async move {
742 let result = client
743 .request(proto::SetChannelMemberRole {
744 channel_id: channel_id.0,
745 user_id,
746 role: role.into(),
747 })
748 .await;
749
750 this.update(&mut cx, |this, cx| {
751 this.outgoing_invites.remove(&(channel_id, user_id));
752 cx.notify();
753 })?;
754
755 result?;
756 Ok(())
757 })
758 }
759
760 pub fn rename(
761 &mut self,
762 channel_id: ChannelId,
763 new_name: &str,
764 cx: &mut ModelContext<Self>,
765 ) -> Task<Result<()>> {
766 let client = self.client.clone();
767 let name = new_name.to_string();
768 cx.spawn(move |this, mut cx| async move {
769 let channel = client
770 .request(proto::RenameChannel {
771 channel_id: channel_id.0,
772 name,
773 })
774 .await?
775 .channel
776 .ok_or_else(|| anyhow!("missing channel in response"))?;
777 this.update(&mut cx, |this, cx| {
778 let task = this.update_channels(
779 proto::UpdateChannels {
780 channels: vec![channel],
781 ..Default::default()
782 },
783 cx,
784 );
785 assert!(task.is_none());
786
787 // This event is emitted because the collab panel wants to clear the pending edit state
788 // before this frame is rendered. But we can't guarantee that the collab panel's future
789 // will resolve before this flush_effects finishes. Synchronously emitting this event
790 // ensures that the collab panel will observe this creation before the frame complete
791 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
792 })?;
793 Ok(())
794 })
795 }
796
797 pub fn respond_to_channel_invite(
798 &mut self,
799 channel_id: ChannelId,
800 accept: bool,
801 cx: &mut ModelContext<Self>,
802 ) -> Task<Result<()>> {
803 let client = self.client.clone();
804 cx.background_executor().spawn(async move {
805 client
806 .request(proto::RespondToChannelInvite {
807 channel_id: channel_id.0,
808 accept,
809 })
810 .await?;
811 Ok(())
812 })
813 }
814
815 pub fn get_channel_member_details(
816 &self,
817 channel_id: ChannelId,
818 cx: &mut ModelContext<Self>,
819 ) -> Task<Result<Vec<ChannelMembership>>> {
820 let client = self.client.clone();
821 let user_store = self.user_store.downgrade();
822 cx.spawn(move |_, mut cx| async move {
823 let response = client
824 .request(proto::GetChannelMembers {
825 channel_id: channel_id.0,
826 })
827 .await?;
828
829 let user_ids = response.members.iter().map(|m| m.user_id).collect();
830 let user_store = user_store
831 .upgrade()
832 .ok_or_else(|| anyhow!("user store dropped"))?;
833 let users = user_store
834 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
835 .await?;
836
837 Ok(users
838 .into_iter()
839 .zip(response.members)
840 .map(|(user, member)| ChannelMembership {
841 user,
842 role: member.role(),
843 kind: member.kind(),
844 })
845 .collect())
846 })
847 }
848
849 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
850 let client = self.client.clone();
851 async move {
852 client
853 .request(proto::DeleteChannel {
854 channel_id: channel_id.0,
855 })
856 .await?;
857 Ok(())
858 }
859 }
860
861 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
862 false
863 }
864
865 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
866 self.outgoing_invites.contains(&(channel_id, user_id))
867 }
868
869 async fn handle_update_channels(
870 this: Model<Self>,
871 message: TypedEnvelope<proto::UpdateChannels>,
872 _: Arc<Client>,
873 mut cx: AsyncAppContext,
874 ) -> Result<()> {
875 this.update(&mut cx, |this, _| {
876 this.update_channels_tx
877 .unbounded_send(message.payload)
878 .unwrap();
879 })?;
880 Ok(())
881 }
882
883 async fn handle_update_user_channels(
884 this: Model<Self>,
885 message: TypedEnvelope<proto::UpdateUserChannels>,
886 _: Arc<Client>,
887 mut cx: AsyncAppContext,
888 ) -> Result<()> {
889 this.update(&mut cx, |this, cx| {
890 for buffer_version in message.payload.observed_channel_buffer_version {
891 let version = language::proto::deserialize_version(&buffer_version.version);
892 this.acknowledge_notes_version(
893 ChannelId(buffer_version.channel_id),
894 buffer_version.epoch,
895 &version,
896 cx,
897 );
898 }
899 for message_id in message.payload.observed_channel_message_id {
900 this.acknowledge_message_id(
901 ChannelId(message_id.channel_id),
902 message_id.message_id,
903 cx,
904 );
905 }
906 for membership in message.payload.channel_memberships {
907 if let Some(role) = ChannelRole::from_i32(membership.role) {
908 this.channel_states
909 .entry(ChannelId(membership.channel_id))
910 .or_insert_with(|| ChannelState::default())
911 .set_role(role)
912 }
913 }
914 })
915 }
916
917 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
918 self.channel_index.clear();
919 self.channel_invitations.clear();
920 self.channel_participants.clear();
921 self.channel_index.clear();
922 self.outgoing_invites.clear();
923 self.disconnect_channel_buffers_task.take();
924
925 for chat in self.opened_chats.values() {
926 if let OpenedModelHandle::Open(chat) = chat {
927 if let Some(chat) = chat.upgrade() {
928 chat.update(cx, |chat, cx| {
929 chat.rejoin(cx);
930 });
931 }
932 }
933 }
934
935 let mut buffer_versions = Vec::new();
936 for buffer in self.opened_buffers.values() {
937 if let OpenedModelHandle::Open(buffer) = buffer {
938 if let Some(buffer) = buffer.upgrade() {
939 let channel_buffer = buffer.read(cx);
940 let buffer = channel_buffer.buffer().read(cx);
941 buffer_versions.push(proto::ChannelBufferVersion {
942 channel_id: channel_buffer.channel_id.0,
943 epoch: channel_buffer.epoch(),
944 version: language::proto::serialize_version(&buffer.version()),
945 });
946 }
947 }
948 }
949
950 if buffer_versions.is_empty() {
951 return Task::ready(Ok(()));
952 }
953
954 let response = self.client.request(proto::RejoinChannelBuffers {
955 buffers: buffer_versions,
956 });
957
958 cx.spawn(|this, mut cx| async move {
959 let mut response = response.await?;
960
961 this.update(&mut cx, |this, cx| {
962 this.opened_buffers.retain(|_, buffer| match buffer {
963 OpenedModelHandle::Open(channel_buffer) => {
964 let Some(channel_buffer) = channel_buffer.upgrade() else {
965 return false;
966 };
967
968 channel_buffer.update(cx, |channel_buffer, cx| {
969 let channel_id = channel_buffer.channel_id;
970 if let Some(remote_buffer) = response
971 .buffers
972 .iter_mut()
973 .find(|buffer| buffer.channel_id == channel_id.0)
974 {
975 let channel_id = channel_buffer.channel_id;
976 let remote_version =
977 language::proto::deserialize_version(&remote_buffer.version);
978
979 channel_buffer.replace_collaborators(
980 mem::take(&mut remote_buffer.collaborators),
981 cx,
982 );
983
984 let operations = channel_buffer
985 .buffer()
986 .update(cx, |buffer, cx| {
987 let outgoing_operations =
988 buffer.serialize_ops(Some(remote_version), cx);
989 let incoming_operations =
990 mem::take(&mut remote_buffer.operations)
991 .into_iter()
992 .map(language::proto::deserialize_operation)
993 .collect::<Result<Vec<_>>>()?;
994 buffer.apply_ops(incoming_operations, cx)?;
995 anyhow::Ok(outgoing_operations)
996 })
997 .log_err();
998
999 if let Some(operations) = operations {
1000 let client = this.client.clone();
1001 cx.background_executor()
1002 .spawn(async move {
1003 let operations = operations.await;
1004 for chunk in
1005 language::proto::split_operations(operations)
1006 {
1007 client
1008 .send(proto::UpdateChannelBuffer {
1009 channel_id: channel_id.0,
1010 operations: chunk,
1011 })
1012 .ok();
1013 }
1014 })
1015 .detach();
1016 return true;
1017 }
1018 }
1019
1020 channel_buffer.disconnect(cx);
1021 false
1022 })
1023 }
1024 OpenedModelHandle::Loading(_) => true,
1025 });
1026 })
1027 .ok();
1028 anyhow::Ok(())
1029 })
1030 }
1031
1032 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1033 cx.notify();
1034
1035 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1036 cx.spawn(move |this, mut cx| async move {
1037 if wait_for_reconnect {
1038 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1039 }
1040
1041 if let Some(this) = this.upgrade() {
1042 this.update(&mut cx, |this, cx| {
1043 for (_, buffer) in this.opened_buffers.drain() {
1044 if let OpenedModelHandle::Open(buffer) = buffer {
1045 if let Some(buffer) = buffer.upgrade() {
1046 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1047 }
1048 }
1049 }
1050 })
1051 .ok();
1052 }
1053 })
1054 });
1055 }
1056
1057 pub(crate) fn update_channels(
1058 &mut self,
1059 payload: proto::UpdateChannels,
1060 cx: &mut ModelContext<ChannelStore>,
1061 ) -> Option<Task<Result<()>>> {
1062 if !payload.remove_channel_invitations.is_empty() {
1063 self.channel_invitations
1064 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1065 }
1066 for channel in payload.channel_invitations {
1067 match self
1068 .channel_invitations
1069 .binary_search_by_key(&channel.id, |c| c.id.0)
1070 {
1071 Ok(ix) => {
1072 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1073 }
1074 Err(ix) => self.channel_invitations.insert(
1075 ix,
1076 Arc::new(Channel {
1077 id: ChannelId(channel.id),
1078 visibility: channel.visibility(),
1079 name: channel.name.into(),
1080 parent_path: channel
1081 .parent_path
1082 .into_iter()
1083 .map(|cid| ChannelId(cid))
1084 .collect(),
1085 }),
1086 ),
1087 }
1088 }
1089
1090 let channels_changed = !payload.channels.is_empty()
1091 || !payload.delete_channels.is_empty()
1092 || !payload.latest_channel_message_ids.is_empty()
1093 || !payload.latest_channel_buffer_versions.is_empty()
1094 || !payload.hosted_projects.is_empty()
1095 || !payload.deleted_hosted_projects.is_empty();
1096
1097 if channels_changed {
1098 if !payload.delete_channels.is_empty() {
1099 let delete_channels: Vec<ChannelId> = payload
1100 .delete_channels
1101 .into_iter()
1102 .map(|cid| ChannelId(cid))
1103 .collect();
1104 self.channel_index.delete_channels(&delete_channels);
1105 self.channel_participants
1106 .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1107
1108 for channel_id in &delete_channels {
1109 let channel_id = *channel_id;
1110 if payload
1111 .channels
1112 .iter()
1113 .any(|channel| channel.id == channel_id.0)
1114 {
1115 continue;
1116 }
1117 if let Some(OpenedModelHandle::Open(buffer)) =
1118 self.opened_buffers.remove(&channel_id)
1119 {
1120 if let Some(buffer) = buffer.upgrade() {
1121 buffer.update(cx, ChannelBuffer::disconnect);
1122 }
1123 }
1124 }
1125 }
1126
1127 let mut index = self.channel_index.bulk_insert();
1128 for channel in payload.channels {
1129 let id = ChannelId(channel.id);
1130 let channel_changed = index.insert(channel);
1131
1132 if channel_changed {
1133 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1134 if let Some(buffer) = buffer.upgrade() {
1135 buffer.update(cx, ChannelBuffer::channel_changed);
1136 }
1137 }
1138 }
1139 }
1140
1141 for latest_buffer_version in payload.latest_channel_buffer_versions {
1142 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1143 self.channel_states
1144 .entry(ChannelId(latest_buffer_version.channel_id))
1145 .or_default()
1146 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1147 }
1148
1149 for latest_channel_message in payload.latest_channel_message_ids {
1150 self.channel_states
1151 .entry(ChannelId(latest_channel_message.channel_id))
1152 .or_default()
1153 .update_latest_message_id(latest_channel_message.message_id);
1154 }
1155
1156 for hosted_project in payload.hosted_projects {
1157 let hosted_project: HostedProject = hosted_project.into();
1158 if let Some(old_project) = self
1159 .hosted_projects
1160 .insert(hosted_project.project_id, hosted_project.clone())
1161 {
1162 self.channel_states
1163 .entry(old_project.channel_id)
1164 .or_default()
1165 .remove_hosted_project(old_project.project_id);
1166 }
1167 self.channel_states
1168 .entry(hosted_project.channel_id)
1169 .or_default()
1170 .add_hosted_project(hosted_project.project_id);
1171 }
1172
1173 for hosted_project_id in payload.deleted_hosted_projects {
1174 let hosted_project_id = ProjectId(hosted_project_id);
1175
1176 if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1177 self.channel_states
1178 .entry(old_project.channel_id)
1179 .or_default()
1180 .remove_hosted_project(old_project.project_id);
1181 }
1182 }
1183 }
1184
1185 cx.notify();
1186 if payload.channel_participants.is_empty() {
1187 return None;
1188 }
1189
1190 let mut all_user_ids = Vec::new();
1191 let channel_participants = payload.channel_participants;
1192 for entry in &channel_participants {
1193 for user_id in entry.participant_user_ids.iter() {
1194 if let Err(ix) = all_user_ids.binary_search(user_id) {
1195 all_user_ids.insert(ix, *user_id);
1196 }
1197 }
1198 }
1199
1200 let users = self
1201 .user_store
1202 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1203 Some(cx.spawn(|this, mut cx| async move {
1204 let users = users.await?;
1205
1206 this.update(&mut cx, |this, cx| {
1207 for entry in &channel_participants {
1208 let mut participants: Vec<_> = entry
1209 .participant_user_ids
1210 .iter()
1211 .filter_map(|user_id| {
1212 users
1213 .binary_search_by_key(&user_id, |user| &user.id)
1214 .ok()
1215 .map(|ix| users[ix].clone())
1216 })
1217 .collect();
1218
1219 participants.sort_by_key(|u| u.id);
1220
1221 this.channel_participants
1222 .insert(ChannelId(entry.channel_id), participants);
1223 }
1224
1225 cx.notify();
1226 })
1227 }))
1228 }
1229}
1230
1231impl ChannelState {
1232 fn set_role(&mut self, role: ChannelRole) {
1233 self.role = Some(role);
1234 }
1235
1236 fn has_channel_buffer_changed(&self) -> bool {
1237 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1238 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1239 && self
1240 .latest_notes_version
1241 .version
1242 .changed_since(&self.observed_notes_version.version))
1243 }
1244
1245 fn has_new_messages(&self) -> bool {
1246 let latest_message_id = self.latest_chat_message;
1247 let observed_message_id = self.observed_chat_message;
1248
1249 latest_message_id.is_some_and(|latest_message_id| {
1250 latest_message_id > observed_message_id.unwrap_or_default()
1251 })
1252 }
1253
1254 fn last_acknowledged_message_id(&self) -> Option<u64> {
1255 self.observed_chat_message
1256 }
1257
1258 fn acknowledge_message_id(&mut self, message_id: u64) {
1259 let observed = self.observed_chat_message.get_or_insert(message_id);
1260 *observed = (*observed).max(message_id);
1261 }
1262
1263 fn update_latest_message_id(&mut self, message_id: u64) {
1264 self.latest_chat_message =
1265 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1266 }
1267
1268 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1269 if self.observed_notes_version.epoch == epoch {
1270 self.observed_notes_version.version.join(version);
1271 } else {
1272 self.observed_notes_version = NotesVersion {
1273 epoch,
1274 version: version.clone(),
1275 };
1276 }
1277 }
1278
1279 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1280 if self.latest_notes_version.epoch == epoch {
1281 self.latest_notes_version.version.join(version);
1282 } else {
1283 self.latest_notes_version = NotesVersion {
1284 epoch,
1285 version: version.clone(),
1286 };
1287 }
1288 }
1289
1290 fn add_hosted_project(&mut self, project_id: ProjectId) {
1291 self.projects.insert(project_id);
1292 }
1293
1294 fn remove_hosted_project(&mut self, project_id: ProjectId) {
1295 self.projects.remove(&project_id);
1296 }
1297}