db.rs

  1mod ids;
  2mod queries;
  3mod tables;
  4#[cfg(test)]
  5pub mod tests;
  6
  7use crate::{Error, Result, executor::Executor};
  8use anyhow::anyhow;
  9use collections::{BTreeMap, BTreeSet, HashMap, HashSet};
 10use dashmap::DashMap;
 11use futures::StreamExt;
 12use project_repository_statuses::StatusKind;
 13use rand::{Rng, SeedableRng, prelude::StdRng};
 14use rpc::ExtensionProvides;
 15use rpc::{
 16    ConnectionId, ExtensionMetadata,
 17    proto::{self},
 18};
 19use sea_orm::{
 20    ActiveValue, Condition, ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbErr,
 21    FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect, Statement,
 22    TransactionTrait,
 23    entity::prelude::*,
 24    sea_query::{Alias, Expr, OnConflict},
 25};
 26use semantic_version::SemanticVersion;
 27use serde::{Deserialize, Serialize};
 28use std::ops::RangeInclusive;
 29use std::{
 30    fmt::Write as _,
 31    future::Future,
 32    marker::PhantomData,
 33    ops::{Deref, DerefMut},
 34    rc::Rc,
 35    sync::Arc,
 36    time::Duration,
 37};
 38use time::PrimitiveDateTime;
 39use tokio::sync::{Mutex, OwnedMutexGuard};
 40use worktree_settings_file::LocalSettingsKind;
 41
 42#[cfg(test)]
 43pub use tests::TestDb;
 44
 45pub use ids::*;
 46pub use queries::billing_customers::{CreateBillingCustomerParams, UpdateBillingCustomerParams};
 47pub use queries::billing_preferences::{
 48    CreateBillingPreferencesParams, UpdateBillingPreferencesParams,
 49};
 50pub use queries::billing_subscriptions::{
 51    CreateBillingSubscriptionParams, UpdateBillingSubscriptionParams,
 52};
 53pub use queries::contributors::ContributorSelector;
 54pub use queries::processed_stripe_events::CreateProcessedStripeEventParams;
 55pub use sea_orm::ConnectOptions;
 56pub use tables::user::Model as User;
 57pub use tables::*;
 58
 59#[cfg(test)]
 60pub struct DatabaseTestOptions {
 61    pub runtime: tokio::runtime::Runtime,
 62    pub query_failure_probability: parking_lot::Mutex<f64>,
 63}
 64
 65/// Database gives you a handle that lets you access the database.
 66/// It handles pooling internally.
 67pub struct Database {
 68    options: ConnectOptions,
 69    pool: DatabaseConnection,
 70    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
 71    projects: DashMap<ProjectId, Arc<Mutex<()>>>,
 72    rng: Mutex<StdRng>,
 73    executor: Executor,
 74    notification_kinds_by_id: HashMap<NotificationKindId, &'static str>,
 75    notification_kinds_by_name: HashMap<String, NotificationKindId>,
 76    #[cfg(test)]
 77    test_options: Option<DatabaseTestOptions>,
 78}
 79
 80// The `Database` type has so many methods that its impl blocks are split into
 81// separate files in the `queries` folder.
 82impl Database {
 83    /// Connects to the database with the given options
 84    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
 85        sqlx::any::install_default_drivers();
 86        Ok(Self {
 87            options: options.clone(),
 88            pool: sea_orm::Database::connect(options).await?,
 89            rooms: DashMap::with_capacity(16384),
 90            projects: DashMap::with_capacity(16384),
 91            rng: Mutex::new(StdRng::seed_from_u64(0)),
 92            notification_kinds_by_id: HashMap::default(),
 93            notification_kinds_by_name: HashMap::default(),
 94            executor,
 95            #[cfg(test)]
 96            test_options: None,
 97        })
 98    }
 99
