1use crate::channel_buffer::ChannelBuffer;
2use anyhow::{anyhow, Result};
3use client::{Client, Status, Subscription, User, UserId, UserStore};
4use collections::{hash_map, HashMap, HashSet};
5use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
6use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
7use rpc::{proto, TypedEnvelope};
8use std::sync::Arc;
9use util::ResultExt;
10
11pub type ChannelId = u64;
12
13pub struct ChannelStore {
14 channels_by_id: HashMap<ChannelId, Arc<Channel>>,
15 channel_paths: Vec<Vec<ChannelId>>,
16 channel_invitations: Vec<Arc<Channel>>,
17 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
18 channels_with_admin_privileges: HashSet<ChannelId>,
19 outgoing_invites: HashSet<(ChannelId, UserId)>,
20 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
21 opened_buffers: HashMap<ChannelId, OpenedChannelBuffer>,
22 client: Arc<Client>,
23 user_store: ModelHandle<UserStore>,
24 _rpc_subscription: Subscription,
25 _watch_connection_status: Task<()>,
26 _update_channels: Task<()>,
27}
28
29#[derive(Clone, Debug, PartialEq)]
30pub struct Channel {
31 pub id: ChannelId,
32 pub name: String,
33}
34
35pub struct ChannelMembership {
36 pub user: Arc<User>,
37 pub kind: proto::channel_member::Kind,
38 pub admin: bool,
39}
40
41pub enum ChannelEvent {
42 ChannelCreated(ChannelId),
43 ChannelRenamed(ChannelId),
44}
45
46impl Entity for ChannelStore {
47 type Event = ChannelEvent;
48}
49
50pub enum ChannelMemberStatus {
51 Invited,
52 Member,
53 NotMember,
54}
55
56enum OpenedChannelBuffer {
57 Open(WeakModelHandle<ChannelBuffer>),
58 Loading(Shared<Task<Result<ModelHandle<ChannelBuffer>, Arc<anyhow::Error>>>>),
59}
60
61impl ChannelStore {
62 pub fn new(
63 client: Arc<Client>,
64 user_store: ModelHandle<UserStore>,
65 cx: &mut ModelContext<Self>,
66 ) -> Self {
67 let rpc_subscription =
68 client.add_message_handler(cx.handle(), Self::handle_update_channels);
69
70 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
71 let mut connection_status = client.status();
72 let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
73 while let Some(status) = connection_status.next().await {
74 if !status.is_connected() {
75 if let Some(this) = this.upgrade(&cx) {
76 this.update(&mut cx, |this, cx| {
77 if matches!(status, Status::ConnectionLost | Status::SignedOut) {
78 this.handle_disconnect(cx);
79 } else {
80 this.disconnect_buffers(cx);
81 }
82 });
83 } else {
84 break;
85 }
86 }
87 }
88 });
89
90 Self {
91 channels_by_id: HashMap::default(),
92 channel_invitations: Vec::default(),
93 channel_paths: Vec::default(),
94 channel_participants: Default::default(),
95 channels_with_admin_privileges: Default::default(),
96 outgoing_invites: Default::default(),
97 opened_buffers: Default::default(),
98 update_channels_tx,
99 client,
100 user_store,
101 _rpc_subscription: rpc_subscription,
102 _watch_connection_status: watch_connection_status,
103 _update_channels: cx.spawn_weak(|this, mut cx| async move {
104 while let Some(update_channels) = update_channels_rx.next().await {
105 if let Some(this) = this.upgrade(&cx) {
106 let update_task = this.update(&mut cx, |this, cx| {
107 this.update_channels(update_channels, cx)
108 });
109 if let Some(update_task) = update_task {
110 update_task.await.log_err();
111 }
112 }
113 }
114 }),
115 }
116 }
117
118 pub fn has_children(&self, channel_id: ChannelId) -> bool {
119 self.channel_paths.iter().any(|path| {
120 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
121 path.len() > ix + 1
122 } else {
123 false
124 }
125 })
126 }
127
128 pub fn channel_count(&self) -> usize {
129 self.channel_paths.len()
130 }
131
132 pub fn channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
133 self.channel_paths.iter().map(move |path| {
134 let id = path.last().unwrap();
135 let channel = self.channel_for_id(*id).unwrap();
136 (path.len() - 1, channel)
137 })
138 }
139
140 pub fn channel_at_index(&self, ix: usize) -> Option<(usize, &Arc<Channel>)> {
141 let path = self.channel_paths.get(ix)?;
142 let id = path.last().unwrap();
143 let channel = self.channel_for_id(*id).unwrap();
144 Some((path.len() - 1, channel))
145 }
146
147 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
148 &self.channel_invitations
149 }
150
151 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
152 self.channels_by_id.get(&channel_id)
153 }
154
155 pub fn open_channel_buffer(
156 &mut self,
157 channel_id: ChannelId,
158 cx: &mut ModelContext<Self>,
159 ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
160 // Make sure that a given channel buffer is only opened once per
161 // app instance, even if this method is called multiple times
162 // with the same channel id while the first task is still running.
163 let task = loop {
164 match self.opened_buffers.entry(channel_id) {
165 hash_map::Entry::Occupied(e) => match e.get() {
166 OpenedChannelBuffer::Open(buffer) => {
167 if let Some(buffer) = buffer.upgrade(cx) {
168 break Task::ready(Ok(buffer)).shared();
169 } else {
170 self.opened_buffers.remove(&channel_id);
171 continue;
172 }
173 }
174 OpenedChannelBuffer::Loading(task) => break task.clone(),
175 },
176 hash_map::Entry::Vacant(e) => {
177 let client = self.client.clone();
178 let task = cx
179 .spawn(|this, cx| async move {
180 let channel = this.read_with(&cx, |this, _| {
181 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
182 Arc::new(anyhow!("no channel for id: {}", channel_id))
183 })
184 })?;
185
186 ChannelBuffer::new(channel, client, cx)
187 .await
188 .map_err(Arc::new)
189 })
190 .shared();
191 e.insert(OpenedChannelBuffer::Loading(task.clone()));
192 cx.spawn({
193 let task = task.clone();
194 |this, mut cx| async move {
195 let result = task.await;
196 this.update(&mut cx, |this, cx| match result {
197 Ok(buffer) => {
198 cx.observe_release(&buffer, move |this, _, _| {
199 this.opened_buffers.remove(&channel_id);
200 })
201 .detach();
202 this.opened_buffers.insert(
203 channel_id,
204 OpenedChannelBuffer::Open(buffer.downgrade()),
205 );
206 }
207 Err(error) => {
208 log::error!("failed to open channel buffer {error:?}");
209 this.opened_buffers.remove(&channel_id);
210 }
211 });
212 }
213 })
214 .detach();
215 break task;
216 }
217 }
218 };
219 cx.foreground()
220 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
221 }
222
223 pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
224 self.channel_paths.iter().any(|path| {
225 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
226 path[..=ix]
227 .iter()
228 .any(|id| self.channels_with_admin_privileges.contains(id))
229 } else {
230 false
231 }
232 })
233 }
234
235 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
236 self.channel_participants
237 .get(&channel_id)
238 .map_or(&[], |v| v.as_slice())
239 }
240
241 pub fn create_channel(
242 &self,
243 name: &str,
244 parent_id: Option<ChannelId>,
245 cx: &mut ModelContext<Self>,
246 ) -> Task<Result<ChannelId>> {
247 let client = self.client.clone();
248 let name = name.trim_start_matches("#").to_owned();
249 cx.spawn(|this, mut cx| async move {
250 let channel = client
251 .request(proto::CreateChannel { name, parent_id })
252 .await?
253 .channel
254 .ok_or_else(|| anyhow!("missing channel in response"))?;
255
256 let channel_id = channel.id;
257
258 this.update(&mut cx, |this, cx| {
259 let task = this.update_channels(
260 proto::UpdateChannels {
261 channels: vec![channel],
262 ..Default::default()
263 },
264 cx,
265 );
266 assert!(task.is_none());
267
268 // This event is emitted because the collab panel wants to clear the pending edit state
269 // before this frame is rendered. But we can't guarantee that the collab panel's future
270 // will resolve before this flush_effects finishes. Synchronously emitting this event
271 // ensures that the collab panel will observe this creation before the frame completes
272 cx.emit(ChannelEvent::ChannelCreated(channel_id));
273 });
274
275 Ok(channel_id)
276 })
277 }
278
279 pub fn invite_member(
280 &mut self,
281 channel_id: ChannelId,
282 user_id: UserId,
283 admin: bool,
284 cx: &mut ModelContext<Self>,
285 ) -> Task<Result<()>> {
286 if !self.outgoing_invites.insert((channel_id, user_id)) {
287 return Task::ready(Err(anyhow!("invite request already in progress")));
288 }
289
290 cx.notify();
291 let client = self.client.clone();
292 cx.spawn(|this, mut cx| async move {
293 let result = client
294 .request(proto::InviteChannelMember {
295 channel_id,
296 user_id,
297 admin,
298 })
299 .await;
300
301 this.update(&mut cx, |this, cx| {
302 this.outgoing_invites.remove(&(channel_id, user_id));
303 cx.notify();
304 });
305
306 result?;
307
308 Ok(())
309 })
310 }
311
312 pub fn remove_member(
313 &mut self,
314 channel_id: ChannelId,
315 user_id: u64,
316 cx: &mut ModelContext<Self>,
317 ) -> Task<Result<()>> {
318 if !self.outgoing_invites.insert((channel_id, user_id)) {
319 return Task::ready(Err(anyhow!("invite request already in progress")));
320 }
321
322 cx.notify();
323 let client = self.client.clone();
324 cx.spawn(|this, mut cx| async move {
325 let result = client
326 .request(proto::RemoveChannelMember {
327 channel_id,
328 user_id,
329 })
330 .await;
331
332 this.update(&mut cx, |this, cx| {
333 this.outgoing_invites.remove(&(channel_id, user_id));
334 cx.notify();
335 });
336 result?;
337 Ok(())
338 })
339 }
340
341 pub fn set_member_admin(
342 &mut self,
343 channel_id: ChannelId,
344 user_id: UserId,
345 admin: bool,
346 cx: &mut ModelContext<Self>,
347 ) -> Task<Result<()>> {
348 if !self.outgoing_invites.insert((channel_id, user_id)) {
349 return Task::ready(Err(anyhow!("member request already in progress")));
350 }
351
352 cx.notify();
353 let client = self.client.clone();
354 cx.spawn(|this, mut cx| async move {
355 let result = client
356 .request(proto::SetChannelMemberAdmin {
357 channel_id,
358 user_id,
359 admin,
360 })
361 .await;
362
363 this.update(&mut cx, |this, cx| {
364 this.outgoing_invites.remove(&(channel_id, user_id));
365 cx.notify();
366 });
367
368 result?;
369 Ok(())
370 })
371 }
372
373 pub fn rename(
374 &mut self,
375 channel_id: ChannelId,
376 new_name: &str,
377 cx: &mut ModelContext<Self>,
378 ) -> Task<Result<()>> {
379 let client = self.client.clone();
380 let name = new_name.to_string();
381 cx.spawn(|this, mut cx| async move {
382 let channel = client
383 .request(proto::RenameChannel { channel_id, name })
384 .await?
385 .channel
386 .ok_or_else(|| anyhow!("missing channel in response"))?;
387 this.update(&mut cx, |this, cx| {
388 let task = this.update_channels(
389 proto::UpdateChannels {
390 channels: vec![channel],
391 ..Default::default()
392 },
393 cx,
394 );
395 assert!(task.is_none());
396
397 // This event is emitted because the collab panel wants to clear the pending edit state
398 // before this frame is rendered. But we can't guarantee that the collab panel's future
399 // will resolve before this flush_effects finishes. Synchronously emitting this event
400 // ensures that the collab panel will observe this creation before the frame complete
401 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
402 });
403 Ok(())
404 })
405 }
406
407 pub fn respond_to_channel_invite(
408 &mut self,
409 channel_id: ChannelId,
410 accept: bool,
411 ) -> impl Future<Output = Result<()>> {
412 let client = self.client.clone();
413 async move {
414 client
415 .request(proto::RespondToChannelInvite { channel_id, accept })
416 .await?;
417 Ok(())
418 }
419 }
420
421 pub fn get_channel_member_details(
422 &self,
423 channel_id: ChannelId,
424 cx: &mut ModelContext<Self>,
425 ) -> Task<Result<Vec<ChannelMembership>>> {
426 let client = self.client.clone();
427 let user_store = self.user_store.downgrade();
428 cx.spawn(|_, mut cx| async move {
429 let response = client
430 .request(proto::GetChannelMembers { channel_id })
431 .await?;
432
433 let user_ids = response.members.iter().map(|m| m.user_id).collect();
434 let user_store = user_store
435 .upgrade(&cx)
436 .ok_or_else(|| anyhow!("user store dropped"))?;
437 let users = user_store
438 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
439 .await?;
440
441 Ok(users
442 .into_iter()
443 .zip(response.members)
444 .filter_map(|(user, member)| {
445 Some(ChannelMembership {
446 user,
447 admin: member.admin,
448 kind: proto::channel_member::Kind::from_i32(member.kind)?,
449 })
450 })
451 .collect())
452 })
453 }
454
455 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
456 let client = self.client.clone();
457 async move {
458 client.request(proto::RemoveChannel { channel_id }).await?;
459 Ok(())
460 }
461 }
462
463 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
464 false
465 }
466
467 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
468 self.outgoing_invites.contains(&(channel_id, user_id))
469 }
470
471 async fn handle_update_channels(
472 this: ModelHandle<Self>,
473 message: TypedEnvelope<proto::UpdateChannels>,
474 _: Arc<Client>,
475 mut cx: AsyncAppContext,
476 ) -> Result<()> {
477 this.update(&mut cx, |this, _| {
478 this.update_channels_tx
479 .unbounded_send(message.payload)
480 .unwrap();
481 });
482 Ok(())
483 }
484
485 fn handle_disconnect(&mut self, cx: &mut ModelContext<'_, ChannelStore>) {
486 self.disconnect_buffers(cx);
487 self.channels_by_id.clear();
488 self.channel_invitations.clear();
489 self.channel_participants.clear();
490 self.channels_with_admin_privileges.clear();
491 self.channel_paths.clear();
492 self.outgoing_invites.clear();
493 cx.notify();
494 }
495
496 fn disconnect_buffers(&mut self, cx: &mut ModelContext<ChannelStore>) {
497 for (_, buffer) in self.opened_buffers.drain() {
498 if let OpenedChannelBuffer::Open(buffer) = buffer {
499 if let Some(buffer) = buffer.upgrade(cx) {
500 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
501 }
502 }
503 }
504 }
505
506 pub(crate) fn update_channels(
507 &mut self,
508 payload: proto::UpdateChannels,
509 cx: &mut ModelContext<ChannelStore>,
510 ) -> Option<Task<Result<()>>> {
511 if !payload.remove_channel_invitations.is_empty() {
512 self.channel_invitations
513 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
514 }
515 for channel in payload.channel_invitations {
516 match self
517 .channel_invitations
518 .binary_search_by_key(&channel.id, |c| c.id)
519 {
520 Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
521 Err(ix) => self.channel_invitations.insert(
522 ix,
523 Arc::new(Channel {
524 id: channel.id,
525 name: channel.name,
526 }),
527 ),
528 }
529 }
530
531 let channels_changed = !payload.channels.is_empty() || !payload.remove_channels.is_empty();
532 if channels_changed {
533 if !payload.remove_channels.is_empty() {
534 self.channels_by_id
535 .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
536 self.channel_participants
537 .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
538 self.channels_with_admin_privileges
539 .retain(|channel_id| !payload.remove_channels.contains(channel_id));
540
541 for channel_id in &payload.remove_channels {
542 let channel_id = *channel_id;
543 if let Some(OpenedChannelBuffer::Open(buffer)) =
544 self.opened_buffers.remove(&channel_id)
545 {
546 if let Some(buffer) = buffer.upgrade(cx) {
547 buffer.update(cx, ChannelBuffer::disconnect);
548 }
549 }
550 }
551 }
552
553 for channel_proto in payload.channels {
554 if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
555 Arc::make_mut(existing_channel).name = channel_proto.name;
556 } else {
557 let channel = Arc::new(Channel {
558 id: channel_proto.id,
559 name: channel_proto.name,
560 });
561 self.channels_by_id.insert(channel.id, channel.clone());
562
563 if let Some(parent_id) = channel_proto.parent_id {
564 let mut ix = 0;
565 while ix < self.channel_paths.len() {
566 let path = &self.channel_paths[ix];
567 if path.ends_with(&[parent_id]) {
568 let mut new_path = path.clone();
569 new_path.push(channel.id);
570 self.channel_paths.insert(ix + 1, new_path);
571 ix += 1;
572 }
573 ix += 1;
574 }
575 } else {
576 self.channel_paths.push(vec![channel.id]);
577 }
578 }
579 }
580
581 self.channel_paths.sort_by(|a, b| {
582 let a = Self::channel_path_sorting_key(a, &self.channels_by_id);
583 let b = Self::channel_path_sorting_key(b, &self.channels_by_id);
584 a.cmp(b)
585 });
586 self.channel_paths.dedup();
587 self.channel_paths.retain(|path| {
588 path.iter()
589 .all(|channel_id| self.channels_by_id.contains_key(channel_id))
590 });
591 }
592
593 for permission in payload.channel_permissions {
594 if permission.is_admin {
595 self.channels_with_admin_privileges
596 .insert(permission.channel_id);
597 } else {
598 self.channels_with_admin_privileges
599 .remove(&permission.channel_id);
600 }
601 }
602
603 cx.notify();
604 if payload.channel_participants.is_empty() {
605 return None;
606 }
607
608 let mut all_user_ids = Vec::new();
609 let channel_participants = payload.channel_participants;
610 for entry in &channel_participants {
611 for user_id in entry.participant_user_ids.iter() {
612 if let Err(ix) = all_user_ids.binary_search(user_id) {
613 all_user_ids.insert(ix, *user_id);
614 }
615 }
616 }
617
618 let users = self
619 .user_store
620 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
621 Some(cx.spawn(|this, mut cx| async move {
622 let users = users.await?;
623
624 this.update(&mut cx, |this, cx| {
625 for entry in &channel_participants {
626 let mut participants: Vec<_> = entry
627 .participant_user_ids
628 .iter()
629 .filter_map(|user_id| {
630 users
631 .binary_search_by_key(&user_id, |user| &user.id)
632 .ok()
633 .map(|ix| users[ix].clone())
634 })
635 .collect();
636
637 participants.sort_by_key(|u| u.id);
638
639 this.channel_participants
640 .insert(entry.channel_id, participants);
641 }
642
643 cx.notify();
644 });
645 anyhow::Ok(())
646 }))
647 }
648
649 fn channel_path_sorting_key<'a>(
650 path: &'a [ChannelId],
651 channels_by_id: &'a HashMap<ChannelId, Arc<Channel>>,
652 ) -> impl 'a + Iterator<Item = Option<&'a str>> {
653 path.iter()
654 .map(|id| Some(channels_by_id.get(id)?.name.as_str()))
655 }
656}