1use super::*;
2use prost::Message;
3use text::{EditOperation, UndoOperation};
4
5pub struct LeftChannelBuffer {
6 pub channel_id: ChannelId,
7 pub collaborators: Vec<proto::Collaborator>,
8 pub connections: Vec<ConnectionId>,
9}
10
11impl Database {
12 /// Open a channel buffer. Returns the current contents, and adds you to the list of people
13 /// to notify on changes.
14 pub async fn join_channel_buffer(
15 &self,
16 channel_id: ChannelId,
17 user_id: UserId,
18 connection: ConnectionId,
19 ) -> Result<proto::JoinChannelBufferResponse> {
20 self.transaction(|tx| async move {
21 let channel = self.get_channel_internal(channel_id, &tx).await?;
22 self.check_user_is_channel_participant(&channel, user_id, &tx)
23 .await?;
24
25 let buffer = channel::Model {
26 id: channel_id,
27 ..Default::default()
28 }
29 .find_related(buffer::Entity)
30 .one(&*tx)
31 .await?;
32
33 let buffer = if let Some(buffer) = buffer {
34 buffer
35 } else {
36 let buffer = buffer::ActiveModel {
37 channel_id: ActiveValue::Set(channel_id),
38 ..Default::default()
39 }
40 .insert(&*tx)
41 .await?;
42 buffer_snapshot::ActiveModel {
43 buffer_id: ActiveValue::Set(buffer.id),
44 epoch: ActiveValue::Set(0),
45 text: ActiveValue::Set(String::new()),
46 operation_serialization_version: ActiveValue::Set(
47 storage::SERIALIZATION_VERSION,
48 ),
49 }
50 .insert(&*tx)
51 .await?;
52 buffer
53 };
54
55 // Join the collaborators
56 let mut collaborators = channel_buffer_collaborator::Entity::find()
57 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
58 .all(&*tx)
59 .await?;
60 let replica_ids = collaborators
61 .iter()
62 .map(|c| c.replica_id)
63 .collect::<HashSet<_>>();
64 let mut replica_id = ReplicaId(0);
65 while replica_ids.contains(&replica_id) {
66 replica_id.0 += 1;
67 }
68 let collaborator = channel_buffer_collaborator::ActiveModel {
69 channel_id: ActiveValue::Set(channel_id),
70 connection_id: ActiveValue::Set(connection.id as i32),
71 connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
72 user_id: ActiveValue::Set(user_id),
73 replica_id: ActiveValue::Set(replica_id),
74 ..Default::default()
75 }
76 .insert(&*tx)
77 .await?;
78 collaborators.push(collaborator);
79
80 let (base_text, operations, max_operation) =
81 self.get_buffer_state(&buffer, &tx).await?;
82
83 // Save the last observed operation
84 if let Some(op) = max_operation {
85 observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
86 user_id: ActiveValue::Set(user_id),
87 buffer_id: ActiveValue::Set(buffer.id),
88 epoch: ActiveValue::Set(op.epoch),
89 lamport_timestamp: ActiveValue::Set(op.lamport_timestamp),
90 replica_id: ActiveValue::Set(op.replica_id),
91 })
92 .on_conflict(
93 OnConflict::columns([
94 observed_buffer_edits::Column::UserId,
95 observed_buffer_edits::Column::BufferId,
96 ])
97 .update_columns([
98 observed_buffer_edits::Column::Epoch,
99 observed_buffer_edits::Column::LamportTimestamp,
100 ])
101 .to_owned(),
102 )
103 .exec(&*tx)
104 .await?;
105 }
106
107 Ok(proto::JoinChannelBufferResponse {
108 buffer_id: buffer.id.to_proto(),
109 replica_id: replica_id.to_proto() as u32,
110 base_text,
111 operations,
112 epoch: buffer.epoch as u64,
113 collaborators: collaborators
114 .into_iter()
115 .map(|collaborator| proto::Collaborator {
116 peer_id: Some(collaborator.connection().into()),
117 user_id: collaborator.user_id.to_proto(),
118 replica_id: collaborator.replica_id.0 as u32,
119 })
120 .collect(),
121 })
122 })
123 .await
124 }
125
126 /// Rejoin a channel buffer (after a connection interruption)
127 pub async fn rejoin_channel_buffers(
128 &self,
129 buffers: &[proto::ChannelBufferVersion],
130 user_id: UserId,
131 connection_id: ConnectionId,
132 ) -> Result<Vec<RejoinedChannelBuffer>> {
133 self.transaction(|tx| async move {
134 let mut results = Vec::new();
135 for client_buffer in buffers {
136 let channel = self
137 .get_channel_internal(ChannelId::from_proto(client_buffer.channel_id), &tx)
138 .await?;
139 if self
140 .check_user_is_channel_participant(&channel, user_id, &tx)
141 .await
142 .is_err()
143 {
144 log::info!("user is not a member of channel");
145 continue;
146 }
147
148 let buffer = self.get_channel_buffer(channel.id, &tx).await?;
149 let mut collaborators = channel_buffer_collaborator::Entity::find()
150 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel.id))
151 .all(&*tx)
152 .await?;
153
154 // If the buffer epoch hasn't changed since the client lost
155 // connection, then the client's buffer can be synchronized with
156 // the server's buffer.
157 if buffer.epoch as u64 != client_buffer.epoch {
158 log::info!("can't rejoin buffer, epoch has changed");
159 continue;
160 }
161
162 // Find the collaborator record for this user's previous lost
163 // connection. Update it with the new connection id.
164 let Some(self_collaborator) =
165 collaborators.iter_mut().find(|c| c.user_id == user_id)
166 else {
167 log::info!("can't rejoin buffer, no previous collaborator found");
168 continue;
169 };
170 let old_connection_id = self_collaborator.connection();
171 *self_collaborator = channel_buffer_collaborator::ActiveModel {
172 id: ActiveValue::Unchanged(self_collaborator.id),
173 connection_id: ActiveValue::Set(connection_id.id as i32),
174 connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
175 connection_lost: ActiveValue::Set(false),
176 ..Default::default()
177 }
178 .update(&*tx)
179 .await?;
180
181 let client_version = version_from_wire(&client_buffer.version);
182 let serialization_version = self
183 .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
184 .await?;
185
186 let mut rows = buffer_operation::Entity::find()
187 .filter(
188 buffer_operation::Column::BufferId
189 .eq(buffer.id)
190 .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
191 )
192 .stream(&*tx)
193 .await?;
194
195 // Find the server's version vector and any operations
196 // that the client has not seen.
197 let mut server_version = clock::Global::new();
198 let mut operations = Vec::new();
199 while let Some(row) = rows.next().await {
200 let row = row?;
201 let timestamp = clock::Lamport {
202 replica_id: row.replica_id as u16,
203 value: row.lamport_timestamp as u32,
204 };
205 server_version.observe(timestamp);
206 if !client_version.observed(timestamp) {
207 operations.push(proto::Operation {
208 variant: Some(operation_from_storage(row, serialization_version)?),
209 })
210 }
211 }
212
213 results.push(RejoinedChannelBuffer {
214 old_connection_id,
215 buffer: proto::RejoinedChannelBuffer {
216 channel_id: client_buffer.channel_id,
217 version: version_to_wire(&server_version),
218 operations,
219 collaborators: collaborators
220 .into_iter()
221 .map(|collaborator| proto::Collaborator {
222 peer_id: Some(collaborator.connection().into()),
223 user_id: collaborator.user_id.to_proto(),
224 replica_id: collaborator.replica_id.0 as u32,
225 })
226 .collect(),
227 },
228 });
229 }
230
231 Ok(results)
232 })
233 .await
234 }
235
236 /// Clear out any buffer collaborators who are no longer collaborating.
237 pub async fn clear_stale_channel_buffer_collaborators(
238 &self,
239 channel_id: ChannelId,
240 server_id: ServerId,
241 ) -> Result<RefreshedChannelBuffer> {
242 self.transaction(|tx| async move {
243 let db_collaborators = channel_buffer_collaborator::Entity::find()
244 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
245 .all(&*tx)
246 .await?;
247
248 let mut connection_ids = Vec::new();
249 let mut collaborators = Vec::new();
250 let mut collaborator_ids_to_remove = Vec::new();
251 for db_collaborator in &db_collaborators {
252 if !db_collaborator.connection_lost
253 && db_collaborator.connection_server_id == server_id
254 {
255 connection_ids.push(db_collaborator.connection());
256 collaborators.push(proto::Collaborator {
257 peer_id: Some(db_collaborator.connection().into()),
258 replica_id: db_collaborator.replica_id.0 as u32,
259 user_id: db_collaborator.user_id.to_proto(),
260 })
261 } else {
262 collaborator_ids_to_remove.push(db_collaborator.id);
263 }
264 }
265
266 channel_buffer_collaborator::Entity::delete_many()
267 .filter(channel_buffer_collaborator::Column::Id.is_in(collaborator_ids_to_remove))
268 .exec(&*tx)
269 .await?;
270
271 Ok(RefreshedChannelBuffer {
272 connection_ids,
273 collaborators,
274 })
275 })
276 .await
277 }
278
279 /// Close the channel buffer, and stop receiving updates for it.
280 pub async fn leave_channel_buffer(
281 &self,
282 channel_id: ChannelId,
283 connection: ConnectionId,
284 ) -> Result<LeftChannelBuffer> {
285 self.transaction(|tx| async move {
286 self.leave_channel_buffer_internal(channel_id, connection, &tx)
287 .await
288 })
289 .await
290 }
291
292 /// Close the channel buffer, and stop receiving updates for it.
293 pub async fn channel_buffer_connection_lost(
294 &self,
295 connection: ConnectionId,
296 tx: &DatabaseTransaction,
297 ) -> Result<()> {
298 channel_buffer_collaborator::Entity::update_many()
299 .filter(
300 Condition::all()
301 .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
302 .add(
303 channel_buffer_collaborator::Column::ConnectionServerId
304 .eq(connection.owner_id as i32),
305 ),
306 )
307 .set(channel_buffer_collaborator::ActiveModel {
308 connection_lost: ActiveValue::set(true),
309 ..Default::default()
310 })
311 .exec(tx)
312 .await?;
313 Ok(())
314 }
315
316 /// Close all open channel buffers
317 pub async fn leave_channel_buffers(
318 &self,
319 connection: ConnectionId,
320 ) -> Result<Vec<LeftChannelBuffer>> {
321 self.transaction(|tx| async move {
322 #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
323 enum QueryChannelIds {
324 ChannelId,
325 }
326
327 let channel_ids: Vec<ChannelId> = channel_buffer_collaborator::Entity::find()
328 .select_only()
329 .column(channel_buffer_collaborator::Column::ChannelId)
330 .filter(Condition::all().add(
331 channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32),
332 ))
333 .into_values::<_, QueryChannelIds>()
334 .all(&*tx)
335 .await?;
336
337 let mut result = Vec::new();
338 for channel_id in channel_ids {
339 let left_channel_buffer = self
340 .leave_channel_buffer_internal(channel_id, connection, &tx)
341 .await?;
342 result.push(left_channel_buffer);
343 }
344
345 Ok(result)
346 })
347 .await
348 }
349
350 async fn leave_channel_buffer_internal(
351 &self,
352 channel_id: ChannelId,
353 connection: ConnectionId,
354 tx: &DatabaseTransaction,
355 ) -> Result<LeftChannelBuffer> {
356 let result = channel_buffer_collaborator::Entity::delete_many()
357 .filter(
358 Condition::all()
359 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
360 .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
361 .add(
362 channel_buffer_collaborator::Column::ConnectionServerId
363 .eq(connection.owner_id as i32),
364 ),
365 )
366 .exec(tx)
367 .await?;
368 if result.rows_affected == 0 {
369 Err(anyhow!("not a collaborator on this project"))?;
370 }
371
372 let mut collaborators = Vec::new();
373 let mut connections = Vec::new();
374 let mut rows = channel_buffer_collaborator::Entity::find()
375 .filter(
376 Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
377 )
378 .stream(tx)
379 .await?;
380 while let Some(row) = rows.next().await {
381 let row = row?;
382 let connection = row.connection();
383 connections.push(connection);
384 collaborators.push(proto::Collaborator {
385 peer_id: Some(connection.into()),
386 replica_id: row.replica_id.0 as u32,
387 user_id: row.user_id.to_proto(),
388 });
389 }
390
391 drop(rows);
392
393 if collaborators.is_empty() {
394 self.snapshot_channel_buffer(channel_id, &tx).await?;
395 }
396
397 Ok(LeftChannelBuffer {
398 channel_id,
399 collaborators,
400 connections,
401 })
402 }
403
404 pub async fn get_channel_buffer_collaborators(
405 &self,
406 channel_id: ChannelId,
407 ) -> Result<Vec<UserId>> {
408 self.transaction(|tx| async move {
409 self.get_channel_buffer_collaborators_internal(channel_id, &tx)
410 .await
411 })
412 .await
413 }
414
415 async fn get_channel_buffer_collaborators_internal(
416 &self,
417 channel_id: ChannelId,
418 tx: &DatabaseTransaction,
419 ) -> Result<Vec<UserId>> {
420 #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
421 enum QueryUserIds {
422 UserId,
423 }
424
425 let users: Vec<UserId> = channel_buffer_collaborator::Entity::find()
426 .select_only()
427 .column(channel_buffer_collaborator::Column::UserId)
428 .filter(
429 Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
430 )
431 .into_values::<_, QueryUserIds>()
432 .all(tx)
433 .await?;
434
435 Ok(users)
436 }
437
438 pub async fn update_channel_buffer(
439 &self,
440 channel_id: ChannelId,
441 user: UserId,
442 operations: &[proto::Operation],
443 ) -> Result<(
444 Vec<ConnectionId>,
445 Vec<UserId>,
446 i32,
447 Vec<proto::VectorClockEntry>,
448 )> {
449 self.transaction(move |tx| async move {
450 let channel = self.get_channel_internal(channel_id, &tx).await?;
451
452 let mut requires_write_permission = false;
453 for op in operations.iter() {
454 match op.variant {
455 None | Some(proto::operation::Variant::UpdateSelections(_)) => {}
456 Some(_) => requires_write_permission = true,
457 }
458 }
459 if requires_write_permission {
460 self.check_user_is_channel_member(&channel, user, &tx)
461 .await?;
462 } else {
463 self.check_user_is_channel_participant(&channel, user, &tx)
464 .await?;
465 }
466
467 let buffer = buffer::Entity::find()
468 .filter(buffer::Column::ChannelId.eq(channel_id))
469 .one(&*tx)
470 .await?
471 .ok_or_else(|| anyhow!("no such buffer"))?;
472
473 let serialization_version = self
474 .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
475 .await?;
476
477 let operations = operations
478 .iter()
479 .filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
480 .collect::<Vec<_>>();
481
482 let mut channel_members;
483 let max_version;
484
485 if !operations.is_empty() {
486 let max_operation = operations
487 .iter()
488 .max_by_key(|op| (op.lamport_timestamp.as_ref(), op.replica_id.as_ref()))
489 .unwrap();
490
491 max_version = vec![proto::VectorClockEntry {
492 replica_id: *max_operation.replica_id.as_ref() as u32,
493 timestamp: *max_operation.lamport_timestamp.as_ref() as u32,
494 }];
495
496 // get current channel participants and save the max operation above
497 self.save_max_operation(
498 user,
499 buffer.id,
500 buffer.epoch,
501 *max_operation.replica_id.as_ref(),
502 *max_operation.lamport_timestamp.as_ref(),
503 &tx,
504 )
505 .await?;
506
507 channel_members = self.get_channel_participants(&channel, &tx).await?;
508 let collaborators = self
509 .get_channel_buffer_collaborators_internal(channel_id, &tx)
510 .await?;
511 channel_members.retain(|member| !collaborators.contains(member));
512
513 buffer_operation::Entity::insert_many(operations)
514 .on_conflict(
515 OnConflict::columns([
516 buffer_operation::Column::BufferId,
517 buffer_operation::Column::Epoch,
518 buffer_operation::Column::LamportTimestamp,
519 buffer_operation::Column::ReplicaId,
520 ])
521 .do_nothing()
522 .to_owned(),
523 )
524 .exec(&*tx)
525 .await?;
526 } else {
527 channel_members = Vec::new();
528 max_version = Vec::new();
529 }
530
531 let mut connections = Vec::new();
532 let mut rows = channel_buffer_collaborator::Entity::find()
533 .filter(
534 Condition::all()
535 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
536 )
537 .stream(&*tx)
538 .await?;
539 while let Some(row) = rows.next().await {
540 let row = row?;
541 connections.push(ConnectionId {
542 id: row.connection_id as u32,
543 owner_id: row.connection_server_id.0 as u32,
544 });
545 }
546
547 Ok((connections, channel_members, buffer.epoch, max_version))
548 })
549 .await
550 }
551
552 async fn save_max_operation(
553 &self,
554 user_id: UserId,
555 buffer_id: BufferId,
556 epoch: i32,
557 replica_id: i32,
558 lamport_timestamp: i32,
559 tx: &DatabaseTransaction,
560 ) -> Result<()> {
561 buffer::Entity::update(buffer::ActiveModel {
562 id: ActiveValue::Unchanged(buffer_id),
563 epoch: ActiveValue::Unchanged(epoch),
564 latest_operation_epoch: ActiveValue::Set(Some(epoch)),
565 latest_operation_replica_id: ActiveValue::Set(Some(replica_id)),
566 latest_operation_lamport_timestamp: ActiveValue::Set(Some(lamport_timestamp)),
567 channel_id: ActiveValue::NotSet,
568 })
569 .exec(tx)
570 .await?;
571
572 use observed_buffer_edits::Column;
573 observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
574 user_id: ActiveValue::Set(user_id),
575 buffer_id: ActiveValue::Set(buffer_id),
576 epoch: ActiveValue::Set(epoch),
577 replica_id: ActiveValue::Set(replica_id),
578 lamport_timestamp: ActiveValue::Set(lamport_timestamp),
579 })
580 .on_conflict(
581 OnConflict::columns([Column::UserId, Column::BufferId])
582 .update_columns([Column::Epoch, Column::LamportTimestamp, Column::ReplicaId])
583 .action_cond_where(
584 Condition::any().add(Column::Epoch.lt(epoch)).add(
585 Condition::all().add(Column::Epoch.eq(epoch)).add(
586 Condition::any()
587 .add(Column::LamportTimestamp.lt(lamport_timestamp))
588 .add(
589 Column::LamportTimestamp
590 .eq(lamport_timestamp)
591 .and(Column::ReplicaId.lt(replica_id)),
592 ),
593 ),
594 ),
595 )
596 .to_owned(),
597 )
598 .exec_without_returning(tx)
599 .await?;
600
601 Ok(())
602 }
603
604 async fn get_buffer_operation_serialization_version(
605 &self,
606 buffer_id: BufferId,
607 epoch: i32,
608 tx: &DatabaseTransaction,
609 ) -> Result<i32> {
610 Ok(buffer_snapshot::Entity::find()
611 .filter(buffer_snapshot::Column::BufferId.eq(buffer_id))
612 .filter(buffer_snapshot::Column::Epoch.eq(epoch))
613 .select_only()
614 .column(buffer_snapshot::Column::OperationSerializationVersion)
615 .into_values::<_, QueryOperationSerializationVersion>()
616 .one(tx)
617 .await?
618 .ok_or_else(|| anyhow!("missing buffer snapshot"))?)
619 }
620
621 pub async fn get_channel_buffer(
622 &self,
623 channel_id: ChannelId,
624 tx: &DatabaseTransaction,
625 ) -> Result<buffer::Model> {
626 Ok(channel::Model {
627 id: channel_id,
628 ..Default::default()
629 }
630 .find_related(buffer::Entity)
631 .one(tx)
632 .await?
633 .ok_or_else(|| anyhow!("no such buffer"))?)
634 }
635
636 async fn get_buffer_state(
637 &self,
638 buffer: &buffer::Model,
639 tx: &DatabaseTransaction,
640 ) -> Result<(
641 String,
642 Vec<proto::Operation>,
643 Option<buffer_operation::Model>,
644 )> {
645 let id = buffer.id;
646 let (base_text, version) = if buffer.epoch > 0 {
647 let snapshot = buffer_snapshot::Entity::find()
648 .filter(
649 buffer_snapshot::Column::BufferId
650 .eq(id)
651 .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
652 )
653 .one(tx)
654 .await?
655 .ok_or_else(|| anyhow!("no such snapshot"))?;
656
657 let version = snapshot.operation_serialization_version;
658 (snapshot.text, version)
659 } else {
660 (String::new(), storage::SERIALIZATION_VERSION)
661 };
662
663 let mut rows = buffer_operation::Entity::find()
664 .filter(
665 buffer_operation::Column::BufferId
666 .eq(id)
667 .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
668 )
669 .order_by_asc(buffer_operation::Column::LamportTimestamp)
670 .order_by_asc(buffer_operation::Column::ReplicaId)
671 .stream(tx)
672 .await?;
673
674 let mut operations = Vec::new();
675 let mut last_row = None;
676 while let Some(row) = rows.next().await {
677 let row = row?;
678 last_row = Some(buffer_operation::Model {
679 buffer_id: row.buffer_id,
680 epoch: row.epoch,
681 lamport_timestamp: row.lamport_timestamp,
682 replica_id: row.replica_id,
683 value: Default::default(),
684 });
685 operations.push(proto::Operation {
686 variant: Some(operation_from_storage(row, version)?),
687 });
688 }
689
690 Ok((base_text, operations, last_row))
691 }
692
693 async fn snapshot_channel_buffer(
694 &self,
695 channel_id: ChannelId,
696 tx: &DatabaseTransaction,
697 ) -> Result<()> {
698 let buffer = self.get_channel_buffer(channel_id, tx).await?;
699 let (base_text, operations, _) = self.get_buffer_state(&buffer, tx).await?;
700 if operations.is_empty() {
701 return Ok(());
702 }
703
704 let mut text_buffer = text::Buffer::new(0, text::BufferId::new(1).unwrap(), base_text);
705 text_buffer
706 .apply_ops(operations.into_iter().filter_map(operation_from_wire))
707 .unwrap();
708
709 let base_text = text_buffer.text();
710 let epoch = buffer.epoch + 1;
711
712 buffer_snapshot::Model {
713 buffer_id: buffer.id,
714 epoch,
715 text: base_text,
716 operation_serialization_version: storage::SERIALIZATION_VERSION,
717 }
718 .into_active_model()
719 .insert(tx)
720 .await?;
721
722 buffer::ActiveModel {
723 id: ActiveValue::Unchanged(buffer.id),
724 epoch: ActiveValue::Set(epoch),
725 latest_operation_epoch: ActiveValue::NotSet,
726 latest_operation_replica_id: ActiveValue::NotSet,
727 latest_operation_lamport_timestamp: ActiveValue::NotSet,
728 channel_id: ActiveValue::NotSet,
729 }
730 .save(tx)
731 .await?;
732
733 Ok(())
734 }
735
736 pub async fn observe_buffer_version(
737 &self,
738 buffer_id: BufferId,
739 user_id: UserId,
740 epoch: i32,
741 version: &[proto::VectorClockEntry],
742 ) -> Result<()> {
743 self.transaction(|tx| async move {
744 // For now, combine concurrent operations.
745 let Some(component) = version.iter().max_by_key(|version| version.timestamp) else {
746 return Ok(());
747 };
748 self.save_max_operation(
749 user_id,
750 buffer_id,
751 epoch,
752 component.replica_id as i32,
753 component.timestamp as i32,
754 &tx,
755 )
756 .await?;
757 Ok(())
758 })
759 .await
760 }
761
762 pub async fn observed_channel_buffer_changes(
763 &self,
764 channel_ids_by_buffer_id: &HashMap<BufferId, ChannelId>,
765 user_id: UserId,
766 tx: &DatabaseTransaction,
767 ) -> Result<Vec<proto::ChannelBufferVersion>> {
768 let observed_operations = observed_buffer_edits::Entity::find()
769 .filter(observed_buffer_edits::Column::UserId.eq(user_id))
770 .filter(
771 observed_buffer_edits::Column::BufferId
772 .is_in(channel_ids_by_buffer_id.keys().copied()),
773 )
774 .all(tx)
775 .await?;
776
777 Ok(observed_operations
778 .iter()
779 .flat_map(|op| {
780 Some(proto::ChannelBufferVersion {
781 channel_id: channel_ids_by_buffer_id.get(&op.buffer_id)?.to_proto(),
782 epoch: op.epoch as u64,
783 version: vec![proto::VectorClockEntry {
784 replica_id: op.replica_id as u32,
785 timestamp: op.lamport_timestamp as u32,
786 }],
787 })
788 })
789 .collect())
790 }
791}
792
793fn operation_to_storage(
794 operation: &proto::Operation,
795 buffer: &buffer::Model,
796 _format: i32,
797) -> Option<buffer_operation::ActiveModel> {
798 let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? {
799 proto::operation::Variant::Edit(operation) => (
800 operation.replica_id,
801 operation.lamport_timestamp,
802 storage::Operation {
803 version: version_to_storage(&operation.version),
804 is_undo: false,
805 edit_ranges: operation
806 .ranges
807 .iter()
808 .map(|range| storage::Range {
809 start: range.start,
810 end: range.end,
811 })
812 .collect(),
813 edit_texts: operation.new_text.clone(),
814 undo_counts: Vec::new(),
815 },
816 ),
817 proto::operation::Variant::Undo(operation) => (
818 operation.replica_id,
819 operation.lamport_timestamp,
820 storage::Operation {
821 version: version_to_storage(&operation.version),
822 is_undo: true,
823 edit_ranges: Vec::new(),
824 edit_texts: Vec::new(),
825 undo_counts: operation
826 .counts
827 .iter()
828 .map(|entry| storage::UndoCount {
829 replica_id: entry.replica_id,
830 lamport_timestamp: entry.lamport_timestamp,
831 count: entry.count,
832 })
833 .collect(),
834 },
835 ),
836 _ => None?,
837 };
838
839 Some(buffer_operation::ActiveModel {
840 buffer_id: ActiveValue::Set(buffer.id),
841 epoch: ActiveValue::Set(buffer.epoch),
842 replica_id: ActiveValue::Set(replica_id as i32),
843 lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32),
844 value: ActiveValue::Set(value.encode_to_vec()),
845 })
846}
847
848fn operation_from_storage(
849 row: buffer_operation::Model,
850 _format_version: i32,
851) -> Result<proto::operation::Variant, Error> {
852 let operation =
853 storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{}", error))?;
854 let version = version_from_storage(&operation.version);
855 Ok(if operation.is_undo {
856 proto::operation::Variant::Undo(proto::operation::Undo {
857 replica_id: row.replica_id as u32,
858 lamport_timestamp: row.lamport_timestamp as u32,
859 version,
860 counts: operation
861 .undo_counts
862 .iter()
863 .map(|entry| proto::UndoCount {
864 replica_id: entry.replica_id,
865 lamport_timestamp: entry.lamport_timestamp,
866 count: entry.count,
867 })
868 .collect(),
869 })
870 } else {
871 proto::operation::Variant::Edit(proto::operation::Edit {
872 replica_id: row.replica_id as u32,
873 lamport_timestamp: row.lamport_timestamp as u32,
874 version,
875 ranges: operation
876 .edit_ranges
877 .into_iter()
878 .map(|range| proto::Range {
879 start: range.start,
880 end: range.end,
881 })
882 .collect(),
883 new_text: operation.edit_texts,
884 })
885 })
886}
887
888fn version_to_storage(version: &Vec<proto::VectorClockEntry>) -> Vec<storage::VectorClockEntry> {
889 version
890 .iter()
891 .map(|entry| storage::VectorClockEntry {
892 replica_id: entry.replica_id,
893 timestamp: entry.timestamp,
894 })
895 .collect()
896}
897
898fn version_from_storage(version: &Vec<storage::VectorClockEntry>) -> Vec<proto::VectorClockEntry> {
899 version
900 .iter()
901 .map(|entry| proto::VectorClockEntry {
902 replica_id: entry.replica_id,
903 timestamp: entry.timestamp,
904 })
905 .collect()
906}
907
908// This is currently a manual copy of the deserialization code in the client's language crate
909pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operation> {
910 match operation.variant? {
911 proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
912 timestamp: clock::Lamport {
913 replica_id: edit.replica_id as text::ReplicaId,
914 value: edit.lamport_timestamp,
915 },
916 version: version_from_wire(&edit.version),
917 ranges: edit
918 .ranges
919 .into_iter()
920 .map(|range| {
921 text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
922 })
923 .collect(),
924 new_text: edit.new_text.into_iter().map(Arc::from).collect(),
925 })),
926 proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation {
927 timestamp: clock::Lamport {
928 replica_id: undo.replica_id as text::ReplicaId,
929 value: undo.lamport_timestamp,
930 },
931 version: version_from_wire(&undo.version),
932 counts: undo
933 .counts
934 .into_iter()
935 .map(|c| {
936 (
937 clock::Lamport {
938 replica_id: c.replica_id as text::ReplicaId,
939 value: c.lamport_timestamp,
940 },
941 c.count,
942 )
943 })
944 .collect(),
945 })),
946 _ => None,
947 }
948}
949
950fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
951 let mut version = clock::Global::new();
952 for entry in message {
953 version.observe(clock::Lamport {
954 replica_id: entry.replica_id as text::ReplicaId,
955 value: entry.timestamp,
956 });
957 }
958 version
959}
960
961fn version_to_wire(version: &clock::Global) -> Vec<proto::VectorClockEntry> {
962 let mut message = Vec::new();
963 for entry in version.iter() {
964 message.push(proto::VectorClockEntry {
965 replica_id: entry.replica_id as u32,
966 timestamp: entry.value,
967 });
968 }
969 message
970}
971
972#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
973enum QueryOperationSerializationVersion {
974 OperationSerializationVersion,
975}
976
977mod storage {
978 #![allow(non_snake_case)]
979 use prost::Message;
980 pub const SERIALIZATION_VERSION: i32 = 1;
981
982 #[derive(Message)]
983 pub struct Operation {
984 #[prost(message, repeated, tag = "2")]
985 pub version: Vec<VectorClockEntry>,
986 #[prost(bool, tag = "3")]
987 pub is_undo: bool,
988 #[prost(message, repeated, tag = "4")]
989 pub edit_ranges: Vec<Range>,
990 #[prost(string, repeated, tag = "5")]
991 pub edit_texts: Vec<String>,
992 #[prost(message, repeated, tag = "6")]
993 pub undo_counts: Vec<UndoCount>,
994 }
995
996 #[derive(Message)]
997 pub struct VectorClockEntry {
998 #[prost(uint32, tag = "1")]
999 pub replica_id: u32,
1000 #[prost(uint32, tag = "2")]
1001 pub timestamp: u32,
1002 }
1003
1004 #[derive(Message)]
1005 pub struct Range {
1006 #[prost(uint64, tag = "1")]
1007 pub start: u64,
1008 #[prost(uint64, tag = "2")]
1009 pub end: u64,
1010 }
1011
1012 #[derive(Message)]
1013 pub struct UndoCount {
1014 #[prost(uint32, tag = "1")]
1015 pub replica_id: u32,
1016 #[prost(uint32, tag = "2")]
1017 pub lamport_timestamp: u32,
1018 #[prost(uint32, tag = "3")]
1019 pub count: u32,
1020 }
1021}