db.rs

  1mod ids;
  2mod queries;
  3mod tables;
  4#[cfg(test)]
  5pub mod tests;
  6
  7use crate::{executor::Executor, Error, Result};
  8use anyhow::anyhow;
  9use collections::{BTreeMap, HashMap, HashSet};
 10use dashmap::DashMap;
 11use futures::StreamExt;
 12use rand::{prelude::StdRng, Rng, SeedableRng};
 13use rpc::{
 14    proto::{self},
 15    ConnectionId,
 16};
 17use sea_orm::{
 18    entity::prelude::*,
 19    sea_query::{Alias, Expr, OnConflict},
 20    ActiveValue, Condition, ConnectionTrait, DatabaseConnection, DatabaseTransaction, DbErr,
 21    FromQueryResult, IntoActiveModel, IsolationLevel, JoinType, QueryOrder, QuerySelect, Statement,
 22    TransactionTrait,
 23};
 24use serde::{ser::Error as _, Deserialize, Serialize, Serializer};
 25use sqlx::{
 26    migrate::{Migrate, Migration, MigrationSource},
 27    Connection,
 28};
 29use std::{
 30    fmt::Write as _,
 31    future::Future,
 32    marker::PhantomData,
 33    ops::{Deref, DerefMut},
 34    path::Path,
 35    rc::Rc,
 36    sync::Arc,
 37    time::Duration,
 38};
 39use time::{format_description::well_known::iso8601, PrimitiveDateTime};
 40use tokio::sync::{Mutex, OwnedMutexGuard};
 41
 42#[cfg(test)]
 43pub use tests::TestDb;
 44
 45pub use ids::*;
 46pub use queries::contributors::ContributorSelector;
 47pub use sea_orm::ConnectOptions;
 48pub use tables::user::Model as User;
 49pub use tables::*;
 50
 51/// Database gives you a handle that lets you access the database.
 52/// It handles pooling internally.
 53pub struct Database {
 54    options: ConnectOptions,
 55    pool: DatabaseConnection,
 56    rooms: DashMap<RoomId, Arc<Mutex<()>>>,
 57    rng: Mutex<StdRng>,
 58    executor: Executor,
 59    notification_kinds_by_id: HashMap<NotificationKindId, &'static str>,
 60    notification_kinds_by_name: HashMap<String, NotificationKindId>,
 61    #[cfg(test)]
 62    runtime: Option<tokio::runtime::Runtime>,
 63}
 64
 65// The `Database` type has so many methods that its impl blocks are split into
 66// separate files in the `queries` folder.
 67impl Database {
 68    /// Connects to the database with the given options
 69    pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
 70        sqlx::any::install_default_drivers();
 71        Ok(Self {
 72            options: options.clone(),
 73            pool: sea_orm::Database::connect(options).await?,
 74            rooms: DashMap::with_capacity(16384),
 75            rng: Mutex::new(StdRng::seed_from_u64(0)),
 76            notification_kinds_by_id: HashMap::default(),
 77            notification_kinds_by_name: HashMap::default(),
 78            executor,
 79            #[cfg(test)]
 80            runtime: None,
 81        })
 82    }
 83
 84    #[cfg(test)]
 85    pub fn reset(&self) {
 86        self.rooms.clear();
 87    }
 88
 89    /// Runs the database migrations.
 90    pub async fn migrate(
 91        &self,
 92        migrations_path: &Path,
 93        ignore_checksum_mismatch: bool,
 94    ) -> anyhow::Result<Vec<(Migration, Duration)>> {
 95        let migrations = MigrationSource::resolve(migrations_path)
 96            .await
 97            .map_err(|err| anyhow!("failed to load migrations: {err:?}"))?;
 98
 99        let mut connection = sqlx::AnyConnection::connect(self.options.get_url()).await?;
100
101        connection.ensure_migrations_table().await?;
102        let applied_migrations: HashMap<_, _> = connection
103            .list_applied_migrations()
104            .await?
105            .into_iter()
106            .map(|m| (m.version, m))
107            .collect();
108
109        let mut new_migrations = Vec::new();
110        for migration in migrations {
111            match applied_migrations.get(&migration.version) {
112                Some(applied_migration) => {
113                    if migration.checksum != applied_migration.checksum && !ignore_checksum_mismatch
114                    {
115                        Err(anyhow!(
116                            "checksum mismatch for applied migration {}",
117                            migration.description
118                        ))?;
119                    }
120                }
121                None => {
122                    let elapsed = connection.apply(&migration).await?;
123                    new_migrations.push((migration, elapsed));
124                }
125            }
126        }
127
128        Ok(new_migrations)
129    }
130
131    /// Transaction runs things in a transaction. If you want to call other methods
132    /// and pass the transaction around you need to reborrow the transaction at each
133    /// call site with: `&*tx`.
134    pub async fn transaction<F, Fut, T>(&self, f: F) -> Result<T>
135    where
136        F: Send + Fn(TransactionHandle) -> Fut,
137        Fut: Send + Future<Output = Result<T>>,
138    {
139        let body = async {
140            let mut i = 0;
141            loop {
142                let (tx, result) = self.with_transaction(&f).await?;
143                match result {
144                    Ok(result) => match tx.commit().await.map_err(Into::into) {
145                        Ok(()) => return Ok(result),
146                        Err(error) => {
147                            if !self.retry_on_serialization_error(&error, i).await {
148                                return Err(error);
149                            }
150                        }
151                    },
152                    Err(error) => {
153                        tx.rollback().await?;
154                        if !self.retry_on_serialization_error(&error, i).await {
155                            return Err(error);
156                        }
157                    }
158                }
159                i += 1;
160            }
161        };
162
163        self.run(body).await
164    }
165
166    pub async fn weak_transaction<F, Fut, T>(&self, f: F) -> Result<T>
167    where
168        F: Send + Fn(TransactionHandle) -> Fut,
169        Fut: Send + Future<Output = Result<T>>,
170    {
171        let body = async {
172            let (tx, result) = self.with_weak_transaction(&f).await?;
173            match result {
174                Ok(result) => match tx.commit().await.map_err(Into::into) {
175                    Ok(()) => return Ok(result),
176                    Err(error) => {
177                        return Err(error);
178                    }
179                },
180                Err(error) => {
181                    tx.rollback().await?;
182                    return Err(error);
183                }
184            }
185        };
186
187        self.run(body).await
188    }
189
190    /// The same as room_transaction, but if you need to only optionally return a Room.
191    async fn optional_room_transaction<F, Fut, T>(&self, f: F) -> Result<Option<RoomGuard<T>>>
192    where
193        F: Send + Fn(TransactionHandle) -> Fut,
194        Fut: Send + Future<Output = Result<Option<(RoomId, T)>>>,
195    {
196        let body = async {
197            let mut i = 0;
198            loop {
199                let (tx, result) = self.with_transaction(&f).await?;
200                match result {
201                    Ok(Some((room_id, data))) => {
202                        let lock = self.rooms.entry(room_id).or_default().clone();
203                        let _guard = lock.lock_owned().await;
204                        match tx.commit().await.map_err(Into::into) {
205                            Ok(()) => {
206                                return Ok(Some(RoomGuard {
207                                    data,
208                                    _guard,
209                                    _not_send: PhantomData,
210                                }));
211                            }
212                            Err(error) => {
213                                if !self.retry_on_serialization_error(&error, i).await {
214                                    return Err(error);
215                                }
216                            }
217                        }
218                    }
219                    Ok(None) => match tx.commit().await.map_err(Into::into) {
220                        Ok(()) => return Ok(None),
221                        Err(error) => {
222                            if !self.retry_on_serialization_error(&error, i).await {
223                                return Err(error);
224                            }
225                        }
226                    },
227                    Err(error) => {
228                        tx.rollback().await?;
229                        if !self.retry_on_serialization_error(&error, i).await {
230                            return Err(error);
231                        }
232                    }
233                }
234                i += 1;
235            }
236        };
237
238        self.run(body).await
239    }
240
241    /// room_transaction runs the block in a transaction. It returns a RoomGuard, that keeps
242    /// the database locked until it is dropped. This ensures that updates sent to clients are
243    /// properly serialized with respect to database changes.
244    async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
245    where
246        F: Send + Fn(TransactionHandle) -> Fut,
247        Fut: Send + Future<Output = Result<T>>,
248    {
249        let body = async {
250            let mut i = 0;
251            loop {
252                let lock = self.rooms.entry(room_id).or_default().clone();
253                let _guard = lock.lock_owned().await;
254                let (tx, result) = self.with_transaction(&f).await?;
255                match result {
256                    Ok(data) => match tx.commit().await.map_err(Into::into) {
257                        Ok(()) => {
258                            return Ok(RoomGuard {
259                                data,
260                                _guard,
261                                _not_send: PhantomData,
262                            });
263                        }
264                        Err(error) => {
265                            if !self.retry_on_serialization_error(&error, i).await {
266                                return Err(error);
267                            }
268                        }
269                    },
270                    Err(error) => {
271                        tx.rollback().await?;
272                        if !self.retry_on_serialization_error(&error, i).await {
273                            return Err(error);
274                        }
275                    }
276                }
277                i += 1;
278            }
279        };
280
281        self.run(body).await
282    }
283
284    async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
285    where
286        F: Send + Fn(TransactionHandle) -> Fut,
287        Fut: Send + Future<Output = Result<T>>,
288    {
289        let tx = self
290            .pool
291            .begin_with_config(Some(IsolationLevel::Serializable), None)
292            .await?;
293
294        let mut tx = Arc::new(Some(tx));
295        let result = f(TransactionHandle(tx.clone())).await;
296        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
297            return Err(anyhow!(
298                "couldn't complete transaction because it's still in use"
299            ))?;
300        };
301
302        Ok((tx, result))
303    }
304
305    async fn with_weak_transaction<F, Fut, T>(
306        &self,
307        f: &F,
308    ) -> Result<(DatabaseTransaction, Result<T>)>
309    where
310        F: Send + Fn(TransactionHandle) -> Fut,
311        Fut: Send + Future<Output = Result<T>>,
312    {
313        let tx = self
314            .pool
315            .begin_with_config(Some(IsolationLevel::ReadCommitted), None)
316            .await?;
317
318        let mut tx = Arc::new(Some(tx));
319        let result = f(TransactionHandle(tx.clone())).await;
320        let Some(tx) = Arc::get_mut(&mut tx).and_then(|tx| tx.take()) else {
321            return Err(anyhow!(
322                "couldn't complete transaction because it's still in use"
323            ))?;
324        };
325
326        Ok((tx, result))
327    }
328
329    async fn run<F, T>(&self, future: F) -> Result<T>
330    where
331        F: Future<Output = Result<T>>,
332    {
333        #[cfg(test)]
334        {
335            if let Executor::Deterministic(executor) = &self.executor {
336                executor.simulate_random_delay().await;
337            }
338
339            self.runtime.as_ref().unwrap().block_on(future)
340        }
341
342        #[cfg(not(test))]
343        {
344            future.await
345        }
346    }
347
348    async fn retry_on_serialization_error(&self, error: &Error, prev_attempt_count: usize) -> bool {
349        // If the error is due to a failure to serialize concurrent transactions, then retry
350        // this transaction after a delay. With each subsequent retry, double the delay duration.
351        // Also vary the delay randomly in order to ensure different database connections retry
352        // at different times.
353        const SLEEPS: [f32; 10] = [10., 20., 40., 80., 160., 320., 640., 1280., 2560., 5120.];
354        if is_serialization_error(error) && prev_attempt_count < SLEEPS.len() {
355            let base_delay = SLEEPS[prev_attempt_count];
356            let randomized_delay = base_delay * self.rng.lock().await.gen_range(0.5..=2.0);
357            log::info!(
358                "retrying transaction after serialization error. delay: {} ms.",
359                randomized_delay
360            );
361            self.executor
362                .sleep(Duration::from_millis(randomized_delay as u64))
363                .await;
364            true
365        } else {
366            false
367        }
368    }
369}
370
371fn is_serialization_error(error: &Error) -> bool {
372    const SERIALIZATION_FAILURE_CODE: &str = "40001";
373    match error {
374        Error::Database(
375            DbErr::Exec(sea_orm::RuntimeErr::SqlxError(error))
376            | DbErr::Query(sea_orm::RuntimeErr::SqlxError(error)),
377        ) if error
378            .as_database_error()
379            .and_then(|error| error.code())
380            .as_deref()
381            == Some(SERIALIZATION_FAILURE_CODE) =>
382        {
383            true
384        }
385        _ => false,
386    }
387}
388
389/// A handle to a [`DatabaseTransaction`].
390pub struct TransactionHandle(Arc<Option<DatabaseTransaction>>);
391
392impl Deref for TransactionHandle {
393    type Target = DatabaseTransaction;
394
395    fn deref(&self) -> &Self::Target {
396        self.0.as_ref().as_ref().unwrap()
397    }
398}
399
400/// [`RoomGuard`] keeps a database transaction alive until it is dropped.
401/// so that updates to rooms are serialized.
402pub struct RoomGuard<T> {
403    data: T,
404    _guard: OwnedMutexGuard<()>,
405    _not_send: PhantomData<Rc<()>>,
406}
407
408impl<T> Deref for RoomGuard<T> {
409    type Target = T;
410
411    fn deref(&self) -> &T {
412        &self.data
413    }
414}
415
416impl<T> DerefMut for RoomGuard<T> {
417    fn deref_mut(&mut self) -> &mut T {
418        &mut self.data
419    }
420}
421
422impl<T> RoomGuard<T> {
423    /// Returns the inner value of the guard.
424    pub fn into_inner(self) -> T {
425        self.data
426    }
427}
428
429#[derive(Clone, Debug, PartialEq, Eq)]
430pub enum Contact {
431    Accepted { user_id: UserId, busy: bool },
432    Outgoing { user_id: UserId },
433    Incoming { user_id: UserId },
434}
435
436impl Contact {
437    pub fn user_id(&self) -> UserId {
438        match self {
439            Contact::Accepted { user_id, .. } => *user_id,
440            Contact::Outgoing { user_id } => *user_id,
441            Contact::Incoming { user_id, .. } => *user_id,
442        }
443    }
444}
445
446pub type NotificationBatch = Vec<(UserId, proto::Notification)>;
447
448pub struct CreatedChannelMessage {
449    pub message_id: MessageId,
450    pub participant_connection_ids: Vec<ConnectionId>,
451    pub channel_members: Vec<UserId>,
452    pub notifications: NotificationBatch,
453}
454
455pub struct UpdatedChannelMessage {
456    pub message_id: MessageId,
457    pub participant_connection_ids: Vec<ConnectionId>,
458    pub notifications: NotificationBatch,
459    pub reply_to_message_id: Option<MessageId>,
460    pub timestamp: PrimitiveDateTime,
461}
462
463#[derive(Clone, Debug, PartialEq, Eq, FromQueryResult, Serialize, Deserialize)]
464pub struct Invite {
465    pub email_address: String,
466    pub email_confirmation_code: String,
467}
468
469#[derive(Clone, Debug, Deserialize)]
470pub struct NewSignup {
471    pub email_address: String,
472    pub platform_mac: bool,
473    pub platform_windows: bool,
474    pub platform_linux: bool,
475    pub editor_features: Vec<String>,
476    pub programming_languages: Vec<String>,
477    pub device_id: Option<String>,
478    pub added_to_mailing_list: bool,
479    pub created_at: Option<DateTime>,
480}
481
482#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, FromQueryResult)]
483pub struct WaitlistSummary {
484    pub count: i64,
485    pub linux_count: i64,
486    pub mac_count: i64,
487    pub windows_count: i64,
488    pub unknown_count: i64,
489}
490
491/// The parameters to create a new user.
492#[derive(Debug, Serialize, Deserialize)]
493pub struct NewUserParams {
494    pub github_login: String,
495    pub github_user_id: i32,
496}
497
498/// The result of creating a new user.
499#[derive(Debug)]
500pub struct NewUserResult {
501    pub user_id: UserId,
502    pub metrics_id: String,
503    pub inviting_user_id: Option<UserId>,
504    pub signup_device_id: Option<String>,
505}
506
507/// The result of updating a channel membership.
508#[derive(Debug)]
509pub struct MembershipUpdated {
510    pub channel_id: ChannelId,
511    pub new_channels: ChannelsForUser,
512    pub removed_channels: Vec<ChannelId>,
513}
514
515/// The result of setting a member's role.
516#[derive(Debug)]
517pub enum SetMemberRoleResult {
518    InviteUpdated(Channel),
519    MembershipUpdated(MembershipUpdated),
520}
521
522/// The result of inviting a member to a channel.
523#[derive(Debug)]
524pub struct InviteMemberResult {
525    pub channel: Channel,
526    pub notifications: NotificationBatch,
527}
528
529#[derive(Debug)]
530pub struct RespondToChannelInvite {
531    pub membership_update: Option<MembershipUpdated>,
532    pub notifications: NotificationBatch,
533}
534
535#[derive(Debug)]
536pub struct RemoveChannelMemberResult {
537    pub membership_update: MembershipUpdated,
538    pub notification_id: Option<NotificationId>,
539}
540
541#[derive(Debug, PartialEq, Eq, Hash)]
542pub struct Channel {
543    pub id: ChannelId,
544    pub name: String,
545    pub visibility: ChannelVisibility,
546    /// parent_path is the channel ids from the root to this one (not including this one)
547    pub parent_path: Vec<ChannelId>,
548}
549
550impl Channel {
551    pub fn from_model(value: channel::Model) -> Self {
552        Channel {
553            id: value.id,
554            visibility: value.visibility,
555            name: value.clone().name,
556            parent_path: value.ancestors().collect(),
557        }
558    }
559
560    pub fn to_proto(&self) -> proto::Channel {
561        proto::Channel {
562            id: self.id.to_proto(),
563            name: self.name.clone(),
564            visibility: self.visibility.into(),
565            parent_path: self.parent_path.iter().map(|c| c.to_proto()).collect(),
566        }
567    }
568}
569
570#[derive(Debug, PartialEq, Eq, Hash)]
571pub struct ChannelMember {
572    pub role: ChannelRole,
573    pub user_id: UserId,
574    pub kind: proto::channel_member::Kind,
575}
576
577impl ChannelMember {
578    pub fn to_proto(&self) -> proto::ChannelMember {
579        proto::ChannelMember {
580            role: self.role.into(),
581            user_id: self.user_id.to_proto(),
582            kind: self.kind.into(),
583        }
584    }
585}
586
587#[derive(Debug, PartialEq)]
588pub struct ChannelsForUser {
589    pub channels: Vec<Channel>,
590    pub channel_memberships: Vec<channel_member::Model>,
591    pub channel_participants: HashMap<ChannelId, Vec<UserId>>,
592    pub hosted_projects: Vec<proto::HostedProject>,
593
594    pub observed_buffer_versions: Vec<proto::ChannelBufferVersion>,
595    pub observed_channel_messages: Vec<proto::ChannelMessageId>,
596    pub latest_buffer_versions: Vec<proto::ChannelBufferVersion>,
597    pub latest_channel_messages: Vec<proto::ChannelMessageId>,
598}
599
600#[derive(Debug)]
601pub struct RejoinedChannelBuffer {
602    pub buffer: proto::RejoinedChannelBuffer,
603    pub old_connection_id: ConnectionId,
604}
605
606#[derive(Clone)]
607pub struct JoinRoom {
608    pub room: proto::Room,
609    pub channel: Option<channel::Model>,
610}
611
612pub struct RejoinedRoom {
613    pub room: proto::Room,
614    pub rejoined_projects: Vec<RejoinedProject>,
615    pub reshared_projects: Vec<ResharedProject>,
616    pub channel: Option<channel::Model>,
617}
618
619pub struct ResharedProject {
620    pub id: ProjectId,
621    pub old_connection_id: ConnectionId,
622    pub collaborators: Vec<ProjectCollaborator>,
623    pub worktrees: Vec<proto::WorktreeMetadata>,
624}
625
626pub struct RejoinedProject {
627    pub id: ProjectId,
628    pub old_connection_id: ConnectionId,
629    pub collaborators: Vec<ProjectCollaborator>,
630    pub worktrees: Vec<RejoinedWorktree>,
631    pub language_servers: Vec<proto::LanguageServer>,
632}
633
634#[derive(Debug)]
635pub struct RejoinedWorktree {
636    pub id: u64,
637    pub abs_path: String,
638    pub root_name: String,
639    pub visible: bool,
640    pub updated_entries: Vec<proto::Entry>,
641    pub removed_entries: Vec<u64>,
642    pub updated_repositories: Vec<proto::RepositoryEntry>,
643    pub removed_repositories: Vec<u64>,
644    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
645    pub settings_files: Vec<WorktreeSettingsFile>,
646    pub scan_id: u64,
647    pub completed_scan_id: u64,
648}
649
650pub struct LeftRoom {
651    pub room: proto::Room,
652    pub channel: Option<channel::Model>,
653    pub left_projects: HashMap<ProjectId, LeftProject>,
654    pub canceled_calls_to_user_ids: Vec<UserId>,
655    pub deleted: bool,
656}
657
658pub struct RefreshedRoom {
659    pub room: proto::Room,
660    pub channel: Option<channel::Model>,
661    pub stale_participant_user_ids: Vec<UserId>,
662    pub canceled_calls_to_user_ids: Vec<UserId>,
663}
664
665pub struct RefreshedChannelBuffer {
666    pub connection_ids: Vec<ConnectionId>,
667    pub collaborators: Vec<proto::Collaborator>,
668}
669
670pub struct Project {
671    pub id: ProjectId,
672    pub role: ChannelRole,
673    pub collaborators: Vec<ProjectCollaborator>,
674    pub worktrees: BTreeMap<u64, Worktree>,
675    pub language_servers: Vec<proto::LanguageServer>,
676}
677
678pub struct ProjectCollaborator {
679    pub connection_id: ConnectionId,
680    pub user_id: UserId,
681    pub replica_id: ReplicaId,
682    pub is_host: bool,
683}
684
685impl ProjectCollaborator {
686    pub fn to_proto(&self) -> proto::Collaborator {
687        proto::Collaborator {
688            peer_id: Some(self.connection_id.into()),
689            replica_id: self.replica_id.0 as u32,
690            user_id: self.user_id.to_proto(),
691        }
692    }
693}
694
695#[derive(Debug)]
696pub struct LeftProject {
697    pub id: ProjectId,
698    pub host_user_id: Option<UserId>,
699    pub host_connection_id: Option<ConnectionId>,
700    pub connection_ids: Vec<ConnectionId>,
701}
702
703pub struct Worktree {
704    pub id: u64,
705    pub abs_path: String,
706    pub root_name: String,
707    pub visible: bool,
708    pub entries: Vec<proto::Entry>,
709    pub repository_entries: BTreeMap<u64, proto::RepositoryEntry>,
710    pub diagnostic_summaries: Vec<proto::DiagnosticSummary>,
711    pub settings_files: Vec<WorktreeSettingsFile>,
712    pub scan_id: u64,
713    pub completed_scan_id: u64,
714}
715
716#[derive(Debug)]
717pub struct WorktreeSettingsFile {
718    pub path: String,
719    pub content: String,
720}
721
722pub struct NewExtensionVersion {
723    pub name: String,
724    pub version: semver::Version,
725    pub description: String,
726    pub authors: Vec<String>,
727    pub repository: String,
728    pub schema_version: i32,
729    pub published_at: PrimitiveDateTime,
730}
731
732#[derive(Debug, Serialize, PartialEq)]
733pub struct ExtensionMetadata {
734    pub id: String,
735    pub name: String,
736    pub version: String,
737    pub authors: Vec<String>,
738    pub description: String,
739    pub repository: String,
740    #[serde(serialize_with = "serialize_iso8601")]
741    pub published_at: PrimitiveDateTime,
742    pub download_count: u64,
743}
744
745pub fn serialize_iso8601<S: Serializer>(
746    datetime: &PrimitiveDateTime,
747    serializer: S,
748) -> Result<S::Ok, S::Error> {
749    const SERDE_CONFIG: iso8601::EncodedConfig = iso8601::Config::DEFAULT
750        .set_year_is_six_digits(false)
751        .set_time_precision(iso8601::TimePrecision::Second {
752            decimal_digits: None,
753        })
754        .encode();
755
756    datetime
757        .assume_utc()
758        .format(&time::format_description::well_known::Iso8601::<SERDE_CONFIG>)
759        .map_err(S::Error::custom)?
760        .serialize(serializer)
761}