diff --git a/crates/acp_thread/src/diff.rs b/crates/acp_thread/src/diff.rs index 15de12af27fe233bad4ad8ebb2893ffa5fbdd598..055b2f7fb86ffe9d7f12459b6b16405ce77815a0 100644 --- a/crates/acp_thread/src/diff.rs +++ b/crates/acp_thread/src/diff.rs @@ -236,21 +236,21 @@ impl PendingDiff { fn finalize(&self, cx: &mut Context) -> FinalizedDiff { let ranges = self.excerpt_ranges(cx); let base_text = self.base_text.clone(); - let language_registry = self.new_buffer.read(cx).language_registry(); + let new_buffer = self.new_buffer.read(cx); + let language_registry = new_buffer.language_registry(); - let path = self - .new_buffer - .read(cx) + let path = new_buffer .file() .map(|file| file.path().display(file.path_style(cx))) .unwrap_or("untitled".into()) .into(); + let replica_id = new_buffer.replica_id(); // Replace the buffer in the multibuffer with the snapshot let buffer = cx.new(|cx| { let language = self.new_buffer.read(cx).language().cloned(); let buffer = TextBuffer::new_normalized( - 0, + replica_id, cx.entity_id().as_non_zero_u64().into(), self.new_buffer.read(cx).line_ending(), self.new_buffer.read(cx).as_rope().clone(), diff --git a/crates/agent/src/edit_agent/streaming_fuzzy_matcher.rs b/crates/agent/src/edit_agent/streaming_fuzzy_matcher.rs index 386b8204400a157b37b2f356829fa27df3abca92..904ec05a8c7565d5052cd546fc0bf6d723ffa375 100644 --- a/crates/agent/src/edit_agent/streaming_fuzzy_matcher.rs +++ b/crates/agent/src/edit_agent/streaming_fuzzy_matcher.rs @@ -308,12 +308,13 @@ mod tests { use indoc::indoc; use language::{BufferId, TextBuffer}; use rand::prelude::*; + use text::ReplicaId; use util::test::{generate_marked_text, marked_text_ranges}; #[test] fn test_empty_query() { let buffer = TextBuffer::new( - 0, + ReplicaId::LOCAL, BufferId::new(1).unwrap(), "Hello world\nThis is a test\nFoo bar baz", ); @@ -327,7 +328,7 @@ mod tests { #[test] fn test_streaming_exact_match() { let buffer = TextBuffer::new( - 0, + ReplicaId::LOCAL, BufferId::new(1).unwrap(), "Hello world\nThis is a test\nFoo bar baz", ); @@ -351,7 +352,7 @@ mod tests { #[test] fn test_streaming_fuzzy_match() { let buffer = TextBuffer::new( - 0, + ReplicaId::LOCAL, BufferId::new(1).unwrap(), indoc! {" function foo(a, b) { @@ -385,7 +386,7 @@ mod tests { #[test] fn test_incremental_improvement() { let buffer = TextBuffer::new( - 0, + ReplicaId::LOCAL, BufferId::new(1).unwrap(), "Line 1\nLine 2\nLine 3\nLine 4\nLine 5", ); @@ -410,7 +411,7 @@ mod tests { #[test] fn test_incomplete_lines_buffering() { let buffer = TextBuffer::new( - 0, + ReplicaId::LOCAL, BufferId::new(1).unwrap(), indoc! {" The quick brown fox @@ -437,7 +438,7 @@ mod tests { #[test] fn test_multiline_fuzzy_match() { let buffer = TextBuffer::new( - 0, + ReplicaId::LOCAL, BufferId::new(1).unwrap(), indoc! {r#" impl Display for User { @@ -691,7 +692,11 @@ mod tests { } "#}; - let buffer = TextBuffer::new(0, BufferId::new(1).unwrap(), text.to_string()); + let buffer = TextBuffer::new( + ReplicaId::LOCAL, + BufferId::new(1).unwrap(), + text.to_string(), + ); let snapshot = buffer.snapshot(); let mut matcher = StreamingFuzzyMatcher::new(snapshot.clone()); @@ -724,7 +729,7 @@ mod tests { #[track_caller] fn assert_location_resolution(text_with_expected_range: &str, query: &str, rng: &mut StdRng) { let (text, expected_ranges) = marked_text_ranges(text_with_expected_range, false); - let buffer = TextBuffer::new(0, BufferId::new(1).unwrap(), text.clone()); + let buffer = TextBuffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), text.clone()); let snapshot = buffer.snapshot(); let mut matcher = StreamingFuzzyMatcher::new(snapshot); diff --git a/crates/assistant_context/src/assistant_context.rs b/crates/assistant_context/src/assistant_context.rs index 6c06cc2c8ec7f845b1e6d49631a1bea6755a62d0..5a1fa707ff04ac3b0cd719c3d0a5e67dfeb3e625 100644 --- a/crates/assistant_context/src/assistant_context.rs +++ b/crates/assistant_context/src/assistant_context.rs @@ -486,7 +486,7 @@ pub enum ContextSummary { Error, } -#[derive(Default, Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct ContextSummaryContent { pub text: String, pub done: bool, @@ -523,7 +523,11 @@ impl ContextSummary { match self { ContextSummary::Content(content) => content, ContextSummary::Pending | ContextSummary::Error => { - let content = ContextSummaryContent::default(); + let content = ContextSummaryContent { + text: "".to_string(), + done: false, + timestamp: clock::Lamport::MIN, + }; *self = ContextSummary::Content(content); self.content_as_mut().unwrap() } @@ -796,7 +800,7 @@ impl AssistantContext { }; let first_message_id = MessageId(clock::Lamport { - replica_id: 0, + replica_id: ReplicaId::LOCAL, value: 0, }); let message = MessageAnchor { @@ -2692,7 +2696,7 @@ impl AssistantContext { self.summary = ContextSummary::Content(ContextSummaryContent { text: "".to_string(), done: false, - timestamp: clock::Lamport::default(), + timestamp: clock::Lamport::MIN, }); replace_old = true; } @@ -3117,7 +3121,7 @@ impl SavedContext { let mut first_message_metadata = None; for message in self.messages { - if message.id == MessageId(clock::Lamport::default()) { + if message.id == MessageId(clock::Lamport::MIN) { first_message_metadata = Some(message.metadata); } else { operations.push(ContextOperation::InsertMessage { @@ -3141,7 +3145,7 @@ impl SavedContext { if let Some(metadata) = first_message_metadata { let timestamp = next_timestamp.tick(); operations.push(ContextOperation::UpdateMessage { - message_id: MessageId(clock::Lamport::default()), + message_id: MessageId(clock::Lamport::MIN), metadata: MessageMetadata { role: metadata.role, status: metadata.status, diff --git a/crates/assistant_context/src/assistant_context_tests.rs b/crates/assistant_context/src/assistant_context_tests.rs index 413e32dfcb14273920e9ae4110e5905bdbae5956..2d987f9f845b471438cfb3eb0667fbc36161c53c 100644 --- a/crates/assistant_context/src/assistant_context_tests.rs +++ b/crates/assistant_context/src/assistant_context_tests.rs @@ -741,7 +741,7 @@ async fn test_serialization(cx: &mut TestAppContext) { ); } -#[gpui::test(iterations = 100)] +#[gpui::test(iterations = 25)] async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: StdRng) { cx.update(init_test); @@ -771,7 +771,7 @@ async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: Std let context = cx.new(|cx| { AssistantContext::new( context_id.clone(), - i as ReplicaId, + ReplicaId::new(i as u16), language::Capability::ReadWrite, registry.clone(), prompt_builder.clone(), @@ -789,7 +789,7 @@ async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: Std if let ContextEvent::Operation(op) = event { network .lock() - .broadcast(i as ReplicaId, vec![op.to_proto()]); + .broadcast(ReplicaId::new(i as u16), vec![op.to_proto()]); } } }) @@ -797,7 +797,7 @@ async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: Std }); contexts.push(context); - network.lock().add_peer(i as ReplicaId); + network.lock().add_peer(ReplicaId::new(i as u16)); } let mut mutation_count = operations; @@ -943,9 +943,9 @@ async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: Std mutation_count -= 1; } _ => { - let replica_id = context_index as ReplicaId; + let replica_id = ReplicaId::new(context_index as u16); if network.lock().is_disconnected(replica_id) { - network.lock().reconnect_peer(replica_id, 0); + network.lock().reconnect_peer(replica_id, ReplicaId::new(0)); let (ops_to_send, ops_to_receive) = cx.read(|cx| { let host_context = &contexts[0].read(cx); @@ -971,7 +971,7 @@ async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: Std network.lock().broadcast(replica_id, ops_to_send); context.update(cx, |context, cx| context.apply_ops(ops_to_receive, cx)); - } else if rng.random_bool(0.1) && replica_id != 0 { + } else if rng.random_bool(0.1) && replica_id != ReplicaId::new(0) { log::info!("Context {}: disconnecting", context_index); network.lock().disconnect_peer(replica_id); } else if network.lock().has_unreceived(replica_id) { @@ -996,25 +996,25 @@ async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: Std assert_eq!( context.buffer.read(cx).text(), first_context.buffer.read(cx).text(), - "Context {} text != Context 0 text", + "Context {:?} text != Context 0 text", context.buffer.read(cx).replica_id() ); assert_eq!( context.message_anchors, first_context.message_anchors, - "Context {} messages != Context 0 messages", + "Context {:?} messages != Context 0 messages", context.buffer.read(cx).replica_id() ); assert_eq!( context.messages_metadata, first_context.messages_metadata, - "Context {} message metadata != Context 0 message metadata", + "Context {:?} message metadata != Context 0 message metadata", context.buffer.read(cx).replica_id() ); assert_eq!( context.slash_command_output_sections, first_context.slash_command_output_sections, - "Context {} slash command output sections != Context 0 slash command output sections", + "Context {:?} slash command output sections != Context 0 slash command output sections", context.buffer.read(cx).replica_id() ); } diff --git a/crates/buffer_diff/src/buffer_diff.rs b/crates/buffer_diff/src/buffer_diff.rs index 13479f6428b02d52f45415a989b694cc04ab5c25..b6883d39a76d212a9d9505999ad2fab9df2d9d82 100644 --- a/crates/buffer_diff/src/buffer_diff.rs +++ b/crates/buffer_diff/src/buffer_diff.rs @@ -85,7 +85,7 @@ struct PendingHunk { new_status: DiffHunkSecondaryStatus, } -#[derive(Debug, Default, Clone)] +#[derive(Debug, Clone)] pub struct DiffHunkSummary { buffer_range: Range, } @@ -114,7 +114,9 @@ impl sum_tree::Summary for DiffHunkSummary { type Context<'a> = &'a text::BufferSnapshot; fn zero(_cx: Self::Context<'_>) -> Self { - Default::default() + DiffHunkSummary { + buffer_range: Anchor::MIN..Anchor::MIN, + } } fn add_summary(&mut self, other: &Self, buffer: Self::Context<'_>) { @@ -937,7 +939,9 @@ impl BufferDiff { pub fn clear_pending_hunks(&mut self, cx: &mut Context) { if self.secondary_diff.is_some() { - self.inner.pending_hunks = SumTree::from_summary(DiffHunkSummary::default()); + self.inner.pending_hunks = SumTree::from_summary(DiffHunkSummary { + buffer_range: Anchor::MIN..Anchor::MIN, + }); cx.emit(BufferDiffEvent::DiffChanged { changed_range: Some(Anchor::MIN..Anchor::MAX), }); @@ -1368,7 +1372,7 @@ mod tests { use gpui::TestAppContext; use pretty_assertions::{assert_eq, assert_ne}; use rand::{Rng as _, rngs::StdRng}; - use text::{Buffer, BufferId, Rope}; + use text::{Buffer, BufferId, ReplicaId, Rope}; use unindent::Unindent as _; use util::test::marked_text_ranges; @@ -1393,7 +1397,7 @@ mod tests { " .unindent(); - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), buffer_text); + let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), buffer_text); let mut diff = BufferDiffSnapshot::new_sync(buffer.clone(), diff_base.clone(), cx); assert_hunks( diff.hunks_intersecting_range(Anchor::MIN..Anchor::MAX, &buffer), @@ -1467,7 +1471,7 @@ mod tests { " .unindent(); - let buffer = Buffer::new(0, BufferId::new(1).unwrap(), buffer_text); + let buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), buffer_text); let unstaged_diff = BufferDiffSnapshot::new_sync(buffer.clone(), index_text, cx); let mut uncommitted_diff = BufferDiffSnapshot::new_sync(buffer.clone(), head_text.clone(), cx); @@ -1536,7 +1540,7 @@ mod tests { " .unindent(); - let buffer = Buffer::new(0, BufferId::new(1).unwrap(), buffer_text); + let buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), buffer_text); let diff = cx .update(|cx| { BufferDiffSnapshot::new_with_base_text( @@ -1799,7 +1803,7 @@ mod tests { for example in table { let (buffer_text, ranges) = marked_text_ranges(&example.buffer_marked_text, false); - let buffer = Buffer::new(0, BufferId::new(1).unwrap(), buffer_text); + let buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), buffer_text); let hunk_range = buffer.anchor_before(ranges[0].start)..buffer.anchor_before(ranges[0].end); @@ -1872,7 +1876,11 @@ mod tests { " .unindent(); - let buffer = Buffer::new(0, BufferId::new(1).unwrap(), buffer_text.clone()); + let buffer = Buffer::new( + ReplicaId::LOCAL, + BufferId::new(1).unwrap(), + buffer_text.clone(), + ); let unstaged = BufferDiffSnapshot::new_sync(buffer.clone(), index_text, cx); let uncommitted = BufferDiffSnapshot::new_sync(buffer.clone(), head_text.clone(), cx); let unstaged_diff = cx.new(|cx| { @@ -1945,7 +1953,7 @@ mod tests { " .unindent(); - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), buffer_text_1); + let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), buffer_text_1); let empty_diff = cx.update(|cx| BufferDiffSnapshot::empty(&buffer, cx)); let diff_1 = BufferDiffSnapshot::new_sync(buffer.clone(), base_text.clone(), cx); diff --git a/crates/channel/src/channel_buffer.rs b/crates/channel/src/channel_buffer.rs index 828248b330b6ef6cfe0e13eab426de2900d364b2..efa0850753887c2116ee7916727a870a3528b627 100644 --- a/crates/channel/src/channel_buffer.rs +++ b/crates/channel/src/channel_buffer.rs @@ -9,7 +9,7 @@ use rpc::{ proto::{self, PeerId}, }; use std::{sync::Arc, time::Duration}; -use text::BufferId; +use text::{BufferId, ReplicaId}; use util::ResultExt; pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250); @@ -65,7 +65,12 @@ impl ChannelBuffer { let buffer = cx.new(|cx| { let capability = channel_store.read(cx).channel_capability(channel.id); - language::Buffer::remote(buffer_id, response.replica_id as u16, capability, base_text) + language::Buffer::remote( + buffer_id, + ReplicaId::new(response.replica_id as u16), + capability, + base_text, + ) })?; buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?; @@ -272,7 +277,7 @@ impl ChannelBuffer { self.connected } - pub fn replica_id(&self, cx: &App) -> u16 { + pub fn replica_id(&self, cx: &App) -> ReplicaId { self.buffer.read(cx).replica_id() } } diff --git a/crates/client/src/user.rs b/crates/client/src/user.rs index de0668b406c512eabfc70f4702466f013eb8c515..525a3e960ce8bc2aede4b0665af23ab3c33cac15 100644 --- a/crates/client/src/user.rs +++ b/crates/client/src/user.rs @@ -943,7 +943,7 @@ impl Collaborator { pub fn from_proto(message: proto::Collaborator) -> Result { Ok(Self { peer_id: message.peer_id.context("invalid peer id")?, - replica_id: message.replica_id as ReplicaId, + replica_id: ReplicaId::new(message.replica_id as u16), user_id: message.user_id as UserId, is_host: message.is_host, committer_name: message.committer_name, diff --git a/crates/clock/src/clock.rs b/crates/clock/src/clock.rs index 64645c9b46f68416c6792b17258baf8e49ca9585..a3cf2b819ec11e22ac533e7743ee884487ef9724 100644 --- a/crates/clock/src/clock.rs +++ b/crates/clock/src/clock.rs @@ -4,33 +4,73 @@ use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use std::{ cmp::{self, Ordering}, - fmt, iter, + fmt, }; pub use system_clock::*; -pub const LOCAL_BRANCH_REPLICA_ID: u16 = u16::MAX; -pub const AGENT_REPLICA_ID: u16 = u16::MAX - 1; - /// A unique identifier for each distributed node. -pub type ReplicaId = u16; +#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd, Serialize, Deserialize)] +pub struct ReplicaId(u16); + +impl ReplicaId { + /// The local replica + pub const LOCAL: ReplicaId = ReplicaId(0); + /// The remote replica of the connected remote server. + pub const REMOTE_SERVER: ReplicaId = ReplicaId(1); + /// The agent's unique identifier. + pub const AGENT: ReplicaId = ReplicaId(2); + /// A local branch. + pub const LOCAL_BRANCH: ReplicaId = ReplicaId(3); + /// The first collaborative replica ID, any replica equal or greater than this is a collaborative replica. + pub const FIRST_COLLAB_ID: ReplicaId = ReplicaId(Self::LOCAL_BRANCH.0 + 1); + + pub fn new(id: u16) -> Self { + ReplicaId(id) + } + + pub fn as_u16(&self) -> u16 { + self.0 + } + + pub fn is_remote(self) -> bool { + self == ReplicaId::REMOTE_SERVER || self >= ReplicaId::FIRST_COLLAB_ID + } +} + +impl fmt::Debug for ReplicaId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if *self == ReplicaId::LOCAL { + write!(f, "") + } else if *self == ReplicaId::REMOTE_SERVER { + write!(f, "") + } else if *self == ReplicaId::AGENT { + write!(f, "") + } else if *self == ReplicaId::LOCAL_BRANCH { + write!(f, "") + } else { + write!(f, "{}", self.0) + } + } +} /// A [Lamport sequence number](https://en.wikipedia.org/wiki/Lamport_timestamp). pub type Seq = u32; /// A [Lamport timestamp](https://en.wikipedia.org/wiki/Lamport_timestamp), /// used to determine the ordering of events in the editor. -#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)] pub struct Lamport { pub replica_id: ReplicaId, pub value: Seq, } -/// A [vector clock](https://en.wikipedia.org/wiki/Vector_clock). +/// A [version vector](https://en.wikipedia.org/wiki/Version_vector). #[derive(Clone, Default, Hash, Eq, PartialEq)] pub struct Global { - values: SmallVec<[u32; 8]>, - local_branch_value: u32, + // 4 is chosen as it is the biggest count that does not increase the size of the field itself. + // Coincidentally, it also covers all the important non-collab replica ids. + values: SmallVec<[u32; 4]>, } impl Global { @@ -38,30 +78,31 @@ impl Global { Self::default() } + /// Fetches the sequence number for the given replica ID. pub fn get(&self, replica_id: ReplicaId) -> Seq { - if replica_id == LOCAL_BRANCH_REPLICA_ID { - self.local_branch_value - } else { - self.values.get(replica_id as usize).copied().unwrap_or(0) as Seq - } + self.values.get(replica_id.0 as usize).copied().unwrap_or(0) as Seq } + /// Observe the lamport timestampe. + /// + /// This sets the current sequence number of the observed replica ID to the maximum of this global's observed sequence and the observed timestamp. pub fn observe(&mut self, timestamp: Lamport) { + debug_assert_ne!(timestamp.replica_id, Lamport::MAX.replica_id); if timestamp.value > 0 { - if timestamp.replica_id == LOCAL_BRANCH_REPLICA_ID { - self.local_branch_value = cmp::max(self.local_branch_value, timestamp.value); - } else { - let new_len = timestamp.replica_id as usize + 1; - if new_len > self.values.len() { - self.values.resize(new_len, 0); - } - - let entry = &mut self.values[timestamp.replica_id as usize]; - *entry = cmp::max(*entry, timestamp.value); + let new_len = timestamp.replica_id.0 as usize + 1; + if new_len > self.values.len() { + self.values.resize(new_len, 0); } + + let entry = &mut self.values[timestamp.replica_id.0 as usize]; + *entry = cmp::max(*entry, timestamp.value); } } + /// Join another global. + /// + /// This observes all timestamps from the other global. + #[doc(alias = "synchronize")] pub fn join(&mut self, other: &Self) { if other.values.len() > self.values.len() { self.values.resize(other.values.len(), 0); @@ -70,34 +111,36 @@ impl Global { for (left, right) in self.values.iter_mut().zip(&other.values) { *left = cmp::max(*left, *right); } - - self.local_branch_value = cmp::max(self.local_branch_value, other.local_branch_value); } + /// Meet another global. + /// + /// Sets all unobserved timestamps of this global to the sequences of other and sets all observed timestamps of this global to the minimum observed of both globals. pub fn meet(&mut self, other: &Self) { if other.values.len() > self.values.len() { self.values.resize(other.values.len(), 0); } let mut new_len = 0; - for (ix, (left, right)) in self - .values - .iter_mut() - .zip(other.values.iter().chain(iter::repeat(&0))) - .enumerate() - { - if *left == 0 { - *left = *right; - } else if *right > 0 { - *left = cmp::min(*left, *right); + for (ix, (left, &right)) in self.values.iter_mut().zip(&other.values).enumerate() { + match (*left, right) { + // left has not observed the replica + (0, _) => *left = right, + // right has not observed the replica + (_, 0) => (), + (_, _) => *left = cmp::min(*left, right), } - if *left != 0 { new_len = ix + 1; } } - self.values.resize(new_len, 0); - self.local_branch_value = cmp::min(self.local_branch_value, other.local_branch_value); + if other.values.len() == self.values.len() { + // only truncate if other was equal or shorter (which at this point + // cant be due to the resize above) to `self` as otherwise we would + // truncate the unprocessed tail that is guaranteed to contain + // non-null timestamps + self.values.truncate(new_len); + } } pub fn observed(&self, timestamp: Lamport) -> bool { @@ -105,20 +148,18 @@ impl Global { } pub fn observed_any(&self, other: &Self) -> bool { - self.values - .iter() - .zip(other.values.iter()) - .any(|(left, right)| *right > 0 && left >= right) - || (other.local_branch_value > 0 && self.local_branch_value >= other.local_branch_value) + self.iter() + .zip(other.iter()) + .any(|(left, right)| right.value > 0 && left.value >= right.value) } pub fn observed_all(&self, other: &Self) -> bool { - let mut rhs = other.values.iter(); - self.values.iter().all(|left| match rhs.next() { - Some(right) => left >= right, - None => true, - }) && rhs.next().is_none() - && self.local_branch_value >= other.local_branch_value + if self.values.len() < other.values.len() { + return false; + } + self.iter() + .zip(other.iter()) + .all(|(left, right)| left.value >= right.value) } pub fn changed_since(&self, other: &Self) -> bool { @@ -128,21 +169,21 @@ impl Global { .iter() .zip(other.values.iter()) .any(|(left, right)| left > right) - || self.local_branch_value > other.local_branch_value } + pub fn most_recent(&self) -> Option { + self.iter().max_by_key(|timestamp| timestamp.value) + } + + /// Iterates all replicas observed by this global as well as any unobserved replicas whose ID is lower than the highest observed replica. pub fn iter(&self) -> impl Iterator + '_ { self.values .iter() .enumerate() .map(|(replica_id, seq)| Lamport { - replica_id: replica_id as ReplicaId, + replica_id: ReplicaId(replica_id as u16), value: *seq, }) - .chain((self.local_branch_value > 0).then_some(Lamport { - replica_id: LOCAL_BRANCH_REPLICA_ID, - value: self.local_branch_value, - })) } } @@ -173,12 +214,12 @@ impl PartialOrd for Lamport { impl Lamport { pub const MIN: Self = Self { - replica_id: ReplicaId::MIN, + replica_id: ReplicaId(u16::MIN), value: Seq::MIN, }; pub const MAX: Self = Self { - replica_id: ReplicaId::MAX, + replica_id: ReplicaId(u16::MAX), value: Seq::MAX, }; @@ -190,7 +231,7 @@ impl Lamport { } pub fn as_u64(self) -> u64 { - ((self.value as u64) << 32) | (self.replica_id as u64) + ((self.value as u64) << 32) | (self.replica_id.0 as u64) } pub fn tick(&mut self) -> Self { @@ -211,7 +252,7 @@ impl fmt::Debug for Lamport { } else if *self == Self::MIN { write!(f, "Lamport {{MIN}}") } else { - write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value) + write!(f, "Lamport {{{:?}: {}}}", self.replica_id, self.value) } } } @@ -220,16 +261,10 @@ impl fmt::Debug for Global { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Global {{")?; for timestamp in self.iter() { - if timestamp.replica_id > 0 { + if timestamp.replica_id.0 > 0 { write!(f, ", ")?; } - if timestamp.replica_id == LOCAL_BRANCH_REPLICA_ID { - write!(f, ": {}", timestamp.value)?; - } else if timestamp.replica_id == AGENT_REPLICA_ID { - write!(f, ": {}", timestamp.value)?; - } else { - write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?; - } + write!(f, "{:?}: {}", timestamp.replica_id, timestamp.value)?; } write!(f, "}}") } diff --git a/crates/collab/src/db/queries/buffers.rs b/crates/collab/src/db/queries/buffers.rs index 2e6b4719d1c126230849ac81bc1f215092bc0b5e..6c4cd58d132bdeaaa791f4da8406e0e6d9052981 100644 --- a/crates/collab/src/db/queries/buffers.rs +++ b/crates/collab/src/db/queries/buffers.rs @@ -62,9 +62,9 @@ impl Database { .iter() .map(|c| c.replica_id) .collect::>(); - let mut replica_id = ReplicaId(0); + let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32); while replica_ids.contains(&replica_id) { - replica_id.0 += 1; + replica_id = ReplicaId(replica_id.0 + 1); } let collaborator = channel_buffer_collaborator::ActiveModel { channel_id: ActiveValue::Set(channel_id), @@ -203,7 +203,7 @@ impl Database { while let Some(row) = rows.next().await { let row = row?; let timestamp = clock::Lamport { - replica_id: row.replica_id as u16, + replica_id: clock::ReplicaId::new(row.replica_id as u16), value: row.lamport_timestamp as u32, }; server_version.observe(timestamp); @@ -701,7 +701,11 @@ impl Database { return Ok(()); } - let mut text_buffer = text::Buffer::new(0, text::BufferId::new(1).unwrap(), base_text); + let mut text_buffer = text::Buffer::new( + clock::ReplicaId::LOCAL, + text::BufferId::new(1).unwrap(), + base_text, + ); text_buffer.apply_ops(operations.into_iter().filter_map(operation_from_wire)); let base_text = text_buffer.text(); @@ -934,7 +938,7 @@ pub fn operation_from_wire(operation: proto::Operation) -> Option Some(text::Operation::Edit(EditOperation { timestamp: clock::Lamport { - replica_id: edit.replica_id as text::ReplicaId, + replica_id: clock::ReplicaId::new(edit.replica_id as u16), value: edit.lamport_timestamp, }, version: version_from_wire(&edit.version), @@ -949,7 +953,7 @@ pub fn operation_from_wire(operation: proto::Operation) -> Option Some(text::Operation::Undo(UndoOperation { timestamp: clock::Lamport { - replica_id: undo.replica_id as text::ReplicaId, + replica_id: clock::ReplicaId::new(undo.replica_id as u16), value: undo.lamport_timestamp, }, version: version_from_wire(&undo.version), @@ -959,7 +963,7 @@ pub fn operation_from_wire(operation: proto::Operation) -> Option clock::Global { let mut version = clock::Global::new(); for entry in message { version.observe(clock::Lamport { - replica_id: entry.replica_id as text::ReplicaId, + replica_id: clock::ReplicaId::new(entry.replica_id as u16), value: entry.timestamp, }); } @@ -986,7 +990,7 @@ fn version_to_wire(version: &clock::Global) -> Vec { let mut message = Vec::new(); for entry in version.iter() { message.push(proto::VectorClockEntry { - replica_id: entry.replica_id as u32, + replica_id: entry.replica_id.as_u16() as u32, timestamp: entry.value, }); } diff --git a/crates/collab/src/db/queries/projects.rs b/crates/collab/src/db/queries/projects.rs index c1f9043a550ea4ea02a9fc241aed9e810f03d0e2..51a0ef83323ec70675283d2fdec7ca1ad791b12d 100644 --- a/crates/collab/src/db/queries/projects.rs +++ b/crates/collab/src/db/queries/projects.rs @@ -91,14 +91,18 @@ impl Database { .await?; } - let replica_id = if is_ssh_project { 1 } else { 0 }; + let replica_id = if is_ssh_project { + clock::ReplicaId::REMOTE_SERVER + } else { + clock::ReplicaId::LOCAL + }; project_collaborator::ActiveModel { project_id: ActiveValue::set(project.id), connection_id: ActiveValue::set(connection.id as i32), connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)), user_id: ActiveValue::set(participant.user_id), - replica_id: ActiveValue::set(ReplicaId(replica_id)), + replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)), is_host: ActiveValue::set(true), id: ActiveValue::NotSet, committer_name: ActiveValue::Set(None), @@ -841,7 +845,7 @@ impl Database { .iter() .map(|c| c.replica_id) .collect::>(); - let mut replica_id = ReplicaId(1); + let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32); while replica_ids.contains(&replica_id) { replica_id.0 += 1; } diff --git a/crates/collab/src/db/tests/buffer_tests.rs b/crates/collab/src/db/tests/buffer_tests.rs index 49da0f37148cffc117c9423a8cbb18a65fe2d290..4eae7a54cba4a906351f05e5945cff5691fd1126 100644 --- a/crates/collab/src/db/tests/buffer_tests.rs +++ b/crates/collab/src/db/tests/buffer_tests.rs @@ -1,7 +1,7 @@ use super::*; use crate::test_both_dbs; use language::proto::{self, serialize_version}; -use text::Buffer; +use text::{Buffer, ReplicaId}; test_both_dbs!( test_channel_buffers, @@ -70,7 +70,11 @@ async fn test_channel_buffers(db: &Arc) { .await .unwrap(); - let mut buffer_a = Buffer::new(0, text::BufferId::new(1).unwrap(), "".to_string()); + let mut buffer_a = Buffer::new( + ReplicaId::new(0), + text::BufferId::new(1).unwrap(), + "".to_string(), + ); let operations = vec![ buffer_a.edit([(0..0, "hello world")]), buffer_a.edit([(5..5, ", cruel")]), @@ -95,7 +99,7 @@ async fn test_channel_buffers(db: &Arc) { .unwrap(); let mut buffer_b = Buffer::new( - 0, + ReplicaId::new(0), text::BufferId::new(1).unwrap(), buffer_response_b.base_text, ); @@ -124,7 +128,7 @@ async fn test_channel_buffers(db: &Arc) { rpc::proto::Collaborator { user_id: a_id.to_proto(), peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }), - replica_id: 0, + replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32, is_host: false, committer_name: None, committer_email: None, @@ -132,7 +136,7 @@ async fn test_channel_buffers(db: &Arc) { rpc::proto::Collaborator { user_id: b_id.to_proto(), peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }), - replica_id: 1, + replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32 + 1, is_host: false, committer_name: None, committer_email: None, @@ -228,7 +232,8 @@ async fn test_channel_buffers_last_operations(db: &Database) { .await .unwrap(); - db.join_channel_buffer(channel, user_id, connection_id) + let res = db + .join_channel_buffer(channel, user_id, connection_id) .await .unwrap(); @@ -239,7 +244,7 @@ async fn test_channel_buffers_last_operations(db: &Database) { ); text_buffers.push(Buffer::new( - 0, + ReplicaId::new(res.replica_id as u16), text::BufferId::new(1).unwrap(), "".to_string(), )); @@ -276,7 +281,12 @@ async fn test_channel_buffers_last_operations(db: &Database) { db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id) .await .unwrap(); - text_buffers[1] = Buffer::new(1, text::BufferId::new(1).unwrap(), "def".to_string()); + let replica_id = text_buffers[1].replica_id(); + text_buffers[1] = Buffer::new( + replica_id, + text::BufferId::new(1).unwrap(), + "def".to_string(), + ); update_buffer( buffers[1].channel_id, user_id, @@ -304,20 +314,32 @@ async fn test_channel_buffers_last_operations(db: &Database) { rpc::proto::ChannelBufferVersion { channel_id: buffers[0].channel_id.to_proto(), epoch: 0, - version: serialize_version(&text_buffers[0].version()), + version: serialize_version(&text_buffers[0].version()) + .into_iter() + .filter( + |vector| vector.replica_id == text_buffers[0].replica_id().as_u16() as u32 + ) + .collect::>(), }, rpc::proto::ChannelBufferVersion { channel_id: buffers[1].channel_id.to_proto(), epoch: 1, version: serialize_version(&text_buffers[1].version()) .into_iter() - .filter(|vector| vector.replica_id == text_buffers[1].replica_id() as u32) + .filter( + |vector| vector.replica_id == text_buffers[1].replica_id().as_u16() as u32 + ) .collect::>(), }, rpc::proto::ChannelBufferVersion { channel_id: buffers[2].channel_id.to_proto(), epoch: 0, - version: serialize_version(&text_buffers[2].version()), + version: serialize_version(&text_buffers[2].version()) + .into_iter() + .filter( + |vector| vector.replica_id == text_buffers[2].replica_id().as_u16() as u32 + ) + .collect::>(), }, ] ); diff --git a/crates/editor/src/display_map/inlay_map.rs b/crates/editor/src/display_map/inlay_map.rs index c4532a93f1d50e91dbd4791b4621b74ee0813cbe..7aeb14fe0eef687ed375e28c6a726799e3876b12 100644 --- a/crates/editor/src/display_map/inlay_map.rs +++ b/crates/editor/src/display_map/inlay_map.rs @@ -1278,7 +1278,7 @@ mod tests { Anchor::min(), &InlayHint { label: InlayHintLabel::String("a".to_string()), - position: text::Anchor::default(), + position: text::Anchor::MIN, padding_left: false, padding_right: false, tooltip: None, @@ -1298,7 +1298,7 @@ mod tests { Anchor::min(), &InlayHint { label: InlayHintLabel::String("a".to_string()), - position: text::Anchor::default(), + position: text::Anchor::MIN, padding_left: true, padding_right: true, tooltip: None, @@ -1318,7 +1318,7 @@ mod tests { Anchor::min(), &InlayHint { label: InlayHintLabel::String(" a ".to_string()), - position: text::Anchor::default(), + position: text::Anchor::MIN, padding_left: false, padding_right: false, tooltip: None, @@ -1338,7 +1338,7 @@ mod tests { Anchor::min(), &InlayHint { label: InlayHintLabel::String(" a ".to_string()), - position: text::Anchor::default(), + position: text::Anchor::MIN, padding_left: true, padding_right: true, tooltip: None, @@ -1361,7 +1361,7 @@ mod tests { Anchor::min(), &InlayHint { label: InlayHintLabel::String("🎨".to_string()), - position: text::Anchor::default(), + position: text::Anchor::MIN, padding_left: true, padding_right: true, tooltip: None, diff --git a/crates/editor/src/editor.rs b/crates/editor/src/editor.rs index 9baa1c892e8dc7e0f62cdc2c0e7abbed82ae9cdd..3b2e30a1761af381c4a3a52e660f8d26dc043ce8 100644 --- a/crates/editor/src/editor.rs +++ b/crates/editor/src/editor.rs @@ -82,7 +82,7 @@ use anyhow::{Context as _, Result, anyhow}; use blink_manager::BlinkManager; use buffer_diff::DiffHunkStatus; use client::{Collaborator, ParticipantIndex, parse_zed_link}; -use clock::{AGENT_REPLICA_ID, ReplicaId}; +use clock::ReplicaId; use code_context_menus::{ AvailableCodeAction, CodeActionContents, CodeActionsItem, CodeActionsMenu, CodeContextMenu, CompletionsMenu, ContextMenuOrigin, @@ -1301,7 +1301,7 @@ enum SelectionHistoryMode { #[derive(Clone, PartialEq, Eq, Hash)] struct HoveredCursor { - replica_id: u16, + replica_id: ReplicaId, selection_id: usize, } @@ -23482,7 +23482,7 @@ impl EditorSnapshot { self.buffer_snapshot() .selections_in_range(range, false) .filter_map(move |(replica_id, line_mode, cursor_shape, selection)| { - if replica_id == AGENT_REPLICA_ID { + if replica_id == ReplicaId::AGENT { Some(RemoteSelection { replica_id, selection, diff --git a/crates/git_ui/src/commit_view.rs b/crates/git_ui/src/commit_view.rs index 201a699e2f0e8527ed62babdc941febcf9426a2d..f89afb0a64b4377b235866c4da66e8255f2320d1 100644 --- a/crates/git_ui/src/commit_view.rs +++ b/crates/git_ui/src/commit_view.rs @@ -8,7 +8,7 @@ use gpui::{ }; use language::{ Anchor, Buffer, Capability, DiskState, File, LanguageRegistry, LineEnding, OffsetRangeExt as _, - Point, Rope, TextBuffer, + Point, ReplicaId, Rope, TextBuffer, }; use multi_buffer::PathKey; use project::{Project, WorktreeId, git_store::Repository}; @@ -135,7 +135,7 @@ impl CommitView { }); let buffer = cx.new(|cx| { let buffer = TextBuffer::new_normalized( - 0, + ReplicaId::LOCAL, cx.entity_id().as_non_zero_u64().into(), LineEnding::default(), format_commit(&commit).into(), @@ -316,7 +316,7 @@ async fn build_buffer( }; let buffer = cx.new(|cx| { let buffer = TextBuffer::new_normalized( - 0, + ReplicaId::LOCAL, cx.entity_id().as_non_zero_u64().into(), line_ending, text, diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index 1605eea051b660f7285481223b0b3b9f97aef732..3b90ae6ba5df5484c79f90cf91649b9f363e92b6 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -18,8 +18,8 @@ pub use crate::{ proto, }; use anyhow::{Context as _, Result}; +use clock::Lamport; pub use clock::ReplicaId; -use clock::{AGENT_REPLICA_ID, Lamport}; use collections::HashMap; use fs::MTime; use futures::channel::oneshot; @@ -828,7 +828,11 @@ impl Buffer { /// Create a new buffer with the given base text. pub fn local>(base_text: T, cx: &Context) -> Self { Self::build( - TextBuffer::new(0, cx.entity_id().as_non_zero_u64().into(), base_text.into()), + TextBuffer::new( + ReplicaId::LOCAL, + cx.entity_id().as_non_zero_u64().into(), + base_text.into(), + ), None, Capability::ReadWrite, ) @@ -842,7 +846,7 @@ impl Buffer { ) -> Self { Self::build( TextBuffer::new_normalized( - 0, + ReplicaId::LOCAL, cx.entity_id().as_non_zero_u64().into(), line_ending, base_text_normalized, @@ -991,10 +995,10 @@ impl Buffer { language: None, remote_selections: Default::default(), diagnostics: Default::default(), - diagnostics_timestamp: Default::default(), + diagnostics_timestamp: Lamport::MIN, completion_triggers: Default::default(), completion_triggers_per_language_server: Default::default(), - completion_triggers_timestamp: Default::default(), + completion_triggers_timestamp: Lamport::MIN, deferred_ops: OperationQueue::new(), has_conflict: false, change_bits: Default::default(), @@ -1012,7 +1016,8 @@ impl Buffer { let buffer_id = entity_id.as_non_zero_u64().into(); async move { let text = - TextBuffer::new_normalized(0, buffer_id, Default::default(), text).snapshot(); + TextBuffer::new_normalized(ReplicaId::LOCAL, buffer_id, Default::default(), text) + .snapshot(); let mut syntax = SyntaxMap::new(&text).snapshot(); if let Some(language) = language.clone() { let language_registry = language_registry.clone(); @@ -1033,8 +1038,13 @@ impl Buffer { pub fn build_empty_snapshot(cx: &mut App) -> BufferSnapshot { let entity_id = cx.reserve_entity::().entity_id(); let buffer_id = entity_id.as_non_zero_u64().into(); - let text = - TextBuffer::new_normalized(0, buffer_id, Default::default(), Rope::new()).snapshot(); + let text = TextBuffer::new_normalized( + ReplicaId::LOCAL, + buffer_id, + Default::default(), + Rope::new(), + ) + .snapshot(); let syntax = SyntaxMap::new(&text).snapshot(); BufferSnapshot { text, @@ -1056,7 +1066,9 @@ impl Buffer { ) -> BufferSnapshot { let entity_id = cx.reserve_entity::().entity_id(); let buffer_id = entity_id.as_non_zero_u64().into(); - let text = TextBuffer::new_normalized(0, buffer_id, Default::default(), text).snapshot(); + let text = + TextBuffer::new_normalized(ReplicaId::LOCAL, buffer_id, Default::default(), text) + .snapshot(); let mut syntax = SyntaxMap::new(&text).snapshot(); if let Some(language) = language.clone() { syntax.reparse(&text, language_registry, language); @@ -2260,7 +2272,7 @@ impl Buffer { ) { let lamport_timestamp = self.text.lamport_clock.tick(); self.remote_selections.insert( - AGENT_REPLICA_ID, + ReplicaId::AGENT, SelectionSet { selections, lamport_timestamp, @@ -2917,7 +2929,7 @@ impl Buffer { edits.push((range, new_text)); } - log::info!("mutating buffer {} with {:?}", self.replica_id(), edits); + log::info!("mutating buffer {:?} with {:?}", self.replica_id(), edits); self.edit(edits, None, cx); } diff --git a/crates/language/src/buffer_tests.rs b/crates/language/src/buffer_tests.rs index 0e23a69f075fc35b2f284c244a15428d57a354a4..f824639ad762191f4168586551af51fb4e37c8dc 100644 --- a/crates/language/src/buffer_tests.rs +++ b/crates/language/src/buffer_tests.rs @@ -70,7 +70,13 @@ fn test_line_endings(cx: &mut gpui::App) { fn test_set_line_ending(cx: &mut TestAppContext) { let base = cx.new(|cx| Buffer::local("one\ntwo\nthree\n", cx)); let base_replica = cx.new(|cx| { - Buffer::from_proto(1, Capability::ReadWrite, base.read(cx).to_proto(cx), None).unwrap() + Buffer::from_proto( + ReplicaId::new(1), + Capability::ReadWrite, + base.read(cx).to_proto(cx), + None, + ) + .unwrap() }); base.update(cx, |_buffer, cx| { cx.subscribe(&base_replica, |this, _, event, cx| { @@ -397,7 +403,7 @@ fn test_edit_events(cx: &mut gpui::App) { let buffer2 = cx.new(|cx| { Buffer::remote( BufferId::from(cx.entity_id().as_non_zero_u64()), - 1, + ReplicaId::new(1), Capability::ReadWrite, "abcdef", ) @@ -2775,7 +2781,8 @@ fn test_serialization(cx: &mut gpui::App) { .background_executor() .block(buffer1.read(cx).serialize_ops(None, cx)); let buffer2 = cx.new(|cx| { - let mut buffer = Buffer::from_proto(1, Capability::ReadWrite, state, None).unwrap(); + let mut buffer = + Buffer::from_proto(ReplicaId::new(1), Capability::ReadWrite, state, None).unwrap(); buffer.apply_ops( ops.into_iter() .map(|op| proto::deserialize_operation(op).unwrap()), @@ -2794,7 +2801,13 @@ fn test_branch_and_merge(cx: &mut TestAppContext) { // Create a remote replica of the base buffer. let base_replica = cx.new(|cx| { - Buffer::from_proto(1, Capability::ReadWrite, base.read(cx).to_proto(cx), None).unwrap() + Buffer::from_proto( + ReplicaId::new(1), + Capability::ReadWrite, + base.read(cx).to_proto(cx), + None, + ) + .unwrap() }); base.update(cx, |_buffer, cx| { cx.subscribe(&base_replica, |this, _, event, cx| { @@ -3108,7 +3121,8 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { .background_executor() .block(base_buffer.read(cx).serialize_ops(None, cx)); let mut buffer = - Buffer::from_proto(i as ReplicaId, Capability::ReadWrite, state, None).unwrap(); + Buffer::from_proto(ReplicaId::new(i as u16), Capability::ReadWrite, state, None) + .unwrap(); buffer.apply_ops( ops.into_iter() .map(|op| proto::deserialize_operation(op).unwrap()), @@ -3133,9 +3147,9 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { }); buffers.push(buffer); - replica_ids.push(i as ReplicaId); - network.lock().add_peer(i as ReplicaId); - log::info!("Adding initial peer with replica id {}", i); + replica_ids.push(ReplicaId::new(i as u16)); + network.lock().add_peer(ReplicaId::new(i as u16)); + log::info!("Adding initial peer with replica id {:?}", replica_ids[i]); } log::info!("initial text: {:?}", base_text); @@ -3155,14 +3169,14 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { buffer.start_transaction_at(now); buffer.randomly_edit(&mut rng, 5, cx); buffer.end_transaction_at(now, cx); - log::info!("buffer {} text: {:?}", buffer.replica_id(), buffer.text()); + log::info!("buffer {:?} text: {:?}", buffer.replica_id(), buffer.text()); }); mutation_count -= 1; } 30..=39 if mutation_count != 0 => { buffer.update(cx, |buffer, cx| { if rng.random_bool(0.2) { - log::info!("peer {} clearing active selections", replica_id); + log::info!("peer {:?} clearing active selections", replica_id); active_selections.remove(&replica_id); buffer.remove_active_selections(cx); } else { @@ -3179,7 +3193,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { } let selections: Arc<[Selection]> = selections.into(); log::info!( - "peer {} setting active selections: {:?}", + "peer {:?} setting active selections: {:?}", replica_id, selections ); @@ -3189,7 +3203,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { }); mutation_count -= 1; } - 40..=49 if mutation_count != 0 && replica_id == 0 => { + 40..=49 if mutation_count != 0 && replica_id == ReplicaId::REMOTE_SERVER => { let entry_count = rng.random_range(1..=5); buffer.update(cx, |buffer, cx| { let diagnostics = DiagnosticSet::new( @@ -3207,7 +3221,11 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { }), buffer, ); - log::info!("peer {} setting diagnostics: {:?}", replica_id, diagnostics); + log::info!( + "peer {:?} setting diagnostics: {:?}", + replica_id, + diagnostics + ); buffer.update_diagnostics(LanguageServerId(0), diagnostics, cx); }); mutation_count -= 1; @@ -3217,12 +3235,13 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { let old_buffer_ops = cx .background_executor() .block(buffer.read(cx).serialize_ops(None, cx)); - let new_replica_id = (0..=replica_ids.len() as ReplicaId) + let new_replica_id = (0..=replica_ids.len() as u16) + .map(ReplicaId::new) .filter(|replica_id| *replica_id != buffer.read(cx).replica_id()) .choose(&mut rng) .unwrap(); log::info!( - "Adding new replica {} (replicating from {})", + "Adding new replica {:?} (replicating from {:?})", new_replica_id, replica_id ); @@ -3241,7 +3260,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { cx, ); log::info!( - "New replica {} text: {:?}", + "New replica {:?} text: {:?}", new_buffer.replica_id(), new_buffer.text() ); @@ -3264,7 +3283,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { })); network.lock().replicate(replica_id, new_replica_id); - if new_replica_id as usize == replica_ids.len() { + if new_replica_id.as_u16() as usize == replica_ids.len() { replica_ids.push(new_replica_id); } else { let new_buffer = new_buffer.take().unwrap(); @@ -3276,7 +3295,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { .map(|op| proto::deserialize_operation(op).unwrap()); if ops.len() > 0 { log::info!( - "peer {} (version: {:?}) applying {} ops from the network. {:?}", + "peer {:?} (version: {:?}) applying {} ops from the network. {:?}", new_replica_id, buffer.read(cx).version(), ops.len(), @@ -3287,13 +3306,13 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { }); } } - buffers[new_replica_id as usize] = new_buffer; + buffers[new_replica_id.as_u16() as usize] = new_buffer; } } 60..=69 if mutation_count != 0 => { buffer.update(cx, |buffer, cx| { buffer.randomly_undo_redo(&mut rng, cx); - log::info!("buffer {} text: {:?}", buffer.replica_id(), buffer.text()); + log::info!("buffer {:?} text: {:?}", buffer.replica_id(), buffer.text()); }); mutation_count -= 1; } @@ -3305,7 +3324,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { .map(|op| proto::deserialize_operation(op).unwrap()); if ops.len() > 0 { log::info!( - "peer {} (version: {:?}) applying {} ops from the network. {:?}", + "peer {:?} (version: {:?}) applying {} ops from the network. {:?}", replica_id, buffer.read(cx).version(), ops.len(), @@ -3335,13 +3354,13 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { assert_eq!( buffer.version(), first_buffer.version(), - "Replica {} version != Replica 0 version", + "Replica {:?} version != Replica 0 version", buffer.replica_id() ); assert_eq!( buffer.text(), first_buffer.text(), - "Replica {} text != Replica 0 text", + "Replica {:?} text != Replica 0 text", buffer.replica_id() ); assert_eq!( @@ -3351,7 +3370,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { first_buffer .diagnostics_in_range::<_, usize>(0..first_buffer.len(), false) .collect::>(), - "Replica {} diagnostics != Replica 0 diagnostics", + "Replica {:?} diagnostics != Replica 0 diagnostics", buffer.replica_id() ); } @@ -3370,7 +3389,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) { assert_eq!( actual_remote_selections, expected_remote_selections, - "Replica {} remote selections != expected selections", + "Replica {:?} remote selections != expected selections", buffer.replica_id() ); } diff --git a/crates/language/src/proto.rs b/crates/language/src/proto.rs index bc85b10859632fc3e2cf61c663b7159a023f4f3a..5c8200b84002c104ce1e2c3d1a42aff5876bd1ee 100644 --- a/crates/language/src/proto.rs +++ b/crates/language/src/proto.rs @@ -39,14 +39,14 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation { crate::Operation::Buffer(text::Operation::Undo(undo)) => { proto::operation::Variant::Undo(proto::operation::Undo { - replica_id: undo.timestamp.replica_id as u32, + replica_id: undo.timestamp.replica_id.as_u16() as u32, lamport_timestamp: undo.timestamp.value, version: serialize_version(&undo.version), counts: undo .counts .iter() .map(|(edit_id, count)| proto::UndoCount { - replica_id: edit_id.replica_id as u32, + replica_id: edit_id.replica_id.as_u16() as u32, lamport_timestamp: edit_id.value, count: *count, }) @@ -60,7 +60,7 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation { lamport_timestamp, cursor_shape, } => proto::operation::Variant::UpdateSelections(proto::operation::UpdateSelections { - replica_id: lamport_timestamp.replica_id as u32, + replica_id: lamport_timestamp.replica_id.as_u16() as u32, lamport_timestamp: lamport_timestamp.value, selections: serialize_selections(selections), line_mode: *line_mode, @@ -72,7 +72,7 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation { server_id, diagnostics, } => proto::operation::Variant::UpdateDiagnostics(proto::UpdateDiagnostics { - replica_id: lamport_timestamp.replica_id as u32, + replica_id: lamport_timestamp.replica_id.as_u16() as u32, lamport_timestamp: lamport_timestamp.value, server_id: server_id.0 as u64, diagnostics: serialize_diagnostics(diagnostics.iter()), @@ -84,7 +84,7 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation { server_id, } => proto::operation::Variant::UpdateCompletionTriggers( proto::operation::UpdateCompletionTriggers { - replica_id: lamport_timestamp.replica_id as u32, + replica_id: lamport_timestamp.replica_id.as_u16() as u32, lamport_timestamp: lamport_timestamp.value, triggers: triggers.clone(), language_server_id: server_id.to_proto(), @@ -95,7 +95,7 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation { line_ending, lamport_timestamp, } => proto::operation::Variant::UpdateLineEnding(proto::operation::UpdateLineEnding { - replica_id: lamport_timestamp.replica_id as u32, + replica_id: lamport_timestamp.replica_id.as_u16() as u32, lamport_timestamp: lamport_timestamp.value, line_ending: serialize_line_ending(*line_ending) as i32, }), @@ -106,7 +106,7 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation { /// Serializes an [`EditOperation`] to be sent over RPC. pub fn serialize_edit_operation(operation: &EditOperation) -> proto::operation::Edit { proto::operation::Edit { - replica_id: operation.timestamp.replica_id as u32, + replica_id: operation.timestamp.replica_id.as_u16() as u32, lamport_timestamp: operation.timestamp.value, version: serialize_version(&operation.version), ranges: operation.ranges.iter().map(serialize_range).collect(), @@ -123,12 +123,12 @@ pub fn serialize_undo_map_entry( (edit_id, counts): (&clock::Lamport, &[(clock::Lamport, u32)]), ) -> proto::UndoMapEntry { proto::UndoMapEntry { - replica_id: edit_id.replica_id as u32, + replica_id: edit_id.replica_id.as_u16() as u32, local_timestamp: edit_id.value, counts: counts .iter() .map(|(undo_id, count)| proto::UndoCount { - replica_id: undo_id.replica_id as u32, + replica_id: undo_id.replica_id.as_u16() as u32, lamport_timestamp: undo_id.value, count: *count, }) @@ -246,7 +246,7 @@ pub fn serialize_diagnostics<'a>( /// Serializes an [`Anchor`] to be sent over RPC. pub fn serialize_anchor(anchor: &Anchor) -> proto::Anchor { proto::Anchor { - replica_id: anchor.timestamp.replica_id as u32, + replica_id: anchor.timestamp.replica_id.as_u16() as u32, timestamp: anchor.timestamp.value, offset: anchor.offset as u64, bias: match anchor.bias { @@ -283,7 +283,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result { crate::Operation::Buffer(text::Operation::Undo(UndoOperation { timestamp: clock::Lamport { - replica_id: undo.replica_id as ReplicaId, + replica_id: ReplicaId::new(undo.replica_id as u16), value: undo.lamport_timestamp, }, version: deserialize_version(&undo.version), @@ -293,7 +293,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result Result Result { crate::Operation::UpdateDiagnostics { lamport_timestamp: clock::Lamport { - replica_id: message.replica_id as ReplicaId, + replica_id: ReplicaId::new(message.replica_id as u16), value: message.lamport_timestamp, }, server_id: LanguageServerId(message.server_id as usize), @@ -344,7 +344,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result Result { crate::Operation::UpdateLineEnding { lamport_timestamp: clock::Lamport { - replica_id: message.replica_id as ReplicaId, + replica_id: ReplicaId::new(message.replica_id as u16), value: message.lamport_timestamp, }, line_ending: deserialize_line_ending( @@ -370,7 +370,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result EditOperation { EditOperation { timestamp: clock::Lamport { - replica_id: edit.replica_id as ReplicaId, + replica_id: ReplicaId::new(edit.replica_id as u16), value: edit.lamport_timestamp, }, version: deserialize_version(&edit.version), @@ -385,7 +385,7 @@ pub fn deserialize_undo_map_entry( ) -> (clock::Lamport, Vec<(clock::Lamport, u32)>) { ( clock::Lamport { - replica_id: entry.replica_id as u16, + replica_id: ReplicaId::new(entry.replica_id as u16), value: entry.local_timestamp, }, entry @@ -394,7 +394,7 @@ pub fn deserialize_undo_map_entry( .map(|undo_count| { ( clock::Lamport { - replica_id: undo_count.replica_id as u16, + replica_id: ReplicaId::new(undo_count.replica_id as u16), value: undo_count.lamport_timestamp, }, undo_count.count, @@ -480,7 +480,7 @@ pub fn deserialize_anchor(anchor: proto::Anchor) -> Option { }; Some(Anchor { timestamp: clock::Lamport { - replica_id: anchor.replica_id as ReplicaId, + replica_id: ReplicaId::new(anchor.replica_id as u16), value: anchor.timestamp, }, offset: anchor.offset as usize, @@ -524,7 +524,7 @@ pub fn lamport_timestamp_for_operation(operation: &proto::Operation) -> Option Result proto::LamportTimestamp { proto::LamportTimestamp { - replica_id: timestamp.replica_id as u32, + replica_id: timestamp.replica_id.as_u16() as u32, value: timestamp.value, } } @@ -567,7 +567,7 @@ pub fn serialize_timestamp(timestamp: clock::Lamport) -> proto::LamportTimestamp /// Deserializes a [`clock::Lamport`] timestamp from the RPC representation. pub fn deserialize_timestamp(timestamp: proto::LamportTimestamp) -> clock::Lamport { clock::Lamport { - replica_id: timestamp.replica_id as ReplicaId, + replica_id: ReplicaId::new(timestamp.replica_id as u16), value: timestamp.value, } } @@ -590,7 +590,7 @@ pub fn deserialize_version(message: &[proto::VectorClockEntry]) -> clock::Global let mut version = clock::Global::new(); for entry in message { version.observe(clock::Lamport { - replica_id: entry.replica_id as ReplicaId, + replica_id: ReplicaId::new(entry.replica_id as u16), value: entry.timestamp, }); } @@ -602,7 +602,7 @@ pub fn serialize_version(version: &clock::Global) -> Vec (Buf .now_or_never() .unwrap() .unwrap(); - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), ""); + let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), ""); let mut mutated_syntax_map = SyntaxMap::new(&buffer); mutated_syntax_map.set_language_registry(registry.clone()); diff --git a/crates/multi_buffer/src/multi_buffer.rs b/crates/multi_buffer/src/multi_buffer.rs index 18a619212dba49bf1c384679ec90120af80e0e2a..94966708ac0e49fc01c7e1617fdd41149fc0a4e9 100644 --- a/crates/multi_buffer/src/multi_buffer.rs +++ b/crates/multi_buffer/src/multi_buffer.rs @@ -666,7 +666,7 @@ impl MultiBuffer { paths_by_excerpt: Default::default(), buffer_changed_since_sync: Default::default(), history: History { - next_transaction_id: clock::Lamport::default(), + next_transaction_id: clock::Lamport::MIN, undo_stack: Vec::new(), redo_stack: Vec::new(), transaction_depth: 0, diff --git a/crates/multi_buffer/src/multi_buffer_tests.rs b/crates/multi_buffer/src/multi_buffer_tests.rs index 1532e5b68ec29e6befbbd17fa97b966449874822..49db1fc2e264583f90f1a96195c560f0e52e8205 100644 --- a/crates/multi_buffer/src/multi_buffer_tests.rs +++ b/crates/multi_buffer/src/multi_buffer_tests.rs @@ -78,7 +78,9 @@ fn test_remote(cx: &mut App) { let ops = cx .background_executor() .block(host_buffer.read(cx).serialize_ops(None, cx)); - let mut buffer = Buffer::from_proto(1, Capability::ReadWrite, state, None).unwrap(); + let mut buffer = + Buffer::from_proto(ReplicaId::REMOTE_SERVER, Capability::ReadWrite, state, None) + .unwrap(); buffer.apply_ops( ops.into_iter() .map(|op| language::proto::deserialize_operation(op).unwrap()), @@ -3636,7 +3638,7 @@ fn assert_position_translation(snapshot: &MultiBufferSnapshot) { fn assert_line_indents(snapshot: &MultiBufferSnapshot) { let max_row = snapshot.max_point().row; let buffer_id = snapshot.excerpts().next().unwrap().1.remote_id(); - let text = text::Buffer::new(0, buffer_id, snapshot.text()); + let text = text::Buffer::new(ReplicaId::LOCAL, buffer_id, snapshot.text()); let mut line_indents = text .line_indents_in_row_range(0..max_row + 1) .collect::>(); diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs index 8a4d4f7918c12abd94cf7bf8fc97c939db7ce033..9c7caa6280c6502a5279a48d63e1ee4f42d9e11c 100644 --- a/crates/project/src/buffer_store.rs +++ b/crates/project/src/buffer_store.rs @@ -25,7 +25,7 @@ use rpc::{ }; use smol::channel::Receiver; use std::{io, pin::pin, sync::Arc, time::Instant}; -use text::BufferId; +use text::{BufferId, ReplicaId}; use util::{ResultExt as _, TryFutureExt, debug_panic, maybe, rel_path::RelPath}; use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId}; @@ -158,7 +158,7 @@ impl RemoteBufferStore { pub fn handle_create_buffer_for_peer( &mut self, envelope: TypedEnvelope, - replica_id: u16, + replica_id: ReplicaId, capability: Capability, cx: &mut Context, ) -> Result>> { @@ -626,7 +626,9 @@ impl LocalBufferStore { cx.spawn(async move |_, cx| { let loaded = load_file.await?; let text_buffer = cx - .background_spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) }) + .background_spawn(async move { + text::Buffer::new(ReplicaId::LOCAL, buffer_id, loaded.text) + }) .await; cx.insert_entity(reservation, |_| { Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite) @@ -639,7 +641,7 @@ impl LocalBufferStore { Ok(buffer) => Ok(buffer), Err(error) if is_not_found_error(&error) => cx.new(|cx| { let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64()); - let text_buffer = text::Buffer::new(0, buffer_id, ""); + let text_buffer = text::Buffer::new(ReplicaId::LOCAL, buffer_id, ""); Buffer::build( text_buffer, Some(Arc::new(File { @@ -917,7 +919,7 @@ impl BufferStore { path: file.path.clone(), worktree_id: file.worktree_id(cx), }); - let is_remote = buffer.replica_id() != 0; + let is_remote = buffer.replica_id().is_remote(); let open_buffer = OpenBuffer::Complete { buffer: buffer_entity.downgrade(), }; @@ -1317,7 +1319,7 @@ impl BufferStore { pub fn handle_create_buffer_for_peer( &mut self, envelope: TypedEnvelope, - replica_id: u16, + replica_id: ReplicaId, capability: Capability, cx: &mut Context, ) -> Result<()> { diff --git a/crates/project/src/git_store/conflict_set.rs b/crates/project/src/git_store/conflict_set.rs index 879280c885a0bfda20e5faa70e1e07f7d9fe038c..160a384a4a0ff4481c97b6eda75faded28f01624 100644 --- a/crates/project/src/git_store/conflict_set.rs +++ b/crates/project/src/git_store/conflict_set.rs @@ -271,7 +271,7 @@ mod tests { use language::language_settings::AllLanguageSettings; use serde_json::json; use settings::Settings as _; - use text::{Buffer, BufferId, Point, ToOffset as _}; + use text::{Buffer, BufferId, Point, ReplicaId, ToOffset as _}; use unindent::Unindent as _; use util::{path, rel_path::rel_path}; use worktree::WorktreeSettings; @@ -299,7 +299,7 @@ mod tests { .unindent(); let buffer_id = BufferId::new(1).unwrap(); - let buffer = Buffer::new(0, buffer_id, test_content); + let buffer = Buffer::new(ReplicaId::LOCAL, buffer_id, test_content); let snapshot = buffer.snapshot(); let conflict_snapshot = ConflictSet::parse(&snapshot); @@ -374,7 +374,7 @@ mod tests { .unindent(); let buffer_id = BufferId::new(1).unwrap(); - let buffer = Buffer::new(0, buffer_id, test_content); + let buffer = Buffer::new(ReplicaId::LOCAL, buffer_id, test_content); let snapshot = buffer.snapshot(); let conflict_snapshot = ConflictSet::parse(&snapshot); @@ -405,7 +405,7 @@ mod tests { >>>>>>> "# .unindent(); let buffer_id = BufferId::new(1).unwrap(); - let buffer = Buffer::new(0, buffer_id, test_content); + let buffer = Buffer::new(ReplicaId::LOCAL, buffer_id, test_content); let snapshot = buffer.snapshot(); let conflict_snapshot = ConflictSet::parse(&snapshot); @@ -447,7 +447,7 @@ mod tests { .unindent(); let buffer_id = BufferId::new(1).unwrap(); - let buffer = Buffer::new(0, buffer_id, test_content.clone()); + let buffer = Buffer::new(ReplicaId::LOCAL, buffer_id, test_content.clone()); let snapshot = buffer.snapshot(); let conflict_snapshot = ConflictSet::parse(&snapshot); diff --git a/crates/project/src/lsp_store.rs b/crates/project/src/lsp_store.rs index bdce7c26fd4b40dd37fd9bf67a03490bdf3c7bc5..c68d14d38866b2fbe8a97792bdf46a94469124a1 100644 --- a/crates/project/src/lsp_store.rs +++ b/crates/project/src/lsp_store.rs @@ -3025,9 +3025,8 @@ impl LocalLspStore { Some(buffer_to_edit.read(cx).saved_version().clone()) }; - let most_recent_edit = version.and_then(|version| { - version.iter().max_by_key(|timestamp| timestamp.value) - }); + let most_recent_edit = + version.and_then(|version| version.most_recent()); // Check if the edit that triggered that edit has been made by this participant. if let Some(most_recent_edit) = most_recent_edit { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 56a2811f07a4c3f37c610df48bb7c6db1904f9f2..4a7ac5fa50fac05bb81e5374fd213a6e4ff5bda4 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1560,7 +1560,7 @@ impl Project { })?; let agent_server_store = cx.new(|cx| AgentServerStore::collab(cx))?; - let replica_id = response.payload.replica_id as ReplicaId; + let replica_id = ReplicaId::new(response.payload.replica_id as u16); let project = cx.new(|cx| { let snippets = SnippetProvider::new(fs.clone(), BTreeSet::from_iter([]), cx); @@ -1975,9 +1975,9 @@ impl Project { ProjectClientState::Remote { replica_id, .. } => replica_id, _ => { if self.remote_client.is_some() { - 1 + ReplicaId::REMOTE_SERVER } else { - 0 + ReplicaId::LOCAL } } } diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index 24612d974d43fa2d8b9ad7bf188c7e3b51726f25..43f233b2d1fdb8ffdeca6fe7e40d7c28a8e3084c 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -4307,7 +4307,7 @@ async fn test_rescan_and_remote_updates(cx: &mut gpui::TestAppContext) { let remote = cx.update(|cx| { Worktree::remote( 0, - 1, + ReplicaId::REMOTE_SERVER, metadata, project.read(cx).client().into(), project.read(cx).path_style(cx), diff --git a/crates/project/src/worktree_store.rs b/crates/project/src/worktree_store.rs index 670b405ed33757117ec62bfbbb4c947f79e5026a..e6da207dadbde3ebc725fbb84ed19b3b35414f87 100644 --- a/crates/project/src/worktree_store.rs +++ b/crates/project/src/worktree_store.rs @@ -551,7 +551,7 @@ impl WorktreeStore { let worktree = cx.update(|cx| { Worktree::remote( REMOTE_SERVER_PROJECT_ID, - 0, + ReplicaId::REMOTE_SERVER, proto::WorktreeMetadata { id: response.worktree_id, root_name, diff --git a/crates/text/src/anchor.rs b/crates/text/src/anchor.rs index 56172c21afcf9fa70a9039218feea59f055a27c5..6b0db2f9352997cd5cef4544edc388bc9c5cd209 100644 --- a/crates/text/src/anchor.rs +++ b/crates/text/src/anchor.rs @@ -6,7 +6,7 @@ use std::{cmp::Ordering, fmt::Debug, ops::Range}; use sum_tree::{Bias, Dimensions}; /// A timestamped position in a buffer -#[derive(Copy, Clone, Eq, PartialEq, Debug, Hash, Default)] +#[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)] pub struct Anchor { pub timestamp: clock::Lamport, /// The byte offset in the buffer diff --git a/crates/text/src/operation_queue.rs b/crates/text/src/operation_queue.rs index 6604817edfe2dcc243ba837a770b361bd505a7ef..f87af381ff314f91469a9b5d438e667fe6ea190f 100644 --- a/crates/text/src/operation_queue.rs +++ b/crates/text/src/operation_queue.rs @@ -1,3 +1,4 @@ +use clock::Lamport; use std::{fmt::Debug, ops::Add}; use sum_tree::{ContextLessSummary, Dimension, Edit, Item, KeyedItem, SumTree}; @@ -11,10 +12,10 @@ struct OperationItem(T); #[derive(Clone, Debug)] pub struct OperationQueue(SumTree>); -#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)] +#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)] pub struct OperationKey(clock::Lamport); -#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct OperationSummary { pub key: OperationKey, pub len: usize, @@ -69,7 +70,10 @@ impl OperationQueue { impl ContextLessSummary for OperationSummary { fn zero() -> Self { - Default::default() + OperationSummary { + key: OperationKey::new(Lamport::MIN), + len: 0, + } } fn add_summary(&mut self, other: &Self) { @@ -93,7 +97,7 @@ impl Add<&Self> for OperationSummary { impl Dimension<'_, OperationSummary> for OperationKey { fn zero(_cx: ()) -> Self { - Default::default() + OperationKey::new(Lamport::MIN) } fn add_summary(&mut self, summary: &OperationSummary, _: ()) { @@ -123,11 +127,13 @@ impl KeyedItem for OperationItem { #[cfg(test)] mod tests { + use clock::ReplicaId; + use super::*; #[test] fn test_len() { - let mut clock = clock::Lamport::new(0); + let mut clock = clock::Lamport::new(ReplicaId::LOCAL); let mut queue = OperationQueue::new(); assert_eq!(queue.len(), 0); diff --git a/crates/text/src/tests.rs b/crates/text/src/tests.rs index 4298e704ab5f8fbe57af363379395ef23624cfcf..c9e04e407ffdb8ffde6b139e01d78822e54e1a4b 100644 --- a/crates/text/src/tests.rs +++ b/crates/text/src/tests.rs @@ -16,7 +16,7 @@ fn init_logger() { #[test] fn test_edit() { - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "abc"); + let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "abc"); assert_eq!(buffer.text(), "abc"); buffer.edit([(3..3, "def")]); assert_eq!(buffer.text(), "abcdef"); @@ -40,7 +40,11 @@ fn test_random_edits(mut rng: StdRng) { let mut reference_string = RandomCharIter::new(&mut rng) .take(reference_string_len) .collect::(); - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), reference_string.clone()); + let mut buffer = Buffer::new( + ReplicaId::LOCAL, + BufferId::new(1).unwrap(), + reference_string.clone(), + ); LineEnding::normalize(&mut reference_string); buffer.set_group_interval(Duration::from_millis(rng.random_range(0..=200))); @@ -176,7 +180,11 @@ fn test_line_endings() { LineEnding::Windows ); - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "one\r\ntwo\rthree"); + let mut buffer = Buffer::new( + ReplicaId::LOCAL, + BufferId::new(1).unwrap(), + "one\r\ntwo\rthree", + ); assert_eq!(buffer.text(), "one\ntwo\nthree"); assert_eq!(buffer.line_ending(), LineEnding::Windows); buffer.check_invariants(); @@ -190,7 +198,7 @@ fn test_line_endings() { #[test] fn test_line_len() { - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), ""); + let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), ""); buffer.edit([(0..0, "abcd\nefg\nhij")]); buffer.edit([(12..12, "kl\nmno")]); buffer.edit([(18..18, "\npqrs\n")]); @@ -207,7 +215,7 @@ fn test_line_len() { #[test] fn test_common_prefix_at_position() { let text = "a = str; b = δα"; - let buffer = Buffer::new(0, BufferId::new(1).unwrap(), text); + let buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), text); let offset1 = offset_after(text, "str"); let offset2 = offset_after(text, "δα"); @@ -256,7 +264,7 @@ fn test_common_prefix_at_position() { #[test] fn test_text_summary_for_range() { let buffer = Buffer::new( - 0, + ReplicaId::LOCAL, BufferId::new(1).unwrap(), "ab\nefg\nhklm\nnopqrs\ntuvwxyz", ); @@ -348,7 +356,7 @@ fn test_text_summary_for_range() { #[test] fn test_chars_at() { - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), ""); + let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), ""); buffer.edit([(0..0, "abcd\nefgh\nij")]); buffer.edit([(12..12, "kl\nmno")]); buffer.edit([(18..18, "\npqrs")]); @@ -370,7 +378,7 @@ fn test_chars_at() { assert_eq!(chars.collect::(), "PQrs"); // Regression test: - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), ""); + let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), ""); buffer.edit([(0..0, "[workspace]\nmembers = [\n \"xray_core\",\n \"xray_server\",\n \"xray_cli\",\n \"xray_wasm\",\n]\n")]); buffer.edit([(60..60, "\n")]); @@ -380,7 +388,7 @@ fn test_chars_at() { #[test] fn test_anchors() { - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), ""); + let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), ""); buffer.edit([(0..0, "abc")]); let left_anchor = buffer.anchor_before(2); let right_anchor = buffer.anchor_after(2); @@ -498,7 +506,7 @@ fn test_anchors() { #[test] fn test_anchors_at_start_and_end() { - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), ""); + let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), ""); let before_start_anchor = buffer.anchor_before(0); let after_end_anchor = buffer.anchor_after(0); @@ -521,7 +529,7 @@ fn test_anchors_at_start_and_end() { #[test] fn test_undo_redo() { - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "1234"); + let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "1234"); // Set group interval to zero so as to not group edits in the undo stack. buffer.set_group_interval(Duration::from_secs(0)); @@ -558,7 +566,7 @@ fn test_undo_redo() { #[test] fn test_history() { let mut now = Instant::now(); - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "123456"); + let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "123456"); buffer.set_group_interval(Duration::from_millis(300)); let transaction_1 = buffer.start_transaction_at(now).unwrap(); @@ -625,7 +633,7 @@ fn test_history() { #[test] fn test_finalize_last_transaction() { let now = Instant::now(); - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "123456"); + let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "123456"); buffer.history.group_interval = Duration::from_millis(1); buffer.start_transaction_at(now); @@ -661,7 +669,7 @@ fn test_finalize_last_transaction() { #[test] fn test_edited_ranges_for_transaction() { let now = Instant::now(); - let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "1234567"); + let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "1234567"); buffer.start_transaction_at(now); buffer.edit([(2..4, "cd")]); @@ -700,9 +708,9 @@ fn test_edited_ranges_for_transaction() { fn test_concurrent_edits() { let text = "abcdef"; - let mut buffer1 = Buffer::new(1, BufferId::new(1).unwrap(), text); - let mut buffer2 = Buffer::new(2, BufferId::new(1).unwrap(), text); - let mut buffer3 = Buffer::new(3, BufferId::new(1).unwrap(), text); + let mut buffer1 = Buffer::new(ReplicaId::new(1), BufferId::new(1).unwrap(), text); + let mut buffer2 = Buffer::new(ReplicaId::new(2), BufferId::new(1).unwrap(), text); + let mut buffer3 = Buffer::new(ReplicaId::new(3), BufferId::new(1).unwrap(), text); let buf1_op = buffer1.edit([(1..2, "12")]); assert_eq!(buffer1.text(), "a12cdef"); @@ -741,11 +749,15 @@ fn test_random_concurrent_edits(mut rng: StdRng) { let mut network = Network::new(rng.clone()); for i in 0..peers { - let mut buffer = Buffer::new(i as ReplicaId, BufferId::new(1).unwrap(), base_text.clone()); + let mut buffer = Buffer::new( + ReplicaId::new(i as u16), + BufferId::new(1).unwrap(), + base_text.clone(), + ); buffer.history.group_interval = Duration::from_millis(rng.random_range(0..=200)); buffers.push(buffer); - replica_ids.push(i as u16); - network.add_peer(i as u16); + replica_ids.push(ReplicaId::new(i as u16)); + network.add_peer(ReplicaId::new(i as u16)); } log::info!("initial text: {:?}", base_text); @@ -759,7 +771,7 @@ fn test_random_concurrent_edits(mut rng: StdRng) { 0..=50 if mutation_count != 0 => { let op = buffer.randomly_edit(&mut rng, 5).1; network.broadcast(buffer.replica_id, vec![op]); - log::info!("buffer {} text: {:?}", buffer.replica_id, buffer.text()); + log::info!("buffer {:?} text: {:?}", buffer.replica_id, buffer.text()); mutation_count -= 1; } 51..=70 if mutation_count != 0 => { @@ -771,7 +783,7 @@ fn test_random_concurrent_edits(mut rng: StdRng) { let ops = network.receive(replica_id); if !ops.is_empty() { log::info!( - "peer {} applying {} ops from the network.", + "peer {:?} applying {} ops from the network.", replica_id, ops.len() ); @@ -792,7 +804,7 @@ fn test_random_concurrent_edits(mut rng: StdRng) { assert_eq!( buffer.text(), first_buffer.text(), - "Replica {} text != Replica 0 text", + "Replica {:?} text != Replica 0 text", buffer.replica_id ); buffer.check_invariants(); diff --git a/crates/text/src/text.rs b/crates/text/src/text.rs index 2eacef5ae037e0d45c53404f30d8c81bdedf4dc1..0516bf21c949db266aad025500f51aab9cec0958 100644 --- a/crates/text/src/text.rs +++ b/crates/text/src/text.rs @@ -12,7 +12,7 @@ mod undo_map; pub use anchor::*; use anyhow::{Context as _, Result}; -use clock::LOCAL_BRANCH_REPLICA_ID; +use clock::Lamport; pub use clock::ReplicaId; use collections::{HashMap, HashSet}; use locator::Locator; @@ -573,7 +573,7 @@ struct InsertionFragment { fragment_id: Locator, } -#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] struct InsertionFragmentKey { timestamp: clock::Lamport, split_offset: usize, @@ -709,7 +709,7 @@ impl FromIterator for LineIndent { } impl Buffer { - pub fn new(replica_id: u16, remote_id: BufferId, base_text: impl Into) -> Buffer { + pub fn new(replica_id: ReplicaId, remote_id: BufferId, base_text: impl Into) -> Buffer { let mut base_text = base_text.into(); let line_ending = LineEnding::detect(&base_text); LineEnding::normalize(&mut base_text); @@ -717,7 +717,7 @@ impl Buffer { } pub fn new_normalized( - replica_id: u16, + replica_id: ReplicaId, remote_id: BufferId, line_ending: LineEnding, normalized: Rope, @@ -731,10 +731,7 @@ impl Buffer { let visible_text = history.base_text.clone(); if !visible_text.is_empty() { - let insertion_timestamp = clock::Lamport { - replica_id: 0, - value: 1, - }; + let insertion_timestamp = clock::Lamport::new(ReplicaId::LOCAL); lamport_clock.observe(insertion_timestamp); version.observe(insertion_timestamp); let fragment_id = Locator::between(&Locator::min(), &Locator::max()); @@ -788,7 +785,7 @@ impl Buffer { history: History::new(self.base_text().clone()), deferred_ops: OperationQueue::new(), deferred_replicas: HashSet::default(), - lamport_clock: clock::Lamport::new(LOCAL_BRANCH_REPLICA_ID), + lamport_clock: clock::Lamport::new(ReplicaId::LOCAL_BRANCH), subscriptions: Default::default(), edit_id_resolvers: Default::default(), wait_for_version_txs: Default::default(), @@ -1254,7 +1251,7 @@ impl Buffer { for edit_id in edit_ids { let insertion_slice = InsertionSlice { edit_id: *edit_id, - insertion_id: clock::Lamport::default(), + insertion_id: clock::Lamport::MIN, range: 0..0, }; let slices = self @@ -1858,7 +1855,7 @@ impl Buffer { T: rand::Rng, { let mut edits = self.get_random_edits(rng, edit_count); - log::info!("mutating buffer {} with {:?}", self.replica_id, edits); + log::info!("mutating buffer {:?} with {:?}", self.replica_id, edits); let op = self.edit(edits.iter().cloned()); if let Operation::Edit(edit) = &op { @@ -1881,7 +1878,7 @@ impl Buffer { if let Some(entry) = self.history.undo_stack.choose(rng) { let transaction = entry.transaction.clone(); log::info!( - "undoing buffer {} transaction {:?}", + "undoing buffer {:?} transaction {:?}", self.replica_id, transaction ); @@ -2918,7 +2915,10 @@ impl InsertionFragment { impl sum_tree::ContextLessSummary for InsertionFragmentKey { fn zero() -> Self { - Default::default() + InsertionFragmentKey { + timestamp: Lamport::MIN, + split_offset: 0, + } } fn add_summary(&mut self, summary: &Self) { diff --git a/crates/text/src/undo_map.rs b/crates/text/src/undo_map.rs index 60b22a9edba70b65d30c60a0b9ca15b8f286cc85..2c2eba8de62ace68b5953b832a2a29be2317175e 100644 --- a/crates/text/src/undo_map.rs +++ b/crates/text/src/undo_map.rs @@ -1,4 +1,5 @@ use crate::UndoOperation; +use clock::Lamport; use std::cmp; use sum_tree::{Bias, SumTree}; @@ -24,7 +25,7 @@ impl sum_tree::KeyedItem for UndoMapEntry { } } -#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] struct UndoMapKey { edit_id: clock::Lamport, undo_id: clock::Lamport, @@ -32,7 +33,10 @@ struct UndoMapKey { impl sum_tree::ContextLessSummary for UndoMapKey { fn zero() -> Self { - Default::default() + UndoMapKey { + edit_id: Lamport::MIN, + undo_id: Lamport::MIN, + } } fn add_summary(&mut self, summary: &Self) { @@ -69,7 +73,7 @@ impl UndoMap { cursor.seek( &UndoMapKey { edit_id, - undo_id: Default::default(), + undo_id: Lamport::MIN, }, Bias::Left, ); @@ -93,7 +97,7 @@ impl UndoMap { cursor.seek( &UndoMapKey { edit_id, - undo_id: Default::default(), + undo_id: Lamport::MIN, }, Bias::Left, ); diff --git a/crates/worktree/src/worktree.rs b/crates/worktree/src/worktree.rs index 447dc0eeb5fe69aa8a936a5850e788d315a69bac..f889fb3b8218b18983c4509b653cf0e0ce863fcd 100644 --- a/crates/worktree/src/worktree.rs +++ b/crates/worktree/src/worktree.rs @@ -656,7 +656,7 @@ impl Worktree { pub fn replica_id(&self) -> ReplicaId { match self { - Worktree::Local(_) => 0, + Worktree::Local(_) => ReplicaId::LOCAL, Worktree::Remote(worktree) => worktree.replica_id, } } diff --git a/crates/zeta/src/zeta.rs b/crates/zeta/src/zeta.rs index 1d48571d7b06f35d82934122919e75bbbd087ffa..454a1526a9e8c6a75d47bda875feb6843b454a0d 100644 --- a/crates/zeta/src/zeta.rs +++ b/crates/zeta/src/zeta.rs @@ -1581,7 +1581,7 @@ fn guess_token_count(bytes: usize) -> usize { #[cfg(test)] mod tests { use client::test::FakeServer; - use clock::FakeSystemClock; + use clock::{FakeSystemClock, ReplicaId}; use cloud_api_types::{CreateLlmTokenResponse, LlmToken}; use gpui::TestAppContext; use http_client::FakeHttpClient; @@ -1839,7 +1839,7 @@ mod tests { let buffer = cx.new(|_cx| { Buffer::remote( language::BufferId::new(1).unwrap(), - 1, + ReplicaId::new(1), language::Capability::ReadWrite, "fn main() {\n println!(\"Hello\");\n}", )