100    pub fn options(&self) -> &ConnectOptions {
101        &self.options
102    }
103
104    #[cfg(test)]
105    pub fn reset(&self) {
106        self.rooms.clear();
107        self.projects.clear();
108    }
109
110    /// Transaction runs things in a transaction. If you want to call other methods
111    /// and pass the transaction around you need to reborrow the transaction at each
112    /// call site with: `&*tx`.
113    pub async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
114    where
115        F: Send + Fn(TransactionHandle) -> Fut,
116        Fut: Send + Future<Output = Result<T>>,
117    {
118        let body = async {
119            let mut i = 0;
120            loop {
121                let (tx, result) = self.with_transaction(&f).await?;
122                match result {
123                    Ok(result) => match tx.commit().await.map_err(Into::into) {
124                        Ok(()) => return Ok(result),
125                        Err(error) => {
126                            if !self.retry_on_serialization_error(&error, i).await {
127                                return Err(error);
128                            }
129                        }
130                    },
131                    Err(error) => {
132                        tx.rollback().await?;
133                        if !self.retry_on_serialization_error(&error, i).await {
134                            return Err(error);
135                        }
136                    }
137                }
138                i += 1;
139            }
140        };
141
142        self.run(body).await
143    }
144
145    pub async fn weak_transaction<F, Fut, T>(&self, f: F) -> Result<T>
146    where
147        F: Send + Fn(TransactionHandle) -> Fut,
148        Fut: Send + Future<Output = Result<T>>,
149    {
150        let body = async {
151            let (tx, result) = self.with_weak_transaction(&f).await?;
152            match result {
153                Ok(result) => match tx.commit().await.map_err(Into::into) {
154                    Ok(()) => Ok(result),
155                    Err(error) => Err(error),
156                },
157                Err(error) => {
158                    tx.rollback().await?;
159                    Err(error)
160                }
161            }
162        };
163
164        self.run(body).await
165    }
166
167    /// The same as room_transaction, but if you need to only optionally return a Room.
168    async fn optional_room_transaction<F, Fut, T>(
169        &self,
170        f: F,
171    ) -> Result<Option<TransactionGuard<T>>>
172    where
173        F: Send + Fn(TransactionHandle) -> Fut,
174        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
175    {
176        let body = async {
177            let mut i = 0;
178            loop {
179                let (tx, result) = self.with_transaction(&f).await?;
180                match result {
181                    Ok(Some((room_id, data))) => {
182                        let lock = self.rooms.entry(room_id).or_default().clone();
183                        let _guard = lock.lock_owned().await;
184                        match tx.commit().await.map_err(Into::into) {
185                            Ok(()) => {
186                                return Ok(Some(TransactionGuard {
187                                    data,
188                                    _guard,
189                                    _not_send: PhantomData,
190                                }));
191                            }
192                            Err(error) => {
193                                if !self.retry_on_serialization_error(&error, i).await {
194                                    return Err(error);
195                                }
196                            }
197                        }
198                    }
199                    Ok(None) => match tx.commit().await.map_err(Into::into) {
200                        Ok(()) => return Ok(None),
201                        Err(error) => {
202                            if !self.retry_on_serialization_error(&error, i).await {
203                                return Err(error);
204                            }
205                        }
206                    },
207                    Err(error) => {
208                        tx.rollback().await?;
209                        if !self.retry_on_serialization_error(&error, i).await {
210                            return Err(error);
211                        }
212                    }
213                }
214                i += 1;
215            }
216        };
217
218        self.run(body).await
219    }
220
221    async fn project_transaction<F, Fut, T>(
222        &self,
223        project_id: ProjectId,
224        f: F,
225    ) -> Result<TransactionGuard<T>>
226    where
227        F: Send + Fn(TransactionHandle) -> Fut,
228        Fut: Send + Future<Output = Result<T>>,
229    {
230        let room_id = Database::room_id_for_project(self, project_id).await?;
231        let body = async {
232            let mut i = 0;
233            loop {
234                let lock = if let Some(room_id) = room_id {
235                    self.rooms.entry(room_id).or_default().clone()
236                } else {
237                    self.projects.entry(project_id).or_default().clone()
238                };
239                let _guard = lock.lock_owned().await;
240                let (tx, result) = self.with_transaction(&f).await?;
241                match result {
242                    Ok(data) => match tx.commit().await.map_err(Into::into) {
243                        Ok(()) => {
244                            return Ok(TransactionGuard {
245                                data,
246                                _guard,
247                                _not_send: PhantomData,
248                            });
249                        }
250                        Err(error) => {
251                            if !self.retry_on_serialization_error(&error, i).await {
252                                return Err(error);
253                            }
254                        }
255                    },
256                    Err(error) => {
257                        tx.rollback().await?;
258                        if !self.retry_on_serialization_error(&error, i).await {
259                            return Err(error);
260                        }
261                    }
262                }
263                i += 1;
264            }
265        };
266
267        self.run(body).await
268    }
269
270    /// room_transaction runs the block in a transaction. It returns a RoomGuard, that keeps
271    /// the database locked until it is dropped. This ensures that updates sent to clients are
272    /// properly serialized with respect to database changes.
273    async fn room_transaction<F, Fut, T>(
274        &self,
275        room_id: RoomId,
276        f: F,
277    ) -> Result<TransactionGuard<T>>
278    where
279        F: Send + Fn(TransactionHandle) -> Fut,
280        Fut: Send + Future<Output = Result<T>>,
281    {
282        let body = async {
283            let mut i = 0;
284            loop {
285                let lock = self.rooms.entry(room_id).or_default().clone();
286                let _guard = lock.lock_owned().await;
287                let (tx, result) = self.with_transaction(&f).await?;
288                match result {
289                    Ok(data) => match tx.commit().await.map_err(Into::into) {
290                        Ok(()) => {
291                            return Ok(TransactionGuard {
292                                data,
293                                _guard,
294                                _not_send: PhantomData,
295                            });
296                        }
297                        Err(error) => {
298                            if !self.retry_on_serialization_error(&error, i).await {
299                                return Err(error);
300                            }
301                        }
302                    },
303                    Err(error) => {
304                        tx.rollback().await?;
305                        if !self.retry_on_serialization_error(&error, i).await {
306                            return Err(error);
307                        }
308                    }
309                }
310                i += 1;
311            }
312        };
313
314        self.run(body).await
315    }
316
317    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
318    where
319        F: Send + Fn(TransactionHandle) -> Fut,
320        Fut: Send + Future<Output = Result<T>>,
321    {
322        let tx = self
323            .pool
324            .begin_with_config(Some(IsolationLevel::Serializable), None)
325            .await?;
326
327        let mut tx = Arc::new(Some(tx));
328        let result = f(TransactionHandle(tx.clone())).await;
329        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
330            return Err(anyhow!(
331                "couldn't complete transaction because it's still in use"
332            ))?;
333        };
334
335        Ok((tx, result))
336    }
337
338    async fn with_weak_transaction<F, Fut, T>(
339        &self,
340        f: &F,
341    ) -> Result<(DatabaseTransaction, Result<T>)>
342    where
343        F: Send + Fn(TransactionHandle) -> Fut,
344        Fut: Send + Future<Output = Result<T>>,
345    {
346        let tx = self
347            .pool
348            .begin_with_config(Some(IsolationLevel::ReadCommitted), None)
349            .await?;
350
351        let mut tx = Arc::new(Some(tx));
352        let result = f(TransactionHandle(tx.clone())).await;
353        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
354            return Err(anyhow!(
355                "couldn't complete transaction because it's still in use"
356            ))?;
357        };
358
359        Ok((tx, result))
360    }
361
362    async fn run<F, T>(&self, future: F) -> Result<T>
363    where
364        F: Future<Output = Result<T>>,
365    {
366        #[cfg(test)]
367        {
368            let test_options = self.test_options.as_ref().unwrap();
369            if let Executor::Deterministic(executor) = &self.executor {
370                executor.simulate_random_delay().await;
371                let fail_probability = *test_options.query_failure_probability.lock();
372                if executor.rng().gen_bool(fail_probability) {
373                    return Err(anyhow!("simulated query failure"))?;
374                }
375            }
376
377            test_options.runtime.block_on(future)
378        }
379
380        #[cfg(not(test))]
381        {
382            future.await
383        }
384    }
385
386    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: usize) -> bool {
387        // If the error is due to a failure to serialize concurrent transactions, then retry
388        // this transaction after a delay. With each subsequent retry, double the delay duration.
389        // Also vary the delay randomly in order to ensure different database connections retry
390        // at different times.
391        const SLEEPS: [f32; 10] = [10., 20., 40., 80., 160., 320., 640., 1280., 2560., 5120.];
392        if is_serialization_error(error) && prev_attempt_count < SLEEPS.len() {
393            let base_delay = SLEEPS[prev_attempt_count];
394            let randomized_delay = base_delay * self.rng.lock().await.gen_range(0.5..=2.0);
395            log::warn!(
396                "retrying transaction after serialization error. delay: {} ms.",
397                randomized_delay
398            );
399            self.executor
400                .sleep(Duration::from_millis(randomized_delay as u64))
401                .await;
402            true
403        } else {
404            false
405        }
406    }
407}
408
409fn is_serialization_error(error: &Error) -> bool {
410    const SERIALIZATION_FAILURE_CODE: &str = "40001";
411    match error {
412        Error::Database(
413            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
414            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
415        ) if error
416            .as_database_error()
417            .and_then(|error| error.code())
418            .as_deref()
419            == Some(SERIALIZATION_FAILURE_CODE) =>
420        {
421            true
422        }
423        _ => false,
424    }
425}
426
427/// A handle to a [`DatabaseTransaction`].
428pub struct TransactionHandle(pub(crate) Arc<Option<DatabaseTransaction>>);
429
430impl Deref for TransactionHandle {
431    type Target = DatabaseTransaction;
432
433    fn deref(&self) -> &Self::Target {
434        self.0.as_ref().as_ref().unwrap()
435    }
436}
437
438/// [`TransactionGuard`] keeps a database transaction alive until it is dropped.
439/// It wraps data that depends on the state of the database and prevents an additional
440/// transaction from starting that would invalidate that data.
441pub struct TransactionGuard<T> {
442    data: T,
443    _guard: OwnedMutexGuard<()>,
444    _not_send: PhantomData<Rc<()>>,
445}
446
447impl<T> Deref for TransactionGuard<T> {
448    type Target = T;
449
450    fn deref(&self) -> &T {
451        &self.data
452    }
453}
454
455impl<T> DerefMut for TransactionGuard<T> {
456    fn deref_mut(&mut self) -> &mut T {
457        &mut self.data
458    }
459}
460
461impl<T> TransactionGuard<T> {
462    /// Returns the inner value of the guard.
463    pub fn into_inner(self) -> T {
464        self.data
465    }
466}
467
468#[derive(Clone, Debug, PartialEq, Eq)]
469pub enum Contact {
470    Accepted { user_id: UserId, busy: bool },
471    Outgoing { user_id: UserId },
472    Incoming { user_id: UserId },
473}
474
475impl Contact {
476    pub fn user_id(&self) -> UserId {
477        match self {
478            Contact::Accepted { user_id, .. } => *user_id,
479            Contact::Outgoing { user_id } => *user_id,
480            Contact::Incoming { user_id, .. } => *user_id,
481        }
482    }
483}
484
485pub type NotificationBatch = Vec<(UserId, proto::Notification)>;
486
487pub struct CreatedChannelMessage {
488    pub message_id: MessageId,
489    pub participant_connection_ids: HashSet<ConnectionId>,
490    pub notifications: NotificationBatch,
491}
492
493pub struct UpdatedChannelMessage {
494    pub message_id: MessageId,
495    pub participant_connection_ids: Vec<ConnectionId>,
496    pub notifications: NotificationBatch,
497    pub reply_to_message_id: Option<MessageId>,
498    pub timestamp: PrimitiveDateTime,
499    pub deleted_mention_notification_ids: Vec<NotificationId>,
500    pub updated_mention_notifications: Vec<rpc::proto::Notification>,
501}
502
503#[derive(Clone, Debug, PartialEq, Eq, FromQueryResult, Serialize, Deserialize)]
504pub struct Invite {
505    pub email_address: String,
506    pub email_confirmation_code: String,
507}
508
509#[derive(Clone, Debug, Deserialize)]
510pub struct NewSignup {
511    pub email_address: String,
512    pub platform_mac: bool,
513    pub platform_windows: bool,
514    pub platform_linux: bool,
515    pub editor_features: Vec<String>,
516    pub programming_languages: Vec<String>,
517    pub device_id: Option<String>,
518    pub added_to_mailing_list: bool,
519    pub created_at: Option<DateTime>,
520}
521
522#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, FromQueryResult)]
523pub struct WaitlistSummary {
524    pub count: i64,
525    pub linux_count: i64,
526    pub mac_count: i64,
527    pub windows_count: i64,
528    pub unknown_count: i64,
529}
530
531/// The parameters to create a new user.
532#[derive(Debug, Serialize, Deserialize)]
533pub struct NewUserParams {
534    pub github_login: String,
535    pub github_user_id: i32,
536}
537
538/// The result of creating a new user.
539#[derive(Debug)]
540pub struct NewUserResult {
541    pub user_id: UserId,
542    pub metrics_id: String,
543    pub inviting_user_id: Option<UserId>,
544    pub signup_device_id: Option<String>,
545}
546
547/// The result of updating a channel membership.
548#[derive(Debug)]
549pub struct MembershipUpdated {
550    pub channel_id: ChannelId,
551    pub new_channels: ChannelsForUser,
552    pub removed_channels: Vec<ChannelId>,
553}
554
555/// The result of setting a member's role.
556#[derive(Debug)]
557#[allow(clippy::large_enum_variant)]
558pub enum SetMemberRoleResult {
559    InviteUpdated(Channel),
560    MembershipUpdated(MembershipUpdated),
561}
562
563/// The result of inviting a member to a channel.
564#[derive(Debug)]
565pub struct InviteMemberResult {
566    pub channel: Channel,
567    pub notifications: NotificationBatch,
568}
569
570#[derive(Debug)]
571pub struct RespondToChannelInvite {
572    pub membership_update: Option<MembershipUpdated>,
573    pub notifications: NotificationBatch,
574}
575
576#[derive(Debug)]
577pub struct RemoveChannelMemberResult {
578    pub membership_update: MembershipUpdated,
579    pub notification_id: Option<NotificationId>,
580}
581
582#[derive(Debug, PartialEq, Eq, Hash)]
583pub struct Channel {
584    pub id: ChannelId,
585    pub name: String,
586    pub visibility: ChannelVisibility,
587    /// parent_path is the channel ids from the root to this one (not including this one)
588    pub parent_path: Vec<ChannelId>,
589}
590
591impl Channel {
592    pub fn from_model(value: channel::Model) -> Self {
593        Channel {
594            id: value.id,
595            visibility: value.visibility,
596            name: value.clone().name,
597            parent_path: value.ancestors().collect(),
598        }
599    }
600
601    pub fn to_proto(&self) -> proto::Channel {
602        proto::Channel {
603            id: self.id.to_proto(),
604            name: self.name.clone(),
605            visibility: self.visibility.into(),
606            parent_path: self.parent_path.iter().map(|c| c.to_proto()).collect(),
607        }
608    }
609}
610
611#[derive(Debug, PartialEq, Eq, Hash)]
612pub struct ChannelMember {
613    pub role: ChannelRole,
614    pub user_id: UserId,
615    pub kind: proto::channel_member::Kind,
616}
617
618impl ChannelMember {
619    pub fn to_proto(&self) -> proto::ChannelMember {
620        proto::ChannelMember {
621            role: self.role.into(),
622            user_id: self.user_id.to_proto(),
623            kind: self.kind.into(),
624        }
625    }
626}
627
628#[derive(Debug, PartialEq)]
629pub struct ChannelsForUser {
630    pub channels: Vec<Channel>,
631    pub channel_memberships: Vec<channel_member::Model>,
632    pub channel_participants: HashMap<ChannelId, Vec<UserId>>,
633    pub invited_channels: Vec<Channel>,
634
635    pub observed_buffer_versions: Vec<proto::ChannelBufferVersion>,
636    pub observed_channel_messages: Vec<proto::ChannelMessageId>,
637    pub latest_buffer_versions: Vec<proto::ChannelBufferVersion>,
638    pub latest_channel_messages: Vec<proto::ChannelMessageId>,
639}
640
641#[derive(Debug)]
642pub struct RejoinedChannelBuffer {
643    pub buffer: proto::RejoinedChannelBuffer,
644    pub old_connection_id: ConnectionId,
645}
646
647#[derive(Clone)]
648pub struct JoinRoom {
649    pub room: proto::Room,
650    pub channel: Option<channel::Model>,
651}
652
653pub struct RejoinedRoom {
654    pub room: proto::Room,
655    pub rejoined_projects: Vec<RejoinedProject>,
656    pub reshared_projects: Vec<ResharedProject>,
657    pub channel: Option<channel::Model>,
658}
659
660pub struct ResharedProject {
661    pub id: ProjectId,
662    pub old_connection_id: ConnectionId,
663    pub collaborators: Vec<ProjectCollaborator>,
664    pub worktrees: Vec<proto::WorktreeMetadata>,
665}
666
667pub struct RejoinedProject {
668    pub id: ProjectId,
669    pub old_connection_id: ConnectionId,
670    pub collaborators: Vec<ProjectCollaborator>,
671    pub worktrees: Vec<RejoinedWorktree>,
672    pub updated_repositories: Vec<proto::UpdateRepository>,
673    pub removed_repositories: Vec<u64>,
674    pub language_servers: Vec<proto::LanguageServer>,
675}
676
677impl RejoinedProject {
678    pub fn to_proto(&self) -> proto::RejoinedProject {
679        proto::RejoinedProject {
680            id: self.id.to_proto(),
681            worktrees: self
682                .worktrees
683                .iter()
684                .map(|worktree| proto::WorktreeMetadata {
685                    id: worktree.id,
686                    root_name: worktree.root_name.clone(),
687                    visible: worktree.visible,
688                    abs_path: worktree.abs_path.clone(),
689                })
690                .collect(),
691            collaborators: self
692                .collaborators
693                .iter()
694                .map(|collaborator| collaborator.to_proto())
695                .collect(),
696            language_servers: self.language_servers.clone(),
697        }
698    }
699}
700
701#[derive(Debug)]
702pub struct RejoinedWorktree {
703    pub id: u64,
704    pub abs_path: String,
705    pub root_name: String,
706    pub visible: bool,
707    pub updated_entries: Vec<proto::Entry>,
708    pub removed_entries: Vec<u64>,
709    pub updated_repositories: Vec<proto::RepositoryEntry>,
710    pub removed_repositories: Vec<u64>,
711    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
712    pub settings_files: Vec<WorktreeSettingsFile>,
713    pub scan_id: u64,
714    pub completed_scan_id: u64,
715}
716
717pub struct LeftRoom {
718    pub room: proto::Room,
719    pub channel: Option<channel::Model>,
720    pub left_projects: HashMap<ProjectId, LeftProject>,
721    pub canceled_calls_to_user_ids: Vec<UserId>,
722    pub deleted: bool,
723}
724
725pub struct RefreshedRoom {
726    pub room: proto::Room,
727    pub channel: Option<channel::Model>,
728    pub stale_participant_user_ids: Vec<UserId>,
729    pub canceled_calls_to_user_ids: Vec<UserId>,
730}
731
732pub struct RefreshedChannelBuffer {
733    pub connection_ids: Vec<ConnectionId>,
734    pub collaborators: Vec<proto::Collaborator>,
735}
736
737pub struct Project {
738    pub id: ProjectId,
739    pub role: ChannelRole,
740    pub collaborators: Vec<ProjectCollaborator>,
741    pub worktrees: BTreeMap<u64, Worktree>,
742    pub repositories: Vec<proto::UpdateRepository>,
743    pub language_servers: Vec<proto::LanguageServer>,
744}
745
746pub struct ProjectCollaborator {
747    pub connection_id: ConnectionId,
748    pub user_id: UserId,
749    pub replica_id: ReplicaId,
750    pub is_host: bool,
751}
752
753impl ProjectCollaborator {
754    pub fn to_proto(&self) -> proto::Collaborator {
755        proto::Collaborator {
756            peer_id: Some(self.connection_id.into()),
757            replica_id: self.replica_id.0 as u32,
758            user_id: self.user_id.to_proto(),
759            is_host: self.is_host,
760        }
761    }
762}
763
764#[derive(Debug)]
765pub struct LeftProject {
766    pub id: ProjectId,
767    pub should_unshare: bool,
768    pub connection_ids: Vec<ConnectionId>,
769}
770
771pub struct Worktree {
772    pub id: u64,
773    pub abs_path: String,
774    pub root_name: String,
775    pub visible: bool,
776    pub entries: Vec<proto::Entry>,
777    pub legacy_repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
778    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
779    pub settings_files: Vec<WorktreeSettingsFile>,
780    pub scan_id: u64,
781    pub completed_scan_id: u64,
782}
783
784#[derive(Debug)]
785pub struct WorktreeSettingsFile {
786    pub path: String,
787    pub content: String,
788    pub kind: LocalSettingsKind,
789}
790
791pub struct NewExtensionVersion {
792    pub name: String,
793    pub version: semver::Version,
794    pub description: String,
795    pub authors: Vec<String>,
796    pub repository: String,
797    pub schema_version: i32,
798    pub wasm_api_version: Option<String>,
799    pub provides: BTreeSet<ExtensionProvides>,
800    pub published_at: PrimitiveDateTime,
801}
802
803pub struct ExtensionVersionConstraints {
804    pub schema_versions: RangeInclusive<i32>,
805    pub wasm_api_versions: RangeInclusive<SemanticVersion>,
806}
807
808impl LocalSettingsKind {
809    pub fn from_proto(proto_kind: proto::LocalSettingsKind) -> Self {
810        match proto_kind {
811            proto::LocalSettingsKind::Settings => Self::Settings,
812            proto::LocalSettingsKind::Tasks => Self::Tasks,
813            proto::LocalSettingsKind::Editorconfig => Self::Editorconfig,
814            proto::LocalSettingsKind::Debug => Self::Debug,
815        }
816    }
817
818    pub fn to_proto(&self) -> proto::LocalSettingsKind {
819        match self {
820            Self::Settings => proto::LocalSettingsKind::Settings,
821            Self::Tasks => proto::LocalSettingsKind::Tasks,
822            Self::Editorconfig => proto::LocalSettingsKind::Editorconfig,
823            Self::Debug => proto::LocalSettingsKind::Debug,
824        }
825    }
826}
827
828fn db_status_to_proto(
829    entry: project_repository_statuses::Model,
830) -> anyhow::Result<proto::StatusEntry> {
831    use proto::git_file_status::{Tracked, Unmerged, Variant};
832
833    let (simple_status, variant) =
834        match (entry.status_kind, entry.first_status, entry.second_status) {
835            (StatusKind::Untracked, None, None) => (
836                proto::GitStatus::Added as i32,
837                Variant::Untracked(Default::default()),
838            ),
839            (StatusKind::Ignored, None, None) => (
840                proto::GitStatus::Added as i32,
841                Variant::Ignored(Default::default()),
842            ),
843            (StatusKind::Unmerged, Some(first_head), Some(second_head)) => (
844                proto::GitStatus::Conflict as i32,
845                Variant::Unmerged(Unmerged {
846                    first_head,
847                    second_head,
848                }),
849            ),
850            (StatusKind::Tracked, Some(index_status), Some(worktree_status)) => {
851                let simple_status = if worktree_status != proto::GitStatus::Unmodified as i32 {
852                    worktree_status
853                } else if index_status != proto::GitStatus::Unmodified as i32 {
854                    index_status
855                } else {
856                    proto::GitStatus::Unmodified as i32
857                };
858                (
859                    simple_status,
860                    Variant::Tracked(Tracked {
861                        index_status,
862                        worktree_status,
863                    }),
864                )
865            }
866            _ => {
867                return Err(anyhow!(
868                    "Unexpected combination of status fields: {entry:?}"
869                ));
870            }
871        };
872    Ok(proto::StatusEntry {
873        repo_path: entry.repo_path,
874        simple_status,
875        status: Some(proto::GitFileStatus {
876            variant: Some(variant),
877        }),
878    })
879}
880
881fn proto_status_to_db(
882    status_entry: proto::StatusEntry,
883) -> (String, StatusKind, Option<i32>, Option<i32>) {
884    use proto::git_file_status::{Tracked, Unmerged, Variant};
885
886    let (status_kind, first_status, second_status) = status_entry
887        .status
888        .clone()
889        .and_then(|status| status.variant)
890        .map_or(
891            (StatusKind::Untracked, None, None),
892            |variant| match variant {
893                Variant::Untracked(_) => (StatusKind::Untracked, None, None),
894                Variant::Ignored(_) => (StatusKind::Ignored, None, None),
895                Variant::Unmerged(Unmerged {
896                    first_head,
897                    second_head,
898                }) => (StatusKind::Unmerged, Some(first_head), Some(second_head)),
899                Variant::Tracked(Tracked {
900                    index_status,
901                    worktree_status,
902                }) => (
903                    StatusKind::Tracked,
904                    Some(index_status),
905                    Some(worktree_status),
906                ),
907            },
908        );
909    (
910        status_entry.repo_path,
911        status_kind,
912        first_status,
913        second_status,
914    )
915}