1use super::*;
2use prost::Message;
3use text::{EditOperation, UndoOperation};
4
5impl Database {
6 pub async fn join_channel_buffer(
7 &self,
8 channel_id: ChannelId,
9 user_id: UserId,
10 connection: ConnectionId,
11 ) -> Result<proto::JoinChannelBufferResponse> {
12 self.transaction(|tx| async move {
13 self.check_user_is_channel_member(channel_id, user_id, &tx)
14 .await?;
15
16 let buffer = channel::Model {
17 id: channel_id,
18 ..Default::default()
19 }
20 .find_related(buffer::Entity)
21 .one(&*tx)
22 .await?;
23
24 let buffer = if let Some(buffer) = buffer {
25 buffer
26 } else {
27 let buffer = buffer::ActiveModel {
28 channel_id: ActiveValue::Set(channel_id),
29 ..Default::default()
30 }
31 .insert(&*tx)
32 .await?;
33 buffer_snapshot::ActiveModel {
34 buffer_id: ActiveValue::Set(buffer.id),
35 epoch: ActiveValue::Set(0),
36 text: ActiveValue::Set(String::new()),
37 operation_serialization_version: ActiveValue::Set(
38 storage::SERIALIZATION_VERSION,
39 ),
40 }
41 .insert(&*tx)
42 .await?;
43 buffer
44 };
45
46 // Join the collaborators
47 let mut collaborators = channel_buffer_collaborator::Entity::find()
48 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
49 .all(&*tx)
50 .await?;
51 let replica_ids = collaborators
52 .iter()
53 .map(|c| c.replica_id)
54 .collect::<HashSet<_>>();
55 let mut replica_id = ReplicaId(0);
56 while replica_ids.contains(&replica_id) {
57 replica_id.0 += 1;
58 }
59 let collaborator = channel_buffer_collaborator::ActiveModel {
60 channel_id: ActiveValue::Set(channel_id),
61 connection_id: ActiveValue::Set(connection.id as i32),
62 connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
63 user_id: ActiveValue::Set(user_id),
64 replica_id: ActiveValue::Set(replica_id),
65 ..Default::default()
66 }
67 .insert(&*tx)
68 .await?;
69 collaborators.push(collaborator);
70
71 let (base_text, operations) = self.get_buffer_state(&buffer, &tx).await?;
72
73 Ok(proto::JoinChannelBufferResponse {
74 buffer_id: buffer.id.to_proto(),
75 replica_id: replica_id.to_proto() as u32,
76 base_text,
77 operations,
78 epoch: buffer.epoch as u64,
79 collaborators: collaborators
80 .into_iter()
81 .map(|collaborator| proto::Collaborator {
82 peer_id: Some(collaborator.connection().into()),
83 user_id: collaborator.user_id.to_proto(),
84 replica_id: collaborator.replica_id.0 as u32,
85 })
86 .collect(),
87 })
88 })
89 .await
90 }
91
92 pub async fn rejoin_channel_buffers(
93 &self,
94 buffers: &[proto::ChannelBufferVersion],
95 user_id: UserId,
96 connection_id: ConnectionId,
97 ) -> Result<Vec<RejoinedChannelBuffer>> {
98 self.transaction(|tx| async move {
99 let mut results = Vec::new();
100 for client_buffer in buffers {
101 let channel_id = ChannelId::from_proto(client_buffer.channel_id);
102 if self
103 .check_user_is_channel_member(channel_id, user_id, &*tx)
104 .await
105 .is_err()
106 {
107 log::info!("user is not a member of channel");
108 continue;
109 }
110
111 let buffer = self.get_channel_buffer(channel_id, &*tx).await?;
112 let mut collaborators = channel_buffer_collaborator::Entity::find()
113 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
114 .all(&*tx)
115 .await?;
116
117 // If the buffer epoch hasn't changed since the client lost
118 // connection, then the client's buffer can be syncronized with
119 // the server's buffer.
120 if buffer.epoch as u64 != client_buffer.epoch {
121 log::info!("can't rejoin buffer, epoch has changed");
122 continue;
123 }
124
125 // Find the collaborator record for this user's previous lost
126 // connection. Update it with the new connection id.
127 let server_id = ServerId(connection_id.owner_id as i32);
128 let Some(self_collaborator) = collaborators.iter_mut().find(|c| {
129 c.user_id == user_id
130 && (c.connection_lost || c.connection_server_id != server_id)
131 }) else {
132 log::info!("can't rejoin buffer, no previous collaborator found");
133 continue;
134 };
135 let old_connection_id = self_collaborator.connection();
136 *self_collaborator = channel_buffer_collaborator::ActiveModel {
137 id: ActiveValue::Unchanged(self_collaborator.id),
138 connection_id: ActiveValue::Set(connection_id.id as i32),
139 connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
140 connection_lost: ActiveValue::Set(false),
141 ..Default::default()
142 }
143 .update(&*tx)
144 .await?;
145
146 let client_version = version_from_wire(&client_buffer.version);
147 let serialization_version = self
148 .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &*tx)
149 .await?;
150
151 let mut rows = buffer_operation::Entity::find()
152 .filter(
153 buffer_operation::Column::BufferId
154 .eq(buffer.id)
155 .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
156 )
157 .stream(&*tx)
158 .await?;
159
160 // Find the server's version vector and any operations
161 // that the client has not seen.
162 let mut server_version = clock::Global::new();
163 let mut operations = Vec::new();
164 while let Some(row) = rows.next().await {
165 let row = row?;
166 let timestamp = clock::Lamport {
167 replica_id: row.replica_id as u16,
168 value: row.lamport_timestamp as u32,
169 };
170 server_version.observe(timestamp);
171 if !client_version.observed(timestamp) {
172 operations.push(proto::Operation {
173 variant: Some(operation_from_storage(row, serialization_version)?),
174 })
175 }
176 }
177
178 results.push(RejoinedChannelBuffer {
179 old_connection_id,
180 buffer: proto::RejoinedChannelBuffer {
181 channel_id: client_buffer.channel_id,
182 version: version_to_wire(&server_version),
183 operations,
184 collaborators: collaborators
185 .into_iter()
186 .map(|collaborator| proto::Collaborator {
187 peer_id: Some(collaborator.connection().into()),
188 user_id: collaborator.user_id.to_proto(),
189 replica_id: collaborator.replica_id.0 as u32,
190 })
191 .collect(),
192 },
193 });
194 }
195
196 Ok(results)
197 })
198 .await
199 }
200
201 pub async fn clear_stale_channel_buffer_collaborators(
202 &self,
203 channel_id: ChannelId,
204 server_id: ServerId,
205 ) -> Result<RefreshedChannelBuffer> {
206 self.transaction(|tx| async move {
207 let collaborators = channel_buffer_collaborator::Entity::find()
208 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
209 .all(&*tx)
210 .await?;
211
212 let mut connection_ids = Vec::new();
213 let mut removed_collaborators = Vec::new();
214 let mut collaborator_ids_to_remove = Vec::new();
215 for collaborator in &collaborators {
216 if !collaborator.connection_lost && collaborator.connection_server_id == server_id {
217 connection_ids.push(collaborator.connection());
218 } else {
219 removed_collaborators.push(proto::RemoveChannelBufferCollaborator {
220 channel_id: channel_id.to_proto(),
221 peer_id: Some(collaborator.connection().into()),
222 });
223 collaborator_ids_to_remove.push(collaborator.id);
224 }
225 }
226
227 channel_buffer_collaborator::Entity::delete_many()
228 .filter(channel_buffer_collaborator::Column::Id.is_in(collaborator_ids_to_remove))
229 .exec(&*tx)
230 .await?;
231
232 Ok(RefreshedChannelBuffer {
233 connection_ids,
234 removed_collaborators,
235 })
236 })
237 .await
238 }
239
240 pub async fn leave_channel_buffer(
241 &self,
242 channel_id: ChannelId,
243 connection: ConnectionId,
244 ) -> Result<Vec<ConnectionId>> {
245 self.transaction(|tx| async move {
246 self.leave_channel_buffer_internal(channel_id, connection, &*tx)
247 .await
248 })
249 .await
250 }
251
252 pub async fn channel_buffer_connection_lost(
253 &self,
254 connection: ConnectionId,
255 tx: &DatabaseTransaction,
256 ) -> Result<()> {
257 channel_buffer_collaborator::Entity::update_many()
258 .filter(
259 Condition::all()
260 .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
261 .add(
262 channel_buffer_collaborator::Column::ConnectionServerId
263 .eq(connection.owner_id as i32),
264 ),
265 )
266 .set(channel_buffer_collaborator::ActiveModel {
267 connection_lost: ActiveValue::set(true),
268 ..Default::default()
269 })
270 .exec(&*tx)
271 .await?;
272 Ok(())
273 }
274
275 pub async fn leave_channel_buffers(
276 &self,
277 connection: ConnectionId,
278 ) -> Result<Vec<(ChannelId, Vec<ConnectionId>)>> {
279 self.transaction(|tx| async move {
280 #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
281 enum QueryChannelIds {
282 ChannelId,
283 }
284
285 let channel_ids: Vec<ChannelId> = channel_buffer_collaborator::Entity::find()
286 .select_only()
287 .column(channel_buffer_collaborator::Column::ChannelId)
288 .filter(Condition::all().add(
289 channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32),
290 ))
291 .into_values::<_, QueryChannelIds>()
292 .all(&*tx)
293 .await?;
294
295 let mut result = Vec::new();
296 for channel_id in channel_ids {
297 let collaborators = self
298 .leave_channel_buffer_internal(channel_id, connection, &*tx)
299 .await?;
300 result.push((channel_id, collaborators));
301 }
302
303 Ok(result)
304 })
305 .await
306 }
307
308 pub async fn leave_channel_buffer_internal(
309 &self,
310 channel_id: ChannelId,
311 connection: ConnectionId,
312 tx: &DatabaseTransaction,
313 ) -> Result<Vec<ConnectionId>> {
314 let result = channel_buffer_collaborator::Entity::delete_many()
315 .filter(
316 Condition::all()
317 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
318 .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
319 .add(
320 channel_buffer_collaborator::Column::ConnectionServerId
321 .eq(connection.owner_id as i32),
322 ),
323 )
324 .exec(&*tx)
325 .await?;
326 if result.rows_affected == 0 {
327 Err(anyhow!("not a collaborator on this project"))?;
328 }
329
330 let mut connections = Vec::new();
331 let mut rows = channel_buffer_collaborator::Entity::find()
332 .filter(
333 Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
334 )
335 .stream(&*tx)
336 .await?;
337 while let Some(row) = rows.next().await {
338 let row = row?;
339 connections.push(ConnectionId {
340 id: row.connection_id as u32,
341 owner_id: row.connection_server_id.0 as u32,
342 });
343 }
344
345 drop(rows);
346
347 if connections.is_empty() {
348 self.snapshot_channel_buffer(channel_id, &tx).await?;
349 }
350
351 Ok(connections)
352 }
353
354 pub async fn get_channel_buffer_collaborators(
355 &self,
356 channel_id: ChannelId,
357 ) -> Result<Vec<UserId>> {
358 self.transaction(|tx| async move {
359 #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
360 enum QueryUserIds {
361 UserId,
362 }
363
364 let users: Vec<UserId> = channel_buffer_collaborator::Entity::find()
365 .select_only()
366 .column(channel_buffer_collaborator::Column::UserId)
367 .filter(
368 Condition::all()
369 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
370 )
371 .into_values::<_, QueryUserIds>()
372 .all(&*tx)
373 .await?;
374
375 Ok(users)
376 })
377 .await
378 }
379
380 pub async fn update_channel_buffer(
381 &self,
382 channel_id: ChannelId,
383 user: UserId,
384 operations: &[proto::Operation],
385 ) -> Result<Vec<ConnectionId>> {
386 self.transaction(move |tx| async move {
387 self.check_user_is_channel_member(channel_id, user, &*tx)
388 .await?;
389
390 let buffer = buffer::Entity::find()
391 .filter(buffer::Column::ChannelId.eq(channel_id))
392 .one(&*tx)
393 .await?
394 .ok_or_else(|| anyhow!("no such buffer"))?;
395
396 let serialization_version = self
397 .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &*tx)
398 .await?;
399
400 let operations = operations
401 .iter()
402 .filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
403 .collect::<Vec<_>>();
404 if !operations.is_empty() {
405 buffer_operation::Entity::insert_many(operations)
406 .on_conflict(
407 OnConflict::columns([
408 buffer_operation::Column::BufferId,
409 buffer_operation::Column::Epoch,
410 buffer_operation::Column::LamportTimestamp,
411 buffer_operation::Column::ReplicaId,
412 ])
413 .do_nothing()
414 .to_owned(),
415 )
416 .exec(&*tx)
417 .await?;
418 }
419
420 let mut connections = Vec::new();
421 let mut rows = channel_buffer_collaborator::Entity::find()
422 .filter(
423 Condition::all()
424 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
425 )
426 .stream(&*tx)
427 .await?;
428 while let Some(row) = rows.next().await {
429 let row = row?;
430 connections.push(ConnectionId {
431 id: row.connection_id as u32,
432 owner_id: row.connection_server_id.0 as u32,
433 });
434 }
435
436 Ok(connections)
437 })
438 .await
439 }
440
441 async fn get_buffer_operation_serialization_version(
442 &self,
443 buffer_id: BufferId,
444 epoch: i32,
445 tx: &DatabaseTransaction,
446 ) -> Result<i32> {
447 Ok(buffer_snapshot::Entity::find()
448 .filter(buffer_snapshot::Column::BufferId.eq(buffer_id))
449 .filter(buffer_snapshot::Column::Epoch.eq(epoch))
450 .select_only()
451 .column(buffer_snapshot::Column::OperationSerializationVersion)
452 .into_values::<_, QueryOperationSerializationVersion>()
453 .one(&*tx)
454 .await?
455 .ok_or_else(|| anyhow!("missing buffer snapshot"))?)
456 }
457
458 async fn get_channel_buffer(
459 &self,
460 channel_id: ChannelId,
461 tx: &DatabaseTransaction,
462 ) -> Result<buffer::Model> {
463 Ok(channel::Model {
464 id: channel_id,
465 ..Default::default()
466 }
467 .find_related(buffer::Entity)
468 .one(&*tx)
469 .await?
470 .ok_or_else(|| anyhow!("no such buffer"))?)
471 }
472
473 async fn get_buffer_state(
474 &self,
475 buffer: &buffer::Model,
476 tx: &DatabaseTransaction,
477 ) -> Result<(String, Vec<proto::Operation>)> {
478 let id = buffer.id;
479 let (base_text, version) = if buffer.epoch > 0 {
480 let snapshot = buffer_snapshot::Entity::find()
481 .filter(
482 buffer_snapshot::Column::BufferId
483 .eq(id)
484 .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
485 )
486 .one(&*tx)
487 .await?
488 .ok_or_else(|| anyhow!("no such snapshot"))?;
489
490 let version = snapshot.operation_serialization_version;
491 (snapshot.text, version)
492 } else {
493 (String::new(), storage::SERIALIZATION_VERSION)
494 };
495
496 let mut rows = buffer_operation::Entity::find()
497 .filter(
498 buffer_operation::Column::BufferId
499 .eq(id)
500 .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
501 )
502 .stream(&*tx)
503 .await?;
504 let mut operations = Vec::new();
505 while let Some(row) = rows.next().await {
506 operations.push(proto::Operation {
507 variant: Some(operation_from_storage(row?, version)?),
508 })
509 }
510
511 Ok((base_text, operations))
512 }
513
514 async fn snapshot_channel_buffer(
515 &self,
516 channel_id: ChannelId,
517 tx: &DatabaseTransaction,
518 ) -> Result<()> {
519 let buffer = self.get_channel_buffer(channel_id, tx).await?;
520 let (base_text, operations) = self.get_buffer_state(&buffer, tx).await?;
521 if operations.is_empty() {
522 return Ok(());
523 }
524
525 let mut text_buffer = text::Buffer::new(0, 0, base_text);
526 text_buffer
527 .apply_ops(operations.into_iter().filter_map(operation_from_wire))
528 .unwrap();
529
530 let base_text = text_buffer.text();
531 let epoch = buffer.epoch + 1;
532
533 buffer_snapshot::Model {
534 buffer_id: buffer.id,
535 epoch,
536 text: base_text,
537 operation_serialization_version: storage::SERIALIZATION_VERSION,
538 }
539 .into_active_model()
540 .insert(tx)
541 .await?;
542
543 buffer::ActiveModel {
544 id: ActiveValue::Unchanged(buffer.id),
545 epoch: ActiveValue::Set(epoch),
546 ..Default::default()
547 }
548 .save(tx)
549 .await?;
550
551 Ok(())
552 }
553}
554
555fn operation_to_storage(
556 operation: &proto::Operation,
557 buffer: &buffer::Model,
558 _format: i32,
559) -> Option<buffer_operation::ActiveModel> {
560 let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? {
561 proto::operation::Variant::Edit(operation) => (
562 operation.replica_id,
563 operation.lamport_timestamp,
564 storage::Operation {
565 version: version_to_storage(&operation.version),
566 is_undo: false,
567 edit_ranges: operation
568 .ranges
569 .iter()
570 .map(|range| storage::Range {
571 start: range.start,
572 end: range.end,
573 })
574 .collect(),
575 edit_texts: operation.new_text.clone(),
576 undo_counts: Vec::new(),
577 },
578 ),
579 proto::operation::Variant::Undo(operation) => (
580 operation.replica_id,
581 operation.lamport_timestamp,
582 storage::Operation {
583 version: version_to_storage(&operation.version),
584 is_undo: true,
585 edit_ranges: Vec::new(),
586 edit_texts: Vec::new(),
587 undo_counts: operation
588 .counts
589 .iter()
590 .map(|entry| storage::UndoCount {
591 replica_id: entry.replica_id,
592 lamport_timestamp: entry.lamport_timestamp,
593 count: entry.count,
594 })
595 .collect(),
596 },
597 ),
598 _ => None?,
599 };
600
601 Some(buffer_operation::ActiveModel {
602 buffer_id: ActiveValue::Set(buffer.id),
603 epoch: ActiveValue::Set(buffer.epoch),
604 replica_id: ActiveValue::Set(replica_id as i32),
605 lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32),
606 value: ActiveValue::Set(value.encode_to_vec()),
607 })
608}
609
610fn operation_from_storage(
611 row: buffer_operation::Model,
612 _format_version: i32,
613) -> Result<proto::operation::Variant, Error> {
614 let operation =
615 storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{}", error))?;
616 let version = version_from_storage(&operation.version);
617 Ok(if operation.is_undo {
618 proto::operation::Variant::Undo(proto::operation::Undo {
619 replica_id: row.replica_id as u32,
620 lamport_timestamp: row.lamport_timestamp as u32,
621 version,
622 counts: operation
623 .undo_counts
624 .iter()
625 .map(|entry| proto::UndoCount {
626 replica_id: entry.replica_id,
627 lamport_timestamp: entry.lamport_timestamp,
628 count: entry.count,
629 })
630 .collect(),
631 })
632 } else {
633 proto::operation::Variant::Edit(proto::operation::Edit {
634 replica_id: row.replica_id as u32,
635 lamport_timestamp: row.lamport_timestamp as u32,
636 version,
637 ranges: operation
638 .edit_ranges
639 .into_iter()
640 .map(|range| proto::Range {
641 start: range.start,
642 end: range.end,
643 })
644 .collect(),
645 new_text: operation.edit_texts,
646 })
647 })
648}
649
650fn version_to_storage(version: &Vec<proto::VectorClockEntry>) -> Vec<storage::VectorClockEntry> {
651 version
652 .iter()
653 .map(|entry| storage::VectorClockEntry {
654 replica_id: entry.replica_id,
655 timestamp: entry.timestamp,
656 })
657 .collect()
658}
659
660fn version_from_storage(version: &Vec<storage::VectorClockEntry>) -> Vec<proto::VectorClockEntry> {
661 version
662 .iter()
663 .map(|entry| proto::VectorClockEntry {
664 replica_id: entry.replica_id,
665 timestamp: entry.timestamp,
666 })
667 .collect()
668}
669
670// This is currently a manual copy of the deserialization code in the client's langauge crate
671pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operation> {
672 match operation.variant? {
673 proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
674 timestamp: clock::Lamport {
675 replica_id: edit.replica_id as text::ReplicaId,
676 value: edit.lamport_timestamp,
677 },
678 version: version_from_wire(&edit.version),
679 ranges: edit
680 .ranges
681 .into_iter()
682 .map(|range| {
683 text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
684 })
685 .collect(),
686 new_text: edit.new_text.into_iter().map(Arc::from).collect(),
687 })),
688 proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation {
689 timestamp: clock::Lamport {
690 replica_id: undo.replica_id as text::ReplicaId,
691 value: undo.lamport_timestamp,
692 },
693 version: version_from_wire(&undo.version),
694 counts: undo
695 .counts
696 .into_iter()
697 .map(|c| {
698 (
699 clock::Lamport {
700 replica_id: c.replica_id as text::ReplicaId,
701 value: c.lamport_timestamp,
702 },
703 c.count,
704 )
705 })
706 .collect(),
707 })),
708 _ => None,
709 }
710}
711
712fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
713 let mut version = clock::Global::new();
714 for entry in message {
715 version.observe(clock::Lamport {
716 replica_id: entry.replica_id as text::ReplicaId,
717 value: entry.timestamp,
718 });
719 }
720 version
721}
722
723fn version_to_wire(version: &clock::Global) -> Vec<proto::VectorClockEntry> {
724 let mut message = Vec::new();
725 for entry in version.iter() {
726 message.push(proto::VectorClockEntry {
727 replica_id: entry.replica_id as u32,
728 timestamp: entry.value,
729 });
730 }
731 message
732}
733
734#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
735enum QueryOperationSerializationVersion {
736 OperationSerializationVersion,
737}
738
739mod storage {
740 #![allow(non_snake_case)]
741 use prost::Message;
742 pub const SERIALIZATION_VERSION: i32 = 1;
743
744 #[derive(Message)]
745 pub struct Operation {
746 #[prost(message, repeated, tag = "2")]
747 pub version: Vec<VectorClockEntry>,
748 #[prost(bool, tag = "3")]
749 pub is_undo: bool,
750 #[prost(message, repeated, tag = "4")]
751 pub edit_ranges: Vec<Range>,
752 #[prost(string, repeated, tag = "5")]
753 pub edit_texts: Vec<String>,
754 #[prost(message, repeated, tag = "6")]
755 pub undo_counts: Vec<UndoCount>,
756 }
757
758 #[derive(Message)]
759 pub struct VectorClockEntry {
760 #[prost(uint32, tag = "1")]
761 pub replica_id: u32,
762 #[prost(uint32, tag = "2")]
763 pub timestamp: u32,
764 }
765
766 #[derive(Message)]
767 pub struct Range {
768 #[prost(uint64, tag = "1")]
769 pub start: u64,
770 #[prost(uint64, tag = "2")]
771 pub end: u64,
772 }
773
774 #[derive(Message)]
775 pub struct UndoCount {
776 #[prost(uint32, tag = "1")]
777 pub replica_id: u32,
778 #[prost(uint32, tag = "2")]
779 pub lamport_timestamp: u32,
780 #[prost(uint32, tag = "3")]
781 pub count: u32,
782 }
783}