1use super::*;
2use prost::Message;
3use text::{EditOperation, UndoOperation};
4
5pub struct LeftChannelBuffer {
6 pub channel_id: ChannelId,
7 pub collaborators: Vec<proto::Collaborator>,
8 pub connections: Vec<ConnectionId>,
9}
10
11impl Database {
12 pub async fn join_channel_buffer(
13 &self,
14 channel_id: ChannelId,
15 user_id: UserId,
16 connection: ConnectionId,
17 ) -> Result<proto::JoinChannelBufferResponse> {
18 self.transaction(|tx| async move {
19 self.check_user_is_channel_member(channel_id, user_id, &tx)
20 .await?;
21
22 let buffer = channel::Model {
23 id: channel_id,
24 ..Default::default()
25 }
26 .find_related(buffer::Entity)
27 .one(&*tx)
28 .await?;
29
30 let buffer = if let Some(buffer) = buffer {
31 buffer
32 } else {
33 let buffer = buffer::ActiveModel {
34 channel_id: ActiveValue::Set(channel_id),
35 ..Default::default()
36 }
37 .insert(&*tx)
38 .await?;
39 buffer_snapshot::ActiveModel {
40 buffer_id: ActiveValue::Set(buffer.id),
41 epoch: ActiveValue::Set(0),
42 text: ActiveValue::Set(String::new()),
43 operation_serialization_version: ActiveValue::Set(
44 storage::SERIALIZATION_VERSION,
45 ),
46 }
47 .insert(&*tx)
48 .await?;
49 buffer
50 };
51
52 // Join the collaborators
53 let mut collaborators = channel_buffer_collaborator::Entity::find()
54 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
55 .all(&*tx)
56 .await?;
57 let replica_ids = collaborators
58 .iter()
59 .map(|c| c.replica_id)
60 .collect::<HashSet<_>>();
61 let mut replica_id = ReplicaId(0);
62 while replica_ids.contains(&replica_id) {
63 replica_id.0 += 1;
64 }
65 let collaborator = channel_buffer_collaborator::ActiveModel {
66 channel_id: ActiveValue::Set(channel_id),
67 connection_id: ActiveValue::Set(connection.id as i32),
68 connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
69 user_id: ActiveValue::Set(user_id),
70 replica_id: ActiveValue::Set(replica_id),
71 ..Default::default()
72 }
73 .insert(&*tx)
74 .await?;
75 collaborators.push(collaborator);
76
77 let (base_text, operations) = self.get_buffer_state(&buffer, &tx).await?;
78
79 Ok(proto::JoinChannelBufferResponse {
80 buffer_id: buffer.id.to_proto(),
81 replica_id: replica_id.to_proto() as u32,
82 base_text,
83 operations,
84 epoch: buffer.epoch as u64,
85 collaborators: collaborators
86 .into_iter()
87 .map(|collaborator| proto::Collaborator {
88 peer_id: Some(collaborator.connection().into()),
89 user_id: collaborator.user_id.to_proto(),
90 replica_id: collaborator.replica_id.0 as u32,
91 })
92 .collect(),
93 })
94 })
95 .await
96 }
97
98 pub async fn rejoin_channel_buffers(
99 &self,
100 buffers: &[proto::ChannelBufferVersion],
101 user_id: UserId,
102 connection_id: ConnectionId,
103 ) -> Result<Vec<RejoinedChannelBuffer>> {
104 self.transaction(|tx| async move {
105 let mut results = Vec::new();
106 for client_buffer in buffers {
107 let channel_id = ChannelId::from_proto(client_buffer.channel_id);
108 if self
109 .check_user_is_channel_member(channel_id, user_id, &*tx)
110 .await
111 .is_err()
112 {
113 log::info!("user is not a member of channel");
114 continue;
115 }
116
117 let buffer = self.get_channel_buffer(channel_id, &*tx).await?;
118 let mut collaborators = channel_buffer_collaborator::Entity::find()
119 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
120 .all(&*tx)
121 .await?;
122
123 // If the buffer epoch hasn't changed since the client lost
124 // connection, then the client's buffer can be syncronized with
125 // the server's buffer.
126 if buffer.epoch as u64 != client_buffer.epoch {
127 log::info!("can't rejoin buffer, epoch has changed");
128 continue;
129 }
130
131 // Find the collaborator record for this user's previous lost
132 // connection. Update it with the new connection id.
133 let server_id = ServerId(connection_id.owner_id as i32);
134 let Some(self_collaborator) = collaborators.iter_mut().find(|c| {
135 c.user_id == user_id
136 && (c.connection_lost || c.connection_server_id != server_id)
137 }) else {
138 log::info!("can't rejoin buffer, no previous collaborator found");
139 continue;
140 };
141 let old_connection_id = self_collaborator.connection();
142 *self_collaborator = channel_buffer_collaborator::ActiveModel {
143 id: ActiveValue::Unchanged(self_collaborator.id),
144 connection_id: ActiveValue::Set(connection_id.id as i32),
145 connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
146 connection_lost: ActiveValue::Set(false),
147 ..Default::default()
148 }
149 .update(&*tx)
150 .await?;
151
152 let client_version = version_from_wire(&client_buffer.version);
153 let serialization_version = self
154 .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &*tx)
155 .await?;
156
157 let mut rows = buffer_operation::Entity::find()
158 .filter(
159 buffer_operation::Column::BufferId
160 .eq(buffer.id)
161 .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
162 )
163 .stream(&*tx)
164 .await?;
165
166 // Find the server's version vector and any operations
167 // that the client has not seen.
168 let mut server_version = clock::Global::new();
169 let mut operations = Vec::new();
170 while let Some(row) = rows.next().await {
171 let row = row?;
172 let timestamp = clock::Lamport {
173 replica_id: row.replica_id as u16,
174 value: row.lamport_timestamp as u32,
175 };
176 server_version.observe(timestamp);
177 if !client_version.observed(timestamp) {
178 operations.push(proto::Operation {
179 variant: Some(operation_from_storage(row, serialization_version)?),
180 })
181 }
182 }
183
184 results.push(RejoinedChannelBuffer {
185 old_connection_id,
186 buffer: proto::RejoinedChannelBuffer {
187 channel_id: client_buffer.channel_id,
188 version: version_to_wire(&server_version),
189 operations,
190 collaborators: collaborators
191 .into_iter()
192 .map(|collaborator| proto::Collaborator {
193 peer_id: Some(collaborator.connection().into()),
194 user_id: collaborator.user_id.to_proto(),
195 replica_id: collaborator.replica_id.0 as u32,
196 })
197 .collect(),
198 },
199 });
200 }
201
202 Ok(results)
203 })
204 .await
205 }
206
207 pub async fn clear_stale_channel_buffer_collaborators(
208 &self,
209 channel_id: ChannelId,
210 server_id: ServerId,
211 ) -> Result<RefreshedChannelBuffer> {
212 self.transaction(|tx| async move {
213 let db_collaborators = channel_buffer_collaborator::Entity::find()
214 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
215 .all(&*tx)
216 .await?;
217
218 let mut connection_ids = Vec::new();
219 let mut collaborators = Vec::new();
220 let mut collaborator_ids_to_remove = Vec::new();
221 for db_collaborator in &db_collaborators {
222 if !db_collaborator.connection_lost
223 && db_collaborator.connection_server_id == server_id
224 {
225 connection_ids.push(db_collaborator.connection());
226 collaborators.push(proto::Collaborator {
227 peer_id: Some(db_collaborator.connection().into()),
228 replica_id: db_collaborator.replica_id.0 as u32,
229 user_id: db_collaborator.user_id.to_proto(),
230 })
231 } else {
232 collaborator_ids_to_remove.push(db_collaborator.id);
233 }
234 }
235
236 channel_buffer_collaborator::Entity::delete_many()
237 .filter(channel_buffer_collaborator::Column::Id.is_in(collaborator_ids_to_remove))
238 .exec(&*tx)
239 .await?;
240
241 Ok(RefreshedChannelBuffer {
242 connection_ids,
243 collaborators,
244 })
245 })
246 .await
247 }
248
249 pub async fn leave_channel_buffer(
250 &self,
251 channel_id: ChannelId,
252 connection: ConnectionId,
253 ) -> Result<LeftChannelBuffer> {
254 self.transaction(|tx| async move {
255 self.leave_channel_buffer_internal(channel_id, connection, &*tx)
256 .await
257 })
258 .await
259 }
260
261 pub async fn channel_buffer_connection_lost(
262 &self,
263 connection: ConnectionId,
264 tx: &DatabaseTransaction,
265 ) -> Result<()> {
266 channel_buffer_collaborator::Entity::update_many()
267 .filter(
268 Condition::all()
269 .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
270 .add(
271 channel_buffer_collaborator::Column::ConnectionServerId
272 .eq(connection.owner_id as i32),
273 ),
274 )
275 .set(channel_buffer_collaborator::ActiveModel {
276 connection_lost: ActiveValue::set(true),
277 ..Default::default()
278 })
279 .exec(&*tx)
280 .await?;
281 Ok(())
282 }
283
284 pub async fn leave_channel_buffers(
285 &self,
286 connection: ConnectionId,
287 ) -> Result<Vec<LeftChannelBuffer>> {
288 self.transaction(|tx| async move {
289 #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
290 enum QueryChannelIds {
291 ChannelId,
292 }
293
294 let channel_ids: Vec<ChannelId> = channel_buffer_collaborator::Entity::find()
295 .select_only()
296 .column(channel_buffer_collaborator::Column::ChannelId)
297 .filter(Condition::all().add(
298 channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32),
299 ))
300 .into_values::<_, QueryChannelIds>()
301 .all(&*tx)
302 .await?;
303
304 let mut result = Vec::new();
305 for channel_id in channel_ids {
306 let left_channel_buffer = self
307 .leave_channel_buffer_internal(channel_id, connection, &*tx)
308 .await?;
309 result.push(left_channel_buffer);
310 }
311
312 Ok(result)
313 })
314 .await
315 }
316
317 pub async fn leave_channel_buffer_internal(
318 &self,
319 channel_id: ChannelId,
320 connection: ConnectionId,
321 tx: &DatabaseTransaction,
322 ) -> Result<LeftChannelBuffer> {
323 let result = channel_buffer_collaborator::Entity::delete_many()
324 .filter(
325 Condition::all()
326 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
327 .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
328 .add(
329 channel_buffer_collaborator::Column::ConnectionServerId
330 .eq(connection.owner_id as i32),
331 ),
332 )
333 .exec(&*tx)
334 .await?;
335 if result.rows_affected == 0 {
336 Err(anyhow!("not a collaborator on this project"))?;
337 }
338
339 let mut collaborators = Vec::new();
340 let mut connections = Vec::new();
341 let mut rows = channel_buffer_collaborator::Entity::find()
342 .filter(
343 Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
344 )
345 .stream(&*tx)
346 .await?;
347 while let Some(row) = rows.next().await {
348 let row = row?;
349 let connection = row.connection();
350 connections.push(connection);
351 collaborators.push(proto::Collaborator {
352 peer_id: Some(connection.into()),
353 replica_id: row.replica_id.0 as u32,
354 user_id: row.user_id.to_proto(),
355 });
356 }
357
358 drop(rows);
359
360 if collaborators.is_empty() {
361 self.snapshot_channel_buffer(channel_id, &tx).await?;
362 }
363
364 Ok(LeftChannelBuffer {
365 channel_id,
366 collaborators,
367 connections,
368 })
369 }
370
371 pub async fn get_channel_buffer_collaborators(
372 &self,
373 channel_id: ChannelId,
374 ) -> Result<Vec<UserId>> {
375 self.transaction(|tx| async move {
376 #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
377 enum QueryUserIds {
378 UserId,
379 }
380
381 let users: Vec<UserId> = channel_buffer_collaborator::Entity::find()
382 .select_only()
383 .column(channel_buffer_collaborator::Column::UserId)
384 .filter(
385 Condition::all()
386 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
387 )
388 .into_values::<_, QueryUserIds>()
389 .all(&*tx)
390 .await?;
391
392 Ok(users)
393 })
394 .await
395 }
396
397 pub async fn update_channel_buffer(
398 &self,
399 channel_id: ChannelId,
400 user: UserId,
401 operations: &[proto::Operation],
402 ) -> Result<Vec<ConnectionId>> {
403 self.transaction(move |tx| async move {
404 self.check_user_is_channel_member(channel_id, user, &*tx)
405 .await?;
406
407 let buffer = buffer::Entity::find()
408 .filter(buffer::Column::ChannelId.eq(channel_id))
409 .one(&*tx)
410 .await?
411 .ok_or_else(|| anyhow!("no such buffer"))?;
412
413 let serialization_version = self
414 .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &*tx)
415 .await?;
416
417 let operations = operations
418 .iter()
419 .filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
420 .collect::<Vec<_>>();
421 if !operations.is_empty() {
422 buffer_operation::Entity::insert_many(operations)
423 .on_conflict(
424 OnConflict::columns([
425 buffer_operation::Column::BufferId,
426 buffer_operation::Column::Epoch,
427 buffer_operation::Column::LamportTimestamp,
428 buffer_operation::Column::ReplicaId,
429 ])
430 .do_nothing()
431 .to_owned(),
432 )
433 .exec(&*tx)
434 .await?;
435 }
436
437 let mut connections = Vec::new();
438 let mut rows = channel_buffer_collaborator::Entity::find()
439 .filter(
440 Condition::all()
441 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
442 )
443 .stream(&*tx)
444 .await?;
445 while let Some(row) = rows.next().await {
446 let row = row?;
447 connections.push(ConnectionId {
448 id: row.connection_id as u32,
449 owner_id: row.connection_server_id.0 as u32,
450 });
451 }
452
453 Ok(connections)
454 })
455 .await
456 }
457
458 async fn get_buffer_operation_serialization_version(
459 &self,
460 buffer_id: BufferId,
461 epoch: i32,
462 tx: &DatabaseTransaction,
463 ) -> Result<i32> {
464 Ok(buffer_snapshot::Entity::find()
465 .filter(buffer_snapshot::Column::BufferId.eq(buffer_id))
466 .filter(buffer_snapshot::Column::Epoch.eq(epoch))
467 .select_only()
468 .column(buffer_snapshot::Column::OperationSerializationVersion)
469 .into_values::<_, QueryOperationSerializationVersion>()
470 .one(&*tx)
471 .await?
472 .ok_or_else(|| anyhow!("missing buffer snapshot"))?)
473 }
474
475 async fn get_channel_buffer(
476 &self,
477 channel_id: ChannelId,
478 tx: &DatabaseTransaction,
479 ) -> Result<buffer::Model> {
480 Ok(channel::Model {
481 id: channel_id,
482 ..Default::default()
483 }
484 .find_related(buffer::Entity)
485 .one(&*tx)
486 .await?
487 .ok_or_else(|| anyhow!("no such buffer"))?)
488 }
489
490 async fn get_buffer_state(
491 &self,
492 buffer: &buffer::Model,
493 tx: &DatabaseTransaction,
494 ) -> Result<(String, Vec<proto::Operation>)> {
495 let id = buffer.id;
496 let (base_text, version) = if buffer.epoch > 0 {
497 let snapshot = buffer_snapshot::Entity::find()
498 .filter(
499 buffer_snapshot::Column::BufferId
500 .eq(id)
501 .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
502 )
503 .one(&*tx)
504 .await?
505 .ok_or_else(|| anyhow!("no such snapshot"))?;
506
507 let version = snapshot.operation_serialization_version;
508 (snapshot.text, version)
509 } else {
510 (String::new(), storage::SERIALIZATION_VERSION)
511 };
512
513 let mut rows = buffer_operation::Entity::find()
514 .filter(
515 buffer_operation::Column::BufferId
516 .eq(id)
517 .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
518 )
519 .stream(&*tx)
520 .await?;
521 let mut operations = Vec::new();
522 while let Some(row) = rows.next().await {
523 operations.push(proto::Operation {
524 variant: Some(operation_from_storage(row?, version)?),
525 })
526 }
527
528 Ok((base_text, operations))
529 }
530
531 async fn snapshot_channel_buffer(
532 &self,
533 channel_id: ChannelId,
534 tx: &DatabaseTransaction,
535 ) -> Result<()> {
536 let buffer = self.get_channel_buffer(channel_id, tx).await?;
537 let (base_text, operations) = self.get_buffer_state(&buffer, tx).await?;
538 if operations.is_empty() {
539 return Ok(());
540 }
541
542 let mut text_buffer = text::Buffer::new(0, 0, base_text);
543 text_buffer
544 .apply_ops(operations.into_iter().filter_map(operation_from_wire))
545 .unwrap();
546
547 let base_text = text_buffer.text();
548 let epoch = buffer.epoch + 1;
549
550 buffer_snapshot::Model {
551 buffer_id: buffer.id,
552 epoch,
553 text: base_text,
554 operation_serialization_version: storage::SERIALIZATION_VERSION,
555 }
556 .into_active_model()
557 .insert(tx)
558 .await?;
559
560 buffer::ActiveModel {
561 id: ActiveValue::Unchanged(buffer.id),
562 epoch: ActiveValue::Set(epoch),
563 ..Default::default()
564 }
565 .save(tx)
566 .await?;
567
568 Ok(())
569 }
570}
571
572fn operation_to_storage(
573 operation: &proto::Operation,
574 buffer: &buffer::Model,
575 _format: i32,
576) -> Option<buffer_operation::ActiveModel> {
577 let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? {
578 proto::operation::Variant::Edit(operation) => (
579 operation.replica_id,
580 operation.lamport_timestamp,
581 storage::Operation {
582 version: version_to_storage(&operation.version),
583 is_undo: false,
584 edit_ranges: operation
585 .ranges
586 .iter()
587 .map(|range| storage::Range {
588 start: range.start,
589 end: range.end,
590 })
591 .collect(),
592 edit_texts: operation.new_text.clone(),
593 undo_counts: Vec::new(),
594 },
595 ),
596 proto::operation::Variant::Undo(operation) => (
597 operation.replica_id,
598 operation.lamport_timestamp,
599 storage::Operation {
600 version: version_to_storage(&operation.version),
601 is_undo: true,
602 edit_ranges: Vec::new(),
603 edit_texts: Vec::new(),
604 undo_counts: operation
605 .counts
606 .iter()
607 .map(|entry| storage::UndoCount {
608 replica_id: entry.replica_id,
609 lamport_timestamp: entry.lamport_timestamp,
610 count: entry.count,
611 })
612 .collect(),
613 },
614 ),
615 _ => None?,
616 };
617
618 Some(buffer_operation::ActiveModel {
619 buffer_id: ActiveValue::Set(buffer.id),
620 epoch: ActiveValue::Set(buffer.epoch),
621 replica_id: ActiveValue::Set(replica_id as i32),
622 lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32),
623 value: ActiveValue::Set(value.encode_to_vec()),
624 })
625}
626
627fn operation_from_storage(
628 row: buffer_operation::Model,
629 _format_version: i32,
630) -> Result<proto::operation::Variant, Error> {
631 let operation =
632 storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{}", error))?;
633 let version = version_from_storage(&operation.version);
634 Ok(if operation.is_undo {
635 proto::operation::Variant::Undo(proto::operation::Undo {
636 replica_id: row.replica_id as u32,
637 lamport_timestamp: row.lamport_timestamp as u32,
638 version,
639 counts: operation
640 .undo_counts
641 .iter()
642 .map(|entry| proto::UndoCount {
643 replica_id: entry.replica_id,
644 lamport_timestamp: entry.lamport_timestamp,
645 count: entry.count,
646 })
647 .collect(),
648 })
649 } else {
650 proto::operation::Variant::Edit(proto::operation::Edit {
651 replica_id: row.replica_id as u32,
652 lamport_timestamp: row.lamport_timestamp as u32,
653 version,
654 ranges: operation
655 .edit_ranges
656 .into_iter()
657 .map(|range| proto::Range {
658 start: range.start,
659 end: range.end,
660 })
661 .collect(),
662 new_text: operation.edit_texts,
663 })
664 })
665}
666
667fn version_to_storage(version: &Vec<proto::VectorClockEntry>) -> Vec<storage::VectorClockEntry> {
668 version
669 .iter()
670 .map(|entry| storage::VectorClockEntry {
671 replica_id: entry.replica_id,
672 timestamp: entry.timestamp,
673 })
674 .collect()
675}
676
677fn version_from_storage(version: &Vec<storage::VectorClockEntry>) -> Vec<proto::VectorClockEntry> {
678 version
679 .iter()
680 .map(|entry| proto::VectorClockEntry {
681 replica_id: entry.replica_id,
682 timestamp: entry.timestamp,
683 })
684 .collect()
685}
686
687// This is currently a manual copy of the deserialization code in the client's langauge crate
688pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operation> {
689 match operation.variant? {
690 proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
691 timestamp: clock::Lamport {
692 replica_id: edit.replica_id as text::ReplicaId,
693 value: edit.lamport_timestamp,
694 },
695 version: version_from_wire(&edit.version),
696 ranges: edit
697 .ranges
698 .into_iter()
699 .map(|range| {
700 text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
701 })
702 .collect(),
703 new_text: edit.new_text.into_iter().map(Arc::from).collect(),
704 })),
705 proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation {
706 timestamp: clock::Lamport {
707 replica_id: undo.replica_id as text::ReplicaId,
708 value: undo.lamport_timestamp,
709 },
710 version: version_from_wire(&undo.version),
711 counts: undo
712 .counts
713 .into_iter()
714 .map(|c| {
715 (
716 clock::Lamport {
717 replica_id: c.replica_id as text::ReplicaId,
718 value: c.lamport_timestamp,
719 },
720 c.count,
721 )
722 })
723 .collect(),
724 })),
725 _ => None,
726 }
727}
728
729fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
730 let mut version = clock::Global::new();
731 for entry in message {
732 version.observe(clock::Lamport {
733 replica_id: entry.replica_id as text::ReplicaId,
734 value: entry.timestamp,
735 });
736 }
737 version
738}
739
740fn version_to_wire(version: &clock::Global) -> Vec<proto::VectorClockEntry> {
741 let mut message = Vec::new();
742 for entry in version.iter() {
743 message.push(proto::VectorClockEntry {
744 replica_id: entry.replica_id as u32,
745 timestamp: entry.value,
746 });
747 }
748 message
749}
750
751#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
752enum QueryOperationSerializationVersion {
753 OperationSerializationVersion,
754}
755
756mod storage {
757 #![allow(non_snake_case)]
758 use prost::Message;
759 pub const SERIALIZATION_VERSION: i32 = 1;
760
761 #[derive(Message)]
762 pub struct Operation {
763 #[prost(message, repeated, tag = "2")]
764 pub version: Vec<VectorClockEntry>,
765 #[prost(bool, tag = "3")]
766 pub is_undo: bool,
767 #[prost(message, repeated, tag = "4")]
768 pub edit_ranges: Vec<Range>,
769 #[prost(string, repeated, tag = "5")]
770 pub edit_texts: Vec<String>,
771 #[prost(message, repeated, tag = "6")]
772 pub undo_counts: Vec<UndoCount>,
773 }
774
775 #[derive(Message)]
776 pub struct VectorClockEntry {
777 #[prost(uint32, tag = "1")]
778 pub replica_id: u32,
779 #[prost(uint32, tag = "2")]
780 pub timestamp: u32,
781 }
782
783 #[derive(Message)]
784 pub struct Range {
785 #[prost(uint64, tag = "1")]
786 pub start: u64,
787 #[prost(uint64, tag = "2")]
788 pub end: u64,
789 }
790
791 #[derive(Message)]
792 pub struct UndoCount {
793 #[prost(uint32, tag = "1")]
794 pub replica_id: u32,
795 #[prost(uint32, tag = "2")]
796 pub lamport_timestamp: u32,
797 #[prost(uint32, tag = "3")]
798 pub count: u32,
799 }
800}