buffers.rs

  1use super::*;
  2use prost::Message;
  3use text::{EditOperation, UndoOperation};
  4
  5impl Database {
  6    pub async fn join_channel_buffer(
  7        &self,
  8        channel_id: ChannelId,
  9        user_id: UserId,
 10        connection: ConnectionId,
 11    ) -> Result<proto::JoinChannelBufferResponse> {
 12        self.transaction(|tx| async move {
 13            self.check_user_is_channel_member(channel_id, user_id, &tx)
 14                .await?;
 15
 16            let buffer = channel::Model {
 17                id: channel_id,
 18                ..Default::default()
 19            }
 20            .find_related(buffer::Entity)
 21            .one(&*tx)
 22            .await?;
 23
 24            let buffer = if let Some(buffer) = buffer {
 25                buffer
 26            } else {
 27                let buffer = buffer::ActiveModel {
 28                    channel_id: ActiveValue::Set(channel_id),
 29                    ..Default::default()
 30                }
 31                .insert(&*tx)
 32                .await?;
 33                buffer_snapshot::ActiveModel {
 34                    buffer_id: ActiveValue::Set(buffer.id),
 35                    epoch: ActiveValue::Set(0),
 36                    text: ActiveValue::Set(String::new()),
 37                    operation_serialization_version: ActiveValue::Set(
 38                        storage::SERIALIZATION_VERSION,
 39                    ),
 40                }
 41                .insert(&*tx)
 42                .await?;
 43                buffer
 44            };
 45
 46            // Join the collaborators
 47            let mut collaborators = channel_buffer_collaborator::Entity::find()
 48                .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
 49                .all(&*tx)
 50                .await?;
 51            let replica_ids = collaborators
 52                .iter()
 53                .map(|c| c.replica_id)
 54                .collect::<HashSet<_>>();
 55            let mut replica_id = ReplicaId(0);
 56            while replica_ids.contains(&replica_id) {
 57                replica_id.0 += 1;
 58            }
 59            let collaborator = channel_buffer_collaborator::ActiveModel {
 60                channel_id: ActiveValue::Set(channel_id),
 61                connection_id: ActiveValue::Set(connection.id as i32),
 62                connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
 63                user_id: ActiveValue::Set(user_id),
 64                replica_id: ActiveValue::Set(replica_id),
 65                ..Default::default()
 66            }
 67            .insert(&*tx)
 68            .await?;
 69            collaborators.push(collaborator);
 70
 71            let (base_text, operations) = self.get_buffer_state(&buffer, &tx).await?;
 72
 73            Ok(proto::JoinChannelBufferResponse {
 74                buffer_id: buffer.id.to_proto(),
 75                replica_id: replica_id.to_proto() as u32,
 76                base_text,
 77                operations,
 78                epoch: buffer.epoch as u64,
 79                collaborators: collaborators
 80                    .into_iter()
 81                    .map(|collaborator| proto::Collaborator {
 82                        peer_id: Some(collaborator.connection().into()),
 83                        user_id: collaborator.user_id.to_proto(),
 84                        replica_id: collaborator.replica_id.0 as u32,
 85                    })
 86                    .collect(),
 87            })
 88        })
 89        .await
 90    }
 91
 92    pub async fn rejoin_channel_buffers(
 93        &self,
 94        buffers: &[proto::ChannelBufferVersion],
 95        user_id: UserId,
 96        connection_id: ConnectionId,
 97    ) -> Result<Vec<RejoinedChannelBuffer>> {
 98        self.transaction(|tx| async move {
 99            let mut results = Vec::new();
100            for client_buffer in buffers {
101                let channel_id = ChannelId::from_proto(client_buffer.channel_id);
102                if self
103                    .check_user_is_channel_member(channel_id, user_id, &*tx)
104                    .await
105                    .is_err()
106                {
107                    log::info!("user is not a member of channel");
108                    continue;
109                }
110
111                let buffer = self.get_channel_buffer(channel_id, &*tx).await?;
112                let mut collaborators = channel_buffer_collaborator::Entity::find()
113                    .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
114                    .all(&*tx)
115                    .await?;
116
117                // If the buffer epoch hasn't changed since the client lost
118                // connection, then the client's buffer can be syncronized with
119                // the server's buffer.
120                if buffer.epoch as u64 != client_buffer.epoch {
121                    log::info!("can't rejoin buffer, epoch has changed");
122                    continue;
123                }
124
125                // Find the collaborator record for this user's previous lost
126                // connection. Update it with the new connection id.
127                let server_id = ServerId(connection_id.owner_id as i32);
128                let Some(self_collaborator) = collaborators.iter_mut().find(|c| {
129                    c.user_id == user_id
130                        && (c.connection_lost || c.connection_server_id != server_id)
131                }) else {
132                    log::info!("can't rejoin buffer, no previous collaborator found");
133                    continue;
134                };
135                let old_connection_id = self_collaborator.connection();
136                *self_collaborator = channel_buffer_collaborator::ActiveModel {
137                    id: ActiveValue::Unchanged(self_collaborator.id),
138                    connection_id: ActiveValue::Set(connection_id.id as i32),
139                    connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
140                    connection_lost: ActiveValue::Set(false),
141                    ..Default::default()
142                }
143                .update(&*tx)
144                .await?;
145
146                let client_version = version_from_wire(&client_buffer.version);
147                let serialization_version = self
148                    .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &*tx)
149                    .await?;
150
151                let mut rows = buffer_operation::Entity::find()
152                    .filter(
153                        buffer_operation::Column::BufferId
154                            .eq(buffer.id)
155                            .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
156                    )
157                    .stream(&*tx)
158                    .await?;
159
160                // Find the server's version vector and any operations
161                // that the client has not seen.
162                let mut server_version = clock::Global::new();
163                let mut operations = Vec::new();
164                while let Some(row) = rows.next().await {
165                    let row = row?;
166                    let timestamp = clock::Lamport {
167                        replica_id: row.replica_id as u16,
168                        value: row.lamport_timestamp as u32,
169                    };
170                    server_version.observe(timestamp);
171                    if !client_version.observed(timestamp) {
172                        operations.push(proto::Operation {
173                            variant: Some(operation_from_storage(row, serialization_version)?),
174                        })
175                    }
176                }
177
178                results.push(RejoinedChannelBuffer {
179                    old_connection_id,
180                    buffer: proto::RejoinedChannelBuffer {
181                        channel_id: client_buffer.channel_id,
182                        version: version_to_wire(&server_version),
183                        operations,
184                        collaborators: collaborators
185                            .into_iter()
186                            .map(|collaborator| proto::Collaborator {
187                                peer_id: Some(collaborator.connection().into()),
188                                user_id: collaborator.user_id.to_proto(),
189                                replica_id: collaborator.replica_id.0 as u32,
190                            })
191                            .collect(),
192                    },
193                });
194            }
195
196            Ok(results)
197        })
198        .await
199    }
200
201    pub async fn clear_stale_channel_buffer_collaborators(
202        &self,
203        channel_id: ChannelId,
204        server_id: ServerId,
205    ) -> Result<RefreshedChannelBuffer> {
206        self.transaction(|tx| async move {
207            let collaborators = channel_buffer_collaborator::Entity::find()
208                .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
209                .all(&*tx)
210                .await?;
211
212            let mut connection_ids = Vec::new();
213            let mut removed_collaborators = Vec::new();
214            let mut collaborator_ids_to_remove = Vec::new();
215            for collaborator in &collaborators {
216                if !collaborator.connection_lost && collaborator.connection_server_id == server_id {
217                    connection_ids.push(collaborator.connection());
218                } else {
219                    removed_collaborators.push(proto::RemoveChannelBufferCollaborator {
220                        channel_id: channel_id.to_proto(),
221                        peer_id: Some(collaborator.connection().into()),
222                    });
223                    collaborator_ids_to_remove.push(collaborator.id);
224                }
225            }
226
227            channel_buffer_collaborator::Entity::delete_many()
228                .filter(channel_buffer_collaborator::Column::Id.is_in(collaborator_ids_to_remove))
229                .exec(&*tx)
230                .await?;
231
232            Ok(RefreshedChannelBuffer {
233                connection_ids,
234                removed_collaborators,
235            })
236        })
237        .await
238    }
239
240    pub async fn leave_channel_buffer(
241        &self,
242        channel_id: ChannelId,
243        connection: ConnectionId,
244    ) -> Result<Vec<ConnectionId>> {
245        self.transaction(|tx| async move {
246            self.leave_channel_buffer_internal(channel_id, connection, &*tx)
247                .await
248        })
249        .await
250    }
251
252    pub async fn channel_buffer_connection_lost(
253        &self,
254        connection: ConnectionId,
255        tx: &DatabaseTransaction,
256    ) -> Result<()> {
257        channel_buffer_collaborator::Entity::update_many()
258            .filter(
259                Condition::all()
260                    .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
261                    .add(
262                        channel_buffer_collaborator::Column::ConnectionServerId
263                            .eq(connection.owner_id as i32),
264                    ),
265            )
266            .set(channel_buffer_collaborator::ActiveModel {
267                connection_lost: ActiveValue::set(true),
268                ..Default::default()
269            })
270            .exec(&*tx)
271            .await?;
272        Ok(())
273    }
274
275    pub async fn leave_channel_buffers(
276        &self,
277        connection: ConnectionId,
278    ) -> Result<Vec<(ChannelId, Vec<ConnectionId>)>> {
279        self.transaction(|tx| async move {
280            #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
281            enum QueryChannelIds {
282                ChannelId,
283            }
284
285            let channel_ids: Vec<ChannelId> = channel_buffer_collaborator::Entity::find()
286                .select_only()
287                .column(channel_buffer_collaborator::Column::ChannelId)
288                .filter(Condition::all().add(
289                    channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32),
290                ))
291                .into_values::<_, QueryChannelIds>()
292                .all(&*tx)
293                .await?;
294
295            let mut result = Vec::new();
296            for channel_id in channel_ids {
297                let collaborators = self
298                    .leave_channel_buffer_internal(channel_id, connection, &*tx)
299                    .await?;
300                result.push((channel_id, collaborators));
301            }
302
303            Ok(result)
304        })
305        .await
306    }
307
308    pub async fn leave_channel_buffer_internal(
309        &self,
310        channel_id: ChannelId,
311        connection: ConnectionId,
312        tx: &DatabaseTransaction,
313    ) -> Result<Vec<ConnectionId>> {
314        let result = channel_buffer_collaborator::Entity::delete_many()
315            .filter(
316                Condition::all()
317                    .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
318                    .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
319                    .add(
320                        channel_buffer_collaborator::Column::ConnectionServerId
321                            .eq(connection.owner_id as i32),
322                    ),
323            )
324            .exec(&*tx)
325            .await?;
326        if result.rows_affected == 0 {
327            Err(anyhow!("not a collaborator on this project"))?;
328        }
329
330        let mut connections = Vec::new();
331        let mut rows = channel_buffer_collaborator::Entity::find()
332            .filter(
333                Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
334            )
335            .stream(&*tx)
336            .await?;
337        while let Some(row) = rows.next().await {
338            let row = row?;
339            connections.push(ConnectionId {
340                id: row.connection_id as u32,
341                owner_id: row.connection_server_id.0 as u32,
342            });
343        }
344
345        drop(rows);
346
347        if connections.is_empty() {
348            self.snapshot_channel_buffer(channel_id, &tx).await?;
349        }
350
351        Ok(connections)
352    }
353
354    pub async fn get_channel_buffer_collaborators(
355        &self,
356        channel_id: ChannelId,
357    ) -> Result<Vec<UserId>> {
358        self.transaction(|tx| async move {
359            #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
360            enum QueryUserIds {
361                UserId,
362            }
363
364            let users: Vec<UserId> = channel_buffer_collaborator::Entity::find()
365                .select_only()
366                .column(channel_buffer_collaborator::Column::UserId)
367                .filter(
368                    Condition::all()
369                        .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
370                )
371                .into_values::<_, QueryUserIds>()
372                .all(&*tx)
373                .await?;
374
375            Ok(users)
376        })
377        .await
378    }
379
380    pub async fn update_channel_buffer(
381        &self,
382        channel_id: ChannelId,
383        user: UserId,
384        operations: &[proto::Operation],
385    ) -> Result<Vec<ConnectionId>> {
386        self.transaction(move |tx| async move {
387            self.check_user_is_channel_member(channel_id, user, &*tx)
388                .await?;
389
390            let buffer = buffer::Entity::find()
391                .filter(buffer::Column::ChannelId.eq(channel_id))
392                .one(&*tx)
393                .await?
394                .ok_or_else(|| anyhow!("no such buffer"))?;
395
396            let serialization_version = self
397                .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &*tx)
398                .await?;
399
400            let operations = operations
401                .iter()
402                .filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
403                .collect::<Vec<_>>();
404            if !operations.is_empty() {
405                buffer_operation::Entity::insert_many(operations)
406                    .on_conflict(
407                        OnConflict::columns([
408                            buffer_operation::Column::BufferId,
409                            buffer_operation::Column::Epoch,
410                            buffer_operation::Column::LamportTimestamp,
411                            buffer_operation::Column::ReplicaId,
412                        ])
413                        .do_nothing()
414                        .to_owned(),
415                    )
416                    .exec(&*tx)
417                    .await?;
418            }
419
420            let mut connections = Vec::new();
421            let mut rows = channel_buffer_collaborator::Entity::find()
422                .filter(
423                    Condition::all()
424                        .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
425                )
426                .stream(&*tx)
427                .await?;
428            while let Some(row) = rows.next().await {
429                let row = row?;
430                connections.push(ConnectionId {
431                    id: row.connection_id as u32,
432                    owner_id: row.connection_server_id.0 as u32,
433                });
434            }
435
436            Ok(connections)
437        })
438        .await
439    }
440
441    async fn get_buffer_operation_serialization_version(
442        &self,
443        buffer_id: BufferId,
444        epoch: i32,
445        tx: &DatabaseTransaction,
446    ) -> Result<i32> {
447        Ok(buffer_snapshot::Entity::find()
448            .filter(buffer_snapshot::Column::BufferId.eq(buffer_id))
449            .filter(buffer_snapshot::Column::Epoch.eq(epoch))
450            .select_only()
451            .column(buffer_snapshot::Column::OperationSerializationVersion)
452            .into_values::<_, QueryOperationSerializationVersion>()
453            .one(&*tx)
454            .await?
455            .ok_or_else(|| anyhow!("missing buffer snapshot"))?)
456    }
457
458    async fn get_channel_buffer(
459        &self,
460        channel_id: ChannelId,
461        tx: &DatabaseTransaction,
462    ) -> Result<buffer::Model> {
463        Ok(channel::Model {
464            id: channel_id,
465            ..Default::default()
466        }
467        .find_related(buffer::Entity)
468        .one(&*tx)
469        .await?
470        .ok_or_else(|| anyhow!("no such buffer"))?)
471    }
472
473    async fn get_buffer_state(
474        &self,
475        buffer: &buffer::Model,
476        tx: &DatabaseTransaction,
477    ) -> Result<(String, Vec<proto::Operation>)> {
478        let id = buffer.id;
479        let (base_text, version) = if buffer.epoch > 0 {
480            let snapshot = buffer_snapshot::Entity::find()
481                .filter(
482                    buffer_snapshot::Column::BufferId
483                        .eq(id)
484                        .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
485                )
486                .one(&*tx)
487                .await?
488                .ok_or_else(|| anyhow!("no such snapshot"))?;
489
490            let version = snapshot.operation_serialization_version;
491            (snapshot.text, version)
492        } else {
493            (String::new(), storage::SERIALIZATION_VERSION)
494        };
495
496        let mut rows = buffer_operation::Entity::find()
497            .filter(
498                buffer_operation::Column::BufferId
499                    .eq(id)
500                    .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
501            )
502            .stream(&*tx)
503            .await?;
504        let mut operations = Vec::new();
505        while let Some(row) = rows.next().await {
506            operations.push(proto::Operation {
507                variant: Some(operation_from_storage(row?, version)?),
508            })
509        }
510
511        Ok((base_text, operations))
512    }
513
514    async fn snapshot_channel_buffer(
515        &self,
516        channel_id: ChannelId,
517        tx: &DatabaseTransaction,
518    ) -> Result<()> {
519        let buffer = self.get_channel_buffer(channel_id, tx).await?;
520        let (base_text, operations) = self.get_buffer_state(&buffer, tx).await?;
521        if operations.is_empty() {
522            return Ok(());
523        }
524
525        let mut text_buffer = text::Buffer::new(0, 0, base_text);
526        text_buffer
527            .apply_ops(operations.into_iter().filter_map(operation_from_wire))
528            .unwrap();
529
530        let base_text = text_buffer.text();
531        let epoch = buffer.epoch + 1;
532
533        buffer_snapshot::Model {
534            buffer_id: buffer.id,
535            epoch,
536            text: base_text,
537            operation_serialization_version: storage::SERIALIZATION_VERSION,
538        }
539        .into_active_model()
540        .insert(tx)
541        .await?;
542
543        buffer::ActiveModel {
544            id: ActiveValue::Unchanged(buffer.id),
545            epoch: ActiveValue::Set(epoch),
546            ..Default::default()
547        }
548        .save(tx)
549        .await?;
550
551        Ok(())
552    }
553}
554
555fn operation_to_storage(
556    operation: &proto::Operation,
557    buffer: &buffer::Model,
558    _format: i32,
559) -> Option<buffer_operation::ActiveModel> {
560    let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? {
561        proto::operation::Variant::Edit(operation) => (
562            operation.replica_id,
563            operation.lamport_timestamp,
564            storage::Operation {
565                version: version_to_storage(&operation.version),
566                is_undo: false,
567                edit_ranges: operation
568                    .ranges
569                    .iter()
570                    .map(|range| storage::Range {
571                        start: range.start,
572                        end: range.end,
573                    })
574                    .collect(),
575                edit_texts: operation.new_text.clone(),
576                undo_counts: Vec::new(),
577            },
578        ),
579        proto::operation::Variant::Undo(operation) => (
580            operation.replica_id,
581            operation.lamport_timestamp,
582            storage::Operation {
583                version: version_to_storage(&operation.version),
584                is_undo: true,
585                edit_ranges: Vec::new(),
586                edit_texts: Vec::new(),
587                undo_counts: operation
588                    .counts
589                    .iter()
590                    .map(|entry| storage::UndoCount {
591                        replica_id: entry.replica_id,
592                        lamport_timestamp: entry.lamport_timestamp,
593                        count: entry.count,
594                    })
595                    .collect(),
596            },
597        ),
598        _ => None?,
599    };
600
601    Some(buffer_operation::ActiveModel {
602        buffer_id: ActiveValue::Set(buffer.id),
603        epoch: ActiveValue::Set(buffer.epoch),
604        replica_id: ActiveValue::Set(replica_id as i32),
605        lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32),
606        value: ActiveValue::Set(value.encode_to_vec()),
607    })
608}
609
610fn operation_from_storage(
611    row: buffer_operation::Model,
612    _format_version: i32,
613) -> Result<proto::operation::Variant, Error> {
614    let operation =
615        storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{}", error))?;
616    let version = version_from_storage(&operation.version);
617    Ok(if operation.is_undo {
618        proto::operation::Variant::Undo(proto::operation::Undo {
619            replica_id: row.replica_id as u32,
620            lamport_timestamp: row.lamport_timestamp as u32,
621            version,
622            counts: operation
623                .undo_counts
624                .iter()
625                .map(|entry| proto::UndoCount {
626                    replica_id: entry.replica_id,
627                    lamport_timestamp: entry.lamport_timestamp,
628                    count: entry.count,
629                })
630                .collect(),
631        })
632    } else {
633        proto::operation::Variant::Edit(proto::operation::Edit {
634            replica_id: row.replica_id as u32,
635            lamport_timestamp: row.lamport_timestamp as u32,
636            version,
637            ranges: operation
638                .edit_ranges
639                .into_iter()
640                .map(|range| proto::Range {
641                    start: range.start,
642                    end: range.end,
643                })
644                .collect(),
645            new_text: operation.edit_texts,
646        })
647    })
648}
649
650fn version_to_storage(version: &Vec<proto::VectorClockEntry>) -> Vec<storage::VectorClockEntry> {
651    version
652        .iter()
653        .map(|entry| storage::VectorClockEntry {
654            replica_id: entry.replica_id,
655            timestamp: entry.timestamp,
656        })
657        .collect()
658}
659
660fn version_from_storage(version: &Vec<storage::VectorClockEntry>) -> Vec<proto::VectorClockEntry> {
661    version
662        .iter()
663        .map(|entry| proto::VectorClockEntry {
664            replica_id: entry.replica_id,
665            timestamp: entry.timestamp,
666        })
667        .collect()
668}
669
670// This is currently a manual copy of the deserialization code in the client's langauge crate
671pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operation> {
672    match operation.variant? {
673        proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
674            timestamp: clock::Lamport {
675                replica_id: edit.replica_id as text::ReplicaId,
676                value: edit.lamport_timestamp,
677            },
678            version: version_from_wire(&edit.version),
679            ranges: edit
680                .ranges
681                .into_iter()
682                .map(|range| {
683                    text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
684                })
685                .collect(),
686            new_text: edit.new_text.into_iter().map(Arc::from).collect(),
687        })),
688        proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation {
689            timestamp: clock::Lamport {
690                replica_id: undo.replica_id as text::ReplicaId,
691                value: undo.lamport_timestamp,
692            },
693            version: version_from_wire(&undo.version),
694            counts: undo
695                .counts
696                .into_iter()
697                .map(|c| {
698                    (
699                        clock::Lamport {
700                            replica_id: c.replica_id as text::ReplicaId,
701                            value: c.lamport_timestamp,
702                        },
703                        c.count,
704                    )
705                })
706                .collect(),
707        })),
708        _ => None,
709    }
710}
711
712fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
713    let mut version = clock::Global::new();
714    for entry in message {
715        version.observe(clock::Lamport {
716            replica_id: entry.replica_id as text::ReplicaId,
717            value: entry.timestamp,
718        });
719    }
720    version
721}
722
723fn version_to_wire(version: &clock::Global) -> Vec<proto::VectorClockEntry> {
724    let mut message = Vec::new();
725    for entry in version.iter() {
726        message.push(proto::VectorClockEntry {
727            replica_id: entry.replica_id as u32,
728            timestamp: entry.value,
729        });
730    }
731    message
732}
733
734#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
735enum QueryOperationSerializationVersion {
736    OperationSerializationVersion,
737}
738
739mod storage {
740    #![allow(non_snake_case)]
741    use prost::Message;
742    pub const SERIALIZATION_VERSION: i32 = 1;
743
744    #[derive(Message)]
745    pub struct Operation {
746        #[prost(message, repeated, tag = "2")]
747        pub version: Vec<VectorClockEntry>,
748        #[prost(bool, tag = "3")]
749        pub is_undo: bool,
750        #[prost(message, repeated, tag = "4")]
751        pub edit_ranges: Vec<Range>,
752        #[prost(string, repeated, tag = "5")]
753        pub edit_texts: Vec<String>,
754        #[prost(message, repeated, tag = "6")]
755        pub undo_counts: Vec<UndoCount>,
756    }
757
758    #[derive(Message)]
759    pub struct VectorClockEntry {
760        #[prost(uint32, tag = "1")]
761        pub replica_id: u32,
762        #[prost(uint32, tag = "2")]
763        pub timestamp: u32,
764    }
765
766    #[derive(Message)]
767    pub struct Range {
768        #[prost(uint64, tag = "1")]
769        pub start: u64,
770        #[prost(uint64, tag = "2")]
771        pub end: u64,
772    }
773
774    #[derive(Message)]
775    pub struct UndoCount {
776        #[prost(uint32, tag = "1")]
777        pub replica_id: u32,
778        #[prost(uint32, tag = "2")]
779        pub lamport_timestamp: u32,
780        #[prost(uint32, tag = "3")]
781        pub count: u32,
782    }
783}