1use super::*;
2use prost::Message;
3
4pub struct Buffer {
5 pub base_text: String,
6 pub operations: Vec<proto::Operation>,
7}
8
9impl Database {
10 pub async fn create_buffer(&self) -> Result<BufferId> {
11 self.transaction(|tx| async move {
12 let buffer = buffer::ActiveModel::new().insert(&*tx).await?;
13 Ok(buffer.id)
14 })
15 .await
16 }
17
18 pub async fn update_buffer(
19 &self,
20 buffer_id: BufferId,
21 operations: &[proto::Operation],
22 ) -> Result<()> {
23 self.transaction(|tx| async move {
24 let buffer = buffer::Entity::find_by_id(buffer_id)
25 .one(&*tx)
26 .await?
27 .ok_or_else(|| anyhow!("no such buffer"))?;
28 buffer_operation::Entity::insert_many(operations.iter().filter_map(|operation| {
29 match operation.variant.as_ref()? {
30 proto::operation::Variant::Edit(operation) => {
31 let value =
32 serialize_edit_operation(&operation.ranges, &operation.new_text);
33 let version = serialize_version(&operation.version);
34 Some(buffer_operation::ActiveModel {
35 buffer_id: ActiveValue::Set(buffer_id),
36 epoch: ActiveValue::Set(buffer.epoch),
37 replica_id: ActiveValue::Set(operation.replica_id as i32),
38 lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32),
39 local_timestamp: ActiveValue::Set(operation.local_timestamp as i32),
40 is_undo: ActiveValue::Set(false),
41 version: ActiveValue::Set(version),
42 value: ActiveValue::Set(value),
43 })
44 }
45 proto::operation::Variant::Undo(operation) => {
46 let value = serialize_undo_operation(&operation.counts);
47 let version = serialize_version(&operation.version);
48 Some(buffer_operation::ActiveModel {
49 buffer_id: ActiveValue::Set(buffer_id),
50 epoch: ActiveValue::Set(buffer.epoch),
51 replica_id: ActiveValue::Set(operation.replica_id as i32),
52 lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32),
53 local_timestamp: ActiveValue::Set(operation.local_timestamp as i32),
54 is_undo: ActiveValue::Set(true),
55 version: ActiveValue::Set(version),
56 value: ActiveValue::Set(value),
57 })
58 }
59 proto::operation::Variant::UpdateSelections(_) => None,
60 proto::operation::Variant::UpdateDiagnostics(_) => None,
61 proto::operation::Variant::UpdateCompletionTriggers(_) => None,
62 }
63 }))
64 .exec(&*tx)
65 .await?;
66
67 Ok(())
68 })
69 .await
70 }
71
72 pub async fn get_buffer(&self, id: BufferId) -> Result<Buffer> {
73 self.transaction(|tx| async move {
74 let buffer = buffer::Entity::find_by_id(id)
75 .one(&*tx)
76 .await?
77 .ok_or_else(|| anyhow!("no such buffer"))?;
78
79 let base_text = if buffer.epoch > 0 {
80 buffer_snapshot::Entity::find()
81 .filter(
82 buffer_snapshot::Column::BufferId
83 .eq(id)
84 .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
85 )
86 .one(&*tx)
87 .await?
88 .ok_or_else(|| anyhow!("no such snapshot"))?
89 .text
90 } else {
91 String::new()
92 };
93
94 let mut rows = buffer_operation::Entity::find()
95 .filter(
96 buffer_operation::Column::BufferId
97 .eq(id)
98 .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
99 )
100 .stream(&*tx)
101 .await?;
102 let mut operations = Vec::new();
103 while let Some(row) = rows.next().await {
104 let row = row?;
105 let version = deserialize_version(&row.version)?;
106 let operation = if row.is_undo {
107 let counts = deserialize_undo_operation(&row.value)?;
108 proto::operation::Variant::Undo(proto::operation::Undo {
109 replica_id: row.replica_id as u32,
110 local_timestamp: row.local_timestamp as u32,
111 lamport_timestamp: row.lamport_timestamp as u32,
112 version,
113 counts,
114 })
115 } else {
116 let (ranges, new_text) = deserialize_edit_operation(&row.value)?;
117 proto::operation::Variant::Edit(proto::operation::Edit {
118 replica_id: row.replica_id as u32,
119 local_timestamp: row.local_timestamp as u32,
120 lamport_timestamp: row.lamport_timestamp as u32,
121 version,
122 ranges,
123 new_text,
124 })
125 };
126 operations.push(proto::Operation {
127 variant: Some(operation),
128 })
129 }
130
131 Ok(Buffer {
132 base_text,
133 operations,
134 })
135 })
136 .await
137 }
138}
139
140mod storage {
141 #![allow(non_snake_case)]
142
143 use prost::Message;
144
145 pub const VERSION: usize = 1;
146
147 #[derive(Message)]
148 pub struct VectorClock {
149 #[prost(message, repeated, tag = "1")]
150 pub entries: Vec<VectorClockEntry>,
151 }
152
153 #[derive(Message)]
154 pub struct VectorClockEntry {
155 #[prost(uint32, tag = "1")]
156 pub replica_id: u32,
157 #[prost(uint32, tag = "2")]
158 pub timestamp: u32,
159 }
160
161 #[derive(Message)]
162 pub struct TextEdit {
163 #[prost(message, repeated, tag = "1")]
164 pub ranges: Vec<Range>,
165 #[prost(string, repeated, tag = "2")]
166 pub texts: Vec<String>,
167 }
168
169 #[derive(Message)]
170 pub struct Range {
171 #[prost(uint64, tag = "1")]
172 pub start: u64,
173 #[prost(uint64, tag = "2")]
174 pub end: u64,
175 }
176
177 #[derive(Message)]
178 pub struct Undo {
179 #[prost(message, repeated, tag = "1")]
180 pub entries: Vec<UndoCount>,
181 }
182
183 #[derive(Message)]
184 pub struct UndoCount {
185 #[prost(uint32, tag = "1")]
186 pub replica_id: u32,
187 #[prost(uint32, tag = "2")]
188 pub local_timestamp: u32,
189 #[prost(uint32, tag = "3")]
190 pub count: u32,
191 }
192}
193
194fn serialize_version(version: &Vec<proto::VectorClockEntry>) -> Vec<u8> {
195 storage::VectorClock {
196 entries: version
197 .iter()
198 .map(|entry| storage::VectorClockEntry {
199 replica_id: entry.replica_id,
200 timestamp: entry.timestamp,
201 })
202 .collect(),
203 }
204 .encode_to_vec()
205}
206
207fn deserialize_version(bytes: &[u8]) -> Result<Vec<proto::VectorClockEntry>> {
208 let clock = storage::VectorClock::decode(bytes).map_err(|error| anyhow!("{}", error))?;
209 Ok(clock
210 .entries
211 .into_iter()
212 .map(|entry| proto::VectorClockEntry {
213 replica_id: entry.replica_id,
214 timestamp: entry.timestamp,
215 })
216 .collect())
217}
218
219fn serialize_edit_operation(ranges: &[proto::Range], texts: &[String]) -> Vec<u8> {
220 storage::TextEdit {
221 ranges: ranges
222 .iter()
223 .map(|range| storage::Range {
224 start: range.start,
225 end: range.end,
226 })
227 .collect(),
228 texts: texts.to_vec(),
229 }
230 .encode_to_vec()
231}
232
233fn deserialize_edit_operation(bytes: &[u8]) -> Result<(Vec<proto::Range>, Vec<String>)> {
234 let edit = storage::TextEdit::decode(bytes).map_err(|error| anyhow!("{}", error))?;
235 let ranges = edit
236 .ranges
237 .into_iter()
238 .map(|range| proto::Range {
239 start: range.start,
240 end: range.end,
241 })
242 .collect();
243 Ok((ranges, edit.texts))
244}
245
246fn serialize_undo_operation(counts: &Vec<proto::UndoCount>) -> Vec<u8> {
247 storage::Undo {
248 entries: counts
249 .iter()
250 .map(|entry| storage::UndoCount {
251 replica_id: entry.replica_id,
252 local_timestamp: entry.local_timestamp,
253 count: entry.count,
254 })
255 .collect(),
256 }
257 .encode_to_vec()
258}
259
260fn deserialize_undo_operation(bytes: &[u8]) -> Result<Vec<proto::UndoCount>> {
261 let undo = storage::Undo::decode(bytes).map_err(|error| anyhow!("{}", error))?;
262 Ok(undo
263 .entries
264 .iter()
265 .map(|entry| proto::UndoCount {
266 replica_id: entry.replica_id,
267 local_timestamp: entry.local_timestamp,
268 count: entry.count,
269 })
270 .collect())
271}