1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{
7 ChannelId, Client, ClientSettings, DevServerId, ProjectId, RemoteProjectId, Subscription, User,
8 UserId, UserStore,
9};
10use collections::{hash_map, HashMap, HashSet};
11use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
12use gpui::{
13 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
14 Task, WeakModel,
15};
16use language::Capability;
17use rpc::{
18 proto::{self, ChannelRole, ChannelVisibility, DevServerStatus},
19 TypedEnvelope,
20};
21use settings::Settings;
22use std::{mem, sync::Arc, time::Duration};
23use util::{maybe, ResultExt};
24
25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
26
27pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
28 let channel_store =
29 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
30 cx.set_global(GlobalChannelStore(channel_store));
31}
32
33#[derive(Debug, Clone, Default, PartialEq)]
34struct NotesVersion {
35 epoch: u64,
36 version: clock::Global,
37}
38
39#[derive(Debug, Clone)]
40pub struct HostedProject {
41 project_id: ProjectId,
42 channel_id: ChannelId,
43 name: SharedString,
44 _visibility: proto::ChannelVisibility,
45}
46impl From<proto::HostedProject> for HostedProject {
47 fn from(project: proto::HostedProject) -> Self {
48 Self {
49 project_id: ProjectId(project.project_id),
50 channel_id: ChannelId(project.channel_id),
51 _visibility: project.visibility(),
52 name: project.name.into(),
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
58pub struct RemoteProject {
59 pub id: RemoteProjectId,
60 pub project_id: Option<ProjectId>,
61 pub channel_id: ChannelId,
62 pub name: SharedString,
63 pub path: SharedString,
64 pub dev_server_id: DevServerId,
65}
66
67impl From<proto::RemoteProject> for RemoteProject {
68 fn from(project: proto::RemoteProject) -> Self {
69 Self {
70 id: RemoteProjectId(project.id),
71 project_id: project.project_id.map(|id| ProjectId(id)),
72 channel_id: ChannelId(project.channel_id),
73 name: project.name.into(),
74 path: project.path.into(),
75 dev_server_id: DevServerId(project.dev_server_id),
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
81pub struct DevServer {
82 pub id: DevServerId,
83 pub channel_id: ChannelId,
84 pub name: SharedString,
85 pub status: DevServerStatus,
86}
87
88impl From<proto::DevServer> for DevServer {
89 fn from(dev_server: proto::DevServer) -> Self {
90 Self {
91 id: DevServerId(dev_server.dev_server_id),
92 channel_id: ChannelId(dev_server.channel_id),
93 status: dev_server.status(),
94 name: dev_server.name.into(),
95 }
96 }
97}
98
99pub struct ChannelStore {
100 pub channel_index: ChannelIndex,
101 channel_invitations: Vec<Arc<Channel>>,
102 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
103 channel_states: HashMap<ChannelId, ChannelState>,
104 hosted_projects: HashMap<ProjectId, HostedProject>,
105 remote_projects: HashMap<RemoteProjectId, RemoteProject>,
106 dev_servers: HashMap<DevServerId, DevServer>,
107
108 outgoing_invites: HashSet<(ChannelId, UserId)>,
109 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
110 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
111 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
112 client: Arc<Client>,
113 user_store: Model<UserStore>,
114 _rpc_subscriptions: [Subscription; 2],
115 _watch_connection_status: Task<Option<()>>,
116 disconnect_channel_buffers_task: Option<Task<()>>,
117 _update_channels: Task<()>,
118}
119
120#[derive(Clone, Debug)]
121pub struct Channel {
122 pub id: ChannelId,
123 pub name: SharedString,
124 pub visibility: proto::ChannelVisibility,
125 pub parent_path: Vec<ChannelId>,
126}
127
128#[derive(Default, Debug)]
129pub struct ChannelState {
130 latest_chat_message: Option<u64>,
131 latest_notes_version: NotesVersion,
132 observed_notes_version: NotesVersion,
133 observed_chat_message: Option<u64>,
134 role: Option<ChannelRole>,
135 projects: HashSet<ProjectId>,
136 dev_servers: HashSet<DevServerId>,
137 remote_projects: HashSet<RemoteProjectId>,
138}
139
140impl Channel {
141 pub fn link(&self, cx: &AppContext) -> String {
142 format!(
143 "{}/channel/{}-{}",
144 ClientSettings::get_global(cx).server_url,
145 Self::slug(&self.name),
146 self.id
147 )
148 }
149
150 pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
151 self.link(cx)
152 + "/notes"
153 + &heading
154 .map(|h| format!("#{}", Self::slug(&h)))
155 .unwrap_or_default()
156 }
157
158 pub fn is_root_channel(&self) -> bool {
159 self.parent_path.is_empty()
160 }
161
162 pub fn root_id(&self) -> ChannelId {
163 self.parent_path.first().copied().unwrap_or(self.id)
164 }
165
166 pub fn slug(str: &str) -> String {
167 let slug: String = str
168 .chars()
169 .map(|c| if c.is_alphanumeric() { c } else { '-' })
170 .collect();
171
172 slug.trim_matches(|c| c == '-').to_string()
173 }
174}
175
176pub struct ChannelMembership {
177 pub user: Arc<User>,
178 pub kind: proto::channel_member::Kind,
179 pub role: proto::ChannelRole,
180}
181impl ChannelMembership {
182 pub fn sort_key(&self) -> MembershipSortKey {
183 MembershipSortKey {
184 role_order: match self.role {
185 proto::ChannelRole::Admin => 0,
186 proto::ChannelRole::Member => 1,
187 proto::ChannelRole::Banned => 2,
188 proto::ChannelRole::Talker => 3,
189 proto::ChannelRole::Guest => 4,
190 },
191 kind_order: match self.kind {
192 proto::channel_member::Kind::Member => 0,
193 proto::channel_member::Kind::Invitee => 1,
194 },
195 username_order: self.user.github_login.as_str(),
196 }
197 }
198}
199
200#[derive(PartialOrd, Ord, PartialEq, Eq)]
201pub struct MembershipSortKey<'a> {
202 role_order: u8,
203 kind_order: u8,
204 username_order: &'a str,
205}
206
207pub enum ChannelEvent {
208 ChannelCreated(ChannelId),
209 ChannelRenamed(ChannelId),
210}
211
212impl EventEmitter<ChannelEvent> for ChannelStore {}
213
214enum OpenedModelHandle<E> {
215 Open(WeakModel<E>),
216 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
217}
218
219struct GlobalChannelStore(Model<ChannelStore>);
220
221impl Global for GlobalChannelStore {}
222
223impl ChannelStore {
224 pub fn global(cx: &AppContext) -> Model<Self> {
225 cx.global::<GlobalChannelStore>().0.clone()
226 }
227
228 pub fn new(
229 client: Arc<Client>,
230 user_store: Model<UserStore>,
231 cx: &mut ModelContext<Self>,
232 ) -> Self {
233 let rpc_subscriptions = [
234 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
235 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
236 ];
237
238 let mut connection_status = client.status();
239 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
240 let watch_connection_status = cx.spawn(|this, mut cx| async move {
241 while let Some(status) = connection_status.next().await {
242 let this = this.upgrade()?;
243 match status {
244 client::Status::Connected { .. } => {
245 this.update(&mut cx, |this, cx| this.handle_connect(cx))
246 .ok()?
247 .await
248 .log_err()?;
249 }
250 client::Status::SignedOut | client::Status::UpgradeRequired => {
251 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
252 .ok();
253 }
254 _ => {
255 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
256 .ok();
257 }
258 }
259 }
260 Some(())
261 });
262
263 Self {
264 channel_invitations: Vec::default(),
265 channel_index: ChannelIndex::default(),
266 channel_participants: Default::default(),
267 hosted_projects: Default::default(),
268 remote_projects: Default::default(),
269 dev_servers: Default::default(),
270 outgoing_invites: Default::default(),
271 opened_buffers: Default::default(),
272 opened_chats: Default::default(),
273 update_channels_tx,
274 client,
275 user_store,
276 _rpc_subscriptions: rpc_subscriptions,
277 _watch_connection_status: watch_connection_status,
278 disconnect_channel_buffers_task: None,
279 _update_channels: cx.spawn(|this, mut cx| async move {
280 maybe!(async move {
281 while let Some(update_channels) = update_channels_rx.next().await {
282 if let Some(this) = this.upgrade() {
283 let update_task = this.update(&mut cx, |this, cx| {
284 this.update_channels(update_channels, cx)
285 })?;
286 if let Some(update_task) = update_task {
287 update_task.await.log_err();
288 }
289 }
290 }
291 anyhow::Ok(())
292 })
293 .await
294 .log_err();
295 }),
296 channel_states: Default::default(),
297 }
298 }
299
300 pub fn client(&self) -> Arc<Client> {
301 self.client.clone()
302 }
303
304 /// Returns the number of unique channels in the store
305 pub fn channel_count(&self) -> usize {
306 self.channel_index.by_id().len()
307 }
308
309 /// Returns the index of a channel ID in the list of unique channels
310 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
311 self.channel_index
312 .by_id()
313 .keys()
314 .position(|id| *id == channel_id)
315 }
316
317 /// Returns an iterator over all unique channels
318 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
319 self.channel_index.by_id().values()
320 }
321
322 /// Iterate over all entries in the channel DAG
323 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
324 self.channel_index
325 .ordered_channels()
326 .iter()
327 .filter_map(move |id| {
328 let channel = self.channel_index.by_id().get(id)?;
329 Some((channel.parent_path.len(), channel))
330 })
331 }
332
333 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
334 let channel_id = self.channel_index.ordered_channels().get(ix)?;
335 self.channel_index.by_id().get(channel_id)
336 }
337
338 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
339 self.channel_index.by_id().values().nth(ix)
340 }
341
342 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
343 self.channel_invitations
344 .iter()
345 .any(|channel| channel.id == channel_id)
346 }
347
348 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
349 &self.channel_invitations
350 }
351
352 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
353 self.channel_index.by_id().get(&channel_id)
354 }
355
356 pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
357 let mut projects: Vec<(SharedString, ProjectId)> = self
358 .channel_states
359 .get(&channel_id)
360 .map(|state| state.projects.clone())
361 .unwrap_or_default()
362 .into_iter()
363 .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
364 .collect();
365 projects.sort();
366 projects
367 }
368
369 pub fn dev_servers_for_id(&self, channel_id: ChannelId) -> Vec<DevServer> {
370 let mut dev_servers: Vec<DevServer> = self
371 .channel_states
372 .get(&channel_id)
373 .map(|state| state.dev_servers.clone())
374 .unwrap_or_default()
375 .into_iter()
376 .flat_map(|id| self.dev_servers.get(&id).cloned())
377 .collect();
378 dev_servers.sort_by_key(|s| (s.name.clone(), s.id));
379 dev_servers
380 }
381
382 pub fn find_dev_server_by_id(&self, id: DevServerId) -> Option<&DevServer> {
383 self.dev_servers.get(&id)
384 }
385
386 pub fn find_remote_project_by_id(&self, id: RemoteProjectId) -> Option<&RemoteProject> {
387 self.remote_projects.get(&id)
388 }
389
390 pub fn remote_projects_for_id(&self, channel_id: ChannelId) -> Vec<RemoteProject> {
391 let mut remote_projects: Vec<RemoteProject> = self
392 .channel_states
393 .get(&channel_id)
394 .map(|state| state.remote_projects.clone())
395 .unwrap_or_default()
396 .into_iter()
397 .flat_map(|id| self.remote_projects.get(&id).cloned())
398 .collect();
399 remote_projects.sort_by_key(|p| (p.name.clone(), p.id));
400 remote_projects
401 }
402
403 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
404 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
405 if let OpenedModelHandle::Open(buffer) = buffer {
406 return buffer.upgrade().is_some();
407 }
408 }
409 false
410 }
411
412 pub fn open_channel_buffer(
413 &mut self,
414 channel_id: ChannelId,
415 cx: &mut ModelContext<Self>,
416 ) -> Task<Result<Model<ChannelBuffer>>> {
417 let client = self.client.clone();
418 let user_store = self.user_store.clone();
419 let channel_store = cx.handle();
420 self.open_channel_resource(
421 channel_id,
422 |this| &mut this.opened_buffers,
423 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
424 cx,
425 )
426 }
427
428 pub fn fetch_channel_messages(
429 &self,
430 message_ids: Vec<u64>,
431 cx: &mut ModelContext<Self>,
432 ) -> Task<Result<Vec<ChannelMessage>>> {
433 let request = if message_ids.is_empty() {
434 None
435 } else {
436 Some(
437 self.client
438 .request(proto::GetChannelMessagesById { message_ids }),
439 )
440 };
441 cx.spawn(|this, mut cx| async move {
442 if let Some(request) = request {
443 let response = request.await?;
444 let this = this
445 .upgrade()
446 .ok_or_else(|| anyhow!("channel store dropped"))?;
447 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
448 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
449 } else {
450 Ok(Vec::new())
451 }
452 })
453 }
454
455 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
456 self.channel_states
457 .get(&channel_id)
458 .is_some_and(|state| state.has_channel_buffer_changed())
459 }
460
461 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
462 self.channel_states
463 .get(&channel_id)
464 .is_some_and(|state| state.has_new_messages())
465 }
466
467 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
468 if let Some(state) = self.channel_states.get_mut(&channel_id) {
469 state.latest_chat_message = message_id;
470 }
471 }
472
473 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
474 self.channel_states.get(&channel_id).and_then(|state| {
475 if let Some(last_message_id) = state.latest_chat_message {
476 if state
477 .last_acknowledged_message_id()
478 .is_some_and(|id| id < last_message_id)
479 {
480 return state.last_acknowledged_message_id();
481 }
482 }
483
484 None
485 })
486 }
487
488 pub fn acknowledge_message_id(
489 &mut self,
490 channel_id: ChannelId,
491 message_id: u64,
492 cx: &mut ModelContext<Self>,
493 ) {
494 self.channel_states
495 .entry(channel_id)
496 .or_insert_with(|| Default::default())
497 .acknowledge_message_id(message_id);
498 cx.notify();
499 }
500
501 pub fn update_latest_message_id(
502 &mut self,
503 channel_id: ChannelId,
504 message_id: u64,
505 cx: &mut ModelContext<Self>,
506 ) {
507 self.channel_states
508 .entry(channel_id)
509 .or_insert_with(|| Default::default())
510 .update_latest_message_id(message_id);
511 cx.notify();
512 }
513
514 pub fn acknowledge_notes_version(
515 &mut self,
516 channel_id: ChannelId,
517 epoch: u64,
518 version: &clock::Global,
519 cx: &mut ModelContext<Self>,
520 ) {
521 self.channel_states
522 .entry(channel_id)
523 .or_insert_with(|| Default::default())
524 .acknowledge_notes_version(epoch, version);
525 cx.notify()
526 }
527
528 pub fn update_latest_notes_version(
529 &mut self,
530 channel_id: ChannelId,
531 epoch: u64,
532 version: &clock::Global,
533 cx: &mut ModelContext<Self>,
534 ) {
535 self.channel_states
536 .entry(channel_id)
537 .or_insert_with(|| Default::default())
538 .update_latest_notes_version(epoch, version);
539 cx.notify()
540 }
541
542 pub fn open_channel_chat(
543 &mut self,
544 channel_id: ChannelId,
545 cx: &mut ModelContext<Self>,
546 ) -> Task<Result<Model<ChannelChat>>> {
547 let client = self.client.clone();
548 let user_store = self.user_store.clone();
549 let this = cx.handle();
550 self.open_channel_resource(
551 channel_id,
552 |this| &mut this.opened_chats,
553 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
554 cx,
555 )
556 }
557
558 /// Asynchronously open a given resource associated with a channel.
559 ///
560 /// Make sure that the resource is only opened once, even if this method
561 /// is called multiple times with the same channel id while the first task
562 /// is still running.
563 fn open_channel_resource<T, F, Fut>(
564 &mut self,
565 channel_id: ChannelId,
566 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
567 load: F,
568 cx: &mut ModelContext<Self>,
569 ) -> Task<Result<Model<T>>>
570 where
571 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
572 Fut: Future<Output = Result<Model<T>>>,
573 T: 'static,
574 {
575 let task = loop {
576 match get_map(self).entry(channel_id) {
577 hash_map::Entry::Occupied(e) => match e.get() {
578 OpenedModelHandle::Open(model) => {
579 if let Some(model) = model.upgrade() {
580 break Task::ready(Ok(model)).shared();
581 } else {
582 get_map(self).remove(&channel_id);
583 continue;
584 }
585 }
586 OpenedModelHandle::Loading(task) => {
587 break task.clone();
588 }
589 },
590 hash_map::Entry::Vacant(e) => {
591 let task = cx
592 .spawn(move |this, mut cx| async move {
593 let channel = this.update(&mut cx, |this, _| {
594 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
595 Arc::new(anyhow!("no channel for id: {}", channel_id))
596 })
597 })??;
598
599 load(channel, cx).await.map_err(Arc::new)
600 })
601 .shared();
602
603 e.insert(OpenedModelHandle::Loading(task.clone()));
604 cx.spawn({
605 let task = task.clone();
606 move |this, mut cx| async move {
607 let result = task.await;
608 this.update(&mut cx, |this, _| match result {
609 Ok(model) => {
610 get_map(this).insert(
611 channel_id,
612 OpenedModelHandle::Open(model.downgrade()),
613 );
614 }
615 Err(_) => {
616 get_map(this).remove(&channel_id);
617 }
618 })
619 .ok();
620 }
621 })
622 .detach();
623 break task;
624 }
625 }
626 };
627 cx.background_executor()
628 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
629 }
630
631 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
632 self.channel_role(channel_id) == proto::ChannelRole::Admin
633 }
634
635 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
636 self.channel_index
637 .by_id()
638 .get(&channel_id)
639 .map_or(false, |channel| channel.is_root_channel())
640 }
641
642 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
643 self.channel_index
644 .by_id()
645 .get(&channel_id)
646 .map_or(false, |channel| {
647 channel.visibility == ChannelVisibility::Public
648 })
649 }
650
651 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
652 match self.channel_role(channel_id) {
653 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
654 _ => Capability::ReadOnly,
655 }
656 }
657
658 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
659 maybe!({
660 let mut channel = self.channel_for_id(channel_id)?;
661 if !channel.is_root_channel() {
662 channel = self.channel_for_id(channel.root_id())?;
663 }
664 let root_channel_state = self.channel_states.get(&channel.id);
665 root_channel_state?.role
666 })
667 .unwrap_or(proto::ChannelRole::Guest)
668 }
669
670 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
671 self.channel_participants
672 .get(&channel_id)
673 .map_or(&[], |v| v.as_slice())
674 }
675
676 pub fn create_channel(
677 &self,
678 name: &str,
679 parent_id: Option<ChannelId>,
680 cx: &mut ModelContext<Self>,
681 ) -> Task<Result<ChannelId>> {
682 let client = self.client.clone();
683 let name = name.trim_start_matches('#').to_owned();
684 cx.spawn(move |this, mut cx| async move {
685 let response = client
686 .request(proto::CreateChannel {
687 name,
688 parent_id: parent_id.map(|cid| cid.0),
689 })
690 .await?;
691
692 let channel = response
693 .channel
694 .ok_or_else(|| anyhow!("missing channel in response"))?;
695 let channel_id = ChannelId(channel.id);
696
697 this.update(&mut cx, |this, cx| {
698 let task = this.update_channels(
699 proto::UpdateChannels {
700 channels: vec![channel],
701 ..Default::default()
702 },
703 cx,
704 );
705 assert!(task.is_none());
706
707 // This event is emitted because the collab panel wants to clear the pending edit state
708 // before this frame is rendered. But we can't guarantee that the collab panel's future
709 // will resolve before this flush_effects finishes. Synchronously emitting this event
710 // ensures that the collab panel will observe this creation before the frame completes
711 cx.emit(ChannelEvent::ChannelCreated(channel_id));
712 })?;
713
714 Ok(channel_id)
715 })
716 }
717
718 pub fn move_channel(
719 &mut self,
720 channel_id: ChannelId,
721 to: ChannelId,
722 cx: &mut ModelContext<Self>,
723 ) -> Task<Result<()>> {
724 let client = self.client.clone();
725 cx.spawn(move |_, _| async move {
726 let _ = client
727 .request(proto::MoveChannel {
728 channel_id: channel_id.0,
729 to: to.0,
730 })
731 .await?;
732
733 Ok(())
734 })
735 }
736
737 pub fn set_channel_visibility(
738 &mut self,
739 channel_id: ChannelId,
740 visibility: ChannelVisibility,
741 cx: &mut ModelContext<Self>,
742 ) -> Task<Result<()>> {
743 let client = self.client.clone();
744 cx.spawn(move |_, _| async move {
745 let _ = client
746 .request(proto::SetChannelVisibility {
747 channel_id: channel_id.0,
748 visibility: visibility.into(),
749 })
750 .await?;
751
752 Ok(())
753 })
754 }
755
756 pub fn invite_member(
757 &mut self,
758 channel_id: ChannelId,
759 user_id: UserId,
760 role: proto::ChannelRole,
761 cx: &mut ModelContext<Self>,
762 ) -> Task<Result<()>> {
763 if !self.outgoing_invites.insert((channel_id, user_id)) {
764 return Task::ready(Err(anyhow!("invite request already in progress")));
765 }
766
767 cx.notify();
768 let client = self.client.clone();
769 cx.spawn(move |this, mut cx| async move {
770 let result = client
771 .request(proto::InviteChannelMember {
772 channel_id: channel_id.0,
773 user_id,
774 role: role.into(),
775 })
776 .await;
777
778 this.update(&mut cx, |this, cx| {
779 this.outgoing_invites.remove(&(channel_id, user_id));
780 cx.notify();
781 })?;
782
783 result?;
784
785 Ok(())
786 })
787 }
788
789 pub fn remove_member(
790 &mut self,
791 channel_id: ChannelId,
792 user_id: u64,
793 cx: &mut ModelContext<Self>,
794 ) -> Task<Result<()>> {
795 if !self.outgoing_invites.insert((channel_id, user_id)) {
796 return Task::ready(Err(anyhow!("invite request already in progress")));
797 }
798
799 cx.notify();
800 let client = self.client.clone();
801 cx.spawn(move |this, mut cx| async move {
802 let result = client
803 .request(proto::RemoveChannelMember {
804 channel_id: channel_id.0,
805 user_id,
806 })
807 .await;
808
809 this.update(&mut cx, |this, cx| {
810 this.outgoing_invites.remove(&(channel_id, user_id));
811 cx.notify();
812 })?;
813 result?;
814 Ok(())
815 })
816 }
817
818 pub fn set_member_role(
819 &mut self,
820 channel_id: ChannelId,
821 user_id: UserId,
822 role: proto::ChannelRole,
823 cx: &mut ModelContext<Self>,
824 ) -> Task<Result<()>> {
825 if !self.outgoing_invites.insert((channel_id, user_id)) {
826 return Task::ready(Err(anyhow!("member request already in progress")));
827 }
828
829 cx.notify();
830 let client = self.client.clone();
831 cx.spawn(move |this, mut cx| async move {
832 let result = client
833 .request(proto::SetChannelMemberRole {
834 channel_id: channel_id.0,
835 user_id,
836 role: role.into(),
837 })
838 .await;
839
840 this.update(&mut cx, |this, cx| {
841 this.outgoing_invites.remove(&(channel_id, user_id));
842 cx.notify();
843 })?;
844
845 result?;
846 Ok(())
847 })
848 }
849
850 pub fn rename(
851 &mut self,
852 channel_id: ChannelId,
853 new_name: &str,
854 cx: &mut ModelContext<Self>,
855 ) -> Task<Result<()>> {
856 let client = self.client.clone();
857 let name = new_name.to_string();
858 cx.spawn(move |this, mut cx| async move {
859 let channel = client
860 .request(proto::RenameChannel {
861 channel_id: channel_id.0,
862 name,
863 })
864 .await?
865 .channel
866 .ok_or_else(|| anyhow!("missing channel in response"))?;
867 this.update(&mut cx, |this, cx| {
868 let task = this.update_channels(
869 proto::UpdateChannels {
870 channels: vec![channel],
871 ..Default::default()
872 },
873 cx,
874 );
875 assert!(task.is_none());
876
877 // This event is emitted because the collab panel wants to clear the pending edit state
878 // before this frame is rendered. But we can't guarantee that the collab panel's future
879 // will resolve before this flush_effects finishes. Synchronously emitting this event
880 // ensures that the collab panel will observe this creation before the frame complete
881 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
882 })?;
883 Ok(())
884 })
885 }
886
887 pub fn respond_to_channel_invite(
888 &mut self,
889 channel_id: ChannelId,
890 accept: bool,
891 cx: &mut ModelContext<Self>,
892 ) -> Task<Result<()>> {
893 let client = self.client.clone();
894 cx.background_executor().spawn(async move {
895 client
896 .request(proto::RespondToChannelInvite {
897 channel_id: channel_id.0,
898 accept,
899 })
900 .await?;
901 Ok(())
902 })
903 }
904
905 pub fn create_remote_project(
906 &mut self,
907 channel_id: ChannelId,
908 dev_server_id: DevServerId,
909 name: String,
910 path: String,
911 cx: &mut ModelContext<Self>,
912 ) -> Task<Result<proto::CreateRemoteProjectResponse>> {
913 let client = self.client.clone();
914 cx.background_executor().spawn(async move {
915 client
916 .request(proto::CreateRemoteProject {
917 channel_id: channel_id.0,
918 dev_server_id: dev_server_id.0,
919 name,
920 path,
921 })
922 .await
923 })
924 }
925
926 pub fn create_dev_server(
927 &mut self,
928 channel_id: ChannelId,
929 name: String,
930 cx: &mut ModelContext<Self>,
931 ) -> Task<Result<proto::CreateDevServerResponse>> {
932 let client = self.client.clone();
933 cx.background_executor().spawn(async move {
934 let result = client
935 .request(proto::CreateDevServer {
936 channel_id: channel_id.0,
937 name,
938 })
939 .await?;
940 Ok(result)
941 })
942 }
943
944 pub fn get_channel_member_details(
945 &self,
946 channel_id: ChannelId,
947 cx: &mut ModelContext<Self>,
948 ) -> Task<Result<Vec<ChannelMembership>>> {
949 let client = self.client.clone();
950 let user_store = self.user_store.downgrade();
951 cx.spawn(move |_, mut cx| async move {
952 let response = client
953 .request(proto::GetChannelMembers {
954 channel_id: channel_id.0,
955 })
956 .await?;
957
958 let user_ids = response.members.iter().map(|m| m.user_id).collect();
959 let user_store = user_store
960 .upgrade()
961 .ok_or_else(|| anyhow!("user store dropped"))?;
962 let users = user_store
963 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
964 .await?;
965
966 Ok(users
967 .into_iter()
968 .zip(response.members)
969 .map(|(user, member)| ChannelMembership {
970 user,
971 role: member.role(),
972 kind: member.kind(),
973 })
974 .collect())
975 })
976 }
977
978 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
979 let client = self.client.clone();
980 async move {
981 client
982 .request(proto::DeleteChannel {
983 channel_id: channel_id.0,
984 })
985 .await?;
986 Ok(())
987 }
988 }
989
990 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
991 false
992 }
993
994 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
995 self.outgoing_invites.contains(&(channel_id, user_id))
996 }
997
998 async fn handle_update_channels(
999 this: Model<Self>,
1000 message: TypedEnvelope<proto::UpdateChannels>,
1001 _: Arc<Client>,
1002 mut cx: AsyncAppContext,
1003 ) -> Result<()> {
1004 this.update(&mut cx, |this, _| {
1005 this.update_channels_tx
1006 .unbounded_send(message.payload)
1007 .unwrap();
1008 })?;
1009 Ok(())
1010 }
1011
1012 async fn handle_update_user_channels(
1013 this: Model<Self>,
1014 message: TypedEnvelope<proto::UpdateUserChannels>,
1015 _: Arc<Client>,
1016 mut cx: AsyncAppContext,
1017 ) -> Result<()> {
1018 this.update(&mut cx, |this, cx| {
1019 for buffer_version in message.payload.observed_channel_buffer_version {
1020 let version = language::proto::deserialize_version(&buffer_version.version);
1021 this.acknowledge_notes_version(
1022 ChannelId(buffer_version.channel_id),
1023 buffer_version.epoch,
1024 &version,
1025 cx,
1026 );
1027 }
1028 for message_id in message.payload.observed_channel_message_id {
1029 this.acknowledge_message_id(
1030 ChannelId(message_id.channel_id),
1031 message_id.message_id,
1032 cx,
1033 );
1034 }
1035 for membership in message.payload.channel_memberships {
1036 if let Some(role) = ChannelRole::from_i32(membership.role) {
1037 this.channel_states
1038 .entry(ChannelId(membership.channel_id))
1039 .or_insert_with(|| ChannelState::default())
1040 .set_role(role)
1041 }
1042 }
1043 })
1044 }
1045
1046 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1047 self.channel_index.clear();
1048 self.channel_invitations.clear();
1049 self.channel_participants.clear();
1050 self.channel_index.clear();
1051 self.outgoing_invites.clear();
1052 self.disconnect_channel_buffers_task.take();
1053
1054 for chat in self.opened_chats.values() {
1055 if let OpenedModelHandle::Open(chat) = chat {
1056 if let Some(chat) = chat.upgrade() {
1057 chat.update(cx, |chat, cx| {
1058 chat.rejoin(cx);
1059 });
1060 }
1061 }
1062 }
1063
1064 let mut buffer_versions = Vec::new();
1065 for buffer in self.opened_buffers.values() {
1066 if let OpenedModelHandle::Open(buffer) = buffer {
1067 if let Some(buffer) = buffer.upgrade() {
1068 let channel_buffer = buffer.read(cx);
1069 let buffer = channel_buffer.buffer().read(cx);
1070 buffer_versions.push(proto::ChannelBufferVersion {
1071 channel_id: channel_buffer.channel_id.0,
1072 epoch: channel_buffer.epoch(),
1073 version: language::proto::serialize_version(&buffer.version()),
1074 });
1075 }
1076 }
1077 }
1078
1079 if buffer_versions.is_empty() {
1080 return Task::ready(Ok(()));
1081 }
1082
1083 let response = self.client.request(proto::RejoinChannelBuffers {
1084 buffers: buffer_versions,
1085 });
1086
1087 cx.spawn(|this, mut cx| async move {
1088 let mut response = response.await?;
1089
1090 this.update(&mut cx, |this, cx| {
1091 this.opened_buffers.retain(|_, buffer| match buffer {
1092 OpenedModelHandle::Open(channel_buffer) => {
1093 let Some(channel_buffer) = channel_buffer.upgrade() else {
1094 return false;
1095 };
1096
1097 channel_buffer.update(cx, |channel_buffer, cx| {
1098 let channel_id = channel_buffer.channel_id;
1099 if let Some(remote_buffer) = response
1100 .buffers
1101 .iter_mut()
1102 .find(|buffer| buffer.channel_id == channel_id.0)
1103 {
1104 let channel_id = channel_buffer.channel_id;
1105 let remote_version =
1106 language::proto::deserialize_version(&remote_buffer.version);
1107
1108 channel_buffer.replace_collaborators(
1109 mem::take(&mut remote_buffer.collaborators),
1110 cx,
1111 );
1112
1113 let operations = channel_buffer
1114 .buffer()
1115 .update(cx, |buffer, cx| {
1116 let outgoing_operations =
1117 buffer.serialize_ops(Some(remote_version), cx);
1118 let incoming_operations =
1119 mem::take(&mut remote_buffer.operations)
1120 .into_iter()
1121 .map(language::proto::deserialize_operation)
1122 .collect::<Result<Vec<_>>>()?;
1123 buffer.apply_ops(incoming_operations, cx)?;
1124 anyhow::Ok(outgoing_operations)
1125 })
1126 .log_err();
1127
1128 if let Some(operations) = operations {
1129 let client = this.client.clone();
1130 cx.background_executor()
1131 .spawn(async move {
1132 let operations = operations.await;
1133 for chunk in
1134 language::proto::split_operations(operations)
1135 {
1136 client
1137 .send(proto::UpdateChannelBuffer {
1138 channel_id: channel_id.0,
1139 operations: chunk,
1140 })
1141 .ok();
1142 }
1143 })
1144 .detach();
1145 return true;
1146 }
1147 }
1148
1149 channel_buffer.disconnect(cx);
1150 false
1151 })
1152 }
1153 OpenedModelHandle::Loading(_) => true,
1154 });
1155 })
1156 .ok();
1157 anyhow::Ok(())
1158 })
1159 }
1160
1161 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1162 cx.notify();
1163
1164 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1165 cx.spawn(move |this, mut cx| async move {
1166 if wait_for_reconnect {
1167 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1168 }
1169
1170 if let Some(this) = this.upgrade() {
1171 this.update(&mut cx, |this, cx| {
1172 for (_, buffer) in this.opened_buffers.drain() {
1173 if let OpenedModelHandle::Open(buffer) = buffer {
1174 if let Some(buffer) = buffer.upgrade() {
1175 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1176 }
1177 }
1178 }
1179 })
1180 .ok();
1181 }
1182 })
1183 });
1184 }
1185
1186 pub(crate) fn update_channels(
1187 &mut self,
1188 payload: proto::UpdateChannels,
1189 cx: &mut ModelContext<ChannelStore>,
1190 ) -> Option<Task<Result<()>>> {
1191 if !payload.remove_channel_invitations.is_empty() {
1192 self.channel_invitations
1193 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1194 }
1195 for channel in payload.channel_invitations {
1196 match self
1197 .channel_invitations
1198 .binary_search_by_key(&channel.id, |c| c.id.0)
1199 {
1200 Ok(ix) => {
1201 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1202 }
1203 Err(ix) => self.channel_invitations.insert(
1204 ix,
1205 Arc::new(Channel {
1206 id: ChannelId(channel.id),
1207 visibility: channel.visibility(),
1208 name: channel.name.into(),
1209 parent_path: channel
1210 .parent_path
1211 .into_iter()
1212 .map(|cid| ChannelId(cid))
1213 .collect(),
1214 }),
1215 ),
1216 }
1217 }
1218
1219 let channels_changed = !payload.channels.is_empty()
1220 || !payload.delete_channels.is_empty()
1221 || !payload.latest_channel_message_ids.is_empty()
1222 || !payload.latest_channel_buffer_versions.is_empty()
1223 || !payload.hosted_projects.is_empty()
1224 || !payload.deleted_hosted_projects.is_empty()
1225 || !payload.dev_servers.is_empty()
1226 || !payload.deleted_dev_servers.is_empty()
1227 || !payload.remote_projects.is_empty()
1228 || !payload.deleted_remote_projects.is_empty();
1229
1230 if channels_changed {
1231 if !payload.delete_channels.is_empty() {
1232 let delete_channels: Vec<ChannelId> = payload
1233 .delete_channels
1234 .into_iter()
1235 .map(|cid| ChannelId(cid))
1236 .collect();
1237 self.channel_index.delete_channels(&delete_channels);
1238 self.channel_participants
1239 .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1240
1241 for channel_id in &delete_channels {
1242 let channel_id = *channel_id;
1243 if payload
1244 .channels
1245 .iter()
1246 .any(|channel| channel.id == channel_id.0)
1247 {
1248 continue;
1249 }
1250 if let Some(OpenedModelHandle::Open(buffer)) =
1251 self.opened_buffers.remove(&channel_id)
1252 {
1253 if let Some(buffer) = buffer.upgrade() {
1254 buffer.update(cx, ChannelBuffer::disconnect);
1255 }
1256 }
1257 }
1258 }
1259
1260 let mut index = self.channel_index.bulk_insert();
1261 for channel in payload.channels {
1262 let id = ChannelId(channel.id);
1263 let channel_changed = index.insert(channel);
1264
1265 if channel_changed {
1266 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1267 if let Some(buffer) = buffer.upgrade() {
1268 buffer.update(cx, ChannelBuffer::channel_changed);
1269 }
1270 }
1271 }
1272 }
1273
1274 for latest_buffer_version in payload.latest_channel_buffer_versions {
1275 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1276 self.channel_states
1277 .entry(ChannelId(latest_buffer_version.channel_id))
1278 .or_default()
1279 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1280 }
1281
1282 for latest_channel_message in payload.latest_channel_message_ids {
1283 self.channel_states
1284 .entry(ChannelId(latest_channel_message.channel_id))
1285 .or_default()
1286 .update_latest_message_id(latest_channel_message.message_id);
1287 }
1288
1289 for hosted_project in payload.hosted_projects {
1290 let hosted_project: HostedProject = hosted_project.into();
1291 if let Some(old_project) = self
1292 .hosted_projects
1293 .insert(hosted_project.project_id, hosted_project.clone())
1294 {
1295 self.channel_states
1296 .entry(old_project.channel_id)
1297 .or_default()
1298 .remove_hosted_project(old_project.project_id);
1299 }
1300 self.channel_states
1301 .entry(hosted_project.channel_id)
1302 .or_default()
1303 .add_hosted_project(hosted_project.project_id);
1304 }
1305
1306 for hosted_project_id in payload.deleted_hosted_projects {
1307 let hosted_project_id = ProjectId(hosted_project_id);
1308
1309 if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1310 self.channel_states
1311 .entry(old_project.channel_id)
1312 .or_default()
1313 .remove_hosted_project(old_project.project_id);
1314 }
1315 }
1316
1317 for remote_project in payload.remote_projects {
1318 let remote_project: RemoteProject = remote_project.into();
1319 if let Some(old_remote_project) = self
1320 .remote_projects
1321 .insert(remote_project.id, remote_project.clone())
1322 {
1323 self.channel_states
1324 .entry(old_remote_project.channel_id)
1325 .or_default()
1326 .remove_remote_project(old_remote_project.id);
1327 }
1328 self.channel_states
1329 .entry(remote_project.channel_id)
1330 .or_default()
1331 .add_remote_project(remote_project.id);
1332 }
1333
1334 for remote_project_id in payload.deleted_remote_projects {
1335 let remote_project_id = RemoteProjectId(remote_project_id);
1336
1337 if let Some(old_project) = self.remote_projects.remove(&remote_project_id) {
1338 self.channel_states
1339 .entry(old_project.channel_id)
1340 .or_default()
1341 .remove_remote_project(old_project.id);
1342 }
1343 }
1344
1345 for dev_server in payload.dev_servers {
1346 let dev_server: DevServer = dev_server.into();
1347 if let Some(old_server) = self.dev_servers.insert(dev_server.id, dev_server.clone())
1348 {
1349 self.channel_states
1350 .entry(old_server.channel_id)
1351 .or_default()
1352 .remove_dev_server(old_server.id);
1353 }
1354 self.channel_states
1355 .entry(dev_server.channel_id)
1356 .or_default()
1357 .add_dev_server(dev_server.id);
1358 }
1359
1360 for dev_server_id in payload.deleted_dev_servers {
1361 let dev_server_id = DevServerId(dev_server_id);
1362
1363 if let Some(old_server) = self.dev_servers.remove(&dev_server_id) {
1364 self.channel_states
1365 .entry(old_server.channel_id)
1366 .or_default()
1367 .remove_dev_server(old_server.id);
1368 }
1369 }
1370 }
1371
1372 cx.notify();
1373 if payload.channel_participants.is_empty() {
1374 return None;
1375 }
1376
1377 let mut all_user_ids = Vec::new();
1378 let channel_participants = payload.channel_participants;
1379 for entry in &channel_participants {
1380 for user_id in entry.participant_user_ids.iter() {
1381 if let Err(ix) = all_user_ids.binary_search(user_id) {
1382 all_user_ids.insert(ix, *user_id);
1383 }
1384 }
1385 }
1386
1387 let users = self
1388 .user_store
1389 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1390 Some(cx.spawn(|this, mut cx| async move {
1391 let users = users.await?;
1392
1393 this.update(&mut cx, |this, cx| {
1394 for entry in &channel_participants {
1395 let mut participants: Vec<_> = entry
1396 .participant_user_ids
1397 .iter()
1398 .filter_map(|user_id| {
1399 users
1400 .binary_search_by_key(&user_id, |user| &user.id)
1401 .ok()
1402 .map(|ix| users[ix].clone())
1403 })
1404 .collect();
1405
1406 participants.sort_by_key(|u| u.id);
1407
1408 this.channel_participants
1409 .insert(ChannelId(entry.channel_id), participants);
1410 }
1411
1412 cx.notify();
1413 })
1414 }))
1415 }
1416}
1417
1418impl ChannelState {
1419 fn set_role(&mut self, role: ChannelRole) {
1420 self.role = Some(role);
1421 }
1422
1423 fn has_channel_buffer_changed(&self) -> bool {
1424 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1425 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1426 && self
1427 .latest_notes_version
1428 .version
1429 .changed_since(&self.observed_notes_version.version))
1430 }
1431
1432 fn has_new_messages(&self) -> bool {
1433 let latest_message_id = self.latest_chat_message;
1434 let observed_message_id = self.observed_chat_message;
1435
1436 latest_message_id.is_some_and(|latest_message_id| {
1437 latest_message_id > observed_message_id.unwrap_or_default()
1438 })
1439 }
1440
1441 fn last_acknowledged_message_id(&self) -> Option<u64> {
1442 self.observed_chat_message
1443 }
1444
1445 fn acknowledge_message_id(&mut self, message_id: u64) {
1446 let observed = self.observed_chat_message.get_or_insert(message_id);
1447 *observed = (*observed).max(message_id);
1448 }
1449
1450 fn update_latest_message_id(&mut self, message_id: u64) {
1451 self.latest_chat_message =
1452 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1453 }
1454
1455 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1456 if self.observed_notes_version.epoch == epoch {
1457 self.observed_notes_version.version.join(version);
1458 } else {
1459 self.observed_notes_version = NotesVersion {
1460 epoch,
1461 version: version.clone(),
1462 };
1463 }
1464 }
1465
1466 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1467 if self.latest_notes_version.epoch == epoch {
1468 self.latest_notes_version.version.join(version);
1469 } else {
1470 self.latest_notes_version = NotesVersion {
1471 epoch,
1472 version: version.clone(),
1473 };
1474 }
1475 }
1476
1477 fn add_hosted_project(&mut self, project_id: ProjectId) {
1478 self.projects.insert(project_id);
1479 }
1480
1481 fn remove_hosted_project(&mut self, project_id: ProjectId) {
1482 self.projects.remove(&project_id);
1483 }
1484
1485 fn add_remote_project(&mut self, remote_project_id: RemoteProjectId) {
1486 self.remote_projects.insert(remote_project_id);
1487 }
1488
1489 fn remove_remote_project(&mut self, remote_project_id: RemoteProjectId) {
1490 self.remote_projects.remove(&remote_project_id);
1491 }
1492
1493 fn add_dev_server(&mut self, dev_server_id: DevServerId) {
1494 self.dev_servers.insert(dev_server_id);
1495 }
1496
1497 fn remove_dev_server(&mut self, dev_server_id: DevServerId) {
1498 self.dev_servers.remove(&dev_server_id);
1499 }
1500}