1use super::*;
2use anyhow::Context as _;
3use prost::Message;
4use text::{EditOperation, UndoOperation};
5
6pub struct LeftChannelBuffer {
7 pub channel_id: ChannelId,
8 pub collaborators: Vec<proto::Collaborator>,
9 pub connections: Vec<ConnectionId>,
10}
11
12impl Database {
13 /// Open a channel buffer. Returns the current contents, and adds you to the list of people
14 /// to notify on changes.
15 pub async fn join_channel_buffer(
16 &self,
17 channel_id: ChannelId,
18 user_id: UserId,
19 connection: ConnectionId,
20 ) -> Result<proto::JoinChannelBufferResponse> {
21 self.transaction(|tx| async move {
22 let channel = self.get_channel_internal(channel_id, &tx).await?;
23 self.check_user_is_channel_participant(&channel, user_id, &tx)
24 .await?;
25
26 let buffer = channel::Model {
27 id: channel_id,
28 ..Default::default()
29 }
30 .find_related(buffer::Entity)
31 .one(&*tx)
32 .await?;
33
34 let buffer = if let Some(buffer) = buffer {
35 buffer
36 } else {
37 let buffer = buffer::ActiveModel {
38 channel_id: ActiveValue::Set(channel_id),
39 ..Default::default()
40 }
41 .insert(&*tx)
42 .await?;
43 buffer_snapshot::ActiveModel {
44 buffer_id: ActiveValue::Set(buffer.id),
45 epoch: ActiveValue::Set(0),
46 text: ActiveValue::Set(String::new()),
47 operation_serialization_version: ActiveValue::Set(
48 storage::SERIALIZATION_VERSION,
49 ),
50 }
51 .insert(&*tx)
52 .await?;
53 buffer
54 };
55
56 // Join the collaborators
57 let mut collaborators = channel_buffer_collaborator::Entity::find()
58 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
59 .all(&*tx)
60 .await?;
61 let replica_ids = collaborators
62 .iter()
63 .map(|c| c.replica_id)
64 .collect::<HashSet<_>>();
65 let mut replica_id = ReplicaId(0);
66 while replica_ids.contains(&replica_id) {
67 replica_id.0 += 1;
68 }
69 let collaborator = channel_buffer_collaborator::ActiveModel {
70 channel_id: ActiveValue::Set(channel_id),
71 connection_id: ActiveValue::Set(connection.id as i32),
72 connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
73 user_id: ActiveValue::Set(user_id),
74 replica_id: ActiveValue::Set(replica_id),
75 ..Default::default()
76 }
77 .insert(&*tx)
78 .await?;
79 collaborators.push(collaborator);
80
81 let (base_text, operations, max_operation) =
82 self.get_buffer_state(&buffer, &tx).await?;
83
84 // Save the last observed operation
85 if let Some(op) = max_operation {
86 observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
87 user_id: ActiveValue::Set(user_id),
88 buffer_id: ActiveValue::Set(buffer.id),
89 epoch: ActiveValue::Set(op.epoch),
90 lamport_timestamp: ActiveValue::Set(op.lamport_timestamp),
91 replica_id: ActiveValue::Set(op.replica_id),
92 })
93 .on_conflict(
94 OnConflict::columns([
95 observed_buffer_edits::Column::UserId,
96 observed_buffer_edits::Column::BufferId,
97 ])
98 .update_columns([
99 observed_buffer_edits::Column::Epoch,
100 observed_buffer_edits::Column::LamportTimestamp,
101 ])
102 .to_owned(),
103 )
104 .exec(&*tx)
105 .await?;
106 }
107
108 Ok(proto::JoinChannelBufferResponse {
109 buffer_id: buffer.id.to_proto(),
110 replica_id: replica_id.to_proto() as u32,
111 base_text,
112 operations,
113 epoch: buffer.epoch as u64,
114 collaborators: collaborators
115 .into_iter()
116 .map(|collaborator| proto::Collaborator {
117 peer_id: Some(collaborator.connection().into()),
118 user_id: collaborator.user_id.to_proto(),
119 replica_id: collaborator.replica_id.0 as u32,
120 is_host: false,
121 })
122 .collect(),
123 })
124 })
125 .await
126 }
127
128 /// Rejoin a channel buffer (after a connection interruption)
129 pub async fn rejoin_channel_buffers(
130 &self,
131 buffers: &[proto::ChannelBufferVersion],
132 user_id: UserId,
133 connection_id: ConnectionId,
134 ) -> Result<Vec<RejoinedChannelBuffer>> {
135 self.transaction(|tx| async move {
136 let mut results = Vec::new();
137 for client_buffer in buffers {
138 let channel = self
139 .get_channel_internal(ChannelId::from_proto(client_buffer.channel_id), &tx)
140 .await?;
141 if self
142 .check_user_is_channel_participant(&channel, user_id, &tx)
143 .await
144 .is_err()
145 {
146 log::info!("user is not a member of channel");
147 continue;
148 }
149
150 let buffer = self.get_channel_buffer(channel.id, &tx).await?;
151 let mut collaborators = channel_buffer_collaborator::Entity::find()
152 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel.id))
153 .all(&*tx)
154 .await?;
155
156 // If the buffer epoch hasn't changed since the client lost
157 // connection, then the client's buffer can be synchronized with
158 // the server's buffer.
159 if buffer.epoch as u64 != client_buffer.epoch {
160 log::info!("can't rejoin buffer, epoch has changed");
161 continue;
162 }
163
164 // Find the collaborator record for this user's previous lost
165 // connection. Update it with the new connection id.
166 let Some(self_collaborator) =
167 collaborators.iter_mut().find(|c| c.user_id == user_id)
168 else {
169 log::info!("can't rejoin buffer, no previous collaborator found");
170 continue;
171 };
172 let old_connection_id = self_collaborator.connection();
173 *self_collaborator = channel_buffer_collaborator::ActiveModel {
174 id: ActiveValue::Unchanged(self_collaborator.id),
175 connection_id: ActiveValue::Set(connection_id.id as i32),
176 connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
177 connection_lost: ActiveValue::Set(false),
178 ..Default::default()
179 }
180 .update(&*tx)
181 .await?;
182
183 let client_version = version_from_wire(&client_buffer.version);
184 let serialization_version = self
185 .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
186 .await?;
187
188 let mut rows = buffer_operation::Entity::find()
189 .filter(
190 buffer_operation::Column::BufferId
191 .eq(buffer.id)
192 .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
193 )
194 .stream(&*tx)
195 .await?;
196
197 // Find the server's version vector and any operations
198 // that the client has not seen.
199 let mut server_version = clock::Global::new();
200 let mut operations = Vec::new();
201 while let Some(row) = rows.next().await {
202 let row = row?;
203 let timestamp = clock::Lamport {
204 replica_id: row.replica_id as u16,
205 value: row.lamport_timestamp as u32,
206 };
207 server_version.observe(timestamp);
208 if !client_version.observed(timestamp) {
209 operations.push(proto::Operation {
210 variant: Some(operation_from_storage(row, serialization_version)?),
211 })
212 }
213 }
214
215 results.push(RejoinedChannelBuffer {
216 old_connection_id,
217 buffer: proto::RejoinedChannelBuffer {
218 channel_id: client_buffer.channel_id,
219 version: version_to_wire(&server_version),
220 operations,
221 collaborators: collaborators
222 .into_iter()
223 .map(|collaborator| proto::Collaborator {
224 peer_id: Some(collaborator.connection().into()),
225 user_id: collaborator.user_id.to_proto(),
226 replica_id: collaborator.replica_id.0 as u32,
227 is_host: false,
228 })
229 .collect(),
230 },
231 });
232 }
233
234 Ok(results)
235 })
236 .await
237 }
238
239 /// Clear out any buffer collaborators who are no longer collaborating.
240 pub async fn clear_stale_channel_buffer_collaborators(
241 &self,
242 channel_id: ChannelId,
243 server_id: ServerId,
244 ) -> Result<RefreshedChannelBuffer> {
245 self.transaction(|tx| async move {
246 let db_collaborators = channel_buffer_collaborator::Entity::find()
247 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
248 .all(&*tx)
249 .await?;
250
251 let mut connection_ids = Vec::new();
252 let mut collaborators = Vec::new();
253 let mut collaborator_ids_to_remove = Vec::new();
254 for db_collaborator in &db_collaborators {
255 if !db_collaborator.connection_lost
256 && db_collaborator.connection_server_id == server_id
257 {
258 connection_ids.push(db_collaborator.connection());
259 collaborators.push(proto::Collaborator {
260 peer_id: Some(db_collaborator.connection().into()),
261 replica_id: db_collaborator.replica_id.0 as u32,
262 user_id: db_collaborator.user_id.to_proto(),
263 is_host: false,
264 })
265 } else {
266 collaborator_ids_to_remove.push(db_collaborator.id);
267 }
268 }
269
270 channel_buffer_collaborator::Entity::delete_many()
271 .filter(channel_buffer_collaborator::Column::Id.is_in(collaborator_ids_to_remove))
272 .exec(&*tx)
273 .await?;
274
275 Ok(RefreshedChannelBuffer {
276 connection_ids,
277 collaborators,
278 })
279 })
280 .await
281 }
282
283 /// Close the channel buffer, and stop receiving updates for it.
284 pub async fn leave_channel_buffer(
285 &self,
286 channel_id: ChannelId,
287 connection: ConnectionId,
288 ) -> Result<LeftChannelBuffer> {
289 self.transaction(|tx| async move {
290 self.leave_channel_buffer_internal(channel_id, connection, &tx)
291 .await
292 })
293 .await
294 }
295
296 /// Close the channel buffer, and stop receiving updates for it.
297 pub async fn channel_buffer_connection_lost(
298 &self,
299 connection: ConnectionId,
300 tx: &DatabaseTransaction,
301 ) -> Result<()> {
302 channel_buffer_collaborator::Entity::update_many()
303 .filter(
304 Condition::all()
305 .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
306 .add(
307 channel_buffer_collaborator::Column::ConnectionServerId
308 .eq(connection.owner_id as i32),
309 ),
310 )
311 .set(channel_buffer_collaborator::ActiveModel {
312 connection_lost: ActiveValue::set(true),
313 ..Default::default()
314 })
315 .exec(tx)
316 .await?;
317 Ok(())
318 }
319
320 /// Close all open channel buffers
321 pub async fn leave_channel_buffers(
322 &self,
323 connection: ConnectionId,
324 ) -> Result<Vec<LeftChannelBuffer>> {
325 self.transaction(|tx| async move {
326 #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
327 enum QueryChannelIds {
328 ChannelId,
329 }
330
331 let channel_ids: Vec<ChannelId> = channel_buffer_collaborator::Entity::find()
332 .select_only()
333 .column(channel_buffer_collaborator::Column::ChannelId)
334 .filter(Condition::all().add(
335 channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32),
336 ))
337 .into_values::<_, QueryChannelIds>()
338 .all(&*tx)
339 .await?;
340
341 let mut result = Vec::new();
342 for channel_id in channel_ids {
343 let left_channel_buffer = self
344 .leave_channel_buffer_internal(channel_id, connection, &tx)
345 .await?;
346 result.push(left_channel_buffer);
347 }
348
349 Ok(result)
350 })
351 .await
352 }
353
354 async fn leave_channel_buffer_internal(
355 &self,
356 channel_id: ChannelId,
357 connection: ConnectionId,
358 tx: &DatabaseTransaction,
359 ) -> Result<LeftChannelBuffer> {
360 let result = channel_buffer_collaborator::Entity::delete_many()
361 .filter(
362 Condition::all()
363 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
364 .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
365 .add(
366 channel_buffer_collaborator::Column::ConnectionServerId
367 .eq(connection.owner_id as i32),
368 ),
369 )
370 .exec(tx)
371 .await?;
372 if result.rows_affected == 0 {
373 Err(anyhow!("not a collaborator on this project"))?;
374 }
375
376 let mut collaborators = Vec::new();
377 let mut connections = Vec::new();
378 let mut rows = channel_buffer_collaborator::Entity::find()
379 .filter(
380 Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
381 )
382 .stream(tx)
383 .await?;
384 while let Some(row) = rows.next().await {
385 let row = row?;
386 let connection = row.connection();
387 connections.push(connection);
388 collaborators.push(proto::Collaborator {
389 peer_id: Some(connection.into()),
390 replica_id: row.replica_id.0 as u32,
391 user_id: row.user_id.to_proto(),
392 is_host: false,
393 });
394 }
395
396 drop(rows);
397
398 if collaborators.is_empty() {
399 self.snapshot_channel_buffer(channel_id, tx).await?;
400 }
401
402 Ok(LeftChannelBuffer {
403 channel_id,
404 collaborators,
405 connections,
406 })
407 }
408
409 pub async fn get_channel_buffer_collaborators(
410 &self,
411 channel_id: ChannelId,
412 ) -> Result<Vec<UserId>> {
413 self.transaction(|tx| async move {
414 self.get_channel_buffer_collaborators_internal(channel_id, &tx)
415 .await
416 })
417 .await
418 }
419
420 async fn get_channel_buffer_collaborators_internal(
421 &self,
422 channel_id: ChannelId,
423 tx: &DatabaseTransaction,
424 ) -> Result<Vec<UserId>> {
425 #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
426 enum QueryUserIds {
427 UserId,
428 }
429
430 let users: Vec<UserId> = channel_buffer_collaborator::Entity::find()
431 .select_only()
432 .column(channel_buffer_collaborator::Column::UserId)
433 .filter(
434 Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
435 )
436 .into_values::<_, QueryUserIds>()
437 .all(tx)
438 .await?;
439
440 Ok(users)
441 }
442
443 pub async fn update_channel_buffer(
444 &self,
445 channel_id: ChannelId,
446 user: UserId,
447 operations: &[proto::Operation],
448 ) -> Result<(HashSet<ConnectionId>, i32, Vec<proto::VectorClockEntry>)> {
449 self.transaction(move |tx| async move {
450 let channel = self.get_channel_internal(channel_id, &tx).await?;
451
452 let mut requires_write_permission = false;
453 for op in operations.iter() {
454 match op.variant {
455 None | Some(proto::operation::Variant::UpdateSelections(_)) => {}
456 Some(_) => requires_write_permission = true,
457 }
458 }
459 if requires_write_permission {
460 self.check_user_is_channel_member(&channel, user, &tx)
461 .await?;
462 } else {
463 self.check_user_is_channel_participant(&channel, user, &tx)
464 .await?;
465 }
466
467 let buffer = buffer::Entity::find()
468 .filter(buffer::Column::ChannelId.eq(channel_id))
469 .one(&*tx)
470 .await?
471 .context("no such buffer")?;
472
473 let serialization_version = self
474 .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
475 .await?;
476
477 let operations = operations
478 .iter()
479 .filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
480 .collect::<Vec<_>>();
481
482 let max_version;
483
484 if !operations.is_empty() {
485 let max_operation = operations
486 .iter()
487 .max_by_key(|op| (op.lamport_timestamp.as_ref(), op.replica_id.as_ref()))
488 .unwrap();
489
490 max_version = vec![proto::VectorClockEntry {
491 replica_id: *max_operation.replica_id.as_ref() as u32,
492 timestamp: *max_operation.lamport_timestamp.as_ref() as u32,
493 }];
494
495 // get current channel participants and save the max operation above
496 self.save_max_operation(
497 user,
498 buffer.id,
499 buffer.epoch,
500 *max_operation.replica_id.as_ref(),
501 *max_operation.lamport_timestamp.as_ref(),
502 &tx,
503 )
504 .await?;
505
506 buffer_operation::Entity::insert_many(operations)
507 .on_conflict(
508 OnConflict::columns([
509 buffer_operation::Column::BufferId,
510 buffer_operation::Column::Epoch,
511 buffer_operation::Column::LamportTimestamp,
512 buffer_operation::Column::ReplicaId,
513 ])
514 .do_nothing()
515 .to_owned(),
516 )
517 .exec(&*tx)
518 .await?;
519 } else {
520 max_version = Vec::new();
521 }
522
523 let mut connections = HashSet::default();
524 let mut rows = channel_buffer_collaborator::Entity::find()
525 .filter(
526 Condition::all()
527 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
528 )
529 .stream(&*tx)
530 .await?;
531 while let Some(row) = rows.next().await {
532 let row = row?;
533 connections.insert(ConnectionId {
534 id: row.connection_id as u32,
535 owner_id: row.connection_server_id.0 as u32,
536 });
537 }
538
539 Ok((connections, buffer.epoch, max_version))
540 })
541 .await
542 }
543
544 async fn save_max_operation(
545 &self,
546 user_id: UserId,
547 buffer_id: BufferId,
548 epoch: i32,
549 replica_id: i32,
550 lamport_timestamp: i32,
551 tx: &DatabaseTransaction,
552 ) -> Result<()> {
553 buffer::Entity::update(buffer::ActiveModel {
554 id: ActiveValue::Unchanged(buffer_id),
555 epoch: ActiveValue::Unchanged(epoch),
556 latest_operation_epoch: ActiveValue::Set(Some(epoch)),
557 latest_operation_replica_id: ActiveValue::Set(Some(replica_id)),
558 latest_operation_lamport_timestamp: ActiveValue::Set(Some(lamport_timestamp)),
559 channel_id: ActiveValue::NotSet,
560 })
561 .exec(tx)
562 .await?;
563
564 use observed_buffer_edits::Column;
565 observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
566 user_id: ActiveValue::Set(user_id),
567 buffer_id: ActiveValue::Set(buffer_id),
568 epoch: ActiveValue::Set(epoch),
569 replica_id: ActiveValue::Set(replica_id),
570 lamport_timestamp: ActiveValue::Set(lamport_timestamp),
571 })
572 .on_conflict(
573 OnConflict::columns([Column::UserId, Column::BufferId])
574 .update_columns([Column::Epoch, Column::LamportTimestamp, Column::ReplicaId])
575 .action_cond_where(
576 Condition::any().add(Column::Epoch.lt(epoch)).add(
577 Condition::all().add(Column::Epoch.eq(epoch)).add(
578 Condition::any()
579 .add(Column::LamportTimestamp.lt(lamport_timestamp))
580 .add(
581 Column::LamportTimestamp
582 .eq(lamport_timestamp)
583 .and(Column::ReplicaId.lt(replica_id)),
584 ),
585 ),
586 ),
587 )
588 .to_owned(),
589 )
590 .exec_without_returning(tx)
591 .await?;
592
593 Ok(())
594 }
595
596 async fn get_buffer_operation_serialization_version(
597 &self,
598 buffer_id: BufferId,
599 epoch: i32,
600 tx: &DatabaseTransaction,
601 ) -> Result<i32> {
602 Ok(buffer_snapshot::Entity::find()
603 .filter(buffer_snapshot::Column::BufferId.eq(buffer_id))
604 .filter(buffer_snapshot::Column::Epoch.eq(epoch))
605 .select_only()
606 .column(buffer_snapshot::Column::OperationSerializationVersion)
607 .into_values::<_, QueryOperationSerializationVersion>()
608 .one(tx)
609 .await?
610 .context("missing buffer snapshot")?)
611 }
612
613 pub async fn get_channel_buffer(
614 &self,
615 channel_id: ChannelId,
616 tx: &DatabaseTransaction,
617 ) -> Result<buffer::Model> {
618 Ok(channel::Model {
619 id: channel_id,
620 ..Default::default()
621 }
622 .find_related(buffer::Entity)
623 .one(tx)
624 .await?
625 .context("no such buffer")?)
626 }
627
628 async fn get_buffer_state(
629 &self,
630 buffer: &buffer::Model,
631 tx: &DatabaseTransaction,
632 ) -> Result<(
633 String,
634 Vec<proto::Operation>,
635 Option<buffer_operation::Model>,
636 )> {
637 let id = buffer.id;
638 let (base_text, version) = if buffer.epoch > 0 {
639 let snapshot = buffer_snapshot::Entity::find()
640 .filter(
641 buffer_snapshot::Column::BufferId
642 .eq(id)
643 .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
644 )
645 .one(tx)
646 .await?
647 .context("no such snapshot")?;
648
649 let version = snapshot.operation_serialization_version;
650 (snapshot.text, version)
651 } else {
652 (String::new(), storage::SERIALIZATION_VERSION)
653 };
654
655 let mut rows = buffer_operation::Entity::find()
656 .filter(
657 buffer_operation::Column::BufferId
658 .eq(id)
659 .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
660 )
661 .order_by_asc(buffer_operation::Column::LamportTimestamp)
662 .order_by_asc(buffer_operation::Column::ReplicaId)
663 .stream(tx)
664 .await?;
665
666 let mut operations = Vec::new();
667 let mut last_row = None;
668 while let Some(row) = rows.next().await {
669 let row = row?;
670 last_row = Some(buffer_operation::Model {
671 buffer_id: row.buffer_id,
672 epoch: row.epoch,
673 lamport_timestamp: row.lamport_timestamp,
674 replica_id: row.replica_id,
675 value: Default::default(),
676 });
677 operations.push(proto::Operation {
678 variant: Some(operation_from_storage(row, version)?),
679 });
680 }
681
682 Ok((base_text, operations, last_row))
683 }
684
685 async fn snapshot_channel_buffer(
686 &self,
687 channel_id: ChannelId,
688 tx: &DatabaseTransaction,
689 ) -> Result<()> {
690 let buffer = self.get_channel_buffer(channel_id, tx).await?;
691 let (base_text, operations, _) = self.get_buffer_state(&buffer, tx).await?;
692 if operations.is_empty() {
693 return Ok(());
694 }
695
696 let mut text_buffer = text::Buffer::new(0, text::BufferId::new(1).unwrap(), base_text);
697 text_buffer.apply_ops(operations.into_iter().filter_map(operation_from_wire));
698
699 let base_text = text_buffer.text();
700 let epoch = buffer.epoch + 1;
701
702 buffer_snapshot::Model {
703 buffer_id: buffer.id,
704 epoch,
705 text: base_text,
706 operation_serialization_version: storage::SERIALIZATION_VERSION,
707 }
708 .into_active_model()
709 .insert(tx)
710 .await?;
711
712 buffer::ActiveModel {
713 id: ActiveValue::Unchanged(buffer.id),
714 epoch: ActiveValue::Set(epoch),
715 latest_operation_epoch: ActiveValue::NotSet,
716 latest_operation_replica_id: ActiveValue::NotSet,
717 latest_operation_lamport_timestamp: ActiveValue::NotSet,
718 channel_id: ActiveValue::NotSet,
719 }
720 .save(tx)
721 .await?;
722
723 Ok(())
724 }
725
726 pub async fn observe_buffer_version(
727 &self,
728 buffer_id: BufferId,
729 user_id: UserId,
730 epoch: i32,
731 version: &[proto::VectorClockEntry],
732 ) -> Result<()> {
733 self.transaction(|tx| async move {
734 // For now, combine concurrent operations.
735 let Some(component) = version.iter().max_by_key(|version| version.timestamp) else {
736 return Ok(());
737 };
738 self.save_max_operation(
739 user_id,
740 buffer_id,
741 epoch,
742 component.replica_id as i32,
743 component.timestamp as i32,
744 &tx,
745 )
746 .await?;
747 Ok(())
748 })
749 .await
750 }
751
752 pub async fn observed_channel_buffer_changes(
753 &self,
754 channel_ids_by_buffer_id: &HashMap<BufferId, ChannelId>,
755 user_id: UserId,
756 tx: &DatabaseTransaction,
757 ) -> Result<Vec<proto::ChannelBufferVersion>> {
758 let observed_operations = observed_buffer_edits::Entity::find()
759 .filter(observed_buffer_edits::Column::UserId.eq(user_id))
760 .filter(
761 observed_buffer_edits::Column::BufferId
762 .is_in(channel_ids_by_buffer_id.keys().copied()),
763 )
764 .all(tx)
765 .await?;
766
767 Ok(observed_operations
768 .iter()
769 .flat_map(|op| {
770 Some(proto::ChannelBufferVersion {
771 channel_id: channel_ids_by_buffer_id.get(&op.buffer_id)?.to_proto(),
772 epoch: op.epoch as u64,
773 version: vec![proto::VectorClockEntry {
774 replica_id: op.replica_id as u32,
775 timestamp: op.lamport_timestamp as u32,
776 }],
777 })
778 })
779 .collect())
780 }
781}
782
783fn operation_to_storage(
784 operation: &proto::Operation,
785 buffer: &buffer::Model,
786 _format: i32,
787) -> Option<buffer_operation::ActiveModel> {
788 let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? {
789 proto::operation::Variant::Edit(operation) => (
790 operation.replica_id,
791 operation.lamport_timestamp,
792 storage::Operation {
793 version: version_to_storage(&operation.version),
794 is_undo: false,
795 edit_ranges: operation
796 .ranges
797 .iter()
798 .map(|range| storage::Range {
799 start: range.start,
800 end: range.end,
801 })
802 .collect(),
803 edit_texts: operation.new_text.clone(),
804 undo_counts: Vec::new(),
805 },
806 ),
807 proto::operation::Variant::Undo(operation) => (
808 operation.replica_id,
809 operation.lamport_timestamp,
810 storage::Operation {
811 version: version_to_storage(&operation.version),
812 is_undo: true,
813 edit_ranges: Vec::new(),
814 edit_texts: Vec::new(),
815 undo_counts: operation
816 .counts
817 .iter()
818 .map(|entry| storage::UndoCount {
819 replica_id: entry.replica_id,
820 lamport_timestamp: entry.lamport_timestamp,
821 count: entry.count,
822 })
823 .collect(),
824 },
825 ),
826 _ => None?,
827 };
828
829 Some(buffer_operation::ActiveModel {
830 buffer_id: ActiveValue::Set(buffer.id),
831 epoch: ActiveValue::Set(buffer.epoch),
832 replica_id: ActiveValue::Set(replica_id as i32),
833 lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32),
834 value: ActiveValue::Set(value.encode_to_vec()),
835 })
836}
837
838fn operation_from_storage(
839 row: buffer_operation::Model,
840 _format_version: i32,
841) -> Result<proto::operation::Variant, Error> {
842 let operation =
843 storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{error}"))?;
844 let version = version_from_storage(&operation.version);
845 Ok(if operation.is_undo {
846 proto::operation::Variant::Undo(proto::operation::Undo {
847 replica_id: row.replica_id as u32,
848 lamport_timestamp: row.lamport_timestamp as u32,
849 version,
850 counts: operation
851 .undo_counts
852 .iter()
853 .map(|entry| proto::UndoCount {
854 replica_id: entry.replica_id,
855 lamport_timestamp: entry.lamport_timestamp,
856 count: entry.count,
857 })
858 .collect(),
859 })
860 } else {
861 proto::operation::Variant::Edit(proto::operation::Edit {
862 replica_id: row.replica_id as u32,
863 lamport_timestamp: row.lamport_timestamp as u32,
864 version,
865 ranges: operation
866 .edit_ranges
867 .into_iter()
868 .map(|range| proto::Range {
869 start: range.start,
870 end: range.end,
871 })
872 .collect(),
873 new_text: operation.edit_texts,
874 })
875 })
876}
877
878fn version_to_storage(version: &[proto::VectorClockEntry]) -> Vec<storage::VectorClockEntry> {
879 version
880 .iter()
881 .map(|entry| storage::VectorClockEntry {
882 replica_id: entry.replica_id,
883 timestamp: entry.timestamp,
884 })
885 .collect()
886}
887
888fn version_from_storage(version: &[storage::VectorClockEntry]) -> Vec<proto::VectorClockEntry> {
889 version
890 .iter()
891 .map(|entry| proto::VectorClockEntry {
892 replica_id: entry.replica_id,
893 timestamp: entry.timestamp,
894 })
895 .collect()
896}
897
898// This is currently a manual copy of the deserialization code in the client's language crate
899pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operation> {
900 match operation.variant? {
901 proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
902 timestamp: clock::Lamport {
903 replica_id: edit.replica_id as text::ReplicaId,
904 value: edit.lamport_timestamp,
905 },
906 version: version_from_wire(&edit.version),
907 ranges: edit
908 .ranges
909 .into_iter()
910 .map(|range| {
911 text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
912 })
913 .collect(),
914 new_text: edit.new_text.into_iter().map(Arc::from).collect(),
915 })),
916 proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation {
917 timestamp: clock::Lamport {
918 replica_id: undo.replica_id as text::ReplicaId,
919 value: undo.lamport_timestamp,
920 },
921 version: version_from_wire(&undo.version),
922 counts: undo
923 .counts
924 .into_iter()
925 .map(|c| {
926 (
927 clock::Lamport {
928 replica_id: c.replica_id as text::ReplicaId,
929 value: c.lamport_timestamp,
930 },
931 c.count,
932 )
933 })
934 .collect(),
935 })),
936 _ => None,
937 }
938}
939
940fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
941 let mut version = clock::Global::new();
942 for entry in message {
943 version.observe(clock::Lamport {
944 replica_id: entry.replica_id as text::ReplicaId,
945 value: entry.timestamp,
946 });
947 }
948 version
949}
950
951fn version_to_wire(version: &clock::Global) -> Vec<proto::VectorClockEntry> {
952 let mut message = Vec::new();
953 for entry in version.iter() {
954 message.push(proto::VectorClockEntry {
955 replica_id: entry.replica_id as u32,
956 timestamp: entry.value,
957 });
958 }
959 message
960}
961
962#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
963enum QueryOperationSerializationVersion {
964 OperationSerializationVersion,
965}
966
967mod storage {
968 #![allow(non_snake_case)]
969 use prost::Message;
970 pub const SERIALIZATION_VERSION: i32 = 1;
971
972 #[derive(Message)]
973 pub struct Operation {
974 #[prost(message, repeated, tag = "2")]
975 pub version: Vec<VectorClockEntry>,
976 #[prost(bool, tag = "3")]
977 pub is_undo: bool,
978 #[prost(message, repeated, tag = "4")]
979 pub edit_ranges: Vec<Range>,
980 #[prost(string, repeated, tag = "5")]
981 pub edit_texts: Vec<String>,
982 #[prost(message, repeated, tag = "6")]
983 pub undo_counts: Vec<UndoCount>,
984 }
985
986 #[derive(Message)]
987 pub struct VectorClockEntry {
988 #[prost(uint32, tag = "1")]
989 pub replica_id: u32,
990 #[prost(uint32, tag = "2")]
991 pub timestamp: u32,
992 }
993
994 #[derive(Message)]
995 pub struct Range {
996 #[prost(uint64, tag = "1")]
997 pub start: u64,
998 #[prost(uint64, tag = "2")]
999 pub end: u64,
1000 }
1001
1002 #[derive(Message)]
1003 pub struct UndoCount {
1004 #[prost(uint32, tag = "1")]
1005 pub replica_id: u32,
1006 #[prost(uint32, tag = "2")]
1007 pub lamport_timestamp: u32,
1008 #[prost(uint32, tag = "3")]
1009 pub count: u32,
1010 }
1011}