channels.rs

  1use super::*;
  2
  3type ChannelDescendants = HashMap<ChannelId, HashSet<ChannelId>>;
  4
  5impl Database {
  6    #[cfg(test)]
  7    pub async fn all_channels(&self) -> Result<Vec<(ChannelId, String)>> {
  8        self.transaction(move |tx| async move {
  9            let mut channels = Vec::new();
 10            let mut rows = channel::Entity::find().stream(&*tx).await?;
 11            while let Some(row) = rows.next().await {
 12                let row = row?;
 13                channels.push((row.id, row.name));
 14            }
 15            Ok(channels)
 16        })
 17        .await
 18    }
 19
 20    pub async fn create_root_channel(
 21        &self,
 22        name: &str,
 23        live_kit_room: &str,
 24        creator_id: UserId,
 25    ) -> Result<ChannelId> {
 26        self.create_channel(name, None, live_kit_room, creator_id)
 27            .await
 28    }
 29
 30    pub async fn create_channel(
 31        &self,
 32        name: &str,
 33        parent: Option<ChannelId>,
 34        live_kit_room: &str,
 35        creator_id: UserId,
 36    ) -> Result<ChannelId> {
 37        let name = Self::sanitize_channel_name(name)?;
 38        self.transaction(move |tx| async move {
 39            if let Some(parent) = parent {
 40                self.check_user_is_channel_admin(parent, creator_id, &*tx)
 41                    .await?;
 42            }
 43
 44            let channel = channel::ActiveModel {
 45                name: ActiveValue::Set(name.to_string()),
 46                ..Default::default()
 47            }
 48            .insert(&*tx)
 49            .await?;
 50
 51            let channel_paths_stmt;
 52            if let Some(parent) = parent {
 53                let sql = r#"
 54                    INSERT INTO channel_paths
 55                    (id_path, channel_id)
 56                    SELECT
 57                        id_path || $1 || '/', $2
 58                    FROM
 59                        channel_paths
 60                    WHERE
 61                        channel_id = $3
 62                "#;
 63                channel_paths_stmt = Statement::from_sql_and_values(
 64                    self.pool.get_database_backend(),
 65                    sql,
 66                    [
 67                        channel.id.to_proto().into(),
 68                        channel.id.to_proto().into(),
 69                        parent.to_proto().into(),
 70                    ],
 71                );
 72                tx.execute(channel_paths_stmt).await?;
 73            } else {
 74                channel_path::Entity::insert(channel_path::ActiveModel {
 75                    channel_id: ActiveValue::Set(channel.id),
 76                    id_path: ActiveValue::Set(format!("/{}/", channel.id)),
 77                })
 78                .exec(&*tx)
 79                .await?;
 80            }
 81
 82            channel_member::ActiveModel {
 83                channel_id: ActiveValue::Set(channel.id),
 84                user_id: ActiveValue::Set(creator_id),
 85                accepted: ActiveValue::Set(true),
 86                admin: ActiveValue::Set(true),
 87                ..Default::default()
 88            }
 89            .insert(&*tx)
 90            .await?;
 91
 92            room::ActiveModel {
 93                channel_id: ActiveValue::Set(Some(channel.id)),
 94                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
 95                ..Default::default()
 96            }
 97            .insert(&*tx)
 98            .await?;
 99
100            Ok(channel.id)
101        })
102        .await
103    }
104
105    pub async fn delete_channel(
106        &self,
107        channel_id: ChannelId,
108        user_id: UserId,
109    ) -> Result<(Vec<ChannelId>, Vec<UserId>)> {
110        self.transaction(move |tx| async move {
111            self.check_user_is_channel_admin(channel_id, user_id, &*tx)
112                .await?;
113
114            // Don't remove descendant channels that have additional parents.
115            let mut channels_to_remove = self.get_channel_descendants([channel_id], &*tx).await?;
116            {
117                let mut channels_to_keep = channel_path::Entity::find()
118                    .filter(
119                        channel_path::Column::ChannelId
120                            .is_in(
121                                channels_to_remove
122                                    .keys()
123                                    .copied()
124                                    .filter(|&id| id != channel_id),
125                            )
126                            .and(
127                                channel_path::Column::IdPath
128                                    .not_like(&format!("%/{}/%", channel_id)),
129                            ),
130                    )
131                    .stream(&*tx)
132                    .await?;
133                while let Some(row) = channels_to_keep.next().await {
134                    let row = row?;
135                    channels_to_remove.remove(&row.channel_id);
136                }
137            }
138
139            let channel_ancestors = self.get_channel_ancestors(channel_id, &*tx).await?;
140            let members_to_notify: Vec<UserId> = channel_member::Entity::find()
141                .filter(channel_member::Column::ChannelId.is_in(channel_ancestors))
142                .select_only()
143                .column(channel_member::Column::UserId)
144                .distinct()
145                .into_values::<_, QueryUserIds>()
146                .all(&*tx)
147                .await?;
148
149            channel::Entity::delete_many()
150                .filter(channel::Column::Id.is_in(channels_to_remove.keys().copied()))
151                .exec(&*tx)
152                .await?;
153
154            // Delete any other paths that incldue this channel
155            let sql = r#"
156                    DELETE FROM channel_paths
157                    WHERE
158                        id_path LIKE '%' || $1 || '%'
159                "#;
160            let channel_paths_stmt = Statement::from_sql_and_values(
161                self.pool.get_database_backend(),
162                sql,
163                [channel_id.to_proto().into()],
164            );
165            tx.execute(channel_paths_stmt).await?;
166
167            Ok((channels_to_remove.into_keys().collect(), members_to_notify))
168        })
169        .await
170    }
171
172    pub async fn invite_channel_member(
173        &self,
174        channel_id: ChannelId,
175        invitee_id: UserId,
176        inviter_id: UserId,
177        is_admin: bool,
178    ) -> Result<()> {
179        self.transaction(move |tx| async move {
180            self.check_user_is_channel_admin(channel_id, inviter_id, &*tx)
181                .await?;
182
183            channel_member::ActiveModel {
184                channel_id: ActiveValue::Set(channel_id),
185                user_id: ActiveValue::Set(invitee_id),
186                accepted: ActiveValue::Set(false),
187                admin: ActiveValue::Set(is_admin),
188                ..Default::default()
189            }
190            .insert(&*tx)
191            .await?;
192
193            Ok(())
194        })
195        .await
196    }
197
198    fn sanitize_channel_name(name: &str) -> Result<&str> {
199        let new_name = name.trim().trim_start_matches('#');
200        if new_name == "" {
201            Err(anyhow!("channel name can't be blank"))?;
202        }
203        Ok(new_name)
204    }
205
206    pub async fn rename_channel(
207        &self,
208        channel_id: ChannelId,
209        user_id: UserId,
210        new_name: &str,
211    ) -> Result<String> {
212        self.transaction(move |tx| async move {
213            let new_name = Self::sanitize_channel_name(new_name)?.to_string();
214
215            self.check_user_is_channel_admin(channel_id, user_id, &*tx)
216                .await?;
217
218            channel::ActiveModel {
219                id: ActiveValue::Unchanged(channel_id),
220                name: ActiveValue::Set(new_name.clone()),
221                ..Default::default()
222            }
223            .update(&*tx)
224            .await?;
225
226            Ok(new_name)
227        })
228        .await
229    }
230
231    pub async fn respond_to_channel_invite(
232        &self,
233        channel_id: ChannelId,
234        user_id: UserId,
235        accept: bool,
236    ) -> Result<()> {
237        self.transaction(move |tx| async move {
238            let rows_affected = if accept {
239                channel_member::Entity::update_many()
240                    .set(channel_member::ActiveModel {
241                        accepted: ActiveValue::Set(accept),
242                        ..Default::default()
243                    })
244                    .filter(
245                        channel_member::Column::ChannelId
246                            .eq(channel_id)
247                            .and(channel_member::Column::UserId.eq(user_id))
248                            .and(channel_member::Column::Accepted.eq(false)),
249                    )
250                    .exec(&*tx)
251                    .await?
252                    .rows_affected
253            } else {
254                channel_member::ActiveModel {
255                    channel_id: ActiveValue::Unchanged(channel_id),
256                    user_id: ActiveValue::Unchanged(user_id),
257                    ..Default::default()
258                }
259                .delete(&*tx)
260                .await?
261                .rows_affected
262            };
263
264            if rows_affected == 0 {
265                Err(anyhow!("no such invitation"))?;
266            }
267
268            Ok(())
269        })
270        .await
271    }
272
273    pub async fn remove_channel_member(
274        &self,
275        channel_id: ChannelId,
276        member_id: UserId,
277        remover_id: UserId,
278    ) -> Result<()> {
279        self.transaction(|tx| async move {
280            self.check_user_is_channel_admin(channel_id, remover_id, &*tx)
281                .await?;
282
283            let result = channel_member::Entity::delete_many()
284                .filter(
285                    channel_member::Column::ChannelId
286                        .eq(channel_id)
287                        .and(channel_member::Column::UserId.eq(member_id)),
288                )
289                .exec(&*tx)
290                .await?;
291
292            if result.rows_affected == 0 {
293                Err(anyhow!("no such member"))?;
294            }
295
296            Ok(())
297        })
298        .await
299    }
300
301    pub async fn get_channel_invites_for_user(&self, user_id: UserId) -> Result<Vec<Channel>> {
302        self.transaction(|tx| async move {
303            let channel_invites = channel_member::Entity::find()
304                .filter(
305                    channel_member::Column::UserId
306                        .eq(user_id)
307                        .and(channel_member::Column::Accepted.eq(false)),
308                )
309                .all(&*tx)
310                .await?;
311
312            let channels = channel::Entity::find()
313                .filter(
314                    channel::Column::Id.is_in(
315                        channel_invites
316                            .into_iter()
317                            .map(|channel_member| channel_member.channel_id),
318                    ),
319                )
320                .all(&*tx)
321                .await?;
322
323            let channels = channels
324                .into_iter()
325                .map(|channel| Channel {
326                    id: channel.id,
327                    name: channel.name,
328                    parent_id: None,
329                })
330                .collect();
331
332            Ok(channels)
333        })
334        .await
335    }
336
337    async fn get_all_channels(
338        &self,
339        parents_by_child_id: ChannelDescendants,
340        tx: &DatabaseTransaction,
341    ) -> Result<Vec<Channel>> {
342        let mut channels = Vec::with_capacity(parents_by_child_id.len());
343        {
344            let mut rows = channel::Entity::find()
345                .filter(channel::Column::Id.is_in(parents_by_child_id.keys().copied()))
346                .stream(&*tx)
347                .await?;
348            while let Some(row) = rows.next().await {
349                let row = row?;
350
351                // As these rows are pulled from the map's keys, this unwrap is safe.
352                let parents = parents_by_child_id.get(&row.id).unwrap();
353                if parents.len() > 0 {
354                    for parent in parents {
355                        channels.push(Channel {
356                            id: row.id,
357                            name: row.name.clone(),
358                            parent_id: Some(*parent),
359                        });
360                    }
361                } else {
362                    channels.push(Channel {
363                        id: row.id,
364                        name: row.name,
365                        parent_id: None,
366                    });
367                }
368            }
369        }
370
371        Ok(channels)
372    }
373
374    pub async fn get_channels_for_user(&self, user_id: UserId) -> Result<ChannelsForUser> {
375        self.transaction(|tx| async move {
376            let tx = tx;
377
378            let channel_memberships = channel_member::Entity::find()
379                .filter(
380                    channel_member::Column::UserId
381                        .eq(user_id)
382                        .and(channel_member::Column::Accepted.eq(true)),
383                )
384                .all(&*tx)
385                .await?;
386
387            let parents_by_child_id = self
388                .get_channel_descendants(channel_memberships.iter().map(|m| m.channel_id), &*tx)
389                .await?;
390
391            let channels_with_admin_privileges = channel_memberships
392                .iter()
393                .filter_map(|membership| membership.admin.then_some(membership.channel_id))
394                .collect();
395
396            let channels = self.get_all_channels(parents_by_child_id, &tx).await?;
397
398            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
399            enum QueryUserIdsAndChannelIds {
400                ChannelId,
401                UserId,
402            }
403
404            let mut channel_participants: HashMap<ChannelId, Vec<UserId>> = HashMap::default();
405            {
406                let mut rows = room_participant::Entity::find()
407                    .inner_join(room::Entity)
408                    .filter(room::Column::ChannelId.is_in(channels.iter().map(|c| c.id)))
409                    .select_only()
410                    .column(room::Column::ChannelId)
411                    .column(room_participant::Column::UserId)
412                    .into_values::<_, QueryUserIdsAndChannelIds>()
413                    .stream(&*tx)
414                    .await?;
415                while let Some(row) = rows.next().await {
416                    let row: (ChannelId, UserId) = row?;
417                    channel_participants.entry(row.0).or_default().push(row.1)
418                }
419            }
420
421            Ok(ChannelsForUser {
422                channels,
423                channel_participants,
424                channels_with_admin_privileges,
425            })
426        })
427        .await
428    }
429
430    pub async fn get_channel_members(&self, id: ChannelId) -> Result<Vec<UserId>> {
431        self.transaction(|tx| async move { self.get_channel_members_internal(id, &*tx).await })
432            .await
433    }
434
435    pub async fn set_channel_member_admin(
436        &self,
437        channel_id: ChannelId,
438        from: UserId,
439        for_user: UserId,
440        admin: bool,
441    ) -> Result<()> {
442        self.transaction(|tx| async move {
443            self.check_user_is_channel_admin(channel_id, from, &*tx)
444                .await?;
445
446            let result = channel_member::Entity::update_many()
447                .filter(
448                    channel_member::Column::ChannelId
449                        .eq(channel_id)
450                        .and(channel_member::Column::UserId.eq(for_user)),
451                )
452                .set(channel_member::ActiveModel {
453                    admin: ActiveValue::set(admin),
454                    ..Default::default()
455                })
456                .exec(&*tx)
457                .await?;
458
459            if result.rows_affected == 0 {
460                Err(anyhow!("no such member"))?;
461            }
462
463            Ok(())
464        })
465        .await
466    }
467
468    pub async fn get_channel_member_details(
469        &self,
470        channel_id: ChannelId,
471        user_id: UserId,
472    ) -> Result<Vec<proto::ChannelMember>> {
473        self.transaction(|tx| async move {
474            self.check_user_is_channel_admin(channel_id, user_id, &*tx)
475                .await?;
476
477            #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
478            enum QueryMemberDetails {
479                UserId,
480                Admin,
481                IsDirectMember,
482                Accepted,
483            }
484
485            let tx = tx;
486            let ancestor_ids = self.get_channel_ancestors(channel_id, &*tx).await?;
487            let mut stream = channel_member::Entity::find()
488                .distinct()
489                .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied()))
490                .select_only()
491                .column(channel_member::Column::UserId)
492                .column(channel_member::Column::Admin)
493                .column_as(
494                    channel_member::Column::ChannelId.eq(channel_id),
495                    QueryMemberDetails::IsDirectMember,
496                )
497                .column(channel_member::Column::Accepted)
498                .order_by_asc(channel_member::Column::UserId)
499                .into_values::<_, QueryMemberDetails>()
500                .stream(&*tx)
501                .await?;
502
503            let mut rows = Vec::<proto::ChannelMember>::new();
504            while let Some(row) = stream.next().await {
505                let (user_id, is_admin, is_direct_member, is_invite_accepted): (
506                    UserId,
507                    bool,
508                    bool,
509                    bool,
510                ) = row?;
511                let kind = match (is_direct_member, is_invite_accepted) {
512                    (true, true) => proto::channel_member::Kind::Member,
513                    (true, false) => proto::channel_member::Kind::Invitee,
514                    (false, true) => proto::channel_member::Kind::AncestorMember,
515                    (false, false) => continue,
516                };
517                let user_id = user_id.to_proto();
518                let kind = kind.into();
519                if let Some(last_row) = rows.last_mut() {
520                    if last_row.user_id == user_id {
521                        if is_direct_member {
522                            last_row.kind = kind;
523                            last_row.admin = is_admin;
524                        }
525                        continue;
526                    }
527                }
528                rows.push(proto::ChannelMember {
529                    user_id,
530                    kind,
531                    admin: is_admin,
532                });
533            }
534
535            Ok(rows)
536        })
537        .await
538    }
539
540    pub async fn get_channel_members_internal(
541        &self,
542        id: ChannelId,
543        tx: &DatabaseTransaction,
544    ) -> Result<Vec<UserId>> {
545        let ancestor_ids = self.get_channel_ancestors(id, tx).await?;
546        let user_ids = channel_member::Entity::find()
547            .distinct()
548            .filter(
549                channel_member::Column::ChannelId
550                    .is_in(ancestor_ids.iter().copied())
551                    .and(channel_member::Column::Accepted.eq(true)),
552            )
553            .select_only()
554            .column(channel_member::Column::UserId)
555            .into_values::<_, QueryUserIds>()
556            .all(&*tx)
557            .await?;
558        Ok(user_ids)
559    }
560
561    pub async fn check_user_is_channel_member(
562        &self,
563        channel_id: ChannelId,
564        user_id: UserId,
565        tx: &DatabaseTransaction,
566    ) -> Result<()> {
567        let channel_ids = self.get_channel_ancestors(channel_id, tx).await?;
568        channel_member::Entity::find()
569            .filter(
570                channel_member::Column::ChannelId
571                    .is_in(channel_ids)
572                    .and(channel_member::Column::UserId.eq(user_id)),
573            )
574            .one(&*tx)
575            .await?
576            .ok_or_else(|| anyhow!("user is not a channel member or channel does not exist"))?;
577        Ok(())
578    }
579
580    pub async fn check_user_is_channel_admin(
581        &self,
582        channel_id: ChannelId,
583        user_id: UserId,
584        tx: &DatabaseTransaction,
585    ) -> Result<()> {
586        let channel_ids = self.get_channel_ancestors(channel_id, tx).await?;
587        channel_member::Entity::find()
588            .filter(
589                channel_member::Column::ChannelId
590                    .is_in(channel_ids)
591                    .and(channel_member::Column::UserId.eq(user_id))
592                    .and(channel_member::Column::Admin.eq(true)),
593            )
594            .one(&*tx)
595            .await?
596            .ok_or_else(|| anyhow!("user is not a channel admin or channel does not exist"))?;
597        Ok(())
598    }
599
600    /// Returns the channel ancestors, deepest first
601    pub async fn get_channel_ancestors(
602        &self,
603        channel_id: ChannelId,
604        tx: &DatabaseTransaction,
605    ) -> Result<Vec<ChannelId>> {
606        let paths = channel_path::Entity::find()
607            .filter(channel_path::Column::ChannelId.eq(channel_id))
608            .order_by(channel_path::Column::IdPath, sea_query::Order::Desc)
609            .all(tx)
610            .await?;
611        let mut channel_ids = Vec::new();
612        for path in paths {
613            for id in path.id_path.trim_matches('/').split('/') {
614                if let Ok(id) = id.parse() {
615                    let id = ChannelId::from_proto(id);
616                    if let Err(ix) = channel_ids.binary_search(&id) {
617                        channel_ids.insert(ix, id);
618                    }
619                }
620            }
621        }
622        Ok(channel_ids)
623    }
624
625    /// Returns the channel descendants,
626    /// Structured as a map from child ids to their parent ids
627    /// For example, the descendants of 'a' in this DAG:
628    ///
629    ///   /- b -\
630    /// a -- c -- d
631    ///
632    /// would be:
633    /// {
634    ///     a: [],
635    ///     b: [a],
636    ///     c: [a],
637    ///     d: [a, c],
638    /// }
639    async fn get_channel_descendants(
640        &self,
641        channel_ids: impl IntoIterator<Item = ChannelId>,
642        tx: &DatabaseTransaction,
643    ) -> Result<ChannelDescendants> {
644        let mut values = String::new();
645        for id in channel_ids {
646            if !values.is_empty() {
647                values.push_str(", ");
648            }
649            write!(&mut values, "({})", id).unwrap();
650        }
651
652        if values.is_empty() {
653            return Ok(HashMap::default());
654        }
655
656        let sql = format!(
657            r#"
658            SELECT
659                descendant_paths.*
660            FROM
661                channel_paths parent_paths, channel_paths descendant_paths
662            WHERE
663                parent_paths.channel_id IN ({values}) AND
664                descendant_paths.id_path LIKE (parent_paths.id_path || '%')
665        "#
666        );
667
668        let stmt = Statement::from_string(self.pool.get_database_backend(), sql);
669
670        let mut parents_by_child_id: ChannelDescendants = HashMap::default();
671        let mut paths = channel_path::Entity::find()
672            .from_raw_sql(stmt)
673            .stream(tx)
674            .await?;
675
676        while let Some(path) = paths.next().await {
677            let path = path?;
678            let ids = path.id_path.trim_matches('/').split('/');
679            let mut parent_id = None;
680            for id in ids {
681                if let Ok(id) = id.parse() {
682                    let id = ChannelId::from_proto(id);
683                    if id == path.channel_id {
684                        break;
685                    }
686                    parent_id = Some(id);
687                }
688            }
689            let entry = parents_by_child_id.entry(path.channel_id).or_default();
690            if let Some(parent_id) = parent_id {
691                entry.insert(parent_id);
692            }
693        }
694
695        Ok(parents_by_child_id)
696    }
697
698    /// Returns the channel with the given ID and:
699    /// - true if the user is a member
700    /// - false if the user hasn't accepted the invitation yet
701    pub async fn get_channel(
702        &self,
703        channel_id: ChannelId,
704        user_id: UserId,
705    ) -> Result<Option<(Channel, bool)>> {
706        self.transaction(|tx| async move {
707            let tx = tx;
708
709            let channel = channel::Entity::find_by_id(channel_id).one(&*tx).await?;
710
711            if let Some(channel) = channel {
712                if self
713                    .check_user_is_channel_member(channel_id, user_id, &*tx)
714                    .await
715                    .is_err()
716                {
717                    return Ok(None);
718                }
719
720                let channel_membership = channel_member::Entity::find()
721                    .filter(
722                        channel_member::Column::ChannelId
723                            .eq(channel_id)
724                            .and(channel_member::Column::UserId.eq(user_id)),
725                    )
726                    .one(&*tx)
727                    .await?;
728
729                let is_accepted = channel_membership
730                    .map(|membership| membership.accepted)
731                    .unwrap_or(false);
732
733                Ok(Some((
734                    Channel {
735                        id: channel.id,
736                        name: channel.name,
737                        parent_id: None,
738                    },
739                    is_accepted,
740                )))
741            } else {
742                Ok(None)
743            }
744        })
745        .await
746    }
747
748    pub async fn room_id_for_channel(&self, channel_id: ChannelId) -> Result<RoomId> {
749        self.transaction(|tx| async move {
750            let tx = tx;
751            let room = channel::Model {
752                id: channel_id,
753                ..Default::default()
754            }
755            .find_related(room::Entity)
756            .one(&*tx)
757            .await?
758            .ok_or_else(|| anyhow!("invalid channel"))?;
759            Ok(room.id)
760        })
761        .await
762    }
763
764    async fn link_channel(
765        &self,
766        from: ChannelId,
767        to: ChannelId,
768        tx: &DatabaseTransaction,
769    ) -> Result<ChannelDescendants> {
770        let to_ancestors = self.get_channel_ancestors(to, &*tx).await?;
771        let from_descendants = self.get_channel_descendants([from], &*tx).await?;
772        for ancestor in to_ancestors {
773            if from_descendants.contains_key(&ancestor) {
774                return Err(anyhow!("Cannot create a channel cycle").into());
775            }
776        }
777
778        let sql = r#"
779                INSERT INTO channel_paths
780                (id_path, channel_id)
781                SELECT
782                    id_path || $1 || '/', $2
783                FROM
784                    channel_paths
785                WHERE
786                    channel_id = $3
787                ON CONFLICT (id_path) DO NOTHING;
788            "#;
789        let channel_paths_stmt = Statement::from_sql_and_values(
790            self.pool.get_database_backend(),
791            sql,
792            [
793                from.to_proto().into(),
794                from.to_proto().into(),
795                to.to_proto().into(),
796            ],
797        );
798        tx.execute(channel_paths_stmt).await?;
799
800        for (from_id, to_ids) in from_descendants.iter().filter(|(id, _)| id != &&from) {
801            for to_id in to_ids {
802                let channel_paths_stmt = Statement::from_sql_and_values(
803                    self.pool.get_database_backend(),
804                    sql,
805                    [
806                        from_id.to_proto().into(),
807                        from_id.to_proto().into(),
808                        to_id.to_proto().into(),
809                    ],
810                );
811                tx.execute(channel_paths_stmt).await?;
812            }
813        }
814
815        Ok(from_descendants)
816    }
817
818    async fn remove_channel_from_parent(
819        &self,
820        from: ChannelId,
821        parent: ChannelId,
822        tx: &DatabaseTransaction,
823    ) -> Result<()> {
824        let sql = r#"
825                DELETE FROM channel_paths
826                WHERE
827                    id_path LIKE '%' || $1 || '/' || $2 || '%'
828            "#;
829        let channel_paths_stmt = Statement::from_sql_and_values(
830            self.pool.get_database_backend(),
831            sql,
832            [parent.to_proto().into(), from.to_proto().into()],
833        );
834        tx.execute(channel_paths_stmt).await?;
835
836        Ok(())
837    }
838
839    /// Move a channel from one parent to another.
840    /// Note that this requires a valid parent_id in the 'from_parent' field.
841    /// As channels are a DAG, we need to know which parent to remove the channel from.
842    /// Here's a list of the parameters to this function and their behavior:
843    ///
844    /// - (`None`, `None`) No op
845    /// - (`None`, `Some(id)`) Link the channel without removing it from any of it's parents
846    /// - (`Some(id)`, `None`) Remove a channel from a given parent, and leave other parents
847    /// - (`Some(id)`, `Some(id)`) Move channel from one parent to another, leaving other parents
848    ///
849    /// Returns the channel that was moved + it's sub channels for use
850    /// by the members for `to`
851    pub async fn move_channel(
852        &self,
853        user: UserId,
854        from: ChannelId,
855        from_parent: Option<ChannelId>,
856        to: Option<ChannelId>,
857    ) -> Result<Vec<Channel>> {
858        self.transaction(|tx| async move {
859            // Note that even with these maxed permissions, this linking operation
860            // is still insecure because you can't remove someone's permissions to a
861            // channel if they've linked the channel to one where they're an admin.
862            self.check_user_is_channel_admin(from, user, &*tx).await?;
863
864            let mut channel_descendants = None;
865
866            // Note that we have to do the linking before the removal, so that we
867            // can leave the channel_path table in a consistent state.
868            if let Some(to) = to {
869                self.check_user_is_channel_admin(to, user, &*tx).await?;
870
871                channel_descendants = Some(self.link_channel(from, to, &*tx).await?);
872            }
873
874            let mut channel_descendants = match channel_descendants {
875                Some(channel_descendants) => channel_descendants,
876                None => self.get_channel_descendants([from], &*tx).await?,
877            };
878
879            if let Some(from_parent) = from_parent {
880                self.check_user_is_channel_admin(from_parent, user, &*tx)
881                    .await?;
882
883                self.remove_channel_from_parent(from, from_parent, &*tx)
884                    .await?;
885            }
886
887            let channels;
888            if let Some(to) = to {
889                if let Some(channel) = channel_descendants.get_mut(&from) {
890                    // Remove the other parents
891                    channel.clear();
892                    channel.insert(to);
893                }
894
895                 channels = self
896                    .get_all_channels(channel_descendants, &*tx)
897                    .await?;
898            } else {
899                channels = vec![];
900            }
901
902            Ok(channels)
903        })
904        .await
905    }
906}
907
908#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
909enum QueryUserIds {
910    UserId,
911}