buffers.rs

  1use super::*;
  2use prost::Message;
  3
  4pub struct Buffer {
  5    pub base_text: String,
  6    pub operations: Vec<proto::Operation>,
  7}
  8
  9impl Database {
 10    pub async fn create_buffer(&self) -> Result<BufferId> {
 11        self.transaction(|tx| async move {
 12            let buffer = buffer::ActiveModel::new().insert(&*tx).await?;
 13            Ok(buffer.id)
 14        })
 15        .await
 16    }
 17
 18    pub async fn update_buffer(
 19        &self,
 20        buffer_id: BufferId,
 21        operations: &[proto::Operation],
 22    ) -> Result<()> {
 23        self.transaction(|tx| async move {
 24            let buffer = buffer::Entity::find_by_id(buffer_id)
 25                .one(&*tx)
 26                .await?
 27                .ok_or_else(|| anyhow!("no such buffer"))?;
 28            buffer_operation::Entity::insert_many(operations.iter().filter_map(|operation| {
 29                match operation.variant.as_ref()? {
 30                    proto::operation::Variant::Edit(operation) => {
 31                        let value =
 32                            serialize_edit_operation(&operation.ranges, &operation.new_text);
 33                        let version = serialize_version(&operation.version);
 34                        Some(buffer_operation::ActiveModel {
 35                            buffer_id: ActiveValue::Set(buffer_id),
 36                            epoch: ActiveValue::Set(buffer.epoch),
 37                            replica_id: ActiveValue::Set(operation.replica_id as i32),
 38                            lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32),
 39                            local_timestamp: ActiveValue::Set(operation.local_timestamp as i32),
 40                            is_undo: ActiveValue::Set(false),
 41                            version: ActiveValue::Set(version),
 42                            value: ActiveValue::Set(value),
 43                        })
 44                    }
 45                    proto::operation::Variant::Undo(operation) => {
 46                        let value = serialize_undo_operation(&operation.counts);
 47                        let version = serialize_version(&operation.version);
 48                        Some(buffer_operation::ActiveModel {
 49                            buffer_id: ActiveValue::Set(buffer_id),
 50                            epoch: ActiveValue::Set(buffer.epoch),
 51                            replica_id: ActiveValue::Set(operation.replica_id as i32),
 52                            lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32),
 53                            local_timestamp: ActiveValue::Set(operation.local_timestamp as i32),
 54                            is_undo: ActiveValue::Set(true),
 55                            version: ActiveValue::Set(version),
 56                            value: ActiveValue::Set(value),
 57                        })
 58                    }
 59                    proto::operation::Variant::UpdateSelections(_) => None,
 60                    proto::operation::Variant::UpdateDiagnostics(_) => None,
 61                    proto::operation::Variant::UpdateCompletionTriggers(_) => None,
 62                }
 63            }))
 64            .exec(&*tx)
 65            .await?;
 66
 67            Ok(())
 68        })
 69        .await
 70    }
 71
 72    pub async fn get_buffer(&self, id: BufferId) -> Result<Buffer> {
 73        self.transaction(|tx| async move {
 74            let buffer = buffer::Entity::find_by_id(id)
 75                .one(&*tx)
 76                .await?
 77                .ok_or_else(|| anyhow!("no such buffer"))?;
 78
 79            let base_text = if buffer.epoch > 0 {
 80                buffer_snapshot::Entity::find()
 81                    .filter(
 82                        buffer_snapshot::Column::BufferId
 83                            .eq(id)
 84                            .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
 85                    )
 86                    .one(&*tx)
 87                    .await?
 88                    .ok_or_else(|| anyhow!("no such snapshot"))?
 89                    .text
 90            } else {
 91                String::new()
 92            };
 93
 94            let mut rows = buffer_operation::Entity::find()
 95                .filter(
 96                    buffer_operation::Column::BufferId
 97                        .eq(id)
 98                        .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
 99                )
100                .stream(&*tx)
101                .await?;
102            let mut operations = Vec::new();
103            while let Some(row) = rows.next().await {
104                let row = row?;
105                let version = deserialize_version(&row.version)?;
106                let operation = if row.is_undo {
107                    let counts = deserialize_undo_operation(&row.value)?;
108                    proto::operation::Variant::Undo(proto::operation::Undo {
109                        replica_id: row.replica_id as u32,
110                        local_timestamp: row.local_timestamp as u32,
111                        lamport_timestamp: row.lamport_timestamp as u32,
112                        version,
113                        counts,
114                    })
115                } else {
116                    let (ranges, new_text) = deserialize_edit_operation(&row.value)?;
117                    proto::operation::Variant::Edit(proto::operation::Edit {
118                        replica_id: row.replica_id as u32,
119                        local_timestamp: row.local_timestamp as u32,
120                        lamport_timestamp: row.lamport_timestamp as u32,
121                        version,
122                        ranges,
123                        new_text,
124                    })
125                };
126                operations.push(proto::Operation {
127                    variant: Some(operation),
128                })
129            }
130
131            Ok(Buffer {
132                base_text,
133                operations,
134            })
135        })
136        .await
137    }
138}
139
140mod storage {
141    #![allow(non_snake_case)]
142
143    use prost::Message;
144
145    pub const VERSION: usize = 1;
146
147    #[derive(Message)]
148    pub struct VectorClock {
149        #[prost(message, repeated, tag = "1")]
150        pub entries: Vec<VectorClockEntry>,
151    }
152
153    #[derive(Message)]
154    pub struct VectorClockEntry {
155        #[prost(uint32, tag = "1")]
156        pub replica_id: u32,
157        #[prost(uint32, tag = "2")]
158        pub timestamp: u32,
159    }
160
161    #[derive(Message)]
162    pub struct TextEdit {
163        #[prost(message, repeated, tag = "1")]
164        pub ranges: Vec<Range>,
165        #[prost(string, repeated, tag = "2")]
166        pub texts: Vec<String>,
167    }
168
169    #[derive(Message)]
170    pub struct Range {
171        #[prost(uint64, tag = "1")]
172        pub start: u64,
173        #[prost(uint64, tag = "2")]
174        pub end: u64,
175    }
176
177    #[derive(Message)]
178    pub struct Undo {
179        #[prost(message, repeated, tag = "1")]
180        pub entries: Vec<UndoCount>,
181    }
182
183    #[derive(Message)]
184    pub struct UndoCount {
185        #[prost(uint32, tag = "1")]
186        pub replica_id: u32,
187        #[prost(uint32, tag = "2")]
188        pub local_timestamp: u32,
189        #[prost(uint32, tag = "3")]
190        pub count: u32,
191    }
192}
193
194fn serialize_version(version: &Vec<proto::VectorClockEntry>) -> Vec<u8> {
195    storage::VectorClock {
196        entries: version
197            .iter()
198            .map(|entry| storage::VectorClockEntry {
199                replica_id: entry.replica_id,
200                timestamp: entry.timestamp,
201            })
202            .collect(),
203    }
204    .encode_to_vec()
205}
206
207fn deserialize_version(bytes: &[u8]) -> Result<Vec<proto::VectorClockEntry>> {
208    let clock = storage::VectorClock::decode(bytes).map_err(|error| anyhow!("{}", error))?;
209    Ok(clock
210        .entries
211        .into_iter()
212        .map(|entry| proto::VectorClockEntry {
213            replica_id: entry.replica_id,
214            timestamp: entry.timestamp,
215        })
216        .collect())
217}
218
219fn serialize_edit_operation(ranges: &[proto::Range], texts: &[String]) -> Vec<u8> {
220    storage::TextEdit {
221        ranges: ranges
222            .iter()
223            .map(|range| storage::Range {
224                start: range.start,
225                end: range.end,
226            })
227            .collect(),
228        texts: texts.to_vec(),
229    }
230    .encode_to_vec()
231}
232
233fn deserialize_edit_operation(bytes: &[u8]) -> Result<(Vec<proto::Range>, Vec<String>)> {
234    let edit = storage::TextEdit::decode(bytes).map_err(|error| anyhow!("{}", error))?;
235    let ranges = edit
236        .ranges
237        .into_iter()
238        .map(|range| proto::Range {
239            start: range.start,
240            end: range.end,
241        })
242        .collect();
243    Ok((ranges, edit.texts))
244}
245
246fn serialize_undo_operation(counts: &Vec<proto::UndoCount>) -> Vec<u8> {
247    storage::Undo {
248        entries: counts
249            .iter()
250            .map(|entry| storage::UndoCount {
251                replica_id: entry.replica_id,
252                local_timestamp: entry.local_timestamp,
253                count: entry.count,
254            })
255            .collect(),
256    }
257    .encode_to_vec()
258}
259
260fn deserialize_undo_operation(bytes: &[u8]) -> Result<Vec<proto::UndoCount>> {
261    let undo = storage::Undo::decode(bytes).map_err(|error| anyhow!("{}", error))?;
262    Ok(undo
263        .entries
264        .iter()
265        .map(|entry| proto::UndoCount {
266            replica_id: entry.replica_id,
267            local_timestamp: entry.local_timestamp,
268            count: entry.count,
269        })
270        .collect())
271}