buffers.rs

   1use super::*;
   2use prost::Message;
   3use text::{EditOperation, UndoOperation};
   4
   5pub struct LeftChannelBuffer {
   6    pub channel_id: ChannelId,
   7    pub collaborators: Vec<proto::Collaborator>,
   8    pub connections: Vec<ConnectionId>,
   9}
  10
  11impl Database {
  12    /// Open a channel buffer. Returns the current contents, and adds you to the list of people
  13    /// to notify on changes.
  14    pub async fn join_channel_buffer(
  15        &self,
  16        channel_id: ChannelId,
  17        user_id: UserId,
  18        connection: ConnectionId,
  19    ) -> Result<proto::JoinChannelBufferResponse> {
  20        self.transaction(|tx| async move {
  21            let channel = self.get_channel_internal(channel_id, &tx).await?;
  22            self.check_user_is_channel_participant(&channel, user_id, &tx)
  23                .await?;
  24
  25            let buffer = channel::Model {
  26                id: channel_id,
  27                ..Default::default()
  28            }
  29            .find_related(buffer::Entity)
  30            .one(&*tx)
  31            .await?;
  32
  33            let buffer = if let Some(buffer) = buffer {
  34                buffer
  35            } else {
  36                let buffer = buffer::ActiveModel {
  37                    channel_id: ActiveValue::Set(channel_id),
  38                    ..Default::default()
  39                }
  40                .insert(&*tx)
  41                .await?;
  42                buffer_snapshot::ActiveModel {
  43                    buffer_id: ActiveValue::Set(buffer.id),
  44                    epoch: ActiveValue::Set(0),
  45                    text: ActiveValue::Set(String::new()),
  46                    operation_serialization_version: ActiveValue::Set(
  47                        storage::SERIALIZATION_VERSION,
  48                    ),
  49                }
  50                .insert(&*tx)
  51                .await?;
  52                buffer
  53            };
  54
  55            // Join the collaborators
  56            let mut collaborators = channel_buffer_collaborator::Entity::find()
  57                .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
  58                .all(&*tx)
  59                .await?;
  60            let replica_ids = collaborators
  61                .iter()
  62                .map(|c| c.replica_id)
  63                .collect::<HashSet<_>>();
  64            let mut replica_id = ReplicaId(0);
  65            while replica_ids.contains(&replica_id) {
  66                replica_id.0 += 1;
  67            }
  68            let collaborator = channel_buffer_collaborator::ActiveModel {
  69                channel_id: ActiveValue::Set(channel_id),
  70                connection_id: ActiveValue::Set(connection.id as i32),
  71                connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
  72                user_id: ActiveValue::Set(user_id),
  73                replica_id: ActiveValue::Set(replica_id),
  74                ..Default::default()
  75            }
  76            .insert(&*tx)
  77            .await?;
  78            collaborators.push(collaborator);
  79
  80            let (base_text, operations, max_operation) =
  81                self.get_buffer_state(&buffer, &tx).await?;
  82
  83            // Save the last observed operation
  84            if let Some(op) = max_operation {
  85                observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
  86                    user_id: ActiveValue::Set(user_id),
  87                    buffer_id: ActiveValue::Set(buffer.id),
  88                    epoch: ActiveValue::Set(op.epoch),
  89                    lamport_timestamp: ActiveValue::Set(op.lamport_timestamp),
  90                    replica_id: ActiveValue::Set(op.replica_id),
  91                })
  92                .on_conflict(
  93                    OnConflict::columns([
  94                        observed_buffer_edits::Column::UserId,
  95                        observed_buffer_edits::Column::BufferId,
  96                    ])
  97                    .update_columns([
  98                        observed_buffer_edits::Column::Epoch,
  99                        observed_buffer_edits::Column::LamportTimestamp,
 100                    ])
 101                    .to_owned(),
 102                )
 103                .exec(&*tx)
 104                .await?;
 105            }
 106
 107            Ok(proto::JoinChannelBufferResponse {
 108                buffer_id: buffer.id.to_proto(),
 109                replica_id: replica_id.to_proto() as u32,
 110                base_text,
 111                operations,
 112                epoch: buffer.epoch as u64,
 113                collaborators: collaborators
 114                    .into_iter()
 115                    .map(|collaborator| proto::Collaborator {
 116                        peer_id: Some(collaborator.connection().into()),
 117                        user_id: collaborator.user_id.to_proto(),
 118                        replica_id: collaborator.replica_id.0 as u32,
 119                    })
 120                    .collect(),
 121            })
 122        })
 123        .await
 124    }
 125
 126    /// Rejoin a channel buffer (after a connection interruption)
 127    pub async fn rejoin_channel_buffers(
 128        &self,
 129        buffers: &[proto::ChannelBufferVersion],
 130        user_id: UserId,
 131        connection_id: ConnectionId,
 132    ) -> Result<Vec<RejoinedChannelBuffer>> {
 133        self.transaction(|tx| async move {
 134            let mut results = Vec::new();
 135            for client_buffer in buffers {
 136                let channel = self
 137                    .get_channel_internal(ChannelId::from_proto(client_buffer.channel_id), &tx)
 138                    .await?;
 139                if self
 140                    .check_user_is_channel_participant(&channel, user_id, &tx)
 141                    .await
 142                    .is_err()
 143                {
 144                    log::info!("user is not a member of channel");
 145                    continue;
 146                }
 147
 148                let buffer = self.get_channel_buffer(channel.id, &tx).await?;
 149                let mut collaborators = channel_buffer_collaborator::Entity::find()
 150                    .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel.id))
 151                    .all(&*tx)
 152                    .await?;
 153
 154                // If the buffer epoch hasn't changed since the client lost
 155                // connection, then the client's buffer can be synchronized with
 156                // the server's buffer.
 157                if buffer.epoch as u64 != client_buffer.epoch {
 158                    log::info!("can't rejoin buffer, epoch has changed");
 159                    continue;
 160                }
 161
 162                // Find the collaborator record for this user's previous lost
 163                // connection. Update it with the new connection id.
 164                let Some(self_collaborator) =
 165                    collaborators.iter_mut().find(|c| c.user_id == user_id)
 166                else {
 167                    log::info!("can't rejoin buffer, no previous collaborator found");
 168                    continue;
 169                };
 170                let old_connection_id = self_collaborator.connection();
 171                *self_collaborator = channel_buffer_collaborator::ActiveModel {
 172                    id: ActiveValue::Unchanged(self_collaborator.id),
 173                    connection_id: ActiveValue::Set(connection_id.id as i32),
 174                    connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
 175                    connection_lost: ActiveValue::Set(false),
 176                    ..Default::default()
 177                }
 178                .update(&*tx)
 179                .await?;
 180
 181                let client_version = version_from_wire(&client_buffer.version);
 182                let serialization_version = self
 183                    .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
 184                    .await?;
 185
 186                let mut rows = buffer_operation::Entity::find()
 187                    .filter(
 188                        buffer_operation::Column::BufferId
 189                            .eq(buffer.id)
 190                            .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
 191                    )
 192                    .stream(&*tx)
 193                    .await?;
 194
 195                // Find the server's version vector and any operations
 196                // that the client has not seen.
 197                let mut server_version = clock::Global::new();
 198                let mut operations = Vec::new();
 199                while let Some(row) = rows.next().await {
 200                    let row = row?;
 201                    let timestamp = clock::Lamport {
 202                        replica_id: row.replica_id as u16,
 203                        value: row.lamport_timestamp as u32,
 204                    };
 205                    server_version.observe(timestamp);
 206                    if !client_version.observed(timestamp) {
 207                        operations.push(proto::Operation {
 208                            variant: Some(operation_from_storage(row, serialization_version)?),
 209                        })
 210                    }
 211                }
 212
 213                results.push(RejoinedChannelBuffer {
 214                    old_connection_id,
 215                    buffer: proto::RejoinedChannelBuffer {
 216                        channel_id: client_buffer.channel_id,
 217                        version: version_to_wire(&server_version),
 218                        operations,
 219                        collaborators: collaborators
 220                            .into_iter()
 221                            .map(|collaborator| proto::Collaborator {
 222                                peer_id: Some(collaborator.connection().into()),
 223                                user_id: collaborator.user_id.to_proto(),
 224                                replica_id: collaborator.replica_id.0 as u32,
 225                            })
 226                            .collect(),
 227                    },
 228                });
 229            }
 230
 231            Ok(results)
 232        })
 233        .await
 234    }
 235
 236    /// Clear out any buffer collaborators who are no longer collaborating.
 237    pub async fn clear_stale_channel_buffer_collaborators(
 238        &self,
 239        channel_id: ChannelId,
 240        server_id: ServerId,
 241    ) -> Result<RefreshedChannelBuffer> {
 242        self.transaction(|tx| async move {
 243            let db_collaborators = channel_buffer_collaborator::Entity::find()
 244                .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
 245                .all(&*tx)
 246                .await?;
 247
 248            let mut connection_ids = Vec::new();
 249            let mut collaborators = Vec::new();
 250            let mut collaborator_ids_to_remove = Vec::new();
 251            for db_collaborator in &db_collaborators {
 252                if !db_collaborator.connection_lost
 253                    && db_collaborator.connection_server_id == server_id
 254                {
 255                    connection_ids.push(db_collaborator.connection());
 256                    collaborators.push(proto::Collaborator {
 257                        peer_id: Some(db_collaborator.connection().into()),
 258                        replica_id: db_collaborator.replica_id.0 as u32,
 259                        user_id: db_collaborator.user_id.to_proto(),
 260                    })
 261                } else {
 262                    collaborator_ids_to_remove.push(db_collaborator.id);
 263                }
 264            }
 265
 266            channel_buffer_collaborator::Entity::delete_many()
 267                .filter(channel_buffer_collaborator::Column::Id.is_in(collaborator_ids_to_remove))
 268                .exec(&*tx)
 269                .await?;
 270
 271            Ok(RefreshedChannelBuffer {
 272                connection_ids,
 273                collaborators,
 274            })
 275        })
 276        .await
 277    }
 278
 279    /// Close the channel buffer, and stop receiving updates for it.
 280    pub async fn leave_channel_buffer(
 281        &self,
 282        channel_id: ChannelId,
 283        connection: ConnectionId,
 284    ) -> Result<LeftChannelBuffer> {
 285        self.transaction(|tx| async move {
 286            self.leave_channel_buffer_internal(channel_id, connection, &tx)
 287                .await
 288        })
 289        .await
 290    }
 291
 292    /// Close the channel buffer, and stop receiving updates for it.
 293    pub async fn channel_buffer_connection_lost(
 294        &self,
 295        connection: ConnectionId,
 296        tx: &DatabaseTransaction,
 297    ) -> Result<()> {
 298        channel_buffer_collaborator::Entity::update_many()
 299            .filter(
 300                Condition::all()
 301                    .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
 302                    .add(
 303                        channel_buffer_collaborator::Column::ConnectionServerId
 304                            .eq(connection.owner_id as i32),
 305                    ),
 306            )
 307            .set(channel_buffer_collaborator::ActiveModel {
 308                connection_lost: ActiveValue::set(true),
 309                ..Default::default()
 310            })
 311            .exec(tx)
 312            .await?;
 313        Ok(())
 314    }
 315
 316    /// Close all open channel buffers
 317    pub async fn leave_channel_buffers(
 318        &self,
 319        connection: ConnectionId,
 320    ) -> Result<Vec<LeftChannelBuffer>> {
 321        self.transaction(|tx| async move {
 322            #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
 323            enum QueryChannelIds {
 324                ChannelId,
 325            }
 326
 327            let channel_ids: Vec<ChannelId> = channel_buffer_collaborator::Entity::find()
 328                .select_only()
 329                .column(channel_buffer_collaborator::Column::ChannelId)
 330                .filter(Condition::all().add(
 331                    channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32),
 332                ))
 333                .into_values::<_, QueryChannelIds>()
 334                .all(&*tx)
 335                .await?;
 336
 337            let mut result = Vec::new();
 338            for channel_id in channel_ids {
 339                let left_channel_buffer = self
 340                    .leave_channel_buffer_internal(channel_id, connection, &tx)
 341                    .await?;
 342                result.push(left_channel_buffer);
 343            }
 344
 345            Ok(result)
 346        })
 347        .await
 348    }
 349
 350    async fn leave_channel_buffer_internal(
 351        &self,
 352        channel_id: ChannelId,
 353        connection: ConnectionId,
 354        tx: &DatabaseTransaction,
 355    ) -> Result<LeftChannelBuffer> {
 356        let result = channel_buffer_collaborator::Entity::delete_many()
 357            .filter(
 358                Condition::all()
 359                    .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
 360                    .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
 361                    .add(
 362                        channel_buffer_collaborator::Column::ConnectionServerId
 363                            .eq(connection.owner_id as i32),
 364                    ),
 365            )
 366            .exec(tx)
 367            .await?;
 368        if result.rows_affected == 0 {
 369            Err(anyhow!("not a collaborator on this project"))?;
 370        }
 371
 372        let mut collaborators = Vec::new();
 373        let mut connections = Vec::new();
 374        let mut rows = channel_buffer_collaborator::Entity::find()
 375            .filter(
 376                Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
 377            )
 378            .stream(tx)
 379            .await?;
 380        while let Some(row) = rows.next().await {
 381            let row = row?;
 382            let connection = row.connection();
 383            connections.push(connection);
 384            collaborators.push(proto::Collaborator {
 385                peer_id: Some(connection.into()),
 386                replica_id: row.replica_id.0 as u32,
 387                user_id: row.user_id.to_proto(),
 388            });
 389        }
 390
 391        drop(rows);
 392
 393        if collaborators.is_empty() {
 394            self.snapshot_channel_buffer(channel_id, &tx).await?;
 395        }
 396
 397        Ok(LeftChannelBuffer {
 398            channel_id,
 399            collaborators,
 400            connections,
 401        })
 402    }
 403
 404    pub async fn get_channel_buffer_collaborators(
 405        &self,
 406        channel_id: ChannelId,
 407    ) -> Result<Vec<UserId>> {
 408        self.transaction(|tx| async move {
 409            self.get_channel_buffer_collaborators_internal(channel_id, &tx)
 410                .await
 411        })
 412        .await
 413    }
 414
 415    async fn get_channel_buffer_collaborators_internal(
 416        &self,
 417        channel_id: ChannelId,
 418        tx: &DatabaseTransaction,
 419    ) -> Result<Vec<UserId>> {
 420        #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
 421        enum QueryUserIds {
 422            UserId,
 423        }
 424
 425        let users: Vec<UserId> = channel_buffer_collaborator::Entity::find()
 426            .select_only()
 427            .column(channel_buffer_collaborator::Column::UserId)
 428            .filter(
 429                Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
 430            )
 431            .into_values::<_, QueryUserIds>()
 432            .all(tx)
 433            .await?;
 434
 435        Ok(users)
 436    }
 437
 438    pub async fn update_channel_buffer(
 439        &self,
 440        channel_id: ChannelId,
 441        user: UserId,
 442        operations: &[proto::Operation],
 443    ) -> Result<(
 444        Vec<ConnectionId>,
 445        Vec<UserId>,
 446        i32,
 447        Vec<proto::VectorClockEntry>,
 448    )> {
 449        self.transaction(move |tx| async move {
 450            let channel = self.get_channel_internal(channel_id, &tx).await?;
 451
 452            let mut requires_write_permission = false;
 453            for op in operations.iter() {
 454                match op.variant {
 455                    None | Some(proto::operation::Variant::UpdateSelections(_)) => {}
 456                    Some(_) => requires_write_permission = true,
 457                }
 458            }
 459            if requires_write_permission {
 460                self.check_user_is_channel_member(&channel, user, &tx)
 461                    .await?;
 462            } else {
 463                self.check_user_is_channel_participant(&channel, user, &tx)
 464                    .await?;
 465            }
 466
 467            let buffer = buffer::Entity::find()
 468                .filter(buffer::Column::ChannelId.eq(channel_id))
 469                .one(&*tx)
 470                .await?
 471                .ok_or_else(|| anyhow!("no such buffer"))?;
 472
 473            let serialization_version = self
 474                .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
 475                .await?;
 476
 477            let operations = operations
 478                .iter()
 479                .filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
 480                .collect::<Vec<_>>();
 481
 482            let mut channel_members;
 483            let max_version;
 484
 485            if !operations.is_empty() {
 486                let max_operation = operations
 487                    .iter()
 488                    .max_by_key(|op| (op.lamport_timestamp.as_ref(), op.replica_id.as_ref()))
 489                    .unwrap();
 490
 491                max_version = vec![proto::VectorClockEntry {
 492                    replica_id: *max_operation.replica_id.as_ref() as u32,
 493                    timestamp: *max_operation.lamport_timestamp.as_ref() as u32,
 494                }];
 495
 496                // get current channel participants and save the max operation above
 497                self.save_max_operation(
 498                    user,
 499                    buffer.id,
 500                    buffer.epoch,
 501                    *max_operation.replica_id.as_ref(),
 502                    *max_operation.lamport_timestamp.as_ref(),
 503                    &tx,
 504                )
 505                .await?;
 506
 507                channel_members = self.get_channel_participants(&channel, &tx).await?;
 508                let collaborators = self
 509                    .get_channel_buffer_collaborators_internal(channel_id, &tx)
 510                    .await?;
 511                channel_members.retain(|member| !collaborators.contains(member));
 512
 513                buffer_operation::Entity::insert_many(operations)
 514                    .on_conflict(
 515                        OnConflict::columns([
 516                            buffer_operation::Column::BufferId,
 517                            buffer_operation::Column::Epoch,
 518                            buffer_operation::Column::LamportTimestamp,
 519                            buffer_operation::Column::ReplicaId,
 520                        ])
 521                        .do_nothing()
 522                        .to_owned(),
 523                    )
 524                    .exec(&*tx)
 525                    .await?;
 526            } else {
 527                channel_members = Vec::new();
 528                max_version = Vec::new();
 529            }
 530
 531            let mut connections = Vec::new();
 532            let mut rows = channel_buffer_collaborator::Entity::find()
 533                .filter(
 534                    Condition::all()
 535                        .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
 536                )
 537                .stream(&*tx)
 538                .await?;
 539            while let Some(row) = rows.next().await {
 540                let row = row?;
 541                connections.push(ConnectionId {
 542                    id: row.connection_id as u32,
 543                    owner_id: row.connection_server_id.0 as u32,
 544                });
 545            }
 546
 547            Ok((connections, channel_members, buffer.epoch, max_version))
 548        })
 549        .await
 550    }
 551
 552    async fn save_max_operation(
 553        &self,
 554        user_id: UserId,
 555        buffer_id: BufferId,
 556        epoch: i32,
 557        replica_id: i32,
 558        lamport_timestamp: i32,
 559        tx: &DatabaseTransaction,
 560    ) -> Result<()> {
 561        buffer::Entity::update(buffer::ActiveModel {
 562            id: ActiveValue::Unchanged(buffer_id),
 563            epoch: ActiveValue::Unchanged(epoch),
 564            latest_operation_epoch: ActiveValue::Set(Some(epoch)),
 565            latest_operation_replica_id: ActiveValue::Set(Some(replica_id)),
 566            latest_operation_lamport_timestamp: ActiveValue::Set(Some(lamport_timestamp)),
 567            channel_id: ActiveValue::NotSet,
 568        })
 569        .exec(tx)
 570        .await?;
 571
 572        use observed_buffer_edits::Column;
 573        observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
 574            user_id: ActiveValue::Set(user_id),
 575            buffer_id: ActiveValue::Set(buffer_id),
 576            epoch: ActiveValue::Set(epoch),
 577            replica_id: ActiveValue::Set(replica_id),
 578            lamport_timestamp: ActiveValue::Set(lamport_timestamp),
 579        })
 580        .on_conflict(
 581            OnConflict::columns([Column::UserId, Column::BufferId])
 582                .update_columns([Column::Epoch, Column::LamportTimestamp, Column::ReplicaId])
 583                .action_cond_where(
 584                    Condition::any().add(Column::Epoch.lt(epoch)).add(
 585                        Condition::all().add(Column::Epoch.eq(epoch)).add(
 586                            Condition::any()
 587                                .add(Column::LamportTimestamp.lt(lamport_timestamp))
 588                                .add(
 589                                    Column::LamportTimestamp
 590                                        .eq(lamport_timestamp)
 591                                        .and(Column::ReplicaId.lt(replica_id)),
 592                                ),
 593                        ),
 594                    ),
 595                )
 596                .to_owned(),
 597        )
 598        .exec_without_returning(tx)
 599        .await?;
 600
 601        Ok(())
 602    }
 603
 604    async fn get_buffer_operation_serialization_version(
 605        &self,
 606        buffer_id: BufferId,
 607        epoch: i32,
 608        tx: &DatabaseTransaction,
 609    ) -> Result<i32> {
 610        Ok(buffer_snapshot::Entity::find()
 611            .filter(buffer_snapshot::Column::BufferId.eq(buffer_id))
 612            .filter(buffer_snapshot::Column::Epoch.eq(epoch))
 613            .select_only()
 614            .column(buffer_snapshot::Column::OperationSerializationVersion)
 615            .into_values::<_, QueryOperationSerializationVersion>()
 616            .one(tx)
 617            .await?
 618            .ok_or_else(|| anyhow!("missing buffer snapshot"))?)
 619    }
 620
 621    pub async fn get_channel_buffer(
 622        &self,
 623        channel_id: ChannelId,
 624        tx: &DatabaseTransaction,
 625    ) -> Result<buffer::Model> {
 626        Ok(channel::Model {
 627            id: channel_id,
 628            ..Default::default()
 629        }
 630        .find_related(buffer::Entity)
 631        .one(tx)
 632        .await?
 633        .ok_or_else(|| anyhow!("no such buffer"))?)
 634    }
 635
 636    async fn get_buffer_state(
 637        &self,
 638        buffer: &buffer::Model,
 639        tx: &DatabaseTransaction,
 640    ) -> Result<(
 641        String,
 642        Vec<proto::Operation>,
 643        Option<buffer_operation::Model>,
 644    )> {
 645        let id = buffer.id;
 646        let (base_text, version) = if buffer.epoch > 0 {
 647            let snapshot = buffer_snapshot::Entity::find()
 648                .filter(
 649                    buffer_snapshot::Column::BufferId
 650                        .eq(id)
 651                        .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
 652                )
 653                .one(tx)
 654                .await?
 655                .ok_or_else(|| anyhow!("no such snapshot"))?;
 656
 657            let version = snapshot.operation_serialization_version;
 658            (snapshot.text, version)
 659        } else {
 660            (String::new(), storage::SERIALIZATION_VERSION)
 661        };
 662
 663        let mut rows = buffer_operation::Entity::find()
 664            .filter(
 665                buffer_operation::Column::BufferId
 666                    .eq(id)
 667                    .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
 668            )
 669            .order_by_asc(buffer_operation::Column::LamportTimestamp)
 670            .order_by_asc(buffer_operation::Column::ReplicaId)
 671            .stream(tx)
 672            .await?;
 673
 674        let mut operations = Vec::new();
 675        let mut last_row = None;
 676        while let Some(row) = rows.next().await {
 677            let row = row?;
 678            last_row = Some(buffer_operation::Model {
 679                buffer_id: row.buffer_id,
 680                epoch: row.epoch,
 681                lamport_timestamp: row.lamport_timestamp,
 682                replica_id: row.replica_id,
 683                value: Default::default(),
 684            });
 685            operations.push(proto::Operation {
 686                variant: Some(operation_from_storage(row, version)?),
 687            });
 688        }
 689
 690        Ok((base_text, operations, last_row))
 691    }
 692
 693    async fn snapshot_channel_buffer(
 694        &self,
 695        channel_id: ChannelId,
 696        tx: &DatabaseTransaction,
 697    ) -> Result<()> {
 698        let buffer = self.get_channel_buffer(channel_id, tx).await?;
 699        let (base_text, operations, _) = self.get_buffer_state(&buffer, tx).await?;
 700        if operations.is_empty() {
 701            return Ok(());
 702        }
 703
 704        let mut text_buffer = text::Buffer::new(0, text::BufferId::new(1).unwrap(), base_text);
 705        text_buffer
 706            .apply_ops(operations.into_iter().filter_map(operation_from_wire))
 707            .unwrap();
 708
 709        let base_text = text_buffer.text();
 710        let epoch = buffer.epoch + 1;
 711
 712        buffer_snapshot::Model {
 713            buffer_id: buffer.id,
 714            epoch,
 715            text: base_text,
 716            operation_serialization_version: storage::SERIALIZATION_VERSION,
 717        }
 718        .into_active_model()
 719        .insert(tx)
 720        .await?;
 721
 722        buffer::ActiveModel {
 723            id: ActiveValue::Unchanged(buffer.id),
 724            epoch: ActiveValue::Set(epoch),
 725            latest_operation_epoch: ActiveValue::NotSet,
 726            latest_operation_replica_id: ActiveValue::NotSet,
 727            latest_operation_lamport_timestamp: ActiveValue::NotSet,
 728            channel_id: ActiveValue::NotSet,
 729        }
 730        .save(tx)
 731        .await?;
 732
 733        Ok(())
 734    }
 735
 736    pub async fn observe_buffer_version(
 737        &self,
 738        buffer_id: BufferId,
 739        user_id: UserId,
 740        epoch: i32,
 741        version: &[proto::VectorClockEntry],
 742    ) -> Result<()> {
 743        self.transaction(|tx| async move {
 744            // For now, combine concurrent operations.
 745            let Some(component) = version.iter().max_by_key(|version| version.timestamp) else {
 746                return Ok(());
 747            };
 748            self.save_max_operation(
 749                user_id,
 750                buffer_id,
 751                epoch,
 752                component.replica_id as i32,
 753                component.timestamp as i32,
 754                &tx,
 755            )
 756            .await?;
 757            Ok(())
 758        })
 759        .await
 760    }
 761
 762    pub async fn observed_channel_buffer_changes(
 763        &self,
 764        channel_ids_by_buffer_id: &HashMap<BufferId, ChannelId>,
 765        user_id: UserId,
 766        tx: &DatabaseTransaction,
 767    ) -> Result<Vec<proto::ChannelBufferVersion>> {
 768        let observed_operations = observed_buffer_edits::Entity::find()
 769            .filter(observed_buffer_edits::Column::UserId.eq(user_id))
 770            .filter(
 771                observed_buffer_edits::Column::BufferId
 772                    .is_in(channel_ids_by_buffer_id.keys().copied()),
 773            )
 774            .all(tx)
 775            .await?;
 776
 777        Ok(observed_operations
 778            .iter()
 779            .flat_map(|op| {
 780                Some(proto::ChannelBufferVersion {
 781                    channel_id: channel_ids_by_buffer_id.get(&op.buffer_id)?.to_proto(),
 782                    epoch: op.epoch as u64,
 783                    version: vec![proto::VectorClockEntry {
 784                        replica_id: op.replica_id as u32,
 785                        timestamp: op.lamport_timestamp as u32,
 786                    }],
 787                })
 788            })
 789            .collect())
 790    }
 791}
 792
 793fn operation_to_storage(
 794    operation: &proto::Operation,
 795    buffer: &buffer::Model,
 796    _format: i32,
 797) -> Option<buffer_operation::ActiveModel> {
 798    let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? {
 799        proto::operation::Variant::Edit(operation) => (
 800            operation.replica_id,
 801            operation.lamport_timestamp,
 802            storage::Operation {
 803                version: version_to_storage(&operation.version),
 804                is_undo: false,
 805                edit_ranges: operation
 806                    .ranges
 807                    .iter()
 808                    .map(|range| storage::Range {
 809                        start: range.start,
 810                        end: range.end,
 811                    })
 812                    .collect(),
 813                edit_texts: operation.new_text.clone(),
 814                undo_counts: Vec::new(),
 815            },
 816        ),
 817        proto::operation::Variant::Undo(operation) => (
 818            operation.replica_id,
 819            operation.lamport_timestamp,
 820            storage::Operation {
 821                version: version_to_storage(&operation.version),
 822                is_undo: true,
 823                edit_ranges: Vec::new(),
 824                edit_texts: Vec::new(),
 825                undo_counts: operation
 826                    .counts
 827                    .iter()
 828                    .map(|entry| storage::UndoCount {
 829                        replica_id: entry.replica_id,
 830                        lamport_timestamp: entry.lamport_timestamp,
 831                        count: entry.count,
 832                    })
 833                    .collect(),
 834            },
 835        ),
 836        _ => None?,
 837    };
 838
 839    Some(buffer_operation::ActiveModel {
 840        buffer_id: ActiveValue::Set(buffer.id),
 841        epoch: ActiveValue::Set(buffer.epoch),
 842        replica_id: ActiveValue::Set(replica_id as i32),
 843        lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32),
 844        value: ActiveValue::Set(value.encode_to_vec()),
 845    })
 846}
 847
 848fn operation_from_storage(
 849    row: buffer_operation::Model,
 850    _format_version: i32,
 851) -> Result<proto::operation::Variant, Error> {
 852    let operation =
 853        storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{}", error))?;
 854    let version = version_from_storage(&operation.version);
 855    Ok(if operation.is_undo {
 856        proto::operation::Variant::Undo(proto::operation::Undo {
 857            replica_id: row.replica_id as u32,
 858            lamport_timestamp: row.lamport_timestamp as u32,
 859            version,
 860            counts: operation
 861                .undo_counts
 862                .iter()
 863                .map(|entry| proto::UndoCount {
 864                    replica_id: entry.replica_id,
 865                    lamport_timestamp: entry.lamport_timestamp,
 866                    count: entry.count,
 867                })
 868                .collect(),
 869        })
 870    } else {
 871        proto::operation::Variant::Edit(proto::operation::Edit {
 872            replica_id: row.replica_id as u32,
 873            lamport_timestamp: row.lamport_timestamp as u32,
 874            version,
 875            ranges: operation
 876                .edit_ranges
 877                .into_iter()
 878                .map(|range| proto::Range {
 879                    start: range.start,
 880                    end: range.end,
 881                })
 882                .collect(),
 883            new_text: operation.edit_texts,
 884        })
 885    })
 886}
 887
 888fn version_to_storage(version: &Vec<proto::VectorClockEntry>) -> Vec<storage::VectorClockEntry> {
 889    version
 890        .iter()
 891        .map(|entry| storage::VectorClockEntry {
 892            replica_id: entry.replica_id,
 893            timestamp: entry.timestamp,
 894        })
 895        .collect()
 896}
 897
 898fn version_from_storage(version: &Vec<storage::VectorClockEntry>) -> Vec<proto::VectorClockEntry> {
 899    version
 900        .iter()
 901        .map(|entry| proto::VectorClockEntry {
 902            replica_id: entry.replica_id,
 903            timestamp: entry.timestamp,
 904        })
 905        .collect()
 906}
 907
 908// This is currently a manual copy of the deserialization code in the client's language crate
 909pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operation> {
 910    match operation.variant? {
 911        proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
 912            timestamp: clock::Lamport {
 913                replica_id: edit.replica_id as text::ReplicaId,
 914                value: edit.lamport_timestamp,
 915            },
 916            version: version_from_wire(&edit.version),
 917            ranges: edit
 918                .ranges
 919                .into_iter()
 920                .map(|range| {
 921                    text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
 922                })
 923                .collect(),
 924            new_text: edit.new_text.into_iter().map(Arc::from).collect(),
 925        })),
 926        proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation {
 927            timestamp: clock::Lamport {
 928                replica_id: undo.replica_id as text::ReplicaId,
 929                value: undo.lamport_timestamp,
 930            },
 931            version: version_from_wire(&undo.version),
 932            counts: undo
 933                .counts
 934                .into_iter()
 935                .map(|c| {
 936                    (
 937                        clock::Lamport {
 938                            replica_id: c.replica_id as text::ReplicaId,
 939                            value: c.lamport_timestamp,
 940                        },
 941                        c.count,
 942                    )
 943                })
 944                .collect(),
 945        })),
 946        _ => None,
 947    }
 948}
 949
 950fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
 951    let mut version = clock::Global::new();
 952    for entry in message {
 953        version.observe(clock::Lamport {
 954            replica_id: entry.replica_id as text::ReplicaId,
 955            value: entry.timestamp,
 956        });
 957    }
 958    version
 959}
 960
 961fn version_to_wire(version: &clock::Global) -> Vec<proto::VectorClockEntry> {
 962    let mut message = Vec::new();
 963    for entry in version.iter() {
 964        message.push(proto::VectorClockEntry {
 965            replica_id: entry.replica_id as u32,
 966            timestamp: entry.value,
 967        });
 968    }
 969    message
 970}
 971
 972#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
 973enum QueryOperationSerializationVersion {
 974    OperationSerializationVersion,
 975}
 976
 977mod storage {
 978    #![allow(non_snake_case)]
 979    use prost::Message;
 980    pub const SERIALIZATION_VERSION: i32 = 1;
 981
 982    #[derive(Message)]
 983    pub struct Operation {
 984        #[prost(message, repeated, tag = "2")]
 985        pub version: Vec<VectorClockEntry>,
 986        #[prost(bool, tag = "3")]
 987        pub is_undo: bool,
 988        #[prost(message, repeated, tag = "4")]
 989        pub edit_ranges: Vec<Range>,
 990        #[prost(string, repeated, tag = "5")]
 991        pub edit_texts: Vec<String>,
 992        #[prost(message, repeated, tag = "6")]
 993        pub undo_counts: Vec<UndoCount>,
 994    }
 995
 996    #[derive(Message)]
 997    pub struct VectorClockEntry {
 998        #[prost(uint32, tag = "1")]
 999        pub replica_id: u32,
1000        #[prost(uint32, tag = "2")]
1001        pub timestamp: u32,
1002    }
1003
1004    #[derive(Message)]
1005    pub struct Range {
1006        #[prost(uint64, tag = "1")]
1007        pub start: u64,
1008        #[prost(uint64, tag = "2")]
1009        pub end: u64,
1010    }
1011
1012    #[derive(Message)]
1013    pub struct UndoCount {
1014        #[prost(uint32, tag = "1")]
1015        pub replica_id: u32,
1016        #[prost(uint32, tag = "2")]
1017        pub lamport_timestamp: u32,
1018        #[prost(uint32, tag = "3")]
1019        pub count: u32,
1020    }
1021}