buffers.rs

   1use super::*;
   2use prost::Message;
   3use text::{EditOperation, UndoOperation};
   4
   5pub struct LeftChannelBuffer {
   6    pub channel_id: ChannelId,
   7    pub collaborators: Vec<proto::Collaborator>,
   8    pub connections: Vec<ConnectionId>,
   9}
  10
  11impl Database {
  12    /// Open a channel buffer. Returns the current contents, and adds you to the list of people
  13    /// to notify on changes.
  14    pub async fn join_channel_buffer(
  15        &self,
  16        channel_id: ChannelId,
  17        user_id: UserId,
  18        connection: ConnectionId,
  19    ) -> Result<proto::JoinChannelBufferResponse> {
  20        self.transaction(|tx| async move {
  21            let channel = self.get_channel_internal(channel_id, &tx).await?;
  22            self.check_user_is_channel_participant(&channel, user_id, &tx)
  23                .await?;
  24
  25            let buffer = channel::Model {
  26                id: channel_id,
  27                ..Default::default()
  28            }
  29            .find_related(buffer::Entity)
  30            .one(&*tx)
  31            .await?;
  32
  33            let buffer = if let Some(buffer) = buffer {
  34                buffer
  35            } else {
  36                let buffer = buffer::ActiveModel {
  37                    channel_id: ActiveValue::Set(channel_id),
  38                    ..Default::default()
  39                }
  40                .insert(&*tx)
  41                .await?;
  42                buffer_snapshot::ActiveModel {
  43                    buffer_id: ActiveValue::Set(buffer.id),
  44                    epoch: ActiveValue::Set(0),
  45                    text: ActiveValue::Set(String::new()),
  46                    operation_serialization_version: ActiveValue::Set(
  47                        storage::SERIALIZATION_VERSION,
  48                    ),
  49                }
  50                .insert(&*tx)
  51                .await?;
  52                buffer
  53            };
  54
  55            // Join the collaborators
  56            let mut collaborators = channel_buffer_collaborator::Entity::find()
  57                .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
  58                .all(&*tx)
  59                .await?;
  60            let replica_ids = collaborators
  61                .iter()
  62                .map(|c| c.replica_id)
  63                .collect::<HashSet<_>>();
  64            let mut replica_id = ReplicaId(0);
  65            while replica_ids.contains(&replica_id) {
  66                replica_id.0 += 1;
  67            }
  68            let collaborator = channel_buffer_collaborator::ActiveModel {
  69                channel_id: ActiveValue::Set(channel_id),
  70                connection_id: ActiveValue::Set(connection.id as i32),
  71                connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
  72                user_id: ActiveValue::Set(user_id),
  73                replica_id: ActiveValue::Set(replica_id),
  74                ..Default::default()
  75            }
  76            .insert(&*tx)
  77            .await?;
  78            collaborators.push(collaborator);
  79
  80            let (base_text, operations, max_operation) =
  81                self.get_buffer_state(&buffer, &tx).await?;
  82
  83            // Save the last observed operation
  84            if let Some(op) = max_operation {
  85                observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
  86                    user_id: ActiveValue::Set(user_id),
  87                    buffer_id: ActiveValue::Set(buffer.id),
  88                    epoch: ActiveValue::Set(op.epoch),
  89                    lamport_timestamp: ActiveValue::Set(op.lamport_timestamp),
  90                    replica_id: ActiveValue::Set(op.replica_id),
  91                })
  92                .on_conflict(
  93                    OnConflict::columns([
  94                        observed_buffer_edits::Column::UserId,
  95                        observed_buffer_edits::Column::BufferId,
  96                    ])
  97                    .update_columns([
  98                        observed_buffer_edits::Column::Epoch,
  99                        observed_buffer_edits::Column::LamportTimestamp,
 100                    ])
 101                    .to_owned(),
 102                )
 103                .exec(&*tx)
 104                .await?;
 105            }
 106
 107            Ok(proto::JoinChannelBufferResponse {
 108                buffer_id: buffer.id.to_proto(),
 109                replica_id: replica_id.to_proto() as u32,
 110                base_text,
 111                operations,
 112                epoch: buffer.epoch as u64,
 113                collaborators: collaborators
 114                    .into_iter()
 115                    .map(|collaborator| proto::Collaborator {
 116                        peer_id: Some(collaborator.connection().into()),
 117                        user_id: collaborator.user_id.to_proto(),
 118                        replica_id: collaborator.replica_id.0 as u32,
 119                        is_host: false,
 120                    })
 121                    .collect(),
 122            })
 123        })
 124        .await
 125    }
 126
 127    /// Rejoin a channel buffer (after a connection interruption)
 128    pub async fn rejoin_channel_buffers(
 129        &self,
 130        buffers: &[proto::ChannelBufferVersion],
 131        user_id: UserId,
 132        connection_id: ConnectionId,
 133    ) -> Result<Vec<RejoinedChannelBuffer>> {
 134        self.transaction(|tx| async move {
 135            let mut results = Vec::new();
 136            for client_buffer in buffers {
 137                let channel = self
 138                    .get_channel_internal(ChannelId::from_proto(client_buffer.channel_id), &tx)
 139                    .await?;
 140                if self
 141                    .check_user_is_channel_participant(&channel, user_id, &tx)
 142                    .await
 143                    .is_err()
 144                {
 145                    log::info!("user is not a member of channel");
 146                    continue;
 147                }
 148
 149                let buffer = self.get_channel_buffer(channel.id, &tx).await?;
 150                let mut collaborators = channel_buffer_collaborator::Entity::find()
 151                    .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel.id))
 152                    .all(&*tx)
 153                    .await?;
 154
 155                // If the buffer epoch hasn't changed since the client lost
 156                // connection, then the client's buffer can be synchronized with
 157                // the server's buffer.
 158                if buffer.epoch as u64 != client_buffer.epoch {
 159                    log::info!("can't rejoin buffer, epoch has changed");
 160                    continue;
 161                }
 162
 163                // Find the collaborator record for this user's previous lost
 164                // connection. Update it with the new connection id.
 165                let Some(self_collaborator) =
 166                    collaborators.iter_mut().find(|c| c.user_id == user_id)
 167                else {
 168                    log::info!("can't rejoin buffer, no previous collaborator found");
 169                    continue;
 170                };
 171                let old_connection_id = self_collaborator.connection();
 172                *self_collaborator = channel_buffer_collaborator::ActiveModel {
 173                    id: ActiveValue::Unchanged(self_collaborator.id),
 174                    connection_id: ActiveValue::Set(connection_id.id as i32),
 175                    connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
 176                    connection_lost: ActiveValue::Set(false),
 177                    ..Default::default()
 178                }
 179                .update(&*tx)
 180                .await?;
 181
 182                let client_version = version_from_wire(&client_buffer.version);
 183                let serialization_version = self
 184                    .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
 185                    .await?;
 186
 187                let mut rows = buffer_operation::Entity::find()
 188                    .filter(
 189                        buffer_operation::Column::BufferId
 190                            .eq(buffer.id)
 191                            .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
 192                    )
 193                    .stream(&*tx)
 194                    .await?;
 195
 196                // Find the server's version vector and any operations
 197                // that the client has not seen.
 198                let mut server_version = clock::Global::new();
 199                let mut operations = Vec::new();
 200                while let Some(row) = rows.next().await {
 201                    let row = row?;
 202                    let timestamp = clock::Lamport {
 203                        replica_id: row.replica_id as u16,
 204                        value: row.lamport_timestamp as u32,
 205                    };
 206                    server_version.observe(timestamp);
 207                    if !client_version.observed(timestamp) {
 208                        operations.push(proto::Operation {
 209                            variant: Some(operation_from_storage(row, serialization_version)?),
 210                        })
 211                    }
 212                }
 213
 214                results.push(RejoinedChannelBuffer {
 215                    old_connection_id,
 216                    buffer: proto::RejoinedChannelBuffer {
 217                        channel_id: client_buffer.channel_id,
 218                        version: version_to_wire(&server_version),
 219                        operations,
 220                        collaborators: collaborators
 221                            .into_iter()
 222                            .map(|collaborator| proto::Collaborator {
 223                                peer_id: Some(collaborator.connection().into()),
 224                                user_id: collaborator.user_id.to_proto(),
 225                                replica_id: collaborator.replica_id.0 as u32,
 226                                is_host: false,
 227                            })
 228                            .collect(),
 229                    },
 230                });
 231            }
 232
 233            Ok(results)
 234        })
 235        .await
 236    }
 237
 238    /// Clear out any buffer collaborators who are no longer collaborating.
 239    pub async fn clear_stale_channel_buffer_collaborators(
 240        &self,
 241        channel_id: ChannelId,
 242        server_id: ServerId,
 243    ) -> Result<RefreshedChannelBuffer> {
 244        self.transaction(|tx| async move {
 245            let db_collaborators = channel_buffer_collaborator::Entity::find()
 246                .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
 247                .all(&*tx)
 248                .await?;
 249
 250            let mut connection_ids = Vec::new();
 251            let mut collaborators = Vec::new();
 252            let mut collaborator_ids_to_remove = Vec::new();
 253            for db_collaborator in &db_collaborators {
 254                if !db_collaborator.connection_lost
 255                    && db_collaborator.connection_server_id == server_id
 256                {
 257                    connection_ids.push(db_collaborator.connection());
 258                    collaborators.push(proto::Collaborator {
 259                        peer_id: Some(db_collaborator.connection().into()),
 260                        replica_id: db_collaborator.replica_id.0 as u32,
 261                        user_id: db_collaborator.user_id.to_proto(),
 262                        is_host: false,
 263                    })
 264                } else {
 265                    collaborator_ids_to_remove.push(db_collaborator.id);
 266                }
 267            }
 268
 269            channel_buffer_collaborator::Entity::delete_many()
 270                .filter(channel_buffer_collaborator::Column::Id.is_in(collaborator_ids_to_remove))
 271                .exec(&*tx)
 272                .await?;
 273
 274            Ok(RefreshedChannelBuffer {
 275                connection_ids,
 276                collaborators,
 277            })
 278        })
 279        .await
 280    }
 281
 282    /// Close the channel buffer, and stop receiving updates for it.
 283    pub async fn leave_channel_buffer(
 284        &self,
 285        channel_id: ChannelId,
 286        connection: ConnectionId,
 287    ) -> Result<LeftChannelBuffer> {
 288        self.transaction(|tx| async move {
 289            self.leave_channel_buffer_internal(channel_id, connection, &tx)
 290                .await
 291        })
 292        .await
 293    }
 294
 295    /// Close the channel buffer, and stop receiving updates for it.
 296    pub async fn channel_buffer_connection_lost(
 297        &self,
 298        connection: ConnectionId,
 299        tx: &DatabaseTransaction,
 300    ) -> Result<()> {
 301        channel_buffer_collaborator::Entity::update_many()
 302            .filter(
 303                Condition::all()
 304                    .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
 305                    .add(
 306                        channel_buffer_collaborator::Column::ConnectionServerId
 307                            .eq(connection.owner_id as i32),
 308                    ),
 309            )
 310            .set(channel_buffer_collaborator::ActiveModel {
 311                connection_lost: ActiveValue::set(true),
 312                ..Default::default()
 313            })
 314            .exec(tx)
 315            .await?;
 316        Ok(())
 317    }
 318
 319    /// Close all open channel buffers
 320    pub async fn leave_channel_buffers(
 321        &self,
 322        connection: ConnectionId,
 323    ) -> Result<Vec<LeftChannelBuffer>> {
 324        self.transaction(|tx| async move {
 325            #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
 326            enum QueryChannelIds {
 327                ChannelId,
 328            }
 329
 330            let channel_ids: Vec<ChannelId> = channel_buffer_collaborator::Entity::find()
 331                .select_only()
 332                .column(channel_buffer_collaborator::Column::ChannelId)
 333                .filter(Condition::all().add(
 334                    channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32),
 335                ))
 336                .into_values::<_, QueryChannelIds>()
 337                .all(&*tx)
 338                .await?;
 339
 340            let mut result = Vec::new();
 341            for channel_id in channel_ids {
 342                let left_channel_buffer = self
 343                    .leave_channel_buffer_internal(channel_id, connection, &tx)
 344                    .await?;
 345                result.push(left_channel_buffer);
 346            }
 347
 348            Ok(result)
 349        })
 350        .await
 351    }
 352
 353    async fn leave_channel_buffer_internal(
 354        &self,
 355        channel_id: ChannelId,
 356        connection: ConnectionId,
 357        tx: &DatabaseTransaction,
 358    ) -> Result<LeftChannelBuffer> {
 359        let result = channel_buffer_collaborator::Entity::delete_many()
 360            .filter(
 361                Condition::all()
 362                    .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
 363                    .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
 364                    .add(
 365                        channel_buffer_collaborator::Column::ConnectionServerId
 366                            .eq(connection.owner_id as i32),
 367                    ),
 368            )
 369            .exec(tx)
 370            .await?;
 371        if result.rows_affected == 0 {
 372            Err(anyhow!("not a collaborator on this project"))?;
 373        }
 374
 375        let mut collaborators = Vec::new();
 376        let mut connections = Vec::new();
 377        let mut rows = channel_buffer_collaborator::Entity::find()
 378            .filter(
 379                Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
 380            )
 381            .stream(tx)
 382            .await?;
 383        while let Some(row) = rows.next().await {
 384            let row = row?;
 385            let connection = row.connection();
 386            connections.push(connection);
 387            collaborators.push(proto::Collaborator {
 388                peer_id: Some(connection.into()),
 389                replica_id: row.replica_id.0 as u32,
 390                user_id: row.user_id.to_proto(),
 391                is_host: false,
 392            });
 393        }
 394
 395        drop(rows);
 396
 397        if collaborators.is_empty() {
 398            self.snapshot_channel_buffer(channel_id, tx).await?;
 399        }
 400
 401        Ok(LeftChannelBuffer {
 402            channel_id,
 403            collaborators,
 404            connections,
 405        })
 406    }
 407
 408    pub async fn get_channel_buffer_collaborators(
 409        &self,
 410        channel_id: ChannelId,
 411    ) -> Result<Vec<UserId>> {
 412        self.transaction(|tx| async move {
 413            self.get_channel_buffer_collaborators_internal(channel_id, &tx)
 414                .await
 415        })
 416        .await
 417    }
 418
 419    async fn get_channel_buffer_collaborators_internal(
 420        &self,
 421        channel_id: ChannelId,
 422        tx: &DatabaseTransaction,
 423    ) -> Result<Vec<UserId>> {
 424        #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
 425        enum QueryUserIds {
 426            UserId,
 427        }
 428
 429        let users: Vec<UserId> = channel_buffer_collaborator::Entity::find()
 430            .select_only()
 431            .column(channel_buffer_collaborator::Column::UserId)
 432            .filter(
 433                Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
 434            )
 435            .into_values::<_, QueryUserIds>()
 436            .all(tx)
 437            .await?;
 438
 439        Ok(users)
 440    }
 441
 442    pub async fn update_channel_buffer(
 443        &self,
 444        channel_id: ChannelId,
 445        user: UserId,
 446        operations: &[proto::Operation],
 447    ) -> Result<(HashSet<ConnectionId>, i32, Vec<proto::VectorClockEntry>)> {
 448        self.transaction(move |tx| async move {
 449            let channel = self.get_channel_internal(channel_id, &tx).await?;
 450
 451            let mut requires_write_permission = false;
 452            for op in operations.iter() {
 453                match op.variant {
 454                    None | Some(proto::operation::Variant::UpdateSelections(_)) => {}
 455                    Some(_) => requires_write_permission = true,
 456                }
 457            }
 458            if requires_write_permission {
 459                self.check_user_is_channel_member(&channel, user, &tx)
 460                    .await?;
 461            } else {
 462                self.check_user_is_channel_participant(&channel, user, &tx)
 463                    .await?;
 464            }
 465
 466            let buffer = buffer::Entity::find()
 467                .filter(buffer::Column::ChannelId.eq(channel_id))
 468                .one(&*tx)
 469                .await?
 470                .ok_or_else(|| anyhow!("no such buffer"))?;
 471
 472            let serialization_version = self
 473                .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
 474                .await?;
 475
 476            let operations = operations
 477                .iter()
 478                .filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
 479                .collect::<Vec<_>>();
 480
 481            let max_version;
 482
 483            if !operations.is_empty() {
 484                let max_operation = operations
 485                    .iter()
 486                    .max_by_key(|op| (op.lamport_timestamp.as_ref(), op.replica_id.as_ref()))
 487                    .unwrap();
 488
 489                max_version = vec![proto::VectorClockEntry {
 490                    replica_id: *max_operation.replica_id.as_ref() as u32,
 491                    timestamp: *max_operation.lamport_timestamp.as_ref() as u32,
 492                }];
 493
 494                // get current channel participants and save the max operation above
 495                self.save_max_operation(
 496                    user,
 497                    buffer.id,
 498                    buffer.epoch,
 499                    *max_operation.replica_id.as_ref(),
 500                    *max_operation.lamport_timestamp.as_ref(),
 501                    &tx,
 502                )
 503                .await?;
 504
 505                buffer_operation::Entity::insert_many(operations)
 506                    .on_conflict(
 507                        OnConflict::columns([
 508                            buffer_operation::Column::BufferId,
 509                            buffer_operation::Column::Epoch,
 510                            buffer_operation::Column::LamportTimestamp,
 511                            buffer_operation::Column::ReplicaId,
 512                        ])
 513                        .do_nothing()
 514                        .to_owned(),
 515                    )
 516                    .exec(&*tx)
 517                    .await?;
 518            } else {
 519                max_version = Vec::new();
 520            }
 521
 522            let mut connections = HashSet::default();
 523            let mut rows = channel_buffer_collaborator::Entity::find()
 524                .filter(
 525                    Condition::all()
 526                        .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
 527                )
 528                .stream(&*tx)
 529                .await?;
 530            while let Some(row) = rows.next().await {
 531                let row = row?;
 532                connections.insert(ConnectionId {
 533                    id: row.connection_id as u32,
 534                    owner_id: row.connection_server_id.0 as u32,
 535                });
 536            }
 537
 538            Ok((connections, buffer.epoch, max_version))
 539        })
 540        .await
 541    }
 542
 543    async fn save_max_operation(
 544        &self,
 545        user_id: UserId,
 546        buffer_id: BufferId,
 547        epoch: i32,
 548        replica_id: i32,
 549        lamport_timestamp: i32,
 550        tx: &DatabaseTransaction,
 551    ) -> Result<()> {
 552        buffer::Entity::update(buffer::ActiveModel {
 553            id: ActiveValue::Unchanged(buffer_id),
 554            epoch: ActiveValue::Unchanged(epoch),
 555            latest_operation_epoch: ActiveValue::Set(Some(epoch)),
 556            latest_operation_replica_id: ActiveValue::Set(Some(replica_id)),
 557            latest_operation_lamport_timestamp: ActiveValue::Set(Some(lamport_timestamp)),
 558            channel_id: ActiveValue::NotSet,
 559        })
 560        .exec(tx)
 561        .await?;
 562
 563        use observed_buffer_edits::Column;
 564        observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
 565            user_id: ActiveValue::Set(user_id),
 566            buffer_id: ActiveValue::Set(buffer_id),
 567            epoch: ActiveValue::Set(epoch),
 568            replica_id: ActiveValue::Set(replica_id),
 569            lamport_timestamp: ActiveValue::Set(lamport_timestamp),
 570        })
 571        .on_conflict(
 572            OnConflict::columns([Column::UserId, Column::BufferId])
 573                .update_columns([Column::Epoch, Column::LamportTimestamp, Column::ReplicaId])
 574                .action_cond_where(
 575                    Condition::any().add(Column::Epoch.lt(epoch)).add(
 576                        Condition::all().add(Column::Epoch.eq(epoch)).add(
 577                            Condition::any()
 578                                .add(Column::LamportTimestamp.lt(lamport_timestamp))
 579                                .add(
 580                                    Column::LamportTimestamp
 581                                        .eq(lamport_timestamp)
 582                                        .and(Column::ReplicaId.lt(replica_id)),
 583                                ),
 584                        ),
 585                    ),
 586                )
 587                .to_owned(),
 588        )
 589        .exec_without_returning(tx)
 590        .await?;
 591
 592        Ok(())
 593    }
 594
 595    async fn get_buffer_operation_serialization_version(
 596        &self,
 597        buffer_id: BufferId,
 598        epoch: i32,
 599        tx: &DatabaseTransaction,
 600    ) -> Result<i32> {
 601        Ok(buffer_snapshot::Entity::find()
 602            .filter(buffer_snapshot::Column::BufferId.eq(buffer_id))
 603            .filter(buffer_snapshot::Column::Epoch.eq(epoch))
 604            .select_only()
 605            .column(buffer_snapshot::Column::OperationSerializationVersion)
 606            .into_values::<_, QueryOperationSerializationVersion>()
 607            .one(tx)
 608            .await?
 609            .ok_or_else(|| anyhow!("missing buffer snapshot"))?)
 610    }
 611
 612    pub async fn get_channel_buffer(
 613        &self,
 614        channel_id: ChannelId,
 615        tx: &DatabaseTransaction,
 616    ) -> Result<buffer::Model> {
 617        Ok(channel::Model {
 618            id: channel_id,
 619            ..Default::default()
 620        }
 621        .find_related(buffer::Entity)
 622        .one(tx)
 623        .await?
 624        .ok_or_else(|| anyhow!("no such buffer"))?)
 625    }
 626
 627    async fn get_buffer_state(
 628        &self,
 629        buffer: &buffer::Model,
 630        tx: &DatabaseTransaction,
 631    ) -> Result<(
 632        String,
 633        Vec<proto::Operation>,
 634        Option<buffer_operation::Model>,
 635    )> {
 636        let id = buffer.id;
 637        let (base_text, version) = if buffer.epoch > 0 {
 638            let snapshot = buffer_snapshot::Entity::find()
 639                .filter(
 640                    buffer_snapshot::Column::BufferId
 641                        .eq(id)
 642                        .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
 643                )
 644                .one(tx)
 645                .await?
 646                .ok_or_else(|| anyhow!("no such snapshot"))?;
 647
 648            let version = snapshot.operation_serialization_version;
 649            (snapshot.text, version)
 650        } else {
 651            (String::new(), storage::SERIALIZATION_VERSION)
 652        };
 653
 654        let mut rows = buffer_operation::Entity::find()
 655            .filter(
 656                buffer_operation::Column::BufferId
 657                    .eq(id)
 658                    .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
 659            )
 660            .order_by_asc(buffer_operation::Column::LamportTimestamp)
 661            .order_by_asc(buffer_operation::Column::ReplicaId)
 662            .stream(tx)
 663            .await?;
 664
 665        let mut operations = Vec::new();
 666        let mut last_row = None;
 667        while let Some(row) = rows.next().await {
 668            let row = row?;
 669            last_row = Some(buffer_operation::Model {
 670                buffer_id: row.buffer_id,
 671                epoch: row.epoch,
 672                lamport_timestamp: row.lamport_timestamp,
 673                replica_id: row.replica_id,
 674                value: Default::default(),
 675            });
 676            operations.push(proto::Operation {
 677                variant: Some(operation_from_storage(row, version)?),
 678            });
 679        }
 680
 681        Ok((base_text, operations, last_row))
 682    }
 683
 684    async fn snapshot_channel_buffer(
 685        &self,
 686        channel_id: ChannelId,
 687        tx: &DatabaseTransaction,
 688    ) -> Result<()> {
 689        let buffer = self.get_channel_buffer(channel_id, tx).await?;
 690        let (base_text, operations, _) = self.get_buffer_state(&buffer, tx).await?;
 691        if operations.is_empty() {
 692            return Ok(());
 693        }
 694
 695        let mut text_buffer = text::Buffer::new(0, text::BufferId::new(1).unwrap(), base_text);
 696        text_buffer.apply_ops(operations.into_iter().filter_map(operation_from_wire));
 697
 698        let base_text = text_buffer.text();
 699        let epoch = buffer.epoch + 1;
 700
 701        buffer_snapshot::Model {
 702            buffer_id: buffer.id,
 703            epoch,
 704            text: base_text,
 705            operation_serialization_version: storage::SERIALIZATION_VERSION,
 706        }
 707        .into_active_model()
 708        .insert(tx)
 709        .await?;
 710
 711        buffer::ActiveModel {
 712            id: ActiveValue::Unchanged(buffer.id),
 713            epoch: ActiveValue::Set(epoch),
 714            latest_operation_epoch: ActiveValue::NotSet,
 715            latest_operation_replica_id: ActiveValue::NotSet,
 716            latest_operation_lamport_timestamp: ActiveValue::NotSet,
 717            channel_id: ActiveValue::NotSet,
 718        }
 719        .save(tx)
 720        .await?;
 721
 722        Ok(())
 723    }
 724
 725    pub async fn observe_buffer_version(
 726        &self,
 727        buffer_id: BufferId,
 728        user_id: UserId,
 729        epoch: i32,
 730        version: &[proto::VectorClockEntry],
 731    ) -> Result<()> {
 732        self.transaction(|tx| async move {
 733            // For now, combine concurrent operations.
 734            let Some(component) = version.iter().max_by_key(|version| version.timestamp) else {
 735                return Ok(());
 736            };
 737            self.save_max_operation(
 738                user_id,
 739                buffer_id,
 740                epoch,
 741                component.replica_id as i32,
 742                component.timestamp as i32,
 743                &tx,
 744            )
 745            .await?;
 746            Ok(())
 747        })
 748        .await
 749    }
 750
 751    pub async fn observed_channel_buffer_changes(
 752        &self,
 753        channel_ids_by_buffer_id: &HashMap<BufferId, ChannelId>,
 754        user_id: UserId,
 755        tx: &DatabaseTransaction,
 756    ) -> Result<Vec<proto::ChannelBufferVersion>> {
 757        let observed_operations = observed_buffer_edits::Entity::find()
 758            .filter(observed_buffer_edits::Column::UserId.eq(user_id))
 759            .filter(
 760                observed_buffer_edits::Column::BufferId
 761                    .is_in(channel_ids_by_buffer_id.keys().copied()),
 762            )
 763            .all(tx)
 764            .await?;
 765
 766        Ok(observed_operations
 767            .iter()
 768            .flat_map(|op| {
 769                Some(proto::ChannelBufferVersion {
 770                    channel_id: channel_ids_by_buffer_id.get(&op.buffer_id)?.to_proto(),
 771                    epoch: op.epoch as u64,
 772                    version: vec![proto::VectorClockEntry {
 773                        replica_id: op.replica_id as u32,
 774                        timestamp: op.lamport_timestamp as u32,
 775                    }],
 776                })
 777            })
 778            .collect())
 779    }
 780}
 781
 782fn operation_to_storage(
 783    operation: &proto::Operation,
 784    buffer: &buffer::Model,
 785    _format: i32,
 786) -> Option<buffer_operation::ActiveModel> {
 787    let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? {
 788        proto::operation::Variant::Edit(operation) => (
 789            operation.replica_id,
 790            operation.lamport_timestamp,
 791            storage::Operation {
 792                version: version_to_storage(&operation.version),
 793                is_undo: false,
 794                edit_ranges: operation
 795                    .ranges
 796                    .iter()
 797                    .map(|range| storage::Range {
 798                        start: range.start,
 799                        end: range.end,
 800                    })
 801                    .collect(),
 802                edit_texts: operation.new_text.clone(),
 803                undo_counts: Vec::new(),
 804            },
 805        ),
 806        proto::operation::Variant::Undo(operation) => (
 807            operation.replica_id,
 808            operation.lamport_timestamp,
 809            storage::Operation {
 810                version: version_to_storage(&operation.version),
 811                is_undo: true,
 812                edit_ranges: Vec::new(),
 813                edit_texts: Vec::new(),
 814                undo_counts: operation
 815                    .counts
 816                    .iter()
 817                    .map(|entry| storage::UndoCount {
 818                        replica_id: entry.replica_id,
 819                        lamport_timestamp: entry.lamport_timestamp,
 820                        count: entry.count,
 821                    })
 822                    .collect(),
 823            },
 824        ),
 825        _ => None?,
 826    };
 827
 828    Some(buffer_operation::ActiveModel {
 829        buffer_id: ActiveValue::Set(buffer.id),
 830        epoch: ActiveValue::Set(buffer.epoch),
 831        replica_id: ActiveValue::Set(replica_id as i32),
 832        lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32),
 833        value: ActiveValue::Set(value.encode_to_vec()),
 834    })
 835}
 836
 837fn operation_from_storage(
 838    row: buffer_operation::Model,
 839    _format_version: i32,
 840) -> Result<proto::operation::Variant, Error> {
 841    let operation =
 842        storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{}", error))?;
 843    let version = version_from_storage(&operation.version);
 844    Ok(if operation.is_undo {
 845        proto::operation::Variant::Undo(proto::operation::Undo {
 846            replica_id: row.replica_id as u32,
 847            lamport_timestamp: row.lamport_timestamp as u32,
 848            version,
 849            counts: operation
 850                .undo_counts
 851                .iter()
 852                .map(|entry| proto::UndoCount {
 853                    replica_id: entry.replica_id,
 854                    lamport_timestamp: entry.lamport_timestamp,
 855                    count: entry.count,
 856                })
 857                .collect(),
 858        })
 859    } else {
 860        proto::operation::Variant::Edit(proto::operation::Edit {
 861            replica_id: row.replica_id as u32,
 862            lamport_timestamp: row.lamport_timestamp as u32,
 863            version,
 864            ranges: operation
 865                .edit_ranges
 866                .into_iter()
 867                .map(|range| proto::Range {
 868                    start: range.start,
 869                    end: range.end,
 870                })
 871                .collect(),
 872            new_text: operation.edit_texts,
 873        })
 874    })
 875}
 876
 877fn version_to_storage(version: &[proto::VectorClockEntry]) -> Vec<storage::VectorClockEntry> {
 878    version
 879        .iter()
 880        .map(|entry| storage::VectorClockEntry {
 881            replica_id: entry.replica_id,
 882            timestamp: entry.timestamp,
 883        })
 884        .collect()
 885}
 886
 887fn version_from_storage(version: &[storage::VectorClockEntry]) -> Vec<proto::VectorClockEntry> {
 888    version
 889        .iter()
 890        .map(|entry| proto::VectorClockEntry {
 891            replica_id: entry.replica_id,
 892            timestamp: entry.timestamp,
 893        })
 894        .collect()
 895}
 896
 897// This is currently a manual copy of the deserialization code in the client's language crate
 898pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operation> {
 899    match operation.variant? {
 900        proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
 901            timestamp: clock::Lamport {
 902                replica_id: edit.replica_id as text::ReplicaId,
 903                value: edit.lamport_timestamp,
 904            },
 905            version: version_from_wire(&edit.version),
 906            ranges: edit
 907                .ranges
 908                .into_iter()
 909                .map(|range| {
 910                    text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
 911                })
 912                .collect(),
 913            new_text: edit.new_text.into_iter().map(Arc::from).collect(),
 914        })),
 915        proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation {
 916            timestamp: clock::Lamport {
 917                replica_id: undo.replica_id as text::ReplicaId,
 918                value: undo.lamport_timestamp,
 919            },
 920            version: version_from_wire(&undo.version),
 921            counts: undo
 922                .counts
 923                .into_iter()
 924                .map(|c| {
 925                    (
 926                        clock::Lamport {
 927                            replica_id: c.replica_id as text::ReplicaId,
 928                            value: c.lamport_timestamp,
 929                        },
 930                        c.count,
 931                    )
 932                })
 933                .collect(),
 934        })),
 935        _ => None,
 936    }
 937}
 938
 939fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
 940    let mut version = clock::Global::new();
 941    for entry in message {
 942        version.observe(clock::Lamport {
 943            replica_id: entry.replica_id as text::ReplicaId,
 944            value: entry.timestamp,
 945        });
 946    }
 947    version
 948}
 949
 950fn version_to_wire(version: &clock::Global) -> Vec<proto::VectorClockEntry> {
 951    let mut message = Vec::new();
 952    for entry in version.iter() {
 953        message.push(proto::VectorClockEntry {
 954            replica_id: entry.replica_id as u32,
 955            timestamp: entry.value,
 956        });
 957    }
 958    message
 959}
 960
 961#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
 962enum QueryOperationSerializationVersion {
 963    OperationSerializationVersion,
 964}
 965
 966mod storage {
 967    #![allow(non_snake_case)]
 968    use prost::Message;
 969    pub const SERIALIZATION_VERSION: i32 = 1;
 970
 971    #[derive(Message)]
 972    pub struct Operation {
 973        #[prost(message, repeated, tag = "2")]
 974        pub version: Vec<VectorClockEntry>,
 975        #[prost(bool, tag = "3")]
 976        pub is_undo: bool,
 977        #[prost(message, repeated, tag = "4")]
 978        pub edit_ranges: Vec<Range>,
 979        #[prost(string, repeated, tag = "5")]
 980        pub edit_texts: Vec<String>,
 981        #[prost(message, repeated, tag = "6")]
 982        pub undo_counts: Vec<UndoCount>,
 983    }
 984
 985    #[derive(Message)]
 986    pub struct VectorClockEntry {
 987        #[prost(uint32, tag = "1")]
 988        pub replica_id: u32,
 989        #[prost(uint32, tag = "2")]
 990        pub timestamp: u32,
 991    }
 992
 993    #[derive(Message)]
 994    pub struct Range {
 995        #[prost(uint64, tag = "1")]
 996        pub start: u64,
 997        #[prost(uint64, tag = "2")]
 998        pub end: u64,
 999    }
1000
1001    #[derive(Message)]
1002    pub struct UndoCount {
1003        #[prost(uint32, tag = "1")]
1004        pub replica_id: u32,
1005        #[prost(uint32, tag = "2")]
1006        pub lamport_timestamp: u32,
1007        #[prost(uint32, tag = "3")]
1008        pub count: u32,
1009    }
1010}