buffers.rs

   1use super::*;
   2use prost::Message;
   3use text::{EditOperation, UndoOperation};
   4
   5pub struct LeftChannelBuffer {
   6    pub channel_id: ChannelId,
   7    pub collaborators: Vec<proto::Collaborator>,
   8    pub connections: Vec<ConnectionId>,
   9}
  10
  11impl Database {
  12    /// Open a channel buffer. Returns the current contents, and adds you to the list of people
  13    /// to notify on changes.
  14    pub async fn join_channel_buffer(
  15        &self,
  16        channel_id: ChannelId,
  17        user_id: UserId,
  18        connection: ConnectionId,
  19    ) -> Result<proto::JoinChannelBufferResponse> {
  20        self.transaction(|tx| async move {
  21            let channel = self.get_channel_internal(channel_id, &tx).await?;
  22            self.check_user_is_channel_participant(&channel, user_id, &tx)
  23                .await?;
  24
  25            let buffer = channel::Model {
  26                id: channel_id,
  27                ..Default::default()
  28            }
  29            .find_related(buffer::Entity)
  30            .one(&*tx)
  31            .await?;
  32
  33            let buffer = if let Some(buffer) = buffer {
  34                buffer
  35            } else {
  36                let buffer = buffer::ActiveModel {
  37                    channel_id: ActiveValue::Set(channel_id),
  38                    ..Default::default()
  39                }
  40                .insert(&*tx)
  41                .await?;
  42                buffer_snapshot::ActiveModel {
  43                    buffer_id: ActiveValue::Set(buffer.id),
  44                    epoch: ActiveValue::Set(0),
  45                    text: ActiveValue::Set(String::new()),
  46                    operation_serialization_version: ActiveValue::Set(
  47                        storage::SERIALIZATION_VERSION,
  48                    ),
  49                }
  50                .insert(&*tx)
  51                .await?;
  52                buffer
  53            };
  54
  55            // Join the collaborators
  56            let mut collaborators = channel_buffer_collaborator::Entity::find()
  57                .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
  58                .all(&*tx)
  59                .await?;
  60            let replica_ids = collaborators
  61                .iter()
  62                .map(|c| c.replica_id)
  63                .collect::<HashSet<_>>();
  64            let mut replica_id = ReplicaId(0);
  65            while replica_ids.contains(&replica_id) {
  66                replica_id.0 += 1;
  67            }
  68            let collaborator = channel_buffer_collaborator::ActiveModel {
  69                channel_id: ActiveValue::Set(channel_id),
  70                connection_id: ActiveValue::Set(connection.id as i32),
  71                connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
  72                user_id: ActiveValue::Set(user_id),
  73                replica_id: ActiveValue::Set(replica_id),
  74                ..Default::default()
  75            }
  76            .insert(&*tx)
  77            .await?;
  78            collaborators.push(collaborator);
  79
  80            let (base_text, operations, max_operation) =
  81                self.get_buffer_state(&buffer, &tx).await?;
  82
  83            // Save the last observed operation
  84            if let Some(op) = max_operation {
  85                observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
  86                    user_id: ActiveValue::Set(user_id),
  87                    buffer_id: ActiveValue::Set(buffer.id),
  88                    epoch: ActiveValue::Set(op.epoch),
  89                    lamport_timestamp: ActiveValue::Set(op.lamport_timestamp),
  90                    replica_id: ActiveValue::Set(op.replica_id),
  91                })
  92                .on_conflict(
  93                    OnConflict::columns([
  94                        observed_buffer_edits::Column::UserId,
  95                        observed_buffer_edits::Column::BufferId,
  96                    ])
  97                    .update_columns([
  98                        observed_buffer_edits::Column::Epoch,
  99                        observed_buffer_edits::Column::LamportTimestamp,
 100                    ])
 101                    .to_owned(),
 102                )
 103                .exec(&*tx)
 104                .await?;
 105            }
 106
 107            Ok(proto::JoinChannelBufferResponse {
 108                buffer_id: buffer.id.to_proto(),
 109                replica_id: replica_id.to_proto() as u32,
 110                base_text,
 111                operations,
 112                epoch: buffer.epoch as u64,
 113                collaborators: collaborators
 114                    .into_iter()
 115                    .map(|collaborator| proto::Collaborator {
 116                        peer_id: Some(collaborator.connection().into()),
 117                        user_id: collaborator.user_id.to_proto(),
 118                        replica_id: collaborator.replica_id.0 as u32,
 119                    })
 120                    .collect(),
 121            })
 122        })
 123        .await
 124    }
 125
 126    /// Rejoin a channel buffer (after a connection interruption)
 127    pub async fn rejoin_channel_buffers(
 128        &self,
 129        buffers: &[proto::ChannelBufferVersion],
 130        user_id: UserId,
 131        connection_id: ConnectionId,
 132    ) -> Result<Vec<RejoinedChannelBuffer>> {
 133        self.transaction(|tx| async move {
 134            let mut results = Vec::new();
 135            for client_buffer in buffers {
 136                let channel = self
 137                    .get_channel_internal(ChannelId::from_proto(client_buffer.channel_id), &tx)
 138                    .await?;
 139                if self
 140                    .check_user_is_channel_participant(&channel, user_id, &tx)
 141                    .await
 142                    .is_err()
 143                {
 144                    log::info!("user is not a member of channel");
 145                    continue;
 146                }
 147
 148                let buffer = self.get_channel_buffer(channel.id, &tx).await?;
 149                let mut collaborators = channel_buffer_collaborator::Entity::find()
 150                    .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel.id))
 151                    .all(&*tx)
 152                    .await?;
 153
 154                // If the buffer epoch hasn't changed since the client lost
 155                // connection, then the client's buffer can be synchronized with
 156                // the server's buffer.
 157                if buffer.epoch as u64 != client_buffer.epoch {
 158                    log::info!("can't rejoin buffer, epoch has changed");
 159                    continue;
 160                }
 161
 162                // Find the collaborator record for this user's previous lost
 163                // connection. Update it with the new connection id.
 164                let Some(self_collaborator) =
 165                    collaborators.iter_mut().find(|c| c.user_id == user_id)
 166                else {
 167                    log::info!("can't rejoin buffer, no previous collaborator found");
 168                    continue;
 169                };
 170                let old_connection_id = self_collaborator.connection();
 171                *self_collaborator = channel_buffer_collaborator::ActiveModel {
 172                    id: ActiveValue::Unchanged(self_collaborator.id),
 173                    connection_id: ActiveValue::Set(connection_id.id as i32),
 174                    connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
 175                    connection_lost: ActiveValue::Set(false),
 176                    ..Default::default()
 177                }
 178                .update(&*tx)
 179                .await?;
 180
 181                let client_version = version_from_wire(&client_buffer.version);
 182                let serialization_version = self
 183                    .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
 184                    .await?;
 185
 186                let mut rows = buffer_operation::Entity::find()
 187                    .filter(
 188                        buffer_operation::Column::BufferId
 189                            .eq(buffer.id)
 190                            .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
 191                    )
 192                    .stream(&*tx)
 193                    .await?;
 194
 195                // Find the server's version vector and any operations
 196                // that the client has not seen.
 197                let mut server_version = clock::Global::new();
 198                let mut operations = Vec::new();
 199                while let Some(row) = rows.next().await {
 200                    let row = row?;
 201                    let timestamp = clock::Lamport {
 202                        replica_id: row.replica_id as u16,
 203                        value: row.lamport_timestamp as u32,
 204                    };
 205                    server_version.observe(timestamp);
 206                    if !client_version.observed(timestamp) {
 207                        operations.push(proto::Operation {
 208                            variant: Some(operation_from_storage(row, serialization_version)?),
 209                        })
 210                    }
 211                }
 212
 213                results.push(RejoinedChannelBuffer {
 214                    old_connection_id,
 215                    buffer: proto::RejoinedChannelBuffer {
 216                        channel_id: client_buffer.channel_id,
 217                        version: version_to_wire(&server_version),
 218                        operations,
 219                        collaborators: collaborators
 220                            .into_iter()
 221                            .map(|collaborator| proto::Collaborator {
 222                                peer_id: Some(collaborator.connection().into()),
 223                                user_id: collaborator.user_id.to_proto(),
 224                                replica_id: collaborator.replica_id.0 as u32,
 225                            })
 226                            .collect(),
 227                    },
 228                });
 229            }
 230
 231            Ok(results)
 232        })
 233        .await
 234    }
 235
 236    /// Clear out any buffer collaborators who are no longer collaborating.
 237    pub async fn clear_stale_channel_buffer_collaborators(
 238        &self,
 239        channel_id: ChannelId,
 240        server_id: ServerId,
 241    ) -> Result<RefreshedChannelBuffer> {
 242        self.transaction(|tx| async move {
 243            let db_collaborators = channel_buffer_collaborator::Entity::find()
 244                .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
 245                .all(&*tx)
 246                .await?;
 247
 248            let mut connection_ids = Vec::new();
 249            let mut collaborators = Vec::new();
 250            let mut collaborator_ids_to_remove = Vec::new();
 251            for db_collaborator in &db_collaborators {
 252                if !db_collaborator.connection_lost
 253                    && db_collaborator.connection_server_id == server_id
 254                {
 255                    connection_ids.push(db_collaborator.connection());
 256                    collaborators.push(proto::Collaborator {
 257                        peer_id: Some(db_collaborator.connection().into()),
 258                        replica_id: db_collaborator.replica_id.0 as u32,
 259                        user_id: db_collaborator.user_id.to_proto(),
 260                    })
 261                } else {
 262                    collaborator_ids_to_remove.push(db_collaborator.id);
 263                }
 264            }
 265
 266            channel_buffer_collaborator::Entity::delete_many()
 267                .filter(channel_buffer_collaborator::Column::Id.is_in(collaborator_ids_to_remove))
 268                .exec(&*tx)
 269                .await?;
 270
 271            Ok(RefreshedChannelBuffer {
 272                connection_ids,
 273                collaborators,
 274            })
 275        })
 276        .await
 277    }
 278
 279    /// Close the channel buffer, and stop receiving updates for it.
 280    pub async fn leave_channel_buffer(
 281        &self,
 282        channel_id: ChannelId,
 283        connection: ConnectionId,
 284    ) -> Result<LeftChannelBuffer> {
 285        self.transaction(|tx| async move {
 286            self.leave_channel_buffer_internal(channel_id, connection, &tx)
 287                .await
 288        })
 289        .await
 290    }
 291
 292    /// Close the channel buffer, and stop receiving updates for it.
 293    pub async fn channel_buffer_connection_lost(
 294        &self,
 295        connection: ConnectionId,
 296        tx: &DatabaseTransaction,
 297    ) -> Result<()> {
 298        channel_buffer_collaborator::Entity::update_many()
 299            .filter(
 300                Condition::all()
 301                    .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
 302                    .add(
 303                        channel_buffer_collaborator::Column::ConnectionServerId
 304                            .eq(connection.owner_id as i32),
 305                    ),
 306            )
 307            .set(channel_buffer_collaborator::ActiveModel {
 308                connection_lost: ActiveValue::set(true),
 309                ..Default::default()
 310            })
 311            .exec(tx)
 312            .await?;
 313        Ok(())
 314    }
 315
 316    /// Close all open channel buffers
 317    pub async fn leave_channel_buffers(
 318        &self,
 319        connection: ConnectionId,
 320    ) -> Result<Vec<LeftChannelBuffer>> {
 321        self.transaction(|tx| async move {
 322            #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
 323            enum QueryChannelIds {
 324                ChannelId,
 325            }
 326
 327            let channel_ids: Vec<ChannelId> = channel_buffer_collaborator::Entity::find()
 328                .select_only()
 329                .column(channel_buffer_collaborator::Column::ChannelId)
 330                .filter(Condition::all().add(
 331                    channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32),
 332                ))
 333                .into_values::<_, QueryChannelIds>()
 334                .all(&*tx)
 335                .await?;
 336
 337            let mut result = Vec::new();
 338            for channel_id in channel_ids {
 339                let left_channel_buffer = self
 340                    .leave_channel_buffer_internal(channel_id, connection, &tx)
 341                    .await?;
 342                result.push(left_channel_buffer);
 343            }
 344
 345            Ok(result)
 346        })
 347        .await
 348    }
 349
 350    async fn leave_channel_buffer_internal(
 351        &self,
 352        channel_id: ChannelId,
 353        connection: ConnectionId,
 354        tx: &DatabaseTransaction,
 355    ) -> Result<LeftChannelBuffer> {
 356        let result = channel_buffer_collaborator::Entity::delete_many()
 357            .filter(
 358                Condition::all()
 359                    .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
 360                    .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
 361                    .add(
 362                        channel_buffer_collaborator::Column::ConnectionServerId
 363                            .eq(connection.owner_id as i32),
 364                    ),
 365            )
 366            .exec(tx)
 367            .await?;
 368        if result.rows_affected == 0 {
 369            Err(anyhow!("not a collaborator on this project"))?;
 370        }
 371
 372        let mut collaborators = Vec::new();
 373        let mut connections = Vec::new();
 374        let mut rows = channel_buffer_collaborator::Entity::find()
 375            .filter(
 376                Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
 377            )
 378            .stream(tx)
 379            .await?;
 380        while let Some(row) = rows.next().await {
 381            let row = row?;
 382            let connection = row.connection();
 383            connections.push(connection);
 384            collaborators.push(proto::Collaborator {
 385                peer_id: Some(connection.into()),
 386                replica_id: row.replica_id.0 as u32,
 387                user_id: row.user_id.to_proto(),
 388            });
 389        }
 390
 391        drop(rows);
 392
 393        if collaborators.is_empty() {
 394            self.snapshot_channel_buffer(channel_id, &tx).await?;
 395        }
 396
 397        Ok(LeftChannelBuffer {
 398            channel_id,
 399            collaborators,
 400            connections,
 401        })
 402    }
 403
 404    pub async fn get_channel_buffer_collaborators(
 405        &self,
 406        channel_id: ChannelId,
 407    ) -> Result<Vec<UserId>> {
 408        self.transaction(|tx| async move {
 409            self.get_channel_buffer_collaborators_internal(channel_id, &tx)
 410                .await
 411        })
 412        .await
 413    }
 414
 415    async fn get_channel_buffer_collaborators_internal(
 416        &self,
 417        channel_id: ChannelId,
 418        tx: &DatabaseTransaction,
 419    ) -> Result<Vec<UserId>> {
 420        #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
 421        enum QueryUserIds {
 422            UserId,
 423        }
 424
 425        let users: Vec<UserId> = channel_buffer_collaborator::Entity::find()
 426            .select_only()
 427            .column(channel_buffer_collaborator::Column::UserId)
 428            .filter(
 429                Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
 430            )
 431            .into_values::<_, QueryUserIds>()
 432            .all(tx)
 433            .await?;
 434
 435        Ok(users)
 436    }
 437
 438    pub async fn update_channel_buffer(
 439        &self,
 440        channel_id: ChannelId,
 441        user: UserId,
 442        operations: &[proto::Operation],
 443    ) -> Result<(HashSet<ConnectionId>, i32, Vec<proto::VectorClockEntry>)> {
 444        self.transaction(move |tx| async move {
 445            let channel = self.get_channel_internal(channel_id, &tx).await?;
 446
 447            let mut requires_write_permission = false;
 448            for op in operations.iter() {
 449                match op.variant {
 450                    None | Some(proto::operation::Variant::UpdateSelections(_)) => {}
 451                    Some(_) => requires_write_permission = true,
 452                }
 453            }
 454            if requires_write_permission {
 455                self.check_user_is_channel_member(&channel, user, &tx)
 456                    .await?;
 457            } else {
 458                self.check_user_is_channel_participant(&channel, user, &tx)
 459                    .await?;
 460            }
 461
 462            let buffer = buffer::Entity::find()
 463                .filter(buffer::Column::ChannelId.eq(channel_id))
 464                .one(&*tx)
 465                .await?
 466                .ok_or_else(|| anyhow!("no such buffer"))?;
 467
 468            let serialization_version = self
 469                .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
 470                .await?;
 471
 472            let operations = operations
 473                .iter()
 474                .filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
 475                .collect::<Vec<_>>();
 476
 477            let max_version;
 478
 479            if !operations.is_empty() {
 480                let max_operation = operations
 481                    .iter()
 482                    .max_by_key(|op| (op.lamport_timestamp.as_ref(), op.replica_id.as_ref()))
 483                    .unwrap();
 484
 485                max_version = vec![proto::VectorClockEntry {
 486                    replica_id: *max_operation.replica_id.as_ref() as u32,
 487                    timestamp: *max_operation.lamport_timestamp.as_ref() as u32,
 488                }];
 489
 490                // get current channel participants and save the max operation above
 491                self.save_max_operation(
 492                    user,
 493                    buffer.id,
 494                    buffer.epoch,
 495                    *max_operation.replica_id.as_ref(),
 496                    *max_operation.lamport_timestamp.as_ref(),
 497                    &tx,
 498                )
 499                .await?;
 500
 501                buffer_operation::Entity::insert_many(operations)
 502                    .on_conflict(
 503                        OnConflict::columns([
 504                            buffer_operation::Column::BufferId,
 505                            buffer_operation::Column::Epoch,
 506                            buffer_operation::Column::LamportTimestamp,
 507                            buffer_operation::Column::ReplicaId,
 508                        ])
 509                        .do_nothing()
 510                        .to_owned(),
 511                    )
 512                    .exec(&*tx)
 513                    .await?;
 514            } else {
 515                max_version = Vec::new();
 516            }
 517
 518            let mut connections = HashSet::default();
 519            let mut rows = channel_buffer_collaborator::Entity::find()
 520                .filter(
 521                    Condition::all()
 522                        .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
 523                )
 524                .stream(&*tx)
 525                .await?;
 526            while let Some(row) = rows.next().await {
 527                let row = row?;
 528                connections.insert(ConnectionId {
 529                    id: row.connection_id as u32,
 530                    owner_id: row.connection_server_id.0 as u32,
 531                });
 532            }
 533
 534            Ok((connections, buffer.epoch, max_version))
 535        })
 536        .await
 537    }
 538
 539    async fn save_max_operation(
 540        &self,
 541        user_id: UserId,
 542        buffer_id: BufferId,
 543        epoch: i32,
 544        replica_id: i32,
 545        lamport_timestamp: i32,
 546        tx: &DatabaseTransaction,
 547    ) -> Result<()> {
 548        buffer::Entity::update(buffer::ActiveModel {
 549            id: ActiveValue::Unchanged(buffer_id),
 550            epoch: ActiveValue::Unchanged(epoch),
 551            latest_operation_epoch: ActiveValue::Set(Some(epoch)),
 552            latest_operation_replica_id: ActiveValue::Set(Some(replica_id)),
 553            latest_operation_lamport_timestamp: ActiveValue::Set(Some(lamport_timestamp)),
 554            channel_id: ActiveValue::NotSet,
 555        })
 556        .exec(tx)
 557        .await?;
 558
 559        use observed_buffer_edits::Column;
 560        observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
 561            user_id: ActiveValue::Set(user_id),
 562            buffer_id: ActiveValue::Set(buffer_id),
 563            epoch: ActiveValue::Set(epoch),
 564            replica_id: ActiveValue::Set(replica_id),
 565            lamport_timestamp: ActiveValue::Set(lamport_timestamp),
 566        })
 567        .on_conflict(
 568            OnConflict::columns([Column::UserId, Column::BufferId])
 569                .update_columns([Column::Epoch, Column::LamportTimestamp, Column::ReplicaId])
 570                .action_cond_where(
 571                    Condition::any().add(Column::Epoch.lt(epoch)).add(
 572                        Condition::all().add(Column::Epoch.eq(epoch)).add(
 573                            Condition::any()
 574                                .add(Column::LamportTimestamp.lt(lamport_timestamp))
 575                                .add(
 576                                    Column::LamportTimestamp
 577                                        .eq(lamport_timestamp)
 578                                        .and(Column::ReplicaId.lt(replica_id)),
 579                                ),
 580                        ),
 581                    ),
 582                )
 583                .to_owned(),
 584        )
 585        .exec_without_returning(tx)
 586        .await?;
 587
 588        Ok(())
 589    }
 590
 591    async fn get_buffer_operation_serialization_version(
 592        &self,
 593        buffer_id: BufferId,
 594        epoch: i32,
 595        tx: &DatabaseTransaction,
 596    ) -> Result<i32> {
 597        Ok(buffer_snapshot::Entity::find()
 598            .filter(buffer_snapshot::Column::BufferId.eq(buffer_id))
 599            .filter(buffer_snapshot::Column::Epoch.eq(epoch))
 600            .select_only()
 601            .column(buffer_snapshot::Column::OperationSerializationVersion)
 602            .into_values::<_, QueryOperationSerializationVersion>()
 603            .one(tx)
 604            .await?
 605            .ok_or_else(|| anyhow!("missing buffer snapshot"))?)
 606    }
 607
 608    pub async fn get_channel_buffer(
 609        &self,
 610        channel_id: ChannelId,
 611        tx: &DatabaseTransaction,
 612    ) -> Result<buffer::Model> {
 613        Ok(channel::Model {
 614            id: channel_id,
 615            ..Default::default()
 616        }
 617        .find_related(buffer::Entity)
 618        .one(tx)
 619        .await?
 620        .ok_or_else(|| anyhow!("no such buffer"))?)
 621    }
 622
 623    async fn get_buffer_state(
 624        &self,
 625        buffer: &buffer::Model,
 626        tx: &DatabaseTransaction,
 627    ) -> Result<(
 628        String,
 629        Vec<proto::Operation>,
 630        Option<buffer_operation::Model>,
 631    )> {
 632        let id = buffer.id;
 633        let (base_text, version) = if buffer.epoch > 0 {
 634            let snapshot = buffer_snapshot::Entity::find()
 635                .filter(
 636                    buffer_snapshot::Column::BufferId
 637                        .eq(id)
 638                        .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
 639                )
 640                .one(tx)
 641                .await?
 642                .ok_or_else(|| anyhow!("no such snapshot"))?;
 643
 644            let version = snapshot.operation_serialization_version;
 645            (snapshot.text, version)
 646        } else {
 647            (String::new(), storage::SERIALIZATION_VERSION)
 648        };
 649
 650        let mut rows = buffer_operation::Entity::find()
 651            .filter(
 652                buffer_operation::Column::BufferId
 653                    .eq(id)
 654                    .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
 655            )
 656            .order_by_asc(buffer_operation::Column::LamportTimestamp)
 657            .order_by_asc(buffer_operation::Column::ReplicaId)
 658            .stream(tx)
 659            .await?;
 660
 661        let mut operations = Vec::new();
 662        let mut last_row = None;
 663        while let Some(row) = rows.next().await {
 664            let row = row?;
 665            last_row = Some(buffer_operation::Model {
 666                buffer_id: row.buffer_id,
 667                epoch: row.epoch,
 668                lamport_timestamp: row.lamport_timestamp,
 669                replica_id: row.replica_id,
 670                value: Default::default(),
 671            });
 672            operations.push(proto::Operation {
 673                variant: Some(operation_from_storage(row, version)?),
 674            });
 675        }
 676
 677        Ok((base_text, operations, last_row))
 678    }
 679
 680    async fn snapshot_channel_buffer(
 681        &self,
 682        channel_id: ChannelId,
 683        tx: &DatabaseTransaction,
 684    ) -> Result<()> {
 685        let buffer = self.get_channel_buffer(channel_id, tx).await?;
 686        let (base_text, operations, _) = self.get_buffer_state(&buffer, tx).await?;
 687        if operations.is_empty() {
 688            return Ok(());
 689        }
 690
 691        let mut text_buffer = text::Buffer::new(0, text::BufferId::new(1).unwrap(), base_text);
 692        text_buffer
 693            .apply_ops(operations.into_iter().filter_map(operation_from_wire))
 694            .unwrap();
 695
 696        let base_text = text_buffer.text();
 697        let epoch = buffer.epoch + 1;
 698
 699        buffer_snapshot::Model {
 700            buffer_id: buffer.id,
 701            epoch,
 702            text: base_text,
 703            operation_serialization_version: storage::SERIALIZATION_VERSION,
 704        }
 705        .into_active_model()
 706        .insert(tx)
 707        .await?;
 708
 709        buffer::ActiveModel {
 710            id: ActiveValue::Unchanged(buffer.id),
 711            epoch: ActiveValue::Set(epoch),
 712            latest_operation_epoch: ActiveValue::NotSet,
 713            latest_operation_replica_id: ActiveValue::NotSet,
 714            latest_operation_lamport_timestamp: ActiveValue::NotSet,
 715            channel_id: ActiveValue::NotSet,
 716        }
 717        .save(tx)
 718        .await?;
 719
 720        Ok(())
 721    }
 722
 723    pub async fn observe_buffer_version(
 724        &self,
 725        buffer_id: BufferId,
 726        user_id: UserId,
 727        epoch: i32,
 728        version: &[proto::VectorClockEntry],
 729    ) -> Result<()> {
 730        self.transaction(|tx| async move {
 731            // For now, combine concurrent operations.
 732            let Some(component) = version.iter().max_by_key(|version| version.timestamp) else {
 733                return Ok(());
 734            };
 735            self.save_max_operation(
 736                user_id,
 737                buffer_id,
 738                epoch,
 739                component.replica_id as i32,
 740                component.timestamp as i32,
 741                &tx,
 742            )
 743            .await?;
 744            Ok(())
 745        })
 746        .await
 747    }
 748
 749    pub async fn observed_channel_buffer_changes(
 750        &self,
 751        channel_ids_by_buffer_id: &HashMap<BufferId, ChannelId>,
 752        user_id: UserId,
 753        tx: &DatabaseTransaction,
 754    ) -> Result<Vec<proto::ChannelBufferVersion>> {
 755        let observed_operations = observed_buffer_edits::Entity::find()
 756            .filter(observed_buffer_edits::Column::UserId.eq(user_id))
 757            .filter(
 758                observed_buffer_edits::Column::BufferId
 759                    .is_in(channel_ids_by_buffer_id.keys().copied()),
 760            )
 761            .all(tx)
 762            .await?;
 763
 764        Ok(observed_operations
 765            .iter()
 766            .flat_map(|op| {
 767                Some(proto::ChannelBufferVersion {
 768                    channel_id: channel_ids_by_buffer_id.get(&op.buffer_id)?.to_proto(),
 769                    epoch: op.epoch as u64,
 770                    version: vec![proto::VectorClockEntry {
 771                        replica_id: op.replica_id as u32,
 772                        timestamp: op.lamport_timestamp as u32,
 773                    }],
 774                })
 775            })
 776            .collect())
 777    }
 778}
 779
 780fn operation_to_storage(
 781    operation: &proto::Operation,
 782    buffer: &buffer::Model,
 783    _format: i32,
 784) -> Option<buffer_operation::ActiveModel> {
 785    let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? {
 786        proto::operation::Variant::Edit(operation) => (
 787            operation.replica_id,
 788            operation.lamport_timestamp,
 789            storage::Operation {
 790                version: version_to_storage(&operation.version),
 791                is_undo: false,
 792                edit_ranges: operation
 793                    .ranges
 794                    .iter()
 795                    .map(|range| storage::Range {
 796                        start: range.start,
 797                        end: range.end,
 798                    })
 799                    .collect(),
 800                edit_texts: operation.new_text.clone(),
 801                undo_counts: Vec::new(),
 802            },
 803        ),
 804        proto::operation::Variant::Undo(operation) => (
 805            operation.replica_id,
 806            operation.lamport_timestamp,
 807            storage::Operation {
 808                version: version_to_storage(&operation.version),
 809                is_undo: true,
 810                edit_ranges: Vec::new(),
 811                edit_texts: Vec::new(),
 812                undo_counts: operation
 813                    .counts
 814                    .iter()
 815                    .map(|entry| storage::UndoCount {
 816                        replica_id: entry.replica_id,
 817                        lamport_timestamp: entry.lamport_timestamp,
 818                        count: entry.count,
 819                    })
 820                    .collect(),
 821            },
 822        ),
 823        _ => None?,
 824    };
 825
 826    Some(buffer_operation::ActiveModel {
 827        buffer_id: ActiveValue::Set(buffer.id),
 828        epoch: ActiveValue::Set(buffer.epoch),
 829        replica_id: ActiveValue::Set(replica_id as i32),
 830        lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32),
 831        value: ActiveValue::Set(value.encode_to_vec()),
 832    })
 833}
 834
 835fn operation_from_storage(
 836    row: buffer_operation::Model,
 837    _format_version: i32,
 838) -> Result<proto::operation::Variant, Error> {
 839    let operation =
 840        storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{}", error))?;
 841    let version = version_from_storage(&operation.version);
 842    Ok(if operation.is_undo {
 843        proto::operation::Variant::Undo(proto::operation::Undo {
 844            replica_id: row.replica_id as u32,
 845            lamport_timestamp: row.lamport_timestamp as u32,
 846            version,
 847            counts: operation
 848                .undo_counts
 849                .iter()
 850                .map(|entry| proto::UndoCount {
 851                    replica_id: entry.replica_id,
 852                    lamport_timestamp: entry.lamport_timestamp,
 853                    count: entry.count,
 854                })
 855                .collect(),
 856        })
 857    } else {
 858        proto::operation::Variant::Edit(proto::operation::Edit {
 859            replica_id: row.replica_id as u32,
 860            lamport_timestamp: row.lamport_timestamp as u32,
 861            version,
 862            ranges: operation
 863                .edit_ranges
 864                .into_iter()
 865                .map(|range| proto::Range {
 866                    start: range.start,
 867                    end: range.end,
 868                })
 869                .collect(),
 870            new_text: operation.edit_texts,
 871        })
 872    })
 873}
 874
 875fn version_to_storage(version: &Vec<proto::VectorClockEntry>) -> Vec<storage::VectorClockEntry> {
 876    version
 877        .iter()
 878        .map(|entry| storage::VectorClockEntry {
 879            replica_id: entry.replica_id,
 880            timestamp: entry.timestamp,
 881        })
 882        .collect()
 883}
 884
 885fn version_from_storage(version: &Vec<storage::VectorClockEntry>) -> Vec<proto::VectorClockEntry> {
 886    version
 887        .iter()
 888        .map(|entry| proto::VectorClockEntry {
 889            replica_id: entry.replica_id,
 890            timestamp: entry.timestamp,
 891        })
 892        .collect()
 893}
 894
 895// This is currently a manual copy of the deserialization code in the client's language crate
 896pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operation> {
 897    match operation.variant? {
 898        proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
 899            timestamp: clock::Lamport {
 900                replica_id: edit.replica_id as text::ReplicaId,
 901                value: edit.lamport_timestamp,
 902            },
 903            version: version_from_wire(&edit.version),
 904            ranges: edit
 905                .ranges
 906                .into_iter()
 907                .map(|range| {
 908                    text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
 909                })
 910                .collect(),
 911            new_text: edit.new_text.into_iter().map(Arc::from).collect(),
 912        })),
 913        proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation {
 914            timestamp: clock::Lamport {
 915                replica_id: undo.replica_id as text::ReplicaId,
 916                value: undo.lamport_timestamp,
 917            },
 918            version: version_from_wire(&undo.version),
 919            counts: undo
 920                .counts
 921                .into_iter()
 922                .map(|c| {
 923                    (
 924                        clock::Lamport {
 925                            replica_id: c.replica_id as text::ReplicaId,
 926                            value: c.lamport_timestamp,
 927                        },
 928                        c.count,
 929                    )
 930                })
 931                .collect(),
 932        })),
 933        _ => None,
 934    }
 935}
 936
 937fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
 938    let mut version = clock::Global::new();
 939    for entry in message {
 940        version.observe(clock::Lamport {
 941            replica_id: entry.replica_id as text::ReplicaId,
 942            value: entry.timestamp,
 943        });
 944    }
 945    version
 946}
 947
 948fn version_to_wire(version: &clock::Global) -> Vec<proto::VectorClockEntry> {
 949    let mut message = Vec::new();
 950    for entry in version.iter() {
 951        message.push(proto::VectorClockEntry {
 952            replica_id: entry.replica_id as u32,
 953            timestamp: entry.value,
 954        });
 955    }
 956    message
 957}
 958
 959#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
 960enum QueryOperationSerializationVersion {
 961    OperationSerializationVersion,
 962}
 963
 964mod storage {
 965    #![allow(non_snake_case)]
 966    use prost::Message;
 967    pub const SERIALIZATION_VERSION: i32 = 1;
 968
 969    #[derive(Message)]
 970    pub struct Operation {
 971        #[prost(message, repeated, tag = "2")]
 972        pub version: Vec<VectorClockEntry>,
 973        #[prost(bool, tag = "3")]
 974        pub is_undo: bool,
 975        #[prost(message, repeated, tag = "4")]
 976        pub edit_ranges: Vec<Range>,
 977        #[prost(string, repeated, tag = "5")]
 978        pub edit_texts: Vec<String>,
 979        #[prost(message, repeated, tag = "6")]
 980        pub undo_counts: Vec<UndoCount>,
 981    }
 982
 983    #[derive(Message)]
 984    pub struct VectorClockEntry {
 985        #[prost(uint32, tag = "1")]
 986        pub replica_id: u32,
 987        #[prost(uint32, tag = "2")]
 988        pub timestamp: u32,
 989    }
 990
 991    #[derive(Message)]
 992    pub struct Range {
 993        #[prost(uint64, tag = "1")]
 994        pub start: u64,
 995        #[prost(uint64, tag = "2")]
 996        pub end: u64,
 997    }
 998
 999    #[derive(Message)]
1000    pub struct UndoCount {
1001        #[prost(uint32, tag = "1")]
1002        pub replica_id: u32,
1003        #[prost(uint32, tag = "2")]
1004        pub lamport_timestamp: u32,
1005        #[prost(uint32, tag = "3")]
1006        pub count: u32,
1007    }
1008}