1use super::*;
2use prost::Message;
3use text::{EditOperation, UndoOperation};
4
5pub struct LeftChannelBuffer {
6 pub channel_id: ChannelId,
7 pub collaborators: Vec<proto::Collaborator>,
8 pub connections: Vec<ConnectionId>,
9}
10
11impl Database {
12 /// Open a channel buffer. Returns the current contents, and adds you to the list of people
13 /// to notify on changes.
14 pub async fn join_channel_buffer(
15 &self,
16 channel_id: ChannelId,
17 user_id: UserId,
18 connection: ConnectionId,
19 ) -> Result<proto::JoinChannelBufferResponse> {
20 self.transaction(|tx| async move {
21 let channel = self.get_channel_internal(channel_id, &tx).await?;
22 self.check_user_is_channel_participant(&channel, user_id, &tx)
23 .await?;
24
25 let buffer = channel::Model {
26 id: channel_id,
27 ..Default::default()
28 }
29 .find_related(buffer::Entity)
30 .one(&*tx)
31 .await?;
32
33 let buffer = if let Some(buffer) = buffer {
34 buffer
35 } else {
36 let buffer = buffer::ActiveModel {
37 channel_id: ActiveValue::Set(channel_id),
38 ..Default::default()
39 }
40 .insert(&*tx)
41 .await?;
42 buffer_snapshot::ActiveModel {
43 buffer_id: ActiveValue::Set(buffer.id),
44 epoch: ActiveValue::Set(0),
45 text: ActiveValue::Set(String::new()),
46 operation_serialization_version: ActiveValue::Set(
47 storage::SERIALIZATION_VERSION,
48 ),
49 }
50 .insert(&*tx)
51 .await?;
52 buffer
53 };
54
55 // Join the collaborators
56 let mut collaborators = channel_buffer_collaborator::Entity::find()
57 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
58 .all(&*tx)
59 .await?;
60 let replica_ids = collaborators
61 .iter()
62 .map(|c| c.replica_id)
63 .collect::<HashSet<_>>();
64 let mut replica_id = ReplicaId(0);
65 while replica_ids.contains(&replica_id) {
66 replica_id.0 += 1;
67 }
68 let collaborator = channel_buffer_collaborator::ActiveModel {
69 channel_id: ActiveValue::Set(channel_id),
70 connection_id: ActiveValue::Set(connection.id as i32),
71 connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
72 user_id: ActiveValue::Set(user_id),
73 replica_id: ActiveValue::Set(replica_id),
74 ..Default::default()
75 }
76 .insert(&*tx)
77 .await?;
78 collaborators.push(collaborator);
79
80 let (base_text, operations, max_operation) =
81 self.get_buffer_state(&buffer, &tx).await?;
82
83 // Save the last observed operation
84 if let Some(op) = max_operation {
85 observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
86 user_id: ActiveValue::Set(user_id),
87 buffer_id: ActiveValue::Set(buffer.id),
88 epoch: ActiveValue::Set(op.epoch),
89 lamport_timestamp: ActiveValue::Set(op.lamport_timestamp),
90 replica_id: ActiveValue::Set(op.replica_id),
91 })
92 .on_conflict(
93 OnConflict::columns([
94 observed_buffer_edits::Column::UserId,
95 observed_buffer_edits::Column::BufferId,
96 ])
97 .update_columns([
98 observed_buffer_edits::Column::Epoch,
99 observed_buffer_edits::Column::LamportTimestamp,
100 ])
101 .to_owned(),
102 )
103 .exec(&*tx)
104 .await?;
105 }
106
107 Ok(proto::JoinChannelBufferResponse {
108 buffer_id: buffer.id.to_proto(),
109 replica_id: replica_id.to_proto() as u32,
110 base_text,
111 operations,
112 epoch: buffer.epoch as u64,
113 collaborators: collaborators
114 .into_iter()
115 .map(|collaborator| proto::Collaborator {
116 peer_id: Some(collaborator.connection().into()),
117 user_id: collaborator.user_id.to_proto(),
118 replica_id: collaborator.replica_id.0 as u32,
119 is_host: false,
120 })
121 .collect(),
122 })
123 })
124 .await
125 }
126
127 /// Rejoin a channel buffer (after a connection interruption)
128 pub async fn rejoin_channel_buffers(
129 &self,
130 buffers: &[proto::ChannelBufferVersion],
131 user_id: UserId,
132 connection_id: ConnectionId,
133 ) -> Result<Vec<RejoinedChannelBuffer>> {
134 self.transaction(|tx| async move {
135 let mut results = Vec::new();
136 for client_buffer in buffers {
137 let channel = self
138 .get_channel_internal(ChannelId::from_proto(client_buffer.channel_id), &tx)
139 .await?;
140 if self
141 .check_user_is_channel_participant(&channel, user_id, &tx)
142 .await
143 .is_err()
144 {
145 log::info!("user is not a member of channel");
146 continue;
147 }
148
149 let buffer = self.get_channel_buffer(channel.id, &tx).await?;
150 let mut collaborators = channel_buffer_collaborator::Entity::find()
151 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel.id))
152 .all(&*tx)
153 .await?;
154
155 // If the buffer epoch hasn't changed since the client lost
156 // connection, then the client's buffer can be synchronized with
157 // the server's buffer.
158 if buffer.epoch as u64 != client_buffer.epoch {
159 log::info!("can't rejoin buffer, epoch has changed");
160 continue;
161 }
162
163 // Find the collaborator record for this user's previous lost
164 // connection. Update it with the new connection id.
165 let Some(self_collaborator) =
166 collaborators.iter_mut().find(|c| c.user_id == user_id)
167 else {
168 log::info!("can't rejoin buffer, no previous collaborator found");
169 continue;
170 };
171 let old_connection_id = self_collaborator.connection();
172 *self_collaborator = channel_buffer_collaborator::ActiveModel {
173 id: ActiveValue::Unchanged(self_collaborator.id),
174 connection_id: ActiveValue::Set(connection_id.id as i32),
175 connection_server_id: ActiveValue::Set(ServerId(connection_id.owner_id as i32)),
176 connection_lost: ActiveValue::Set(false),
177 ..Default::default()
178 }
179 .update(&*tx)
180 .await?;
181
182 let client_version = version_from_wire(&client_buffer.version);
183 let serialization_version = self
184 .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
185 .await?;
186
187 let mut rows = buffer_operation::Entity::find()
188 .filter(
189 buffer_operation::Column::BufferId
190 .eq(buffer.id)
191 .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
192 )
193 .stream(&*tx)
194 .await?;
195
196 // Find the server's version vector and any operations
197 // that the client has not seen.
198 let mut server_version = clock::Global::new();
199 let mut operations = Vec::new();
200 while let Some(row) = rows.next().await {
201 let row = row?;
202 let timestamp = clock::Lamport {
203 replica_id: row.replica_id as u16,
204 value: row.lamport_timestamp as u32,
205 };
206 server_version.observe(timestamp);
207 if !client_version.observed(timestamp) {
208 operations.push(proto::Operation {
209 variant: Some(operation_from_storage(row, serialization_version)?),
210 })
211 }
212 }
213
214 results.push(RejoinedChannelBuffer {
215 old_connection_id,
216 buffer: proto::RejoinedChannelBuffer {
217 channel_id: client_buffer.channel_id,
218 version: version_to_wire(&server_version),
219 operations,
220 collaborators: collaborators
221 .into_iter()
222 .map(|collaborator| proto::Collaborator {
223 peer_id: Some(collaborator.connection().into()),
224 user_id: collaborator.user_id.to_proto(),
225 replica_id: collaborator.replica_id.0 as u32,
226 is_host: false,
227 })
228 .collect(),
229 },
230 });
231 }
232
233 Ok(results)
234 })
235 .await
236 }
237
238 /// Clear out any buffer collaborators who are no longer collaborating.
239 pub async fn clear_stale_channel_buffer_collaborators(
240 &self,
241 channel_id: ChannelId,
242 server_id: ServerId,
243 ) -> Result<RefreshedChannelBuffer> {
244 self.transaction(|tx| async move {
245 let db_collaborators = channel_buffer_collaborator::Entity::find()
246 .filter(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
247 .all(&*tx)
248 .await?;
249
250 let mut connection_ids = Vec::new();
251 let mut collaborators = Vec::new();
252 let mut collaborator_ids_to_remove = Vec::new();
253 for db_collaborator in &db_collaborators {
254 if !db_collaborator.connection_lost
255 && db_collaborator.connection_server_id == server_id
256 {
257 connection_ids.push(db_collaborator.connection());
258 collaborators.push(proto::Collaborator {
259 peer_id: Some(db_collaborator.connection().into()),
260 replica_id: db_collaborator.replica_id.0 as u32,
261 user_id: db_collaborator.user_id.to_proto(),
262 is_host: false,
263 })
264 } else {
265 collaborator_ids_to_remove.push(db_collaborator.id);
266 }
267 }
268
269 channel_buffer_collaborator::Entity::delete_many()
270 .filter(channel_buffer_collaborator::Column::Id.is_in(collaborator_ids_to_remove))
271 .exec(&*tx)
272 .await?;
273
274 Ok(RefreshedChannelBuffer {
275 connection_ids,
276 collaborators,
277 })
278 })
279 .await
280 }
281
282 /// Close the channel buffer, and stop receiving updates for it.
283 pub async fn leave_channel_buffer(
284 &self,
285 channel_id: ChannelId,
286 connection: ConnectionId,
287 ) -> Result<LeftChannelBuffer> {
288 self.transaction(|tx| async move {
289 self.leave_channel_buffer_internal(channel_id, connection, &tx)
290 .await
291 })
292 .await
293 }
294
295 /// Close the channel buffer, and stop receiving updates for it.
296 pub async fn channel_buffer_connection_lost(
297 &self,
298 connection: ConnectionId,
299 tx: &DatabaseTransaction,
300 ) -> Result<()> {
301 channel_buffer_collaborator::Entity::update_many()
302 .filter(
303 Condition::all()
304 .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
305 .add(
306 channel_buffer_collaborator::Column::ConnectionServerId
307 .eq(connection.owner_id as i32),
308 ),
309 )
310 .set(channel_buffer_collaborator::ActiveModel {
311 connection_lost: ActiveValue::set(true),
312 ..Default::default()
313 })
314 .exec(tx)
315 .await?;
316 Ok(())
317 }
318
319 /// Close all open channel buffers
320 pub async fn leave_channel_buffers(
321 &self,
322 connection: ConnectionId,
323 ) -> Result<Vec<LeftChannelBuffer>> {
324 self.transaction(|tx| async move {
325 #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
326 enum QueryChannelIds {
327 ChannelId,
328 }
329
330 let channel_ids: Vec<ChannelId> = channel_buffer_collaborator::Entity::find()
331 .select_only()
332 .column(channel_buffer_collaborator::Column::ChannelId)
333 .filter(Condition::all().add(
334 channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32),
335 ))
336 .into_values::<_, QueryChannelIds>()
337 .all(&*tx)
338 .await?;
339
340 let mut result = Vec::new();
341 for channel_id in channel_ids {
342 let left_channel_buffer = self
343 .leave_channel_buffer_internal(channel_id, connection, &tx)
344 .await?;
345 result.push(left_channel_buffer);
346 }
347
348 Ok(result)
349 })
350 .await
351 }
352
353 async fn leave_channel_buffer_internal(
354 &self,
355 channel_id: ChannelId,
356 connection: ConnectionId,
357 tx: &DatabaseTransaction,
358 ) -> Result<LeftChannelBuffer> {
359 let result = channel_buffer_collaborator::Entity::delete_many()
360 .filter(
361 Condition::all()
362 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
363 .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
364 .add(
365 channel_buffer_collaborator::Column::ConnectionServerId
366 .eq(connection.owner_id as i32),
367 ),
368 )
369 .exec(tx)
370 .await?;
371 if result.rows_affected == 0 {
372 Err(anyhow!("not a collaborator on this project"))?;
373 }
374
375 let mut collaborators = Vec::new();
376 let mut connections = Vec::new();
377 let mut rows = channel_buffer_collaborator::Entity::find()
378 .filter(
379 Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
380 )
381 .stream(tx)
382 .await?;
383 while let Some(row) = rows.next().await {
384 let row = row?;
385 let connection = row.connection();
386 connections.push(connection);
387 collaborators.push(proto::Collaborator {
388 peer_id: Some(connection.into()),
389 replica_id: row.replica_id.0 as u32,
390 user_id: row.user_id.to_proto(),
391 is_host: false,
392 });
393 }
394
395 drop(rows);
396
397 if collaborators.is_empty() {
398 self.snapshot_channel_buffer(channel_id, tx).await?;
399 }
400
401 Ok(LeftChannelBuffer {
402 channel_id,
403 collaborators,
404 connections,
405 })
406 }
407
408 pub async fn get_channel_buffer_collaborators(
409 &self,
410 channel_id: ChannelId,
411 ) -> Result<Vec<UserId>> {
412 self.transaction(|tx| async move {
413 self.get_channel_buffer_collaborators_internal(channel_id, &tx)
414 .await
415 })
416 .await
417 }
418
419 async fn get_channel_buffer_collaborators_internal(
420 &self,
421 channel_id: ChannelId,
422 tx: &DatabaseTransaction,
423 ) -> Result<Vec<UserId>> {
424 #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
425 enum QueryUserIds {
426 UserId,
427 }
428
429 let users: Vec<UserId> = channel_buffer_collaborator::Entity::find()
430 .select_only()
431 .column(channel_buffer_collaborator::Column::UserId)
432 .filter(
433 Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
434 )
435 .into_values::<_, QueryUserIds>()
436 .all(tx)
437 .await?;
438
439 Ok(users)
440 }
441
442 pub async fn update_channel_buffer(
443 &self,
444 channel_id: ChannelId,
445 user: UserId,
446 operations: &[proto::Operation],
447 ) -> Result<(HashSet<ConnectionId>, i32, Vec<proto::VectorClockEntry>)> {
448 self.transaction(move |tx| async move {
449 let channel = self.get_channel_internal(channel_id, &tx).await?;
450
451 let mut requires_write_permission = false;
452 for op in operations.iter() {
453 match op.variant {
454 None | Some(proto::operation::Variant::UpdateSelections(_)) => {}
455 Some(_) => requires_write_permission = true,
456 }
457 }
458 if requires_write_permission {
459 self.check_user_is_channel_member(&channel, user, &tx)
460 .await?;
461 } else {
462 self.check_user_is_channel_participant(&channel, user, &tx)
463 .await?;
464 }
465
466 let buffer = buffer::Entity::find()
467 .filter(buffer::Column::ChannelId.eq(channel_id))
468 .one(&*tx)
469 .await?
470 .ok_or_else(|| anyhow!("no such buffer"))?;
471
472 let serialization_version = self
473 .get_buffer_operation_serialization_version(buffer.id, buffer.epoch, &tx)
474 .await?;
475
476 let operations = operations
477 .iter()
478 .filter_map(|op| operation_to_storage(op, &buffer, serialization_version))
479 .collect::<Vec<_>>();
480
481 let max_version;
482
483 if !operations.is_empty() {
484 let max_operation = operations
485 .iter()
486 .max_by_key(|op| (op.lamport_timestamp.as_ref(), op.replica_id.as_ref()))
487 .unwrap();
488
489 max_version = vec![proto::VectorClockEntry {
490 replica_id: *max_operation.replica_id.as_ref() as u32,
491 timestamp: *max_operation.lamport_timestamp.as_ref() as u32,
492 }];
493
494 // get current channel participants and save the max operation above
495 self.save_max_operation(
496 user,
497 buffer.id,
498 buffer.epoch,
499 *max_operation.replica_id.as_ref(),
500 *max_operation.lamport_timestamp.as_ref(),
501 &tx,
502 )
503 .await?;
504
505 buffer_operation::Entity::insert_many(operations)
506 .on_conflict(
507 OnConflict::columns([
508 buffer_operation::Column::BufferId,
509 buffer_operation::Column::Epoch,
510 buffer_operation::Column::LamportTimestamp,
511 buffer_operation::Column::ReplicaId,
512 ])
513 .do_nothing()
514 .to_owned(),
515 )
516 .exec(&*tx)
517 .await?;
518 } else {
519 max_version = Vec::new();
520 }
521
522 let mut connections = HashSet::default();
523 let mut rows = channel_buffer_collaborator::Entity::find()
524 .filter(
525 Condition::all()
526 .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
527 )
528 .stream(&*tx)
529 .await?;
530 while let Some(row) = rows.next().await {
531 let row = row?;
532 connections.insert(ConnectionId {
533 id: row.connection_id as u32,
534 owner_id: row.connection_server_id.0 as u32,
535 });
536 }
537
538 Ok((connections, buffer.epoch, max_version))
539 })
540 .await
541 }
542
543 async fn save_max_operation(
544 &self,
545 user_id: UserId,
546 buffer_id: BufferId,
547 epoch: i32,
548 replica_id: i32,
549 lamport_timestamp: i32,
550 tx: &DatabaseTransaction,
551 ) -> Result<()> {
552 buffer::Entity::update(buffer::ActiveModel {
553 id: ActiveValue::Unchanged(buffer_id),
554 epoch: ActiveValue::Unchanged(epoch),
555 latest_operation_epoch: ActiveValue::Set(Some(epoch)),
556 latest_operation_replica_id: ActiveValue::Set(Some(replica_id)),
557 latest_operation_lamport_timestamp: ActiveValue::Set(Some(lamport_timestamp)),
558 channel_id: ActiveValue::NotSet,
559 })
560 .exec(tx)
561 .await?;
562
563 use observed_buffer_edits::Column;
564 observed_buffer_edits::Entity::insert(observed_buffer_edits::ActiveModel {
565 user_id: ActiveValue::Set(user_id),
566 buffer_id: ActiveValue::Set(buffer_id),
567 epoch: ActiveValue::Set(epoch),
568 replica_id: ActiveValue::Set(replica_id),
569 lamport_timestamp: ActiveValue::Set(lamport_timestamp),
570 })
571 .on_conflict(
572 OnConflict::columns([Column::UserId, Column::BufferId])
573 .update_columns([Column::Epoch, Column::LamportTimestamp, Column::ReplicaId])
574 .action_cond_where(
575 Condition::any().add(Column::Epoch.lt(epoch)).add(
576 Condition::all().add(Column::Epoch.eq(epoch)).add(
577 Condition::any()
578 .add(Column::LamportTimestamp.lt(lamport_timestamp))
579 .add(
580 Column::LamportTimestamp
581 .eq(lamport_timestamp)
582 .and(Column::ReplicaId.lt(replica_id)),
583 ),
584 ),
585 ),
586 )
587 .to_owned(),
588 )
589 .exec_without_returning(tx)
590 .await?;
591
592 Ok(())
593 }
594
595 async fn get_buffer_operation_serialization_version(
596 &self,
597 buffer_id: BufferId,
598 epoch: i32,
599 tx: &DatabaseTransaction,
600 ) -> Result<i32> {
601 Ok(buffer_snapshot::Entity::find()
602 .filter(buffer_snapshot::Column::BufferId.eq(buffer_id))
603 .filter(buffer_snapshot::Column::Epoch.eq(epoch))
604 .select_only()
605 .column(buffer_snapshot::Column::OperationSerializationVersion)
606 .into_values::<_, QueryOperationSerializationVersion>()
607 .one(tx)
608 .await?
609 .ok_or_else(|| anyhow!("missing buffer snapshot"))?)
610 }
611
612 pub async fn get_channel_buffer(
613 &self,
614 channel_id: ChannelId,
615 tx: &DatabaseTransaction,
616 ) -> Result<buffer::Model> {
617 Ok(channel::Model {
618 id: channel_id,
619 ..Default::default()
620 }
621 .find_related(buffer::Entity)
622 .one(tx)
623 .await?
624 .ok_or_else(|| anyhow!("no such buffer"))?)
625 }
626
627 async fn get_buffer_state(
628 &self,
629 buffer: &buffer::Model,
630 tx: &DatabaseTransaction,
631 ) -> Result<(
632 String,
633 Vec<proto::Operation>,
634 Option<buffer_operation::Model>,
635 )> {
636 let id = buffer.id;
637 let (base_text, version) = if buffer.epoch > 0 {
638 let snapshot = buffer_snapshot::Entity::find()
639 .filter(
640 buffer_snapshot::Column::BufferId
641 .eq(id)
642 .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
643 )
644 .one(tx)
645 .await?
646 .ok_or_else(|| anyhow!("no such snapshot"))?;
647
648 let version = snapshot.operation_serialization_version;
649 (snapshot.text, version)
650 } else {
651 (String::new(), storage::SERIALIZATION_VERSION)
652 };
653
654 let mut rows = buffer_operation::Entity::find()
655 .filter(
656 buffer_operation::Column::BufferId
657 .eq(id)
658 .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
659 )
660 .order_by_asc(buffer_operation::Column::LamportTimestamp)
661 .order_by_asc(buffer_operation::Column::ReplicaId)
662 .stream(tx)
663 .await?;
664
665 let mut operations = Vec::new();
666 let mut last_row = None;
667 while let Some(row) = rows.next().await {
668 let row = row?;
669 last_row = Some(buffer_operation::Model {
670 buffer_id: row.buffer_id,
671 epoch: row.epoch,
672 lamport_timestamp: row.lamport_timestamp,
673 replica_id: row.replica_id,
674 value: Default::default(),
675 });
676 operations.push(proto::Operation {
677 variant: Some(operation_from_storage(row, version)?),
678 });
679 }
680
681 Ok((base_text, operations, last_row))
682 }
683
684 async fn snapshot_channel_buffer(
685 &self,
686 channel_id: ChannelId,
687 tx: &DatabaseTransaction,
688 ) -> Result<()> {
689 let buffer = self.get_channel_buffer(channel_id, tx).await?;
690 let (base_text, operations, _) = self.get_buffer_state(&buffer, tx).await?;
691 if operations.is_empty() {
692 return Ok(());
693 }
694
695 let mut text_buffer = text::Buffer::new(0, text::BufferId::new(1).unwrap(), base_text);
696 text_buffer.apply_ops(operations.into_iter().filter_map(operation_from_wire));
697
698 let base_text = text_buffer.text();
699 let epoch = buffer.epoch + 1;
700
701 buffer_snapshot::Model {
702 buffer_id: buffer.id,
703 epoch,
704 text: base_text,
705 operation_serialization_version: storage::SERIALIZATION_VERSION,
706 }
707 .into_active_model()
708 .insert(tx)
709 .await?;
710
711 buffer::ActiveModel {
712 id: ActiveValue::Unchanged(buffer.id),
713 epoch: ActiveValue::Set(epoch),
714 latest_operation_epoch: ActiveValue::NotSet,
715 latest_operation_replica_id: ActiveValue::NotSet,
716 latest_operation_lamport_timestamp: ActiveValue::NotSet,
717 channel_id: ActiveValue::NotSet,
718 }
719 .save(tx)
720 .await?;
721
722 Ok(())
723 }
724
725 pub async fn observe_buffer_version(
726 &self,
727 buffer_id: BufferId,
728 user_id: UserId,
729 epoch: i32,
730 version: &[proto::VectorClockEntry],
731 ) -> Result<()> {
732 self.transaction(|tx| async move {
733 // For now, combine concurrent operations.
734 let Some(component) = version.iter().max_by_key(|version| version.timestamp) else {
735 return Ok(());
736 };
737 self.save_max_operation(
738 user_id,
739 buffer_id,
740 epoch,
741 component.replica_id as i32,
742 component.timestamp as i32,
743 &tx,
744 )
745 .await?;
746 Ok(())
747 })
748 .await
749 }
750
751 pub async fn observed_channel_buffer_changes(
752 &self,
753 channel_ids_by_buffer_id: &HashMap<BufferId, ChannelId>,
754 user_id: UserId,
755 tx: &DatabaseTransaction,
756 ) -> Result<Vec<proto::ChannelBufferVersion>> {
757 let observed_operations = observed_buffer_edits::Entity::find()
758 .filter(observed_buffer_edits::Column::UserId.eq(user_id))
759 .filter(
760 observed_buffer_edits::Column::BufferId
761 .is_in(channel_ids_by_buffer_id.keys().copied()),
762 )
763 .all(tx)
764 .await?;
765
766 Ok(observed_operations
767 .iter()
768 .flat_map(|op| {
769 Some(proto::ChannelBufferVersion {
770 channel_id: channel_ids_by_buffer_id.get(&op.buffer_id)?.to_proto(),
771 epoch: op.epoch as u64,
772 version: vec![proto::VectorClockEntry {
773 replica_id: op.replica_id as u32,
774 timestamp: op.lamport_timestamp as u32,
775 }],
776 })
777 })
778 .collect())
779 }
780}
781
782fn operation_to_storage(
783 operation: &proto::Operation,
784 buffer: &buffer::Model,
785 _format: i32,
786) -> Option<buffer_operation::ActiveModel> {
787 let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? {
788 proto::operation::Variant::Edit(operation) => (
789 operation.replica_id,
790 operation.lamport_timestamp,
791 storage::Operation {
792 version: version_to_storage(&operation.version),
793 is_undo: false,
794 edit_ranges: operation
795 .ranges
796 .iter()
797 .map(|range| storage::Range {
798 start: range.start,
799 end: range.end,
800 })
801 .collect(),
802 edit_texts: operation.new_text.clone(),
803 undo_counts: Vec::new(),
804 },
805 ),
806 proto::operation::Variant::Undo(operation) => (
807 operation.replica_id,
808 operation.lamport_timestamp,
809 storage::Operation {
810 version: version_to_storage(&operation.version),
811 is_undo: true,
812 edit_ranges: Vec::new(),
813 edit_texts: Vec::new(),
814 undo_counts: operation
815 .counts
816 .iter()
817 .map(|entry| storage::UndoCount {
818 replica_id: entry.replica_id,
819 lamport_timestamp: entry.lamport_timestamp,
820 count: entry.count,
821 })
822 .collect(),
823 },
824 ),
825 _ => None?,
826 };
827
828 Some(buffer_operation::ActiveModel {
829 buffer_id: ActiveValue::Set(buffer.id),
830 epoch: ActiveValue::Set(buffer.epoch),
831 replica_id: ActiveValue::Set(replica_id as i32),
832 lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32),
833 value: ActiveValue::Set(value.encode_to_vec()),
834 })
835}
836
837fn operation_from_storage(
838 row: buffer_operation::Model,
839 _format_version: i32,
840) -> Result<proto::operation::Variant, Error> {
841 let operation =
842 storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{}", error))?;
843 let version = version_from_storage(&operation.version);
844 Ok(if operation.is_undo {
845 proto::operation::Variant::Undo(proto::operation::Undo {
846 replica_id: row.replica_id as u32,
847 lamport_timestamp: row.lamport_timestamp as u32,
848 version,
849 counts: operation
850 .undo_counts
851 .iter()
852 .map(|entry| proto::UndoCount {
853 replica_id: entry.replica_id,
854 lamport_timestamp: entry.lamport_timestamp,
855 count: entry.count,
856 })
857 .collect(),
858 })
859 } else {
860 proto::operation::Variant::Edit(proto::operation::Edit {
861 replica_id: row.replica_id as u32,
862 lamport_timestamp: row.lamport_timestamp as u32,
863 version,
864 ranges: operation
865 .edit_ranges
866 .into_iter()
867 .map(|range| proto::Range {
868 start: range.start,
869 end: range.end,
870 })
871 .collect(),
872 new_text: operation.edit_texts,
873 })
874 })
875}
876
877fn version_to_storage(version: &[proto::VectorClockEntry]) -> Vec<storage::VectorClockEntry> {
878 version
879 .iter()
880 .map(|entry| storage::VectorClockEntry {
881 replica_id: entry.replica_id,
882 timestamp: entry.timestamp,
883 })
884 .collect()
885}
886
887fn version_from_storage(version: &[storage::VectorClockEntry]) -> Vec<proto::VectorClockEntry> {
888 version
889 .iter()
890 .map(|entry| proto::VectorClockEntry {
891 replica_id: entry.replica_id,
892 timestamp: entry.timestamp,
893 })
894 .collect()
895}
896
897// This is currently a manual copy of the deserialization code in the client's language crate
898pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operation> {
899 match operation.variant? {
900 proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
901 timestamp: clock::Lamport {
902 replica_id: edit.replica_id as text::ReplicaId,
903 value: edit.lamport_timestamp,
904 },
905 version: version_from_wire(&edit.version),
906 ranges: edit
907 .ranges
908 .into_iter()
909 .map(|range| {
910 text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
911 })
912 .collect(),
913 new_text: edit.new_text.into_iter().map(Arc::from).collect(),
914 })),
915 proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation {
916 timestamp: clock::Lamport {
917 replica_id: undo.replica_id as text::ReplicaId,
918 value: undo.lamport_timestamp,
919 },
920 version: version_from_wire(&undo.version),
921 counts: undo
922 .counts
923 .into_iter()
924 .map(|c| {
925 (
926 clock::Lamport {
927 replica_id: c.replica_id as text::ReplicaId,
928 value: c.lamport_timestamp,
929 },
930 c.count,
931 )
932 })
933 .collect(),
934 })),
935 _ => None,
936 }
937}
938
939fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
940 let mut version = clock::Global::new();
941 for entry in message {
942 version.observe(clock::Lamport {
943 replica_id: entry.replica_id as text::ReplicaId,
944 value: entry.timestamp,
945 });
946 }
947 version
948}
949
950fn version_to_wire(version: &clock::Global) -> Vec<proto::VectorClockEntry> {
951 let mut message = Vec::new();
952 for entry in version.iter() {
953 message.push(proto::VectorClockEntry {
954 replica_id: entry.replica_id as u32,
955 timestamp: entry.value,
956 });
957 }
958 message
959}
960
961#[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
962enum QueryOperationSerializationVersion {
963 OperationSerializationVersion,
964}
965
966mod storage {
967 #![allow(non_snake_case)]
968 use prost::Message;
969 pub const SERIALIZATION_VERSION: i32 = 1;
970
971 #[derive(Message)]
972 pub struct Operation {
973 #[prost(message, repeated, tag = "2")]
974 pub version: Vec<VectorClockEntry>,
975 #[prost(bool, tag = "3")]
976 pub is_undo: bool,
977 #[prost(message, repeated, tag = "4")]
978 pub edit_ranges: Vec<Range>,
979 #[prost(string, repeated, tag = "5")]
980 pub edit_texts: Vec<String>,
981 #[prost(message, repeated, tag = "6")]
982 pub undo_counts: Vec<UndoCount>,
983 }
984
985 #[derive(Message)]
986 pub struct VectorClockEntry {
987 #[prost(uint32, tag = "1")]
988 pub replica_id: u32,
989 #[prost(uint32, tag = "2")]
990 pub timestamp: u32,
991 }
992
993 #[derive(Message)]
994 pub struct Range {
995 #[prost(uint64, tag = "1")]
996 pub start: u64,
997 #[prost(uint64, tag = "2")]
998 pub end: u64,
999 }
1000
1001 #[derive(Message)]
1002 pub struct UndoCount {
1003 #[prost(uint32, tag = "1")]
1004 pub replica_id: u32,
1005 #[prost(uint32, tag = "2")]
1006 pub lamport_timestamp: u32,
1007 #[prost(uint32, tag = "3")]
1008 pub count: u32,
1009 }
1010}