buffers.rs

  1use super::*;
  2use prost::Message;
  3use text::{EditOperation, UndoOperation};
  4
  5pub struct LeftChannelBuffer {
  6    pub channel_id: ChannelId,
  7    pub collaborators: Vec<proto::Collaborator>,
  8    pub connections: Vec<ConnectionId>,
  9}
 10
 11impl Database {
 12    pub async fn join_channel_buffer(
 13        &self,
 14        channel_id: ChannelId,
 15        user_id: UserId,
 16        connection: ConnectionId,
 17    ) -> Result<proto::JoinChannelBufferResponse> {
 18        self.transaction(|tx| async move {
 19            self.check_user_is_channel_member(channel_id, user_id, &tx)
 20                .await?;
 21
 22            let buffer = channel::Model {
 23                id: channel_id,
 24                ..Default::default()
 25            }
 26            .find_related(buffer::Entity)
 27            .one(&*tx)
 28            .await?;
 29
 30            let buffer = if let Some(buffer) = buffer {
 31                buffer
 32            } else {
 33                let buffer = buffer::ActiveModel {
 34                    channel_id: ActiveValue::Set(channel_id),
 35                    ..Default::default()
 36                }
 37                .insert(&*tx)
 38                .await?;
 39                buffer_snapshot::ActiveModel {
 40                    buffer_id: ActiveValue::Set(buffer.id),
 41                    epoch: ActiveValue::Set(0),
 42                    text: ActiveValue::Set(String::new()),
 43                    operation_serialization_version: ActiveValue::Set(
 44                        storage::SERIALIZATION_VERSION,
 45                    ),
 46                }
 47                .insert(&*tx)
 48                .await?;
 49                buffer
 50            };
 51
 52            // Join the collaborators
 53            let mut collaborators = channel_buffer_collaborator::Entity::find()
 54                .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
 55                .all(&*tx)
 56                .await?;
 57            let replica_ids = collaborators
 58                .iter()
 59                .map(|c| c.replica_id)
 60                .collect::<HashSet<_>>();
 61            let mut replica_id = ReplicaId(0);
 62            while replica_ids.contains(&replica_id) {
 63                replica_id.0 += 1;
 64            }
 65            let collaborator = channel_buffer_collaborator::ActiveModel {
 66                channel_id: ActiveValue::Set(channel_id),
 67                connection_id: ActiveValue::Set(connection.id as i32),
 68                connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
 69                user_id: ActiveValue::Set(user_id),
 70                replica_id: ActiveValue::Set(replica_id),
 71                ..Default::default()
 72            }
 73            .insert(&*tx)
 74            .await?;
 75            collaborators.push(collaborator);
 76
 77            let (base_text, operations) = self.get_buffer_state(&buffer, &tx).await?;
 78
 79            Ok(proto::JoinChannelBufferResponse {
 80                buffer_id: buffer.id.to_proto(),
 81                replica_id: replica_id.to_proto() as u32,
 82                base_text,
 83                operations,
 84                epoch: buffer.epoch as u64,
 85                collaborators: collaborators
 86                    .into_iter()
 87                    .map(|collaborator| proto::Collaborator {
 88                        peer_id: Some(collaborator.connection().into()),
 89                        user_id: collaborator.user_id.to_proto(),
 90                        replica_id: collaborator.replica_id.0 as u32,
 91                    })
 92                    .collect(),
 93            })
 94        })
 95        .await
 96    }
 97
 98    pub async fn rejoin_channel_buffers(
 99        &self,
100        buffers: &[proto::ChannelBufferVersion],
101        user_id: UserId,
102        connection_id: ConnectionId,
103    ) -> Result<Vec<RejoinedChannelBuffer>> {
104        self.transaction(|tx| async move {
105            let mut results = Vec::new();
106            for client_buffer in buffers {
107                let channel_id = ChannelId::from_proto(client_buffer.channel_id);
108                if self
109                    .check_user_is_channel_member(channel_id, user_id, &*tx)
110                    .await
111                    .is_err()
112                {
113                    log::info!("user is not a member of channel");
114                    continue;
115                }
116
117                let buffer = self.get_channel_buffer(channel_id, &*tx).await?;
118                let mut collaborators = channel_buffer_collaborator::Entity::find()
119                    .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
120                    .all(&*tx)
121                    .await?;
122
123                // If the buffer epoch hasn't changed since the client lost
124                // connection, then the client's buffer can be syncronized with
125                // the server's buffer.
126                if buffer.epoch as u64 != client_buffer.epoch {
127                    log::info!("can't rejoin buffer, epoch has changed");
128                    continue;
129                }
130
131                // Find the collaborator record for this user's previous lost
132                // connection. Update it with the new connection id.
133                let server_id = ServerId(connection_id.owner_id as i32);
134                let Some(self_collaborator) = collaborators.iter_mut().find(|c| {
135                    c.user_id == user_id
136                        && (c.connection_lost || c.connection_server_id != server_id)
137                }) else {
138                    log::info!("can't rejoin buffer, no previous collaborator found");
139                    continue;
140                };
141                let old_connection_id = self_collaborator.connection();
142                *self_collaborator = channel_buffer_collaborator::ActiveModel {
143                    id: ActiveValue::Unchanged(self_collaborator.id),
144                    connection_id: ActiveValue::Set(connection_id.id as i32),
145                    connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
146                    connection_lost: ActiveValue::Set(false),
147                    ..Default::default()
148                }
149                .update(&*tx)
150                .await?;
151
152                let client_version = version_from_wire(&client_buffer.version);
153                let serialization_version = self
154                    .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &*tx)
155                    .await?;
156
157                let mut rows = buffer_operation::Entity::find()
158                    .filter(
159                        buffer_operation::Column::BufferId
160                            .eq(buffer.id)
161                            .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
162                    )
163                    .stream(&*tx)
164                    .await?;
165
166                // Find the server's version vector and any operations
167                // that the client has not seen.
168                let mut server_version = clock::Global::new();
169                let mut operations = Vec::new();
170                while let Some(row) = rows.next().await {
171                    let row = row?;
172                    let timestamp = clock::Lamport {
173                        replica_id: row.replica_id as u16,
174                        value: row.lamport_timestamp as u32,
175                    };
176                    server_version.observe(timestamp);
177                    if !client_version.observed(timestamp) {
178                        operations.push(proto::Operation {
179                            variant: Some(operation_from_storage(row, serialization_version)?),
180                        })
181                    }
182                }
183
184                results.push(RejoinedChannelBuffer {
185                    old_connection_id,
186                    buffer: proto::RejoinedChannelBuffer {
187                        channel_id: client_buffer.channel_id,
188                        version: version_to_wire(&server_version),
189                        operations,
190                        collaborators: collaborators
191                            .into_iter()
192                            .map(|collaborator| proto::Collaborator {
193                                peer_id: Some(collaborator.connection().into()),
194                                user_id: collaborator.user_id.to_proto(),
195                                replica_id: collaborator.replica_id.0 as u32,
196                            })
197                            .collect(),
198                    },
199                });
200            }
201
202            Ok(results)
203        })
204        .await
205    }
206
207    pub async fn clear_stale_channel_buffer_collaborators(
208        &self,
209        channel_id: ChannelId,
210        server_id: ServerId,
211    ) -> Result<RefreshedChannelBuffer> {
212        self.transaction(|tx| async move {
213            let db_collaborators = channel_buffer_collaborator::Entity::find()
214                .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
215                .all(&*tx)
216                .await?;
217
218            let mut connection_ids = Vec::new();
219            let mut collaborators = Vec::new();
220            let mut collaborator_ids_to_remove = Vec::new();
221            for db_collaborator in &db_collaborators {
222                if !db_collaborator.connection_lost
223                    && db_collaborator.connection_server_id == server_id
224                {
225                    connection_ids.push(db_collaborator.connection());
226                    collaborators.push(proto::Collaborator {
227                        peer_id: Some(db_collaborator.connection().into()),
228                        replica_id: db_collaborator.replica_id.0 as u32,
229                        user_id: db_collaborator.user_id.to_proto(),
230                    })
231                } else {
232                    collaborator_ids_to_remove.push(db_collaborator.id);
233                }
234            }
235
236            channel_buffer_collaborator::Entity::delete_many()
237                .filter(channel_buffer_collaborator::Column::Id.is_in(collaborator_ids_to_remove))
238                .exec(&*tx)
239                .await?;
240
241            Ok(RefreshedChannelBuffer {
242                connection_ids,
243                collaborators,
244            })
245        })
246        .await
247    }
248
249    pub async fn leave_channel_buffer(
250        &self,
251        channel_id: ChannelId,
252        connection: ConnectionId,
253    ) -> Result<LeftChannelBuffer> {
254        self.transaction(|tx| async move {
255            self.leave_channel_buffer_internal(channel_id, connection, &*tx)
256                .await
257        })
258        .await
259    }
260
261    pub async fn channel_buffer_connection_lost(
262        &self,
263        connection: ConnectionId,
264        tx: &DatabaseTransaction,
265    ) -> Result<()> {
266        channel_buffer_collaborator::Entity::update_many()
267            .filter(
268                Condition::all()
269                    .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
270                    .add(
271                        channel_buffer_collaborator::Column::ConnectionServerId
272                            .eq(connection.owner_id as i32),
273                    ),
274            )
275            .set(channel_buffer_collaborator::ActiveModel {
276                connection_lost: ActiveValue::set(true),
277                ..Default::default()
278            })
279            .exec(&*tx)
280            .await?;
281        Ok(())
282    }
283
284    pub async fn leave_channel_buffers(
285        &self,
286        connection: ConnectionId,
287    ) -> Result<Vec<LeftChannelBuffer>> {
288        self.transaction(|tx| async move {
289            #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
290            enum QueryChannelIds {
291                ChannelId,
292            }
293
294            let channel_ids: Vec<ChannelId> = channel_buffer_collaborator::Entity::find()
295                .select_only()
296                .column(channel_buffer_collaborator::Column::ChannelId)
297                .filter(Condition::all().add(
298                    channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32),
299                ))
300                .into_values::<_, QueryChannelIds>()
301                .all(&*tx)
302                .await?;
303
304            let mut result = Vec::new();
305            for channel_id in channel_ids {
306                let left_channel_buffer = self
307                    .leave_channel_buffer_internal(channel_id, connection, &*tx)
308                    .await?;
309                result.push(left_channel_buffer);
310            }
311
312            Ok(result)
313        })
314        .await
315    }
316
317    pub async fn leave_channel_buffer_internal(
318        &self,
319        channel_id: ChannelId,
320        connection: ConnectionId,
321        tx: &DatabaseTransaction,
322    ) -> Result<LeftChannelBuffer> {
323        let result = channel_buffer_collaborator::Entity::delete_many()
324            .filter(
325                Condition::all()
326                    .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
327                    .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
328                    .add(
329                        channel_buffer_collaborator::Column::ConnectionServerId
330                            .eq(connection.owner_id as i32),
331                    ),
332            )
333            .exec(&*tx)
334            .await?;
335        if result.rows_affected == 0 {
336            Err(anyhow!("not a collaborator on this project"))?;
337        }
338
339        let mut collaborators = Vec::new();
340        let mut connections = Vec::new();
341        let mut rows = channel_buffer_collaborator::Entity::find()
342            .filter(
343                Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
344            )
345            .stream(&*tx)
346            .await?;
347        while let Some(row) = rows.next().await {
348            let row = row?;
349            let connection = row.connection();
350            connections.push(connection);
351            collaborators.push(proto::Collaborator {
352                peer_id: Some(connection.into()),
353                replica_id: row.replica_id.0 as u32,
354                user_id: row.user_id.to_proto(),
355            });
356        }
357
358        drop(rows);
359
360        if collaborators.is_empty() {
361            self.snapshot_channel_buffer(channel_id, &tx).await?;
362        }
363
364        Ok(LeftChannelBuffer {
365            channel_id,
366            collaborators,
367            connections,
368        })
369    }
370
371    pub async fn get_channel_buffer_collaborators(
372        &self,
373        channel_id: ChannelId,
374    ) -> Result<Vec<UserId>> {
375        self.transaction(|tx| async move {
376            #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
377            enum QueryUserIds {
378                UserId,
379            }
380
381            let users: Vec<UserId> = channel_buffer_collaborator::Entity::find()
382                .select_only()
383                .column(channel_buffer_collaborator::Column::UserId)
384                .filter(
385                    Condition::all()
386                        .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
387                )
388                .into_values::<_, QueryUserIds>()
389                .all(&*tx)
390                .await?;
391
392            Ok(users)
393        })
394        .await
395    }
396
397    pub async fn update_channel_buffer(
398        &self,
399        channel_id: ChannelId,
400        user: UserId,
401        operations: &[proto::Operation],
402    ) -> Result<Vec<ConnectionId>> {
403        self.transaction(move |tx| async move {
404            self.check_user_is_channel_member(channel_id, user, &*tx)
405                .await?;
406
407            let buffer = buffer::Entity::find()
408                .filter(buffer::Column::ChannelId.eq(channel_id))
409                .one(&*tx)
410                .await?
411                .ok_or_else(|| anyhow!("no such buffer"))?;
412
413            let serialization_version = self
414                .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &*tx)
415                .await?;
416
417            let operations = operations
418                .iter()
419                .filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
420                .collect::<Vec<_>>();
421            if !operations.is_empty() {
422                buffer_operation::Entity::insert_many(operations)
423                    .on_conflict(
424                        OnConflict::columns([
425                            buffer_operation::Column::BufferId,
426                            buffer_operation::Column::Epoch,
427                            buffer_operation::Column::LamportTimestamp,
428                            buffer_operation::Column::ReplicaId,
429                        ])
430                        .do_nothing()
431                        .to_owned(),
432                    )
433                    .exec(&*tx)
434                    .await?;
435            }
436
437            let mut connections = Vec::new();
438            let mut rows = channel_buffer_collaborator::Entity::find()
439                .filter(
440                    Condition::all()
441                        .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
442                )
443                .stream(&*tx)
444                .await?;
445            while let Some(row) = rows.next().await {
446                let row = row?;
447                connections.push(ConnectionId {
448                    id: row.connection_id as u32,
449                    owner_id: row.connection_server_id.0 as u32,
450                });
451            }
452
453            Ok(connections)
454        })
455        .await
456    }
457
458    async fn get_buffer_operation_serialization_version(
459        &self,
460        buffer_id: BufferId,
461        epoch: i32,
462        tx: &DatabaseTransaction,
463    ) -> Result<i32> {
464        Ok(buffer_snapshot::Entity::find()
465            .filter(buffer_snapshot::Column::BufferId.eq(buffer_id))
466            .filter(buffer_snapshot::Column::Epoch.eq(epoch))
467            .select_only()
468            .column(buffer_snapshot::Column::OperationSerializationVersion)
469            .into_values::<_, QueryOperationSerializationVersion>()
470            .one(&*tx)
471            .await?
472            .ok_or_else(|| anyhow!("missing buffer snapshot"))?)
473    }
474
475    async fn get_channel_buffer(
476        &self,
477        channel_id: ChannelId,
478        tx: &DatabaseTransaction,
479    ) -> Result<buffer::Model> {
480        Ok(channel::Model {
481            id: channel_id,
482            ..Default::default()
483        }
484        .find_related(buffer::Entity)
485        .one(&*tx)
486        .await?
487        .ok_or_else(|| anyhow!("no such buffer"))?)
488    }
489
490    async fn get_buffer_state(
491        &self,
492        buffer: &buffer::Model,
493        tx: &DatabaseTransaction,
494    ) -> Result<(String, Vec<proto::Operation>)> {
495        let id = buffer.id;
496        let (base_text, version) = if buffer.epoch > 0 {
497            let snapshot = buffer_snapshot::Entity::find()
498                .filter(
499                    buffer_snapshot::Column::BufferId
500                        .eq(id)
501                        .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
502                )
503                .one(&*tx)
504                .await?
505                .ok_or_else(|| anyhow!("no such snapshot"))?;
506
507            let version = snapshot.operation_serialization_version;
508            (snapshot.text, version)
509        } else {
510            (String::new(), storage::SERIALIZATION_VERSION)
511        };
512
513        let mut rows = buffer_operation::Entity::find()
514            .filter(
515                buffer_operation::Column::BufferId
516                    .eq(id)
517                    .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
518            )
519            .stream(&*tx)
520            .await?;
521        let mut operations = Vec::new();
522        while let Some(row) = rows.next().await {
523            operations.push(proto::Operation {
524                variant: Some(operation_from_storage(row?, version)?),
525            })
526        }
527
528        Ok((base_text, operations))
529    }
530
531    async fn snapshot_channel_buffer(
532        &self,
533        channel_id: ChannelId,
534        tx: &DatabaseTransaction,
535    ) -> Result<()> {
536        let buffer = self.get_channel_buffer(channel_id, tx).await?;
537        let (base_text, operations) = self.get_buffer_state(&buffer, tx).await?;
538        if operations.is_empty() {
539            return Ok(());
540        }
541
542        let mut text_buffer = text::Buffer::new(0, 0, base_text);
543        text_buffer
544            .apply_ops(operations.into_iter().filter_map(operation_from_wire))
545            .unwrap();
546
547        let base_text = text_buffer.text();
548        let epoch = buffer.epoch + 1;
549
550        buffer_snapshot::Model {
551            buffer_id: buffer.id,
552            epoch,
553            text: base_text,
554            operation_serialization_version: storage::SERIALIZATION_VERSION,
555        }
556        .into_active_model()
557        .insert(tx)
558        .await?;
559
560        buffer::ActiveModel {
561            id: ActiveValue::Unchanged(buffer.id),
562            epoch: ActiveValue::Set(epoch),
563            ..Default::default()
564        }
565        .save(tx)
566        .await?;
567
568        Ok(())
569    }
570}
571
572fn operation_to_storage(
573    operation: &proto::Operation,
574    buffer: &buffer::Model,
575    _format: i32,
576) -> Option<buffer_operation::ActiveModel> {
577    let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? {
578        proto::operation::Variant::Edit(operation) => (
579            operation.replica_id,
580            operation.lamport_timestamp,
581            storage::Operation {
582                version: version_to_storage(&operation.version),
583                is_undo: false,
584                edit_ranges: operation
585                    .ranges
586                    .iter()
587                    .map(|range| storage::Range {
588                        start: range.start,
589                        end: range.end,
590                    })
591                    .collect(),
592                edit_texts: operation.new_text.clone(),
593                undo_counts: Vec::new(),
594            },
595        ),
596        proto::operation::Variant::Undo(operation) => (
597            operation.replica_id,
598            operation.lamport_timestamp,
599            storage::Operation {
600                version: version_to_storage(&operation.version),
601                is_undo: true,
602                edit_ranges: Vec::new(),
603                edit_texts: Vec::new(),
604                undo_counts: operation
605                    .counts
606                    .iter()
607                    .map(|entry| storage::UndoCount {
608                        replica_id: entry.replica_id,
609                        lamport_timestamp: entry.lamport_timestamp,
610                        count: entry.count,
611                    })
612                    .collect(),
613            },
614        ),
615        _ => None?,
616    };
617
618    Some(buffer_operation::ActiveModel {
619        buffer_id: ActiveValue::Set(buffer.id),
620        epoch: ActiveValue::Set(buffer.epoch),
621        replica_id: ActiveValue::Set(replica_id as i32),
622        lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32),
623        value: ActiveValue::Set(value.encode_to_vec()),
624    })
625}
626
627fn operation_from_storage(
628    row: buffer_operation::Model,
629    _format_version: i32,
630) -> Result<proto::operation::Variant, Error> {
631    let operation =
632        storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{}", error))?;
633    let version = version_from_storage(&operation.version);
634    Ok(if operation.is_undo {
635        proto::operation::Variant::Undo(proto::operation::Undo {
636            replica_id: row.replica_id as u32,
637            lamport_timestamp: row.lamport_timestamp as u32,
638            version,
639            counts: operation
640                .undo_counts
641                .iter()
642                .map(|entry| proto::UndoCount {
643                    replica_id: entry.replica_id,
644                    lamport_timestamp: entry.lamport_timestamp,
645                    count: entry.count,
646                })
647                .collect(),
648        })
649    } else {
650        proto::operation::Variant::Edit(proto::operation::Edit {
651            replica_id: row.replica_id as u32,
652            lamport_timestamp: row.lamport_timestamp as u32,
653            version,
654            ranges: operation
655                .edit_ranges
656                .into_iter()
657                .map(|range| proto::Range {
658                    start: range.start,
659                    end: range.end,
660                })
661                .collect(),
662            new_text: operation.edit_texts,
663        })
664    })
665}
666
667fn version_to_storage(version: &Vec<proto::VectorClockEntry>) -> Vec<storage::VectorClockEntry> {
668    version
669        .iter()
670        .map(|entry| storage::VectorClockEntry {
671            replica_id: entry.replica_id,
672            timestamp: entry.timestamp,
673        })
674        .collect()
675}
676
677fn version_from_storage(version: &Vec<storage::VectorClockEntry>) -> Vec<proto::VectorClockEntry> {
678    version
679        .iter()
680        .map(|entry| proto::VectorClockEntry {
681            replica_id: entry.replica_id,
682            timestamp: entry.timestamp,
683        })
684        .collect()
685}
686
687// This is currently a manual copy of the deserialization code in the client's langauge crate
688pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operation> {
689    match operation.variant? {
690        proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
691            timestamp: clock::Lamport {
692                replica_id: edit.replica_id as text::ReplicaId,
693                value: edit.lamport_timestamp,
694            },
695            version: version_from_wire(&edit.version),
696            ranges: edit
697                .ranges
698                .into_iter()
699                .map(|range| {
700                    text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
701                })
702                .collect(),
703            new_text: edit.new_text.into_iter().map(Arc::from).collect(),
704        })),
705        proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation {
706            timestamp: clock::Lamport {
707                replica_id: undo.replica_id as text::ReplicaId,
708                value: undo.lamport_timestamp,
709            },
710            version: version_from_wire(&undo.version),
711            counts: undo
712                .counts
713                .into_iter()
714                .map(|c| {
715                    (
716                        clock::Lamport {
717                            replica_id: c.replica_id as text::ReplicaId,
718                            value: c.lamport_timestamp,
719                        },
720                        c.count,
721                    )
722                })
723                .collect(),
724        })),
725        _ => None,
726    }
727}
728
729fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
730    let mut version = clock::Global::new();
731    for entry in message {
732        version.observe(clock::Lamport {
733            replica_id: entry.replica_id as text::ReplicaId,
734            value: entry.timestamp,
735        });
736    }
737    version
738}
739
740fn version_to_wire(version: &clock::Global) -> Vec<proto::VectorClockEntry> {
741    let mut message = Vec::new();
742    for entry in version.iter() {
743        message.push(proto::VectorClockEntry {
744            replica_id: entry.replica_id as u32,
745            timestamp: entry.value,
746        });
747    }
748    message
749}
750
751#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
752enum QueryOperationSerializationVersion {
753    OperationSerializationVersion,
754}
755
756mod storage {
757    #![allow(non_snake_case)]
758    use prost::Message;
759    pub const SERIALIZATION_VERSION: i32 = 1;
760
761    #[derive(Message)]
762    pub struct Operation {
763        #[prost(message, repeated, tag = "2")]
764        pub version: Vec<VectorClockEntry>,
765        #[prost(bool, tag = "3")]
766        pub is_undo: bool,
767        #[prost(message, repeated, tag = "4")]
768        pub edit_ranges: Vec<Range>,
769        #[prost(string, repeated, tag = "5")]
770        pub edit_texts: Vec<String>,
771        #[prost(message, repeated, tag = "6")]
772        pub undo_counts: Vec<UndoCount>,
773    }
774
775    #[derive(Message)]
776    pub struct VectorClockEntry {
777        #[prost(uint32, tag = "1")]
778        pub replica_id: u32,
779        #[prost(uint32, tag = "2")]
780        pub timestamp: u32,
781    }
782
783    #[derive(Message)]
784    pub struct Range {
785        #[prost(uint64, tag = "1")]
786        pub start: u64,
787        #[prost(uint64, tag = "2")]
788        pub end: u64,
789    }
790
791    #[derive(Message)]
792    pub struct UndoCount {
793        #[prost(uint32, tag = "1")]
794        pub replica_id: u32,
795        #[prost(uint32, tag = "2")]
796        pub lamport_timestamp: u32,
797        #[prost(uint32, tag = "3")]
798        pub count: u32,
799    }
800}