1use super::*;
2use prost::Message;
3use text::{EditOperation, UndoOperation};
4
5pub struct LeftChannelBuffer {
6 pub channel_id: ChannelId,
7 pub collaborators: Vec<proto::Collaborator>,
8 pub connections: Vec<ConnectionId>,
9}
10
11impl Database {
12 /// Open a channel buffer. Returns the current contents, and adds you to the list of people
13 /// to notify on changes.
14 pub async fn join_channel_buffer(
15 &self,
16 channel_id: ChannelId,
17 user_id: UserId,
18 connection: ConnectionId,
19 ) -> Result<proto::JoinChannelBufferResponse> {
20 self.transaction(|tx| async move {
21 let channel = self.get_channel_internal(channel_id, &tx).await?;
22 self.check_user_is_channel_participant(&channel, user_id, &tx)
23 .await?;
24
25 let buffer = channel::Model {
26 id: channel_id,
27 ..Default::default()
28 }
29 .find_related(buffer::Entity)
30 .one(&*tx)
31 .await?;
32
33 let buffer = if let Some(buffer) = buffer {
34 buffer
35 } else {
36 let buffer = buffer::ActiveModel {
37 channel_id: ActiveValue::Set(channel_id),
38 ..Default::default()
39 }
40 .insert(&*tx)
41 .await?;
42 buffer_snapshot::ActiveModel {
43 buffer_id: ActiveValue::Set(buffer.id),
44 epoch: ActiveValue::Set(0),
45 text: ActiveValue::Set(String::new()),
46 operation_serialization_version: ActiveValue::Set(
47 storage::SERIALIZATION_VERSION,
48 ),
49 }
50 .insert(&*tx)
51 .await?;
52 buffer
53 };
54
55 // Join the collaborators
56 let mut collaborators = channel_buffer_collaborator::Entity::find()
57 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
58 .all(&*tx)
59 .await?;
60 let replica_ids = collaborators
61 .iter()
62 .map(|c| c.replica_id)
63 .collect::<HashSet<_>>();
64 let mut replica_id = ReplicaId(0);
65 while replica_ids.contains(&replica_id) {
66 replica_id.0 += 1;
67 }
68 let collaborator = channel_buffer_collaborator::ActiveModel {
69 channel_id: ActiveValue::Set(channel_id),
70 connection_id: ActiveValue::Set(connection.id as i32),
71 connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
72 user_id: ActiveValue::Set(user_id),
73 replica_id: ActiveValue::Set(replica_id),
74 ..Default::default()
75 }
76 .insert(&*tx)
77 .await?;
78 collaborators.push(collaborator);
79
80 let (base_text, operations, max_operation) =
81 self.get_buffer_state(&buffer, &tx).await?;
82
83 // Save the last observed operation
84 if let Some(op) = max_operation {
85 observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
86 user_id: ActiveValue::Set(user_id),
87 buffer_id: ActiveValue::Set(buffer.id),
88 epoch: ActiveValue::Set(op.epoch),
89 lamport_timestamp: ActiveValue::Set(op.lamport_timestamp),
90 replica_id: ActiveValue::Set(op.replica_id),
91 })
92 .on_conflict(
93 OnConflict::columns([
94 observed_buffer_edits::Column::UserId,
95 observed_buffer_edits::Column::BufferId,
96 ])
97 .update_columns([
98 observed_buffer_edits::Column::Epoch,
99 observed_buffer_edits::Column::LamportTimestamp,
100 ])
101 .to_owned(),
102 )
103 .exec(&*tx)
104 .await?;
105 }
106
107 Ok(proto::JoinChannelBufferResponse {
108 buffer_id: buffer.id.to_proto(),
109 replica_id: replica_id.to_proto() as u32,
110 base_text,
111 operations,
112 epoch: buffer.epoch as u64,
113 collaborators: collaborators
114 .into_iter()
115 .map(|collaborator| proto::Collaborator {
116 peer_id: Some(collaborator.connection().into()),
117 user_id: collaborator.user_id.to_proto(),
118 replica_id: collaborator.replica_id.0 as u32,
119 })
120 .collect(),
121 })
122 })
123 .await
124 }
125
126 /// Rejoin a channel buffer (after a connection interruption)
127 pub async fn rejoin_channel_buffers(
128 &self,
129 buffers: &[proto::ChannelBufferVersion],
130 user_id: UserId,
131 connection_id: ConnectionId,
132 ) -> Result<Vec<RejoinedChannelBuffer>> {
133 self.transaction(|tx| async move {
134 let mut results = Vec::new();
135 for client_buffer in buffers {
136 let channel = self
137 .get_channel_internal(ChannelId::from_proto(client_buffer.channel_id), &tx)
138 .await?;
139 if self
140 .check_user_is_channel_participant(&channel, user_id, &tx)
141 .await
142 .is_err()
143 {
144 log::info!("user is not a member of channel");
145 continue;
146 }
147
148 let buffer = self.get_channel_buffer(channel.id, &tx).await?;
149 let mut collaborators = channel_buffer_collaborator::Entity::find()
150 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel.id))
151 .all(&*tx)
152 .await?;
153
154 // If the buffer epoch hasn't changed since the client lost
155 // connection, then the client's buffer can be synchronized with
156 // the server's buffer.
157 if buffer.epoch as u64 != client_buffer.epoch {
158 log::info!("can't rejoin buffer, epoch has changed");
159 continue;
160 }
161
162 // Find the collaborator record for this user's previous lost
163 // connection. Update it with the new connection id.
164 let Some(self_collaborator) =
165 collaborators.iter_mut().find(|c| c.user_id == user_id)
166 else {
167 log::info!("can't rejoin buffer, no previous collaborator found");
168 continue;
169 };
170 let old_connection_id = self_collaborator.connection();
171 *self_collaborator = channel_buffer_collaborator::ActiveModel {
172 id: ActiveValue::Unchanged(self_collaborator.id),
173 connection_id: ActiveValue::Set(connection_id.id as i32),
174 connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
175 connection_lost: ActiveValue::Set(false),
176 ..Default::default()
177 }
178 .update(&*tx)
179 .await?;
180
181 let client_version = version_from_wire(&client_buffer.version);
182 let serialization_version = self
183 .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
184 .await?;
185
186 let mut rows = buffer_operation::Entity::find()
187 .filter(
188 buffer_operation::Column::BufferId
189 .eq(buffer.id)
190 .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
191 )
192 .stream(&*tx)
193 .await?;
194
195 // Find the server's version vector and any operations
196 // that the client has not seen.
197 let mut server_version = clock::Global::new();
198 let mut operations = Vec::new();
199 while let Some(row) = rows.next().await {
200 let row = row?;
201 let timestamp = clock::Lamport {
202 replica_id: row.replica_id as u16,
203 value: row.lamport_timestamp as u32,
204 };
205 server_version.observe(timestamp);
206 if !client_version.observed(timestamp) {
207 operations.push(proto::Operation {
208 variant: Some(operation_from_storage(row, serialization_version)?),
209 })
210 }
211 }
212
213 results.push(RejoinedChannelBuffer {
214 old_connection_id,
215 buffer: proto::RejoinedChannelBuffer {
216 channel_id: client_buffer.channel_id,
217 version: version_to_wire(&server_version),
218 operations,
219 collaborators: collaborators
220 .into_iter()
221 .map(|collaborator| proto::Collaborator {
222 peer_id: Some(collaborator.connection().into()),
223 user_id: collaborator.user_id.to_proto(),
224 replica_id: collaborator.replica_id.0 as u32,
225 })
226 .collect(),
227 },
228 });
229 }
230
231 Ok(results)
232 })
233 .await
234 }
235
236 /// Clear out any buffer collaborators who are no longer collaborating.
237 pub async fn clear_stale_channel_buffer_collaborators(
238 &self,
239 channel_id: ChannelId,
240 server_id: ServerId,
241 ) -> Result<RefreshedChannelBuffer> {
242 self.transaction(|tx| async move {
243 let db_collaborators = channel_buffer_collaborator::Entity::find()
244 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
245 .all(&*tx)
246 .await?;
247
248 let mut connection_ids = Vec::new();
249 let mut collaborators = Vec::new();
250 let mut collaborator_ids_to_remove = Vec::new();
251 for db_collaborator in &db_collaborators {
252 if !db_collaborator.connection_lost
253 && db_collaborator.connection_server_id == server_id
254 {
255 connection_ids.push(db_collaborator.connection());
256 collaborators.push(proto::Collaborator {
257 peer_id: Some(db_collaborator.connection().into()),
258 replica_id: db_collaborator.replica_id.0 as u32,
259 user_id: db_collaborator.user_id.to_proto(),
260 })
261 } else {
262 collaborator_ids_to_remove.push(db_collaborator.id);
263 }
264 }
265
266 channel_buffer_collaborator::Entity::delete_many()
267 .filter(channel_buffer_collaborator::Column::Id.is_in(collaborator_ids_to_remove))
268 .exec(&*tx)
269 .await?;
270
271 Ok(RefreshedChannelBuffer {
272 connection_ids,
273 collaborators,
274 })
275 })
276 .await
277 }
278
279 /// Close the channel buffer, and stop receiving updates for it.
280 pub async fn leave_channel_buffer(
281 &self,
282 channel_id: ChannelId,
283 connection: ConnectionId,
284 ) -> Result<LeftChannelBuffer> {
285 self.transaction(|tx| async move {
286 self.leave_channel_buffer_internal(channel_id, connection, &tx)
287 .await
288 })
289 .await
290 }
291
292 /// Close the channel buffer, and stop receiving updates for it.
293 pub async fn channel_buffer_connection_lost(
294 &self,
295 connection: ConnectionId,
296 tx: &DatabaseTransaction,
297 ) -> Result<()> {
298 channel_buffer_collaborator::Entity::update_many()
299 .filter(
300 Condition::all()
301 .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
302 .add(
303 channel_buffer_collaborator::Column::ConnectionServerId
304 .eq(connection.owner_id as i32),
305 ),
306 )
307 .set(channel_buffer_collaborator::ActiveModel {
308 connection_lost: ActiveValue::set(true),
309 ..Default::default()
310 })
311 .exec(tx)
312 .await?;
313 Ok(())
314 }
315
316 /// Close all open channel buffers
317 pub async fn leave_channel_buffers(
318 &self,
319 connection: ConnectionId,
320 ) -> Result<Vec<LeftChannelBuffer>> {
321 self.transaction(|tx| async move {
322 #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
323 enum QueryChannelIds {
324 ChannelId,
325 }
326
327 let channel_ids: Vec<ChannelId> = channel_buffer_collaborator::Entity::find()
328 .select_only()
329 .column(channel_buffer_collaborator::Column::ChannelId)
330 .filter(Condition::all().add(
331 channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32),
332 ))
333 .into_values::<_, QueryChannelIds>()
334 .all(&*tx)
335 .await?;
336
337 let mut result = Vec::new();
338 for channel_id in channel_ids {
339 let left_channel_buffer = self
340 .leave_channel_buffer_internal(channel_id, connection, &tx)
341 .await?;
342 result.push(left_channel_buffer);
343 }
344
345 Ok(result)
346 })
347 .await
348 }
349
350 async fn leave_channel_buffer_internal(
351 &self,
352 channel_id: ChannelId,
353 connection: ConnectionId,
354 tx: &DatabaseTransaction,
355 ) -> Result<LeftChannelBuffer> {
356 let result = channel_buffer_collaborator::Entity::delete_many()
357 .filter(
358 Condition::all()
359 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
360 .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
361 .add(
362 channel_buffer_collaborator::Column::ConnectionServerId
363 .eq(connection.owner_id as i32),
364 ),
365 )
366 .exec(tx)
367 .await?;
368 if result.rows_affected == 0 {
369 Err(anyhow!("not a collaborator on this project"))?;
370 }
371
372 let mut collaborators = Vec::new();
373 let mut connections = Vec::new();
374 let mut rows = channel_buffer_collaborator::Entity::find()
375 .filter(
376 Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
377 )
378 .stream(tx)
379 .await?;
380 while let Some(row) = rows.next().await {
381 let row = row?;
382 let connection = row.connection();
383 connections.push(connection);
384 collaborators.push(proto::Collaborator {
385 peer_id: Some(connection.into()),
386 replica_id: row.replica_id.0 as u32,
387 user_id: row.user_id.to_proto(),
388 });
389 }
390
391 drop(rows);
392
393 if collaborators.is_empty() {
394 self.snapshot_channel_buffer(channel_id, tx).await?;
395 }
396
397 Ok(LeftChannelBuffer {
398 channel_id,
399 collaborators,
400 connections,
401 })
402 }
403
404 pub async fn get_channel_buffer_collaborators(
405 &self,
406 channel_id: ChannelId,
407 ) -> Result<Vec<UserId>> {
408 self.transaction(|tx| async move {
409 self.get_channel_buffer_collaborators_internal(channel_id, &tx)
410 .await
411 })
412 .await
413 }
414
415 async fn get_channel_buffer_collaborators_internal(
416 &self,
417 channel_id: ChannelId,
418 tx: &DatabaseTransaction,
419 ) -> Result<Vec<UserId>> {
420 #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
421 enum QueryUserIds {
422 UserId,
423 }
424
425 let users: Vec<UserId> = channel_buffer_collaborator::Entity::find()
426 .select_only()
427 .column(channel_buffer_collaborator::Column::UserId)
428 .filter(
429 Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
430 )
431 .into_values::<_, QueryUserIds>()
432 .all(tx)
433 .await?;
434
435 Ok(users)
436 }
437
438 pub async fn update_channel_buffer(
439 &self,
440 channel_id: ChannelId,
441 user: UserId,
442 operations: &[proto::Operation],
443 ) -> Result<(HashSet<ConnectionId>, i32, Vec<proto::VectorClockEntry>)> {
444 self.transaction(move |tx| async move {
445 let channel = self.get_channel_internal(channel_id, &tx).await?;
446
447 let mut requires_write_permission = false;
448 for op in operations.iter() {
449 match op.variant {
450 None | Some(proto::operation::Variant::UpdateSelections(_)) => {}
451 Some(_) => requires_write_permission = true,
452 }
453 }
454 if requires_write_permission {
455 self.check_user_is_channel_member(&channel, user, &tx)
456 .await?;
457 } else {
458 self.check_user_is_channel_participant(&channel, user, &tx)
459 .await?;
460 }
461
462 let buffer = buffer::Entity::find()
463 .filter(buffer::Column::ChannelId.eq(channel_id))
464 .one(&*tx)
465 .await?
466 .ok_or_else(|| anyhow!("no such buffer"))?;
467
468 let serialization_version = self
469 .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
470 .await?;
471
472 let operations = operations
473 .iter()
474 .filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
475 .collect::<Vec<_>>();
476
477 let max_version;
478
479 if !operations.is_empty() {
480 let max_operation = operations
481 .iter()
482 .max_by_key(|op| (op.lamport_timestamp.as_ref(), op.replica_id.as_ref()))
483 .unwrap();
484
485 max_version = vec![proto::VectorClockEntry {
486 replica_id: *max_operation.replica_id.as_ref() as u32,
487 timestamp: *max_operation.lamport_timestamp.as_ref() as u32,
488 }];
489
490 // get current channel participants and save the max operation above
491 self.save_max_operation(
492 user,
493 buffer.id,
494 buffer.epoch,
495 *max_operation.replica_id.as_ref(),
496 *max_operation.lamport_timestamp.as_ref(),
497 &tx,
498 )
499 .await?;
500
501 buffer_operation::Entity::insert_many(operations)
502 .on_conflict(
503 OnConflict::columns([
504 buffer_operation::Column::BufferId,
505 buffer_operation::Column::Epoch,
506 buffer_operation::Column::LamportTimestamp,
507 buffer_operation::Column::ReplicaId,
508 ])
509 .do_nothing()
510 .to_owned(),
511 )
512 .exec(&*tx)
513 .await?;
514 } else {
515 max_version = Vec::new();
516 }
517
518 let mut connections = HashSet::default();
519 let mut rows = channel_buffer_collaborator::Entity::find()
520 .filter(
521 Condition::all()
522 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
523 )
524 .stream(&*tx)
525 .await?;
526 while let Some(row) = rows.next().await {
527 let row = row?;
528 connections.insert(ConnectionId {
529 id: row.connection_id as u32,
530 owner_id: row.connection_server_id.0 as u32,
531 });
532 }
533
534 Ok((connections, buffer.epoch, max_version))
535 })
536 .await
537 }
538
539 async fn save_max_operation(
540 &self,
541 user_id: UserId,
542 buffer_id: BufferId,
543 epoch: i32,
544 replica_id: i32,
545 lamport_timestamp: i32,
546 tx: &DatabaseTransaction,
547 ) -> Result<()> {
548 buffer::Entity::update(buffer::ActiveModel {
549 id: ActiveValue::Unchanged(buffer_id),
550 epoch: ActiveValue::Unchanged(epoch),
551 latest_operation_epoch: ActiveValue::Set(Some(epoch)),
552 latest_operation_replica_id: ActiveValue::Set(Some(replica_id)),
553 latest_operation_lamport_timestamp: ActiveValue::Set(Some(lamport_timestamp)),
554 channel_id: ActiveValue::NotSet,
555 })
556 .exec(tx)
557 .await?;
558
559 use observed_buffer_edits::Column;
560 observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
561 user_id: ActiveValue::Set(user_id),
562 buffer_id: ActiveValue::Set(buffer_id),
563 epoch: ActiveValue::Set(epoch),
564 replica_id: ActiveValue::Set(replica_id),
565 lamport_timestamp: ActiveValue::Set(lamport_timestamp),
566 })
567 .on_conflict(
568 OnConflict::columns([Column::UserId, Column::BufferId])
569 .update_columns([Column::Epoch, Column::LamportTimestamp, Column::ReplicaId])
570 .action_cond_where(
571 Condition::any().add(Column::Epoch.lt(epoch)).add(
572 Condition::all().add(Column::Epoch.eq(epoch)).add(
573 Condition::any()
574 .add(Column::LamportTimestamp.lt(lamport_timestamp))
575 .add(
576 Column::LamportTimestamp
577 .eq(lamport_timestamp)
578 .and(Column::ReplicaId.lt(replica_id)),
579 ),
580 ),
581 ),
582 )
583 .to_owned(),
584 )
585 .exec_without_returning(tx)
586 .await?;
587
588 Ok(())
589 }
590
591 async fn get_buffer_operation_serialization_version(
592 &self,
593 buffer_id: BufferId,
594 epoch: i32,
595 tx: &DatabaseTransaction,
596 ) -> Result<i32> {
597 Ok(buffer_snapshot::Entity::find()
598 .filter(buffer_snapshot::Column::BufferId.eq(buffer_id))
599 .filter(buffer_snapshot::Column::Epoch.eq(epoch))
600 .select_only()
601 .column(buffer_snapshot::Column::OperationSerializationVersion)
602 .into_values::<_, QueryOperationSerializationVersion>()
603 .one(tx)
604 .await?
605 .ok_or_else(|| anyhow!("missing buffer snapshot"))?)
606 }
607
608 pub async fn get_channel_buffer(
609 &self,
610 channel_id: ChannelId,
611 tx: &DatabaseTransaction,
612 ) -> Result<buffer::Model> {
613 Ok(channel::Model {
614 id: channel_id,
615 ..Default::default()
616 }
617 .find_related(buffer::Entity)
618 .one(tx)
619 .await?
620 .ok_or_else(|| anyhow!("no such buffer"))?)
621 }
622
623 async fn get_buffer_state(
624 &self,
625 buffer: &buffer::Model,
626 tx: &DatabaseTransaction,
627 ) -> Result<(
628 String,
629 Vec<proto::Operation>,
630 Option<buffer_operation::Model>,
631 )> {
632 let id = buffer.id;
633 let (base_text, version) = if buffer.epoch > 0 {
634 let snapshot = buffer_snapshot::Entity::find()
635 .filter(
636 buffer_snapshot::Column::BufferId
637 .eq(id)
638 .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
639 )
640 .one(tx)
641 .await?
642 .ok_or_else(|| anyhow!("no such snapshot"))?;
643
644 let version = snapshot.operation_serialization_version;
645 (snapshot.text, version)
646 } else {
647 (String::new(), storage::SERIALIZATION_VERSION)
648 };
649
650 let mut rows = buffer_operation::Entity::find()
651 .filter(
652 buffer_operation::Column::BufferId
653 .eq(id)
654 .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
655 )
656 .order_by_asc(buffer_operation::Column::LamportTimestamp)
657 .order_by_asc(buffer_operation::Column::ReplicaId)
658 .stream(tx)
659 .await?;
660
661 let mut operations = Vec::new();
662 let mut last_row = None;
663 while let Some(row) = rows.next().await {
664 let row = row?;
665 last_row = Some(buffer_operation::Model {
666 buffer_id: row.buffer_id,
667 epoch: row.epoch,
668 lamport_timestamp: row.lamport_timestamp,
669 replica_id: row.replica_id,
670 value: Default::default(),
671 });
672 operations.push(proto::Operation {
673 variant: Some(operation_from_storage(row, version)?),
674 });
675 }
676
677 Ok((base_text, operations, last_row))
678 }
679
680 async fn snapshot_channel_buffer(
681 &self,
682 channel_id: ChannelId,
683 tx: &DatabaseTransaction,
684 ) -> Result<()> {
685 let buffer = self.get_channel_buffer(channel_id, tx).await?;
686 let (base_text, operations, _) = self.get_buffer_state(&buffer, tx).await?;
687 if operations.is_empty() {
688 return Ok(());
689 }
690
691 let mut text_buffer = text::Buffer::new(0, text::BufferId::new(1).unwrap(), base_text);
692 text_buffer.apply_ops(operations.into_iter().filter_map(operation_from_wire));
693
694 let base_text = text_buffer.text();
695 let epoch = buffer.epoch + 1;
696
697 buffer_snapshot::Model {
698 buffer_id: buffer.id,
699 epoch,
700 text: base_text,
701 operation_serialization_version: storage::SERIALIZATION_VERSION,
702 }
703 .into_active_model()
704 .insert(tx)
705 .await?;
706
707 buffer::ActiveModel {
708 id: ActiveValue::Unchanged(buffer.id),
709 epoch: ActiveValue::Set(epoch),
710 latest_operation_epoch: ActiveValue::NotSet,
711 latest_operation_replica_id: ActiveValue::NotSet,
712 latest_operation_lamport_timestamp: ActiveValue::NotSet,
713 channel_id: ActiveValue::NotSet,
714 }
715 .save(tx)
716 .await?;
717
718 Ok(())
719 }
720
721 pub async fn observe_buffer_version(
722 &self,
723 buffer_id: BufferId,
724 user_id: UserId,
725 epoch: i32,
726 version: &[proto::VectorClockEntry],
727 ) -> Result<()> {
728 self.transaction(|tx| async move {
729 // For now, combine concurrent operations.
730 let Some(component) = version.iter().max_by_key(|version| version.timestamp) else {
731 return Ok(());
732 };
733 self.save_max_operation(
734 user_id,
735 buffer_id,
736 epoch,
737 component.replica_id as i32,
738 component.timestamp as i32,
739 &tx,
740 )
741 .await?;
742 Ok(())
743 })
744 .await
745 }
746
747 pub async fn observed_channel_buffer_changes(
748 &self,
749 channel_ids_by_buffer_id: &HashMap<BufferId, ChannelId>,
750 user_id: UserId,
751 tx: &DatabaseTransaction,
752 ) -> Result<Vec<proto::ChannelBufferVersion>> {
753 let observed_operations = observed_buffer_edits::Entity::find()
754 .filter(observed_buffer_edits::Column::UserId.eq(user_id))
755 .filter(
756 observed_buffer_edits::Column::BufferId
757 .is_in(channel_ids_by_buffer_id.keys().copied()),
758 )
759 .all(tx)
760 .await?;
761
762 Ok(observed_operations
763 .iter()
764 .flat_map(|op| {
765 Some(proto::ChannelBufferVersion {
766 channel_id: channel_ids_by_buffer_id.get(&op.buffer_id)?.to_proto(),
767 epoch: op.epoch as u64,
768 version: vec![proto::VectorClockEntry {
769 replica_id: op.replica_id as u32,
770 timestamp: op.lamport_timestamp as u32,
771 }],
772 })
773 })
774 .collect())
775 }
776}
777
778fn operation_to_storage(
779 operation: &proto::Operation,
780 buffer: &buffer::Model,
781 _format: i32,
782) -> Option<buffer_operation::ActiveModel> {
783 let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? {
784 proto::operation::Variant::Edit(operation) => (
785 operation.replica_id,
786 operation.lamport_timestamp,
787 storage::Operation {
788 version: version_to_storage(&operation.version),
789 is_undo: false,
790 edit_ranges: operation
791 .ranges
792 .iter()
793 .map(|range| storage::Range {
794 start: range.start,
795 end: range.end,
796 })
797 .collect(),
798 edit_texts: operation.new_text.clone(),
799 undo_counts: Vec::new(),
800 },
801 ),
802 proto::operation::Variant::Undo(operation) => (
803 operation.replica_id,
804 operation.lamport_timestamp,
805 storage::Operation {
806 version: version_to_storage(&operation.version),
807 is_undo: true,
808 edit_ranges: Vec::new(),
809 edit_texts: Vec::new(),
810 undo_counts: operation
811 .counts
812 .iter()
813 .map(|entry| storage::UndoCount {
814 replica_id: entry.replica_id,
815 lamport_timestamp: entry.lamport_timestamp,
816 count: entry.count,
817 })
818 .collect(),
819 },
820 ),
821 _ => None?,
822 };
823
824 Some(buffer_operation::ActiveModel {
825 buffer_id: ActiveValue::Set(buffer.id),
826 epoch: ActiveValue::Set(buffer.epoch),
827 replica_id: ActiveValue::Set(replica_id as i32),
828 lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32),
829 value: ActiveValue::Set(value.encode_to_vec()),
830 })
831}
832
833fn operation_from_storage(
834 row: buffer_operation::Model,
835 _format_version: i32,
836) -> Result<proto::operation::Variant, Error> {
837 let operation =
838 storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{}", error))?;
839 let version = version_from_storage(&operation.version);
840 Ok(if operation.is_undo {
841 proto::operation::Variant::Undo(proto::operation::Undo {
842 replica_id: row.replica_id as u32,
843 lamport_timestamp: row.lamport_timestamp as u32,
844 version,
845 counts: operation
846 .undo_counts
847 .iter()
848 .map(|entry| proto::UndoCount {
849 replica_id: entry.replica_id,
850 lamport_timestamp: entry.lamport_timestamp,
851 count: entry.count,
852 })
853 .collect(),
854 })
855 } else {
856 proto::operation::Variant::Edit(proto::operation::Edit {
857 replica_id: row.replica_id as u32,
858 lamport_timestamp: row.lamport_timestamp as u32,
859 version,
860 ranges: operation
861 .edit_ranges
862 .into_iter()
863 .map(|range| proto::Range {
864 start: range.start,
865 end: range.end,
866 })
867 .collect(),
868 new_text: operation.edit_texts,
869 })
870 })
871}
872
873fn version_to_storage(version: &[proto::VectorClockEntry]) -> Vec<storage::VectorClockEntry> {
874 version
875 .iter()
876 .map(|entry| storage::VectorClockEntry {
877 replica_id: entry.replica_id,
878 timestamp: entry.timestamp,
879 })
880 .collect()
881}
882
883fn version_from_storage(version: &[storage::VectorClockEntry]) -> Vec<proto::VectorClockEntry> {
884 version
885 .iter()
886 .map(|entry| proto::VectorClockEntry {
887 replica_id: entry.replica_id,
888 timestamp: entry.timestamp,
889 })
890 .collect()
891}
892
893// This is currently a manual copy of the deserialization code in the client's language crate
894pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operation> {
895 match operation.variant? {
896 proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
897 timestamp: clock::Lamport {
898 replica_id: edit.replica_id as text::ReplicaId,
899 value: edit.lamport_timestamp,
900 },
901 version: version_from_wire(&edit.version),
902 ranges: edit
903 .ranges
904 .into_iter()
905 .map(|range| {
906 text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
907 })
908 .collect(),
909 new_text: edit.new_text.into_iter().map(Arc::from).collect(),
910 })),
911 proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation {
912 timestamp: clock::Lamport {
913 replica_id: undo.replica_id as text::ReplicaId,
914 value: undo.lamport_timestamp,
915 },
916 version: version_from_wire(&undo.version),
917 counts: undo
918 .counts
919 .into_iter()
920 .map(|c| {
921 (
922 clock::Lamport {
923 replica_id: c.replica_id as text::ReplicaId,
924 value: c.lamport_timestamp,
925 },
926 c.count,
927 )
928 })
929 .collect(),
930 })),
931 _ => None,
932 }
933}
934
935fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
936 let mut version = clock::Global::new();
937 for entry in message {
938 version.observe(clock::Lamport {
939 replica_id: entry.replica_id as text::ReplicaId,
940 value: entry.timestamp,
941 });
942 }
943 version
944}
945
946fn version_to_wire(version: &clock::Global) -> Vec<proto::VectorClockEntry> {
947 let mut message = Vec::new();
948 for entry in version.iter() {
949 message.push(proto::VectorClockEntry {
950 replica_id: entry.replica_id as u32,
951 timestamp: entry.value,
952 });
953 }
954 message
955}
956
957#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
958enum QueryOperationSerializationVersion {
959 OperationSerializationVersion,
960}
961
962mod storage {
963 #![allow(non_snake_case)]
964 use prost::Message;
965 pub const SERIALIZATION_VERSION: i32 = 1;
966
967 #[derive(Message)]
968 pub struct Operation {
969 #[prost(message, repeated, tag = "2")]
970 pub version: Vec<VectorClockEntry>,
971 #[prost(bool, tag = "3")]
972 pub is_undo: bool,
973 #[prost(message, repeated, tag = "4")]
974 pub edit_ranges: Vec<Range>,
975 #[prost(string, repeated, tag = "5")]
976 pub edit_texts: Vec<String>,
977 #[prost(message, repeated, tag = "6")]
978 pub undo_counts: Vec<UndoCount>,
979 }
980
981 #[derive(Message)]
982 pub struct VectorClockEntry {
983 #[prost(uint32, tag = "1")]
984 pub replica_id: u32,
985 #[prost(uint32, tag = "2")]
986 pub timestamp: u32,
987 }
988
989 #[derive(Message)]
990 pub struct Range {
991 #[prost(uint64, tag = "1")]
992 pub start: u64,
993 #[prost(uint64, tag = "2")]
994 pub end: u64,
995 }
996
997 #[derive(Message)]
998 pub struct UndoCount {
999 #[prost(uint32, tag = "1")]
1000 pub replica_id: u32,
1001 #[prost(uint32, tag = "2")]
1002 pub lamport_timestamp: u32,
1003 #[prost(uint32, tag = "3")]
1004 pub count: u32,
1005 }
1006}