buffers.rs

   1use super::*;
   2use anyhow::Context as _;
   3use prost::Message;
   4use text::{EditOperation, UndoOperation};
   5
   6pub struct LeftChannelBuffer {
   7    pub channel_id: ChannelId,
   8    pub collaborators: Vec<proto::Collaborator>,
   9    pub connections: Vec<ConnectionId>,
  10}
  11
  12impl Database {
  13    /// Open a channel buffer. Returns the current contents, and adds you to the list of people
  14    /// to notify on changes.
  15    pub async fn join_channel_buffer(
  16        &self,
  17        channel_id: ChannelId,
  18        user_id: UserId,
  19        connection: ConnectionId,
  20    ) -> Result<proto::JoinChannelBufferResponse> {
  21        self.transaction(|tx| async move {
  22            let channel = self.get_channel_internal(channel_id, &tx).await?;
  23            self.check_user_is_channel_participant(&channel, user_id, &tx)
  24                .await?;
  25
  26            let buffer = channel::Model {
  27                id: channel_id,
  28                ..Default::default()
  29            }
  30            .find_related(buffer::Entity)
  31            .one(&*tx)
  32            .await?;
  33
  34            let buffer = if let Some(buffer) = buffer {
  35                buffer
  36            } else {
  37                let buffer = buffer::ActiveModel {
  38                    channel_id: ActiveValue::Set(channel_id),
  39                    ..Default::default()
  40                }
  41                .insert(&*tx)
  42                .await?;
  43                buffer_snapshot::ActiveModel {
  44                    buffer_id: ActiveValue::Set(buffer.id),
  45                    epoch: ActiveValue::Set(0),
  46                    text: ActiveValue::Set(String::new()),
  47                    operation_serialization_version: ActiveValue::Set(
  48                        storage::SERIALIZATION_VERSION,
  49                    ),
  50                }
  51                .insert(&*tx)
  52                .await?;
  53                buffer
  54            };
  55
  56            // Join the collaborators
  57            let mut collaborators = channel_buffer_collaborator::Entity::find()
  58                .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
  59                .all(&*tx)
  60                .await?;
  61            let replica_ids = collaborators
  62                .iter()
  63                .map(|c| c.replica_id)
  64                .collect::<HashSet<_>>();
  65            let mut replica_id = ReplicaId(0);
  66            while replica_ids.contains(&replica_id) {
  67                replica_id.0 += 1;
  68            }
  69            let collaborator = channel_buffer_collaborator::ActiveModel {
  70                channel_id: ActiveValue::Set(channel_id),
  71                connection_id: ActiveValue::Set(connection.id as i32),
  72                connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
  73                user_id: ActiveValue::Set(user_id),
  74                replica_id: ActiveValue::Set(replica_id),
  75                ..Default::default()
  76            }
  77            .insert(&*tx)
  78            .await?;
  79            collaborators.push(collaborator);
  80
  81            let (base_text, operations, max_operation) =
  82                self.get_buffer_state(&buffer, &tx).await?;
  83
  84            // Save the last observed operation
  85            if let Some(op) = max_operation {
  86                observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
  87                    user_id: ActiveValue::Set(user_id),
  88                    buffer_id: ActiveValue::Set(buffer.id),
  89                    epoch: ActiveValue::Set(op.epoch),
  90                    lamport_timestamp: ActiveValue::Set(op.lamport_timestamp),
  91                    replica_id: ActiveValue::Set(op.replica_id),
  92                })
  93                .on_conflict(
  94                    OnConflict::columns([
  95                        observed_buffer_edits::Column::UserId,
  96                        observed_buffer_edits::Column::BufferId,
  97                    ])
  98                    .update_columns([
  99                        observed_buffer_edits::Column::Epoch,
 100                        observed_buffer_edits::Column::LamportTimestamp,
 101                    ])
 102                    .to_owned(),
 103                )
 104                .exec(&*tx)
 105                .await?;
 106            }
 107
 108            Ok(proto::JoinChannelBufferResponse {
 109                buffer_id: buffer.id.to_proto(),
 110                replica_id: replica_id.to_proto() as u32,
 111                base_text,
 112                operations,
 113                epoch: buffer.epoch as u64,
 114                collaborators: collaborators
 115                    .into_iter()
 116                    .map(|collaborator| proto::Collaborator {
 117                        peer_id: Some(collaborator.connection().into()),
 118                        user_id: collaborator.user_id.to_proto(),
 119                        replica_id: collaborator.replica_id.0 as u32,
 120                        is_host: false,
 121                    })
 122                    .collect(),
 123            })
 124        })
 125        .await
 126    }
 127
 128    /// Rejoin a channel buffer (after a connection interruption)
 129    pub async fn rejoin_channel_buffers(
 130        &self,
 131        buffers: &[proto::ChannelBufferVersion],
 132        user_id: UserId,
 133        connection_id: ConnectionId,
 134    ) -> Result<Vec<RejoinedChannelBuffer>> {
 135        self.transaction(|tx| async move {
 136            let mut results = Vec::new();
 137            for client_buffer in buffers {
 138                let channel = self
 139                    .get_channel_internal(ChannelId::from_proto(client_buffer.channel_id), &tx)
 140                    .await?;
 141                if self
 142                    .check_user_is_channel_participant(&channel, user_id, &tx)
 143                    .await
 144                    .is_err()
 145                {
 146                    log::info!("user is not a member of channel");
 147                    continue;
 148                }
 149
 150                let buffer = self.get_channel_buffer(channel.id, &tx).await?;
 151                let mut collaborators = channel_buffer_collaborator::Entity::find()
 152                    .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel.id))
 153                    .all(&*tx)
 154                    .await?;
 155
 156                // If the buffer epoch hasn't changed since the client lost
 157                // connection, then the client's buffer can be synchronized with
 158                // the server's buffer.
 159                if buffer.epoch as u64 != client_buffer.epoch {
 160                    log::info!("can't rejoin buffer, epoch has changed");
 161                    continue;
 162                }
 163
 164                // Find the collaborator record for this user's previous lost
 165                // connection. Update it with the new connection id.
 166                let Some(self_collaborator) =
 167                    collaborators.iter_mut().find(|c| c.user_id == user_id)
 168                else {
 169                    log::info!("can't rejoin buffer, no previous collaborator found");
 170                    continue;
 171                };
 172                let old_connection_id = self_collaborator.connection();
 173                *self_collaborator = channel_buffer_collaborator::ActiveModel {
 174                    id: ActiveValue::Unchanged(self_collaborator.id),
 175                    connection_id: ActiveValue::Set(connection_id.id as i32),
 176                    connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
 177                    connection_lost: ActiveValue::Set(false),
 178                    ..Default::default()
 179                }
 180                .update(&*tx)
 181                .await?;
 182
 183                let client_version = version_from_wire(&client_buffer.version);
 184                let serialization_version = self
 185                    .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
 186                    .await?;
 187
 188                let mut rows = buffer_operation::Entity::find()
 189                    .filter(
 190                        buffer_operation::Column::BufferId
 191                            .eq(buffer.id)
 192                            .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
 193                    )
 194                    .stream(&*tx)
 195                    .await?;
 196
 197                // Find the server's version vector and any operations
 198                // that the client has not seen.
 199                let mut server_version = clock::Global::new();
 200                let mut operations = Vec::new();
 201                while let Some(row) = rows.next().await {
 202                    let row = row?;
 203                    let timestamp = clock::Lamport {
 204                        replica_id: row.replica_id as u16,
 205                        value: row.lamport_timestamp as u32,
 206                    };
 207                    server_version.observe(timestamp);
 208                    if !client_version.observed(timestamp) {
 209                        operations.push(proto::Operation {
 210                            variant: Some(operation_from_storage(row, serialization_version)?),
 211                        })
 212                    }
 213                }
 214
 215                results.push(RejoinedChannelBuffer {
 216                    old_connection_id,
 217                    buffer: proto::RejoinedChannelBuffer {
 218                        channel_id: client_buffer.channel_id,
 219                        version: version_to_wire(&server_version),
 220                        operations,
 221                        collaborators: collaborators
 222                            .into_iter()
 223                            .map(|collaborator| proto::Collaborator {
 224                                peer_id: Some(collaborator.connection().into()),
 225                                user_id: collaborator.user_id.to_proto(),
 226                                replica_id: collaborator.replica_id.0 as u32,
 227                                is_host: false,
 228                            })
 229                            .collect(),
 230                    },
 231                });
 232            }
 233
 234            Ok(results)
 235        })
 236        .await
 237    }
 238
 239    /// Clear out any buffer collaborators who are no longer collaborating.
 240    pub async fn clear_stale_channel_buffer_collaborators(
 241        &self,
 242        channel_id: ChannelId,
 243        server_id: ServerId,
 244    ) -> Result<RefreshedChannelBuffer> {
 245        self.transaction(|tx| async move {
 246            let db_collaborators = channel_buffer_collaborator::Entity::find()
 247                .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
 248                .all(&*tx)
 249                .await?;
 250
 251            let mut connection_ids = Vec::new();
 252            let mut collaborators = Vec::new();
 253            let mut collaborator_ids_to_remove = Vec::new();
 254            for db_collaborator in &db_collaborators {
 255                if !db_collaborator.connection_lost
 256                    && db_collaborator.connection_server_id == server_id
 257                {
 258                    connection_ids.push(db_collaborator.connection());
 259                    collaborators.push(proto::Collaborator {
 260                        peer_id: Some(db_collaborator.connection().into()),
 261                        replica_id: db_collaborator.replica_id.0 as u32,
 262                        user_id: db_collaborator.user_id.to_proto(),
 263                        is_host: false,
 264                    })
 265                } else {
 266                    collaborator_ids_to_remove.push(db_collaborator.id);
 267                }
 268            }
 269
 270            channel_buffer_collaborator::Entity::delete_many()
 271                .filter(channel_buffer_collaborator::Column::Id.is_in(collaborator_ids_to_remove))
 272                .exec(&*tx)
 273                .await?;
 274
 275            Ok(RefreshedChannelBuffer {
 276                connection_ids,
 277                collaborators,
 278            })
 279        })
 280        .await
 281    }
 282
 283    /// Close the channel buffer, and stop receiving updates for it.
 284    pub async fn leave_channel_buffer(
 285        &self,
 286        channel_id: ChannelId,
 287        connection: ConnectionId,
 288    ) -> Result<LeftChannelBuffer> {
 289        self.transaction(|tx| async move {
 290            self.leave_channel_buffer_internal(channel_id, connection, &tx)
 291                .await
 292        })
 293        .await
 294    }
 295
 296    /// Close the channel buffer, and stop receiving updates for it.
 297    pub async fn channel_buffer_connection_lost(
 298        &self,
 299        connection: ConnectionId,
 300        tx: &DatabaseTransaction,
 301    ) -> Result<()> {
 302        channel_buffer_collaborator::Entity::update_many()
 303            .filter(
 304                Condition::all()
 305                    .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
 306                    .add(
 307                        channel_buffer_collaborator::Column::ConnectionServerId
 308                            .eq(connection.owner_id as i32),
 309                    ),
 310            )
 311            .set(channel_buffer_collaborator::ActiveModel {
 312                connection_lost: ActiveValue::set(true),
 313                ..Default::default()
 314            })
 315            .exec(tx)
 316            .await?;
 317        Ok(())
 318    }
 319
 320    /// Close all open channel buffers
 321    pub async fn leave_channel_buffers(
 322        &self,
 323        connection: ConnectionId,
 324    ) -> Result<Vec<LeftChannelBuffer>> {
 325        self.transaction(|tx| async move {
 326            #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
 327            enum QueryChannelIds {
 328                ChannelId,
 329            }
 330
 331            let channel_ids: Vec<ChannelId> = channel_buffer_collaborator::Entity::find()
 332                .select_only()
 333                .column(channel_buffer_collaborator::Column::ChannelId)
 334                .filter(Condition::all().add(
 335                    channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32),
 336                ))
 337                .into_values::<_, QueryChannelIds>()
 338                .all(&*tx)
 339                .await?;
 340
 341            let mut result = Vec::new();
 342            for channel_id in channel_ids {
 343                let left_channel_buffer = self
 344                    .leave_channel_buffer_internal(channel_id, connection, &tx)
 345                    .await?;
 346                result.push(left_channel_buffer);
 347            }
 348
 349            Ok(result)
 350        })
 351        .await
 352    }
 353
 354    async fn leave_channel_buffer_internal(
 355        &self,
 356        channel_id: ChannelId,
 357        connection: ConnectionId,
 358        tx: &DatabaseTransaction,
 359    ) -> Result<LeftChannelBuffer> {
 360        let result = channel_buffer_collaborator::Entity::delete_many()
 361            .filter(
 362                Condition::all()
 363                    .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
 364                    .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
 365                    .add(
 366                        channel_buffer_collaborator::Column::ConnectionServerId
 367                            .eq(connection.owner_id as i32),
 368                    ),
 369            )
 370            .exec(tx)
 371            .await?;
 372        if result.rows_affected == 0 {
 373            Err(anyhow!("not a collaborator on this project"))?;
 374        }
 375
 376        let mut collaborators = Vec::new();
 377        let mut connections = Vec::new();
 378        let mut rows = channel_buffer_collaborator::Entity::find()
 379            .filter(
 380                Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
 381            )
 382            .stream(tx)
 383            .await?;
 384        while let Some(row) = rows.next().await {
 385            let row = row?;
 386            let connection = row.connection();
 387            connections.push(connection);
 388            collaborators.push(proto::Collaborator {
 389                peer_id: Some(connection.into()),
 390                replica_id: row.replica_id.0 as u32,
 391                user_id: row.user_id.to_proto(),
 392                is_host: false,
 393            });
 394        }
 395
 396        drop(rows);
 397
 398        if collaborators.is_empty() {
 399            self.snapshot_channel_buffer(channel_id, tx).await?;
 400        }
 401
 402        Ok(LeftChannelBuffer {
 403            channel_id,
 404            collaborators,
 405            connections,
 406        })
 407    }
 408
 409    pub async fn get_channel_buffer_collaborators(
 410        &self,
 411        channel_id: ChannelId,
 412    ) -> Result<Vec<UserId>> {
 413        self.transaction(|tx| async move {
 414            self.get_channel_buffer_collaborators_internal(channel_id, &tx)
 415                .await
 416        })
 417        .await
 418    }
 419
 420    async fn get_channel_buffer_collaborators_internal(
 421        &self,
 422        channel_id: ChannelId,
 423        tx: &DatabaseTransaction,
 424    ) -> Result<Vec<UserId>> {
 425        #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
 426        enum QueryUserIds {
 427            UserId,
 428        }
 429
 430        let users: Vec<UserId> = channel_buffer_collaborator::Entity::find()
 431            .select_only()
 432            .column(channel_buffer_collaborator::Column::UserId)
 433            .filter(
 434                Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
 435            )
 436            .into_values::<_, QueryUserIds>()
 437            .all(tx)
 438            .await?;
 439
 440        Ok(users)
 441    }
 442
 443    pub async fn update_channel_buffer(
 444        &self,
 445        channel_id: ChannelId,
 446        user: UserId,
 447        operations: &[proto::Operation],
 448    ) -> Result<(HashSet<ConnectionId>, i32, Vec<proto::VectorClockEntry>)> {
 449        self.transaction(move |tx| async move {
 450            let channel = self.get_channel_internal(channel_id, &tx).await?;
 451
 452            let mut requires_write_permission = false;
 453            for op in operations.iter() {
 454                match op.variant {
 455                    None | Some(proto::operation::Variant::UpdateSelections(_)) => {}
 456                    Some(_) => requires_write_permission = true,
 457                }
 458            }
 459            if requires_write_permission {
 460                self.check_user_is_channel_member(&channel, user, &tx)
 461                    .await?;
 462            } else {
 463                self.check_user_is_channel_participant(&channel, user, &tx)
 464                    .await?;
 465            }
 466
 467            let buffer = buffer::Entity::find()
 468                .filter(buffer::Column::ChannelId.eq(channel_id))
 469                .one(&*tx)
 470                .await?
 471                .context("no such buffer")?;
 472
 473            let serialization_version = self
 474                .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
 475                .await?;
 476
 477            let operations = operations
 478                .iter()
 479                .filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
 480                .collect::<Vec<_>>();
 481
 482            let max_version;
 483
 484            if !operations.is_empty() {
 485                let max_operation = operations
 486                    .iter()
 487                    .max_by_key(|op| (op.lamport_timestamp.as_ref(), op.replica_id.as_ref()))
 488                    .unwrap();
 489
 490                max_version = vec![proto::VectorClockEntry {
 491                    replica_id: *max_operation.replica_id.as_ref() as u32,
 492                    timestamp: *max_operation.lamport_timestamp.as_ref() as u32,
 493                }];
 494
 495                // get current channel participants and save the max operation above
 496                self.save_max_operation(
 497                    user,
 498                    buffer.id,
 499                    buffer.epoch,
 500                    *max_operation.replica_id.as_ref(),
 501                    *max_operation.lamport_timestamp.as_ref(),
 502                    &tx,
 503                )
 504                .await?;
 505
 506                buffer_operation::Entity::insert_many(operations)
 507                    .on_conflict(
 508                        OnConflict::columns([
 509                            buffer_operation::Column::BufferId,
 510                            buffer_operation::Column::Epoch,
 511                            buffer_operation::Column::LamportTimestamp,
 512                            buffer_operation::Column::ReplicaId,
 513                        ])
 514                        .do_nothing()
 515                        .to_owned(),
 516                    )
 517                    .exec(&*tx)
 518                    .await?;
 519            } else {
 520                max_version = Vec::new();
 521            }
 522
 523            let mut connections = HashSet::default();
 524            let mut rows = channel_buffer_collaborator::Entity::find()
 525                .filter(
 526                    Condition::all()
 527                        .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
 528                )
 529                .stream(&*tx)
 530                .await?;
 531            while let Some(row) = rows.next().await {
 532                let row = row?;
 533                connections.insert(ConnectionId {
 534                    id: row.connection_id as u32,
 535                    owner_id: row.connection_server_id.0 as u32,
 536                });
 537            }
 538
 539            Ok((connections, buffer.epoch, max_version))
 540        })
 541        .await
 542    }
 543
 544    async fn save_max_operation(
 545        &self,
 546        user_id: UserId,
 547        buffer_id: BufferId,
 548        epoch: i32,
 549        replica_id: i32,
 550        lamport_timestamp: i32,
 551        tx: &DatabaseTransaction,
 552    ) -> Result<()> {
 553        buffer::Entity::update(buffer::ActiveModel {
 554            id: ActiveValue::Unchanged(buffer_id),
 555            epoch: ActiveValue::Unchanged(epoch),
 556            latest_operation_epoch: ActiveValue::Set(Some(epoch)),
 557            latest_operation_replica_id: ActiveValue::Set(Some(replica_id)),
 558            latest_operation_lamport_timestamp: ActiveValue::Set(Some(lamport_timestamp)),
 559            channel_id: ActiveValue::NotSet,
 560        })
 561        .exec(tx)
 562        .await?;
 563
 564        use observed_buffer_edits::Column;
 565        observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
 566            user_id: ActiveValue::Set(user_id),
 567            buffer_id: ActiveValue::Set(buffer_id),
 568            epoch: ActiveValue::Set(epoch),
 569            replica_id: ActiveValue::Set(replica_id),
 570            lamport_timestamp: ActiveValue::Set(lamport_timestamp),
 571        })
 572        .on_conflict(
 573            OnConflict::columns([Column::UserId, Column::BufferId])
 574                .update_columns([Column::Epoch, Column::LamportTimestamp, Column::ReplicaId])
 575                .action_cond_where(
 576                    Condition::any().add(Column::Epoch.lt(epoch)).add(
 577                        Condition::all().add(Column::Epoch.eq(epoch)).add(
 578                            Condition::any()
 579                                .add(Column::LamportTimestamp.lt(lamport_timestamp))
 580                                .add(
 581                                    Column::LamportTimestamp
 582                                        .eq(lamport_timestamp)
 583                                        .and(Column::ReplicaId.lt(replica_id)),
 584                                ),
 585                        ),
 586                    ),
 587                )
 588                .to_owned(),
 589        )
 590        .exec_without_returning(tx)
 591        .await?;
 592
 593        Ok(())
 594    }
 595
 596    async fn get_buffer_operation_serialization_version(
 597        &self,
 598        buffer_id: BufferId,
 599        epoch: i32,
 600        tx: &DatabaseTransaction,
 601    ) -> Result<i32> {
 602        Ok(buffer_snapshot::Entity::find()
 603            .filter(buffer_snapshot::Column::BufferId.eq(buffer_id))
 604            .filter(buffer_snapshot::Column::Epoch.eq(epoch))
 605            .select_only()
 606            .column(buffer_snapshot::Column::OperationSerializationVersion)
 607            .into_values::<_, QueryOperationSerializationVersion>()
 608            .one(tx)
 609            .await?
 610            .context("missing buffer snapshot")?)
 611    }
 612
 613    pub async fn get_channel_buffer(
 614        &self,
 615        channel_id: ChannelId,
 616        tx: &DatabaseTransaction,
 617    ) -> Result<buffer::Model> {
 618        Ok(channel::Model {
 619            id: channel_id,
 620            ..Default::default()
 621        }
 622        .find_related(buffer::Entity)
 623        .one(tx)
 624        .await?
 625        .context("no such buffer")?)
 626    }
 627
 628    async fn get_buffer_state(
 629        &self,
 630        buffer: &buffer::Model,
 631        tx: &DatabaseTransaction,
 632    ) -> Result<(
 633        String,
 634        Vec<proto::Operation>,
 635        Option<buffer_operation::Model>,
 636    )> {
 637        let id = buffer.id;
 638        let (base_text, version) = if buffer.epoch > 0 {
 639            let snapshot = buffer_snapshot::Entity::find()
 640                .filter(
 641                    buffer_snapshot::Column::BufferId
 642                        .eq(id)
 643                        .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
 644                )
 645                .one(tx)
 646                .await?
 647                .context("no such snapshot")?;
 648
 649            let version = snapshot.operation_serialization_version;
 650            (snapshot.text, version)
 651        } else {
 652            (String::new(), storage::SERIALIZATION_VERSION)
 653        };
 654
 655        let mut rows = buffer_operation::Entity::find()
 656            .filter(
 657                buffer_operation::Column::BufferId
 658                    .eq(id)
 659                    .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
 660            )
 661            .order_by_asc(buffer_operation::Column::LamportTimestamp)
 662            .order_by_asc(buffer_operation::Column::ReplicaId)
 663            .stream(tx)
 664            .await?;
 665
 666        let mut operations = Vec::new();
 667        let mut last_row = None;
 668        while let Some(row) = rows.next().await {
 669            let row = row?;
 670            last_row = Some(buffer_operation::Model {
 671                buffer_id: row.buffer_id,
 672                epoch: row.epoch,
 673                lamport_timestamp: row.lamport_timestamp,
 674                replica_id: row.replica_id,
 675                value: Default::default(),
 676            });
 677            operations.push(proto::Operation {
 678                variant: Some(operation_from_storage(row, version)?),
 679            });
 680        }
 681
 682        Ok((base_text, operations, last_row))
 683    }
 684
 685    async fn snapshot_channel_buffer(
 686        &self,
 687        channel_id: ChannelId,
 688        tx: &DatabaseTransaction,
 689    ) -> Result<()> {
 690        let buffer = self.get_channel_buffer(channel_id, tx).await?;
 691        let (base_text, operations, _) = self.get_buffer_state(&buffer, tx).await?;
 692        if operations.is_empty() {
 693            return Ok(());
 694        }
 695
 696        let mut text_buffer = text::Buffer::new(0, text::BufferId::new(1).unwrap(), base_text);
 697        text_buffer.apply_ops(operations.into_iter().filter_map(operation_from_wire));
 698
 699        let base_text = text_buffer.text();
 700        let epoch = buffer.epoch + 1;
 701
 702        buffer_snapshot::Model {
 703            buffer_id: buffer.id,
 704            epoch,
 705            text: base_text,
 706            operation_serialization_version: storage::SERIALIZATION_VERSION,
 707        }
 708        .into_active_model()
 709        .insert(tx)
 710        .await?;
 711
 712        buffer::ActiveModel {
 713            id: ActiveValue::Unchanged(buffer.id),
 714            epoch: ActiveValue::Set(epoch),
 715            latest_operation_epoch: ActiveValue::NotSet,
 716            latest_operation_replica_id: ActiveValue::NotSet,
 717            latest_operation_lamport_timestamp: ActiveValue::NotSet,
 718            channel_id: ActiveValue::NotSet,
 719        }
 720        .save(tx)
 721        .await?;
 722
 723        Ok(())
 724    }
 725
 726    pub async fn observe_buffer_version(
 727        &self,
 728        buffer_id: BufferId,
 729        user_id: UserId,
 730        epoch: i32,
 731        version: &[proto::VectorClockEntry],
 732    ) -> Result<()> {
 733        self.transaction(|tx| async move {
 734            // For now, combine concurrent operations.
 735            let Some(component) = version.iter().max_by_key(|version| version.timestamp) else {
 736                return Ok(());
 737            };
 738            self.save_max_operation(
 739                user_id,
 740                buffer_id,
 741                epoch,
 742                component.replica_id as i32,
 743                component.timestamp as i32,
 744                &tx,
 745            )
 746            .await?;
 747            Ok(())
 748        })
 749        .await
 750    }
 751
 752    pub async fn observed_channel_buffer_changes(
 753        &self,
 754        channel_ids_by_buffer_id: &HashMap<BufferId, ChannelId>,
 755        user_id: UserId,
 756        tx: &DatabaseTransaction,
 757    ) -> Result<Vec<proto::ChannelBufferVersion>> {
 758        let observed_operations = observed_buffer_edits::Entity::find()
 759            .filter(observed_buffer_edits::Column::UserId.eq(user_id))
 760            .filter(
 761                observed_buffer_edits::Column::BufferId
 762                    .is_in(channel_ids_by_buffer_id.keys().copied()),
 763            )
 764            .all(tx)
 765            .await?;
 766
 767        Ok(observed_operations
 768            .iter()
 769            .flat_map(|op| {
 770                Some(proto::ChannelBufferVersion {
 771                    channel_id: channel_ids_by_buffer_id.get(&op.buffer_id)?.to_proto(),
 772                    epoch: op.epoch as u64,
 773                    version: vec![proto::VectorClockEntry {
 774                        replica_id: op.replica_id as u32,
 775                        timestamp: op.lamport_timestamp as u32,
 776                    }],
 777                })
 778            })
 779            .collect())
 780    }
 781}
 782
 783fn operation_to_storage(
 784    operation: &proto::Operation,
 785    buffer: &buffer::Model,
 786    _format: i32,
 787) -> Option<buffer_operation::ActiveModel> {
 788    let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? {
 789        proto::operation::Variant::Edit(operation) => (
 790            operation.replica_id,
 791            operation.lamport_timestamp,
 792            storage::Operation {
 793                version: version_to_storage(&operation.version),
 794                is_undo: false,
 795                edit_ranges: operation
 796                    .ranges
 797                    .iter()
 798                    .map(|range| storage::Range {
 799                        start: range.start,
 800                        end: range.end,
 801                    })
 802                    .collect(),
 803                edit_texts: operation.new_text.clone(),
 804                undo_counts: Vec::new(),
 805            },
 806        ),
 807        proto::operation::Variant::Undo(operation) => (
 808            operation.replica_id,
 809            operation.lamport_timestamp,
 810            storage::Operation {
 811                version: version_to_storage(&operation.version),
 812                is_undo: true,
 813                edit_ranges: Vec::new(),
 814                edit_texts: Vec::new(),
 815                undo_counts: operation
 816                    .counts
 817                    .iter()
 818                    .map(|entry| storage::UndoCount {
 819                        replica_id: entry.replica_id,
 820                        lamport_timestamp: entry.lamport_timestamp,
 821                        count: entry.count,
 822                    })
 823                    .collect(),
 824            },
 825        ),
 826        _ => None?,
 827    };
 828
 829    Some(buffer_operation::ActiveModel {
 830        buffer_id: ActiveValue::Set(buffer.id),
 831        epoch: ActiveValue::Set(buffer.epoch),
 832        replica_id: ActiveValue::Set(replica_id as i32),
 833        lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32),
 834        value: ActiveValue::Set(value.encode_to_vec()),
 835    })
 836}
 837
 838fn operation_from_storage(
 839    row: buffer_operation::Model,
 840    _format_version: i32,
 841) -> Result<proto::operation::Variant, Error> {
 842    let operation =
 843        storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{error}"))?;
 844    let version = version_from_storage(&operation.version);
 845    Ok(if operation.is_undo {
 846        proto::operation::Variant::Undo(proto::operation::Undo {
 847            replica_id: row.replica_id as u32,
 848            lamport_timestamp: row.lamport_timestamp as u32,
 849            version,
 850            counts: operation
 851                .undo_counts
 852                .iter()
 853                .map(|entry| proto::UndoCount {
 854                    replica_id: entry.replica_id,
 855                    lamport_timestamp: entry.lamport_timestamp,
 856                    count: entry.count,
 857                })
 858                .collect(),
 859        })
 860    } else {
 861        proto::operation::Variant::Edit(proto::operation::Edit {
 862            replica_id: row.replica_id as u32,
 863            lamport_timestamp: row.lamport_timestamp as u32,
 864            version,
 865            ranges: operation
 866                .edit_ranges
 867                .into_iter()
 868                .map(|range| proto::Range {
 869                    start: range.start,
 870                    end: range.end,
 871                })
 872                .collect(),
 873            new_text: operation.edit_texts,
 874        })
 875    })
 876}
 877
 878fn version_to_storage(version: &[proto::VectorClockEntry]) -> Vec<storage::VectorClockEntry> {
 879    version
 880        .iter()
 881        .map(|entry| storage::VectorClockEntry {
 882            replica_id: entry.replica_id,
 883            timestamp: entry.timestamp,
 884        })
 885        .collect()
 886}
 887
 888fn version_from_storage(version: &[storage::VectorClockEntry]) -> Vec<proto::VectorClockEntry> {
 889    version
 890        .iter()
 891        .map(|entry| proto::VectorClockEntry {
 892            replica_id: entry.replica_id,
 893            timestamp: entry.timestamp,
 894        })
 895        .collect()
 896}
 897
 898// This is currently a manual copy of the deserialization code in the client's language crate
 899pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operation> {
 900    match operation.variant? {
 901        proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
 902            timestamp: clock::Lamport {
 903                replica_id: edit.replica_id as text::ReplicaId,
 904                value: edit.lamport_timestamp,
 905            },
 906            version: version_from_wire(&edit.version),
 907            ranges: edit
 908                .ranges
 909                .into_iter()
 910                .map(|range| {
 911                    text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
 912                })
 913                .collect(),
 914            new_text: edit.new_text.into_iter().map(Arc::from).collect(),
 915        })),
 916        proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation {
 917            timestamp: clock::Lamport {
 918                replica_id: undo.replica_id as text::ReplicaId,
 919                value: undo.lamport_timestamp,
 920            },
 921            version: version_from_wire(&undo.version),
 922            counts: undo
 923                .counts
 924                .into_iter()
 925                .map(|c| {
 926                    (
 927                        clock::Lamport {
 928                            replica_id: c.replica_id as text::ReplicaId,
 929                            value: c.lamport_timestamp,
 930                        },
 931                        c.count,
 932                    )
 933                })
 934                .collect(),
 935        })),
 936        _ => None,
 937    }
 938}
 939
 940fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
 941    let mut version = clock::Global::new();
 942    for entry in message {
 943        version.observe(clock::Lamport {
 944            replica_id: entry.replica_id as text::ReplicaId,
 945            value: entry.timestamp,
 946        });
 947    }
 948    version
 949}
 950
 951fn version_to_wire(version: &clock::Global) -> Vec<proto::VectorClockEntry> {
 952    let mut message = Vec::new();
 953    for entry in version.iter() {
 954        message.push(proto::VectorClockEntry {
 955            replica_id: entry.replica_id as u32,
 956            timestamp: entry.value,
 957        });
 958    }
 959    message
 960}
 961
 962#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
 963enum QueryOperationSerializationVersion {
 964    OperationSerializationVersion,
 965}
 966
 967mod storage {
 968    #![allow(non_snake_case)]
 969    use prost::Message;
 970    pub const SERIALIZATION_VERSION: i32 = 1;
 971
 972    #[derive(Message)]
 973    pub struct Operation {
 974        #[prost(message, repeated, tag = "2")]
 975        pub version: Vec<VectorClockEntry>,
 976        #[prost(bool, tag = "3")]
 977        pub is_undo: bool,
 978        #[prost(message, repeated, tag = "4")]
 979        pub edit_ranges: Vec<Range>,
 980        #[prost(string, repeated, tag = "5")]
 981        pub edit_texts: Vec<String>,
 982        #[prost(message, repeated, tag = "6")]
 983        pub undo_counts: Vec<UndoCount>,
 984    }
 985
 986    #[derive(Message)]
 987    pub struct VectorClockEntry {
 988        #[prost(uint32, tag = "1")]
 989        pub replica_id: u32,
 990        #[prost(uint32, tag = "2")]
 991        pub timestamp: u32,
 992    }
 993
 994    #[derive(Message)]
 995    pub struct Range {
 996        #[prost(uint64, tag = "1")]
 997        pub start: u64,
 998        #[prost(uint64, tag = "2")]
 999        pub end: u64,
1000    }
1001
1002    #[derive(Message)]
1003    pub struct UndoCount {
1004        #[prost(uint32, tag = "1")]
1005        pub replica_id: u32,
1006        #[prost(uint32, tag = "2")]
1007        pub lamport_timestamp: u32,
1008        #[prost(uint32, tag = "3")]
1009        pub count: u32,
1010    }
1011}