1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::time::Instant;
12use std::{
13 cmp,
14 fmt::Debug,
15 io, iter,
16 time::{Duration, SystemTime, UNIX_EPOCH},
17};
18use std::{fmt, mem};
19
20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
21
22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
23 const NAME: &'static str;
24 const PRIORITY: MessagePriority;
25 fn into_envelope(
26 self,
27 id: u32,
28 responding_to: Option<u32>,
29 original_sender_id: Option<PeerId>,
30 ) -> Envelope;
31 fn from_envelope(envelope: Envelope) -> Option<Self>;
32}
33
34pub trait EntityMessage: EnvelopedMessage {
35 type Entity;
36 fn remote_entity_id(&self) -> u64;
37}
38
39pub trait RequestMessage: EnvelopedMessage {
40 type Response: EnvelopedMessage;
41}
42
43pub trait AnyTypedEnvelope: 'static + Send + Sync {
44 fn payload_type_id(&self) -> TypeId;
45 fn payload_type_name(&self) -> &'static str;
46 fn as_any(&self) -> &dyn Any;
47 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
48 fn is_background(&self) -> bool;
49 fn original_sender_id(&self) -> Option<PeerId>;
50 fn sender_id(&self) -> ConnectionId;
51 fn message_id(&self) -> u32;
52}
53
54pub enum MessagePriority {
55 Foreground,
56 Background,
57}
58
59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
60 fn payload_type_id(&self) -> TypeId {
61 TypeId::of::<T>()
62 }
63
64 fn payload_type_name(&self) -> &'static str {
65 T::NAME
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
73 self
74 }
75
76 fn is_background(&self) -> bool {
77 matches!(T::PRIORITY, MessagePriority::Background)
78 }
79
80 fn original_sender_id(&self) -> Option<PeerId> {
81 self.original_sender_id
82 }
83
84 fn sender_id(&self) -> ConnectionId {
85 self.sender_id
86 }
87
88 fn message_id(&self) -> u32 {
89 self.message_id
90 }
91}
92
93impl PeerId {
94 pub fn from_u64(peer_id: u64) -> Self {
95 let owner_id = (peer_id >> 32) as u32;
96 let id = peer_id as u32;
97 Self { owner_id, id }
98 }
99
100 pub fn as_u64(self) -> u64 {
101 ((self.owner_id as u64) << 32) | (self.id as u64)
102 }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110 fn cmp(&self, other: &Self) -> cmp::Ordering {
111 self.owner_id
112 .cmp(&other.owner_id)
113 .then_with(|| self.id.cmp(&other.id))
114 }
115}
116
117impl PartialOrd for PeerId {
118 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119 Some(self.cmp(other))
120 }
121}
122
123impl std::hash::Hash for PeerId {
124 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125 self.owner_id.hash(state);
126 self.id.hash(state);
127 }
128}
129
130impl fmt::Display for PeerId {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(f, "{}/{}", self.owner_id, self.id)
133 }
134}
135
136messages!(
137 (Ack, Foreground),
138 (AckBufferOperation, Background),
139 (AckChannelMessage, Background),
140 (AddNotification, Foreground),
141 (AddProjectCollaborator, Foreground),
142 (ApplyCodeAction, Background),
143 (ApplyCodeActionResponse, Background),
144 (ApplyCompletionAdditionalEdits, Background),
145 (ApplyCompletionAdditionalEditsResponse, Background),
146 (BufferReloaded, Foreground),
147 (BufferSaved, Foreground),
148 (Call, Foreground),
149 (CallCanceled, Foreground),
150 (CancelCall, Foreground),
151 (ChannelMessageSent, Foreground),
152 (ChannelMessageUpdate, Foreground),
153 (CompleteWithLanguageModel, Background),
154 (ComputeEmbeddings, Background),
155 (ComputeEmbeddingsResponse, Background),
156 (CopyProjectEntry, Foreground),
157 (CountTokensWithLanguageModel, Background),
158 (CountTokensResponse, Background),
159 (CreateBufferForPeer, Foreground),
160 (CreateChannel, Foreground),
161 (CreateChannelResponse, Foreground),
162 (CreateProjectEntry, Foreground),
163 (CreateRoom, Foreground),
164 (CreateRoomResponse, Foreground),
165 (DeclineCall, Foreground),
166 (DeleteChannel, Foreground),
167 (DeleteNotification, Foreground),
168 (UpdateNotification, Foreground),
169 (DeleteProjectEntry, Foreground),
170 (EndStream, Foreground),
171 (Error, Foreground),
172 (ExpandProjectEntry, Foreground),
173 (ExpandProjectEntryResponse, Foreground),
174 (Follow, Foreground),
175 (FollowResponse, Foreground),
176 (FormatBuffers, Foreground),
177 (FormatBuffersResponse, Foreground),
178 (FuzzySearchUsers, Foreground),
179 (GetCachedEmbeddings, Background),
180 (GetCachedEmbeddingsResponse, Background),
181 (GetChannelMembers, Foreground),
182 (GetChannelMembersResponse, Foreground),
183 (GetChannelMessages, Background),
184 (GetChannelMessagesById, Background),
185 (GetChannelMessagesResponse, Background),
186 (GetCodeActions, Background),
187 (GetCodeActionsResponse, Background),
188 (GetCompletions, Background),
189 (GetCompletionsResponse, Background),
190 (GetDefinition, Background),
191 (GetDefinitionResponse, Background),
192 (GetDocumentHighlights, Background),
193 (GetDocumentHighlightsResponse, Background),
194 (GetHover, Background),
195 (GetHoverResponse, Background),
196 (GetNotifications, Foreground),
197 (GetNotificationsResponse, Foreground),
198 (GetPrivateUserInfo, Foreground),
199 (GetPrivateUserInfoResponse, Foreground),
200 (GetProjectSymbols, Background),
201 (GetProjectSymbolsResponse, Background),
202 (GetReferences, Background),
203 (GetReferencesResponse, Background),
204 (GetTypeDefinition, Background),
205 (GetTypeDefinitionResponse, Background),
206 (GetImplementation, Background),
207 (GetImplementationResponse, Background),
208 (GetUsers, Foreground),
209 (Hello, Foreground),
210 (IncomingCall, Foreground),
211 (InlayHints, Background),
212 (InlayHintsResponse, Background),
213 (InviteChannelMember, Foreground),
214 (JoinChannel, Foreground),
215 (JoinChannelBuffer, Foreground),
216 (JoinChannelBufferResponse, Foreground),
217 (JoinChannelChat, Foreground),
218 (JoinChannelChatResponse, Foreground),
219 (JoinProject, Foreground),
220 (JoinHostedProject, Foreground),
221 (JoinProjectResponse, Foreground),
222 (JoinRoom, Foreground),
223 (JoinRoomResponse, Foreground),
224 (LanguageModelResponse, Background),
225 (LeaveChannelBuffer, Background),
226 (LeaveChannelChat, Foreground),
227 (LeaveProject, Foreground),
228 (LeaveRoom, Foreground),
229 (MarkNotificationRead, Foreground),
230 (MoveChannel, Foreground),
231 (OnTypeFormatting, Background),
232 (OnTypeFormattingResponse, Background),
233 (OpenBufferById, Background),
234 (OpenBufferByPath, Background),
235 (OpenBufferForSymbol, Background),
236 (OpenBufferForSymbolResponse, Background),
237 (OpenBufferResponse, Background),
238 (PerformRename, Background),
239 (PerformRenameResponse, Background),
240 (Ping, Foreground),
241 (PrepareRename, Background),
242 (PrepareRenameResponse, Background),
243 (ProjectEntryResponse, Foreground),
244 (RefreshInlayHints, Foreground),
245 (RejoinChannelBuffers, Foreground),
246 (RejoinChannelBuffersResponse, Foreground),
247 (RejoinRoom, Foreground),
248 (RejoinRoomResponse, Foreground),
249 (ReloadBuffers, Foreground),
250 (ReloadBuffersResponse, Foreground),
251 (RemoveChannelMember, Foreground),
252 (RemoveChannelMessage, Foreground),
253 (UpdateChannelMessage, Foreground),
254 (RemoveContact, Foreground),
255 (RemoveProjectCollaborator, Foreground),
256 (RenameChannel, Foreground),
257 (RenameChannelResponse, Foreground),
258 (RenameProjectEntry, Foreground),
259 (RequestContact, Foreground),
260 (ResolveCompletionDocumentation, Background),
261 (ResolveCompletionDocumentationResponse, Background),
262 (ResolveInlayHint, Background),
263 (ResolveInlayHintResponse, Background),
264 (RespondToChannelInvite, Foreground),
265 (RespondToContactRequest, Foreground),
266 (RoomUpdated, Foreground),
267 (SaveBuffer, Foreground),
268 (SetChannelMemberRole, Foreground),
269 (SetChannelVisibility, Foreground),
270 (SearchProject, Background),
271 (SearchProjectResponse, Background),
272 (SendChannelMessage, Background),
273 (SendChannelMessageResponse, Background),
274 (ShareProject, Foreground),
275 (ShareProjectResponse, Foreground),
276 (ShowContacts, Foreground),
277 (StartLanguageServer, Foreground),
278 (SynchronizeBuffers, Foreground),
279 (SynchronizeBuffersResponse, Foreground),
280 (Test, Foreground),
281 (Unfollow, Foreground),
282 (UnshareProject, Foreground),
283 (UpdateBuffer, Foreground),
284 (UpdateBufferFile, Foreground),
285 (UpdateChannelBuffer, Foreground),
286 (UpdateChannelBufferCollaborators, Foreground),
287 (UpdateChannels, Foreground),
288 (UpdateUserChannels, Foreground),
289 (UpdateContacts, Foreground),
290 (UpdateDiagnosticSummary, Foreground),
291 (UpdateDiffBase, Foreground),
292 (UpdateFollowers, Foreground),
293 (UpdateInviteInfo, Foreground),
294 (UpdateLanguageServer, Foreground),
295 (UpdateParticipantLocation, Foreground),
296 (UpdateProject, Foreground),
297 (UpdateProjectCollaborator, Foreground),
298 (UpdateWorktree, Foreground),
299 (UpdateWorktreeSettings, Foreground),
300 (UsersResponse, Foreground),
301 (LspExtExpandMacro, Background),
302 (LspExtExpandMacroResponse, Background),
303 (SetRoomParticipantRole, Foreground),
304 (BlameBuffer, Foreground),
305 (BlameBufferResponse, Foreground),
306 (CreateRemoteProject, Background),
307 (CreateRemoteProjectResponse, Foreground),
308 (CreateDevServer, Foreground),
309 (CreateDevServerResponse, Foreground),
310 (DevServerInstructions, Foreground),
311 (ShutdownDevServer, Foreground),
312 (ReconnectDevServer, Foreground),
313 (ReconnectDevServerResponse, Foreground),
314 (ShareRemoteProject, Foreground),
315 (JoinRemoteProject, Foreground),
316 (RejoinRemoteProjects, Foreground),
317 (RejoinRemoteProjectsResponse, Foreground),
318 (MultiLspQuery, Background),
319 (MultiLspQueryResponse, Background),
320 (RemoteProjectsUpdate, Foreground),
321 (ValidateRemoteProjectRequest, Background),
322 (DeleteDevServer, Foreground),
323 (OpenNewBuffer, Foreground)
324);
325
326request_messages!(
327 (ApplyCodeAction, ApplyCodeActionResponse),
328 (
329 ApplyCompletionAdditionalEdits,
330 ApplyCompletionAdditionalEditsResponse
331 ),
332 (Call, Ack),
333 (CancelCall, Ack),
334 (CopyProjectEntry, ProjectEntryResponse),
335 (CompleteWithLanguageModel, LanguageModelResponse),
336 (ComputeEmbeddings, ComputeEmbeddingsResponse),
337 (CountTokensWithLanguageModel, CountTokensResponse),
338 (CreateChannel, CreateChannelResponse),
339 (CreateProjectEntry, ProjectEntryResponse),
340 (CreateRoom, CreateRoomResponse),
341 (DeclineCall, Ack),
342 (DeleteChannel, Ack),
343 (DeleteProjectEntry, ProjectEntryResponse),
344 (ExpandProjectEntry, ExpandProjectEntryResponse),
345 (Follow, FollowResponse),
346 (FormatBuffers, FormatBuffersResponse),
347 (FuzzySearchUsers, UsersResponse),
348 (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
349 (GetChannelMembers, GetChannelMembersResponse),
350 (GetChannelMessages, GetChannelMessagesResponse),
351 (GetChannelMessagesById, GetChannelMessagesResponse),
352 (GetCodeActions, GetCodeActionsResponse),
353 (GetCompletions, GetCompletionsResponse),
354 (GetDefinition, GetDefinitionResponse),
355 (GetImplementation, GetImplementationResponse),
356 (GetDocumentHighlights, GetDocumentHighlightsResponse),
357 (GetHover, GetHoverResponse),
358 (GetNotifications, GetNotificationsResponse),
359 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
360 (GetProjectSymbols, GetProjectSymbolsResponse),
361 (GetReferences, GetReferencesResponse),
362 (GetTypeDefinition, GetTypeDefinitionResponse),
363 (GetUsers, UsersResponse),
364 (IncomingCall, Ack),
365 (InlayHints, InlayHintsResponse),
366 (InviteChannelMember, Ack),
367 (JoinChannel, JoinRoomResponse),
368 (JoinChannelBuffer, JoinChannelBufferResponse),
369 (JoinChannelChat, JoinChannelChatResponse),
370 (JoinHostedProject, JoinProjectResponse),
371 (JoinProject, JoinProjectResponse),
372 (JoinRoom, JoinRoomResponse),
373 (LeaveChannelBuffer, Ack),
374 (LeaveRoom, Ack),
375 (MarkNotificationRead, Ack),
376 (MoveChannel, Ack),
377 (OnTypeFormatting, OnTypeFormattingResponse),
378 (OpenBufferById, OpenBufferResponse),
379 (OpenBufferByPath, OpenBufferResponse),
380 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
381 (OpenNewBuffer, OpenBufferResponse),
382 (PerformRename, PerformRenameResponse),
383 (Ping, Ack),
384 (PrepareRename, PrepareRenameResponse),
385 (RefreshInlayHints, Ack),
386 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
387 (RejoinRoom, RejoinRoomResponse),
388 (ReloadBuffers, ReloadBuffersResponse),
389 (RemoveChannelMember, Ack),
390 (RemoveChannelMessage, Ack),
391 (UpdateChannelMessage, Ack),
392 (RemoveContact, Ack),
393 (RenameChannel, RenameChannelResponse),
394 (RenameProjectEntry, ProjectEntryResponse),
395 (RequestContact, Ack),
396 (
397 ResolveCompletionDocumentation,
398 ResolveCompletionDocumentationResponse
399 ),
400 (ResolveInlayHint, ResolveInlayHintResponse),
401 (RespondToChannelInvite, Ack),
402 (RespondToContactRequest, Ack),
403 (SaveBuffer, BufferSaved),
404 (SearchProject, SearchProjectResponse),
405 (SendChannelMessage, SendChannelMessageResponse),
406 (SetChannelMemberRole, Ack),
407 (SetChannelVisibility, Ack),
408 (ShareProject, ShareProjectResponse),
409 (SynchronizeBuffers, SynchronizeBuffersResponse),
410 (Test, Test),
411 (UpdateBuffer, Ack),
412 (UpdateParticipantLocation, Ack),
413 (UpdateProject, Ack),
414 (UpdateWorktree, Ack),
415 (LspExtExpandMacro, LspExtExpandMacroResponse),
416 (SetRoomParticipantRole, Ack),
417 (BlameBuffer, BlameBufferResponse),
418 (CreateRemoteProject, CreateRemoteProjectResponse),
419 (CreateDevServer, CreateDevServerResponse),
420 (ShutdownDevServer, Ack),
421 (ShareRemoteProject, ShareProjectResponse),
422 (JoinRemoteProject, JoinProjectResponse),
423 (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
424 (ReconnectDevServer, ReconnectDevServerResponse),
425 (ValidateRemoteProjectRequest, Ack),
426 (MultiLspQuery, MultiLspQueryResponse),
427 (DeleteDevServer, Ack),
428);
429
430entity_messages!(
431 {project_id, ShareProject},
432 AddProjectCollaborator,
433 ApplyCodeAction,
434 ApplyCompletionAdditionalEdits,
435 BlameBuffer,
436 BufferReloaded,
437 BufferSaved,
438 CopyProjectEntry,
439 CreateBufferForPeer,
440 CreateProjectEntry,
441 DeleteProjectEntry,
442 ExpandProjectEntry,
443 FormatBuffers,
444 GetCodeActions,
445 GetCompletions,
446 GetDefinition,
447 GetImplementation,
448 GetDocumentHighlights,
449 GetHover,
450 GetProjectSymbols,
451 GetReferences,
452 GetTypeDefinition,
453 InlayHints,
454 JoinProject,
455 LeaveProject,
456 MultiLspQuery,
457 OnTypeFormatting,
458 OpenNewBuffer,
459 OpenBufferById,
460 OpenBufferByPath,
461 OpenBufferForSymbol,
462 PerformRename,
463 PrepareRename,
464 RefreshInlayHints,
465 ReloadBuffers,
466 RemoveProjectCollaborator,
467 RenameProjectEntry,
468 ResolveCompletionDocumentation,
469 ResolveInlayHint,
470 SaveBuffer,
471 SearchProject,
472 StartLanguageServer,
473 SynchronizeBuffers,
474 UnshareProject,
475 UpdateBuffer,
476 UpdateBufferFile,
477 UpdateDiagnosticSummary,
478 UpdateDiffBase,
479 UpdateLanguageServer,
480 UpdateProject,
481 UpdateProjectCollaborator,
482 UpdateWorktree,
483 UpdateWorktreeSettings,
484 LspExtExpandMacro,
485);
486
487entity_messages!(
488 {channel_id, Channel},
489 ChannelMessageSent,
490 ChannelMessageUpdate,
491 RemoveChannelMessage,
492 UpdateChannelMessage,
493 UpdateChannelBuffer,
494 UpdateChannelBufferCollaborators,
495);
496
497const KIB: usize = 1024;
498const MIB: usize = KIB * 1024;
499const MAX_BUFFER_LEN: usize = MIB;
500
501/// A stream of protobuf messages.
502pub struct MessageStream<S> {
503 stream: S,
504 encoding_buffer: Vec<u8>,
505}
506
507#[allow(clippy::large_enum_variant)]
508#[derive(Debug)]
509pub enum Message {
510 Envelope(Envelope),
511 Ping,
512 Pong,
513}
514
515impl<S> MessageStream<S> {
516 pub fn new(stream: S) -> Self {
517 Self {
518 stream,
519 encoding_buffer: Vec::new(),
520 }
521 }
522
523 pub fn inner_mut(&mut self) -> &mut S {
524 &mut self.stream
525 }
526}
527
528impl<S> MessageStream<S>
529where
530 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
531{
532 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
533 #[cfg(any(test, feature = "test-support"))]
534 const COMPRESSION_LEVEL: i32 = -7;
535
536 #[cfg(not(any(test, feature = "test-support")))]
537 const COMPRESSION_LEVEL: i32 = 4;
538
539 match message {
540 Message::Envelope(message) => {
541 self.encoding_buffer.reserve(message.encoded_len());
542 message
543 .encode(&mut self.encoding_buffer)
544 .map_err(io::Error::from)?;
545 let buffer =
546 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
547 .unwrap();
548
549 self.encoding_buffer.clear();
550 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
551 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
552 }
553 Message::Ping => {
554 self.stream
555 .send(WebSocketMessage::Ping(Default::default()))
556 .await?;
557 }
558 Message::Pong => {
559 self.stream
560 .send(WebSocketMessage::Pong(Default::default()))
561 .await?;
562 }
563 }
564
565 Ok(())
566 }
567}
568
569impl<S> MessageStream<S>
570where
571 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
572{
573 pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
574 while let Some(bytes) = self.stream.next().await {
575 let received_at = Instant::now();
576 match bytes? {
577 WebSocketMessage::Binary(bytes) => {
578 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
579 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
580 .map_err(io::Error::from)?;
581
582 self.encoding_buffer.clear();
583 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
584 return Ok((Message::Envelope(envelope), received_at));
585 }
586 WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
587 WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
588 WebSocketMessage::Close(_) => break,
589 _ => {}
590 }
591 }
592 Err(anyhow!("connection closed"))
593 }
594}
595
596impl From<Timestamp> for SystemTime {
597 fn from(val: Timestamp) -> Self {
598 UNIX_EPOCH
599 .checked_add(Duration::new(val.seconds, val.nanos))
600 .unwrap()
601 }
602}
603
604impl From<SystemTime> for Timestamp {
605 fn from(time: SystemTime) -> Self {
606 let duration = time.duration_since(UNIX_EPOCH).unwrap();
607 Self {
608 seconds: duration.as_secs(),
609 nanos: duration.subsec_nanos(),
610 }
611 }
612}
613
614impl From<u128> for Nonce {
615 fn from(nonce: u128) -> Self {
616 let upper_half = (nonce >> 64) as u64;
617 let lower_half = nonce as u64;
618 Self {
619 upper_half,
620 lower_half,
621 }
622 }
623}
624
625impl From<Nonce> for u128 {
626 fn from(nonce: Nonce) -> Self {
627 let upper_half = (nonce.upper_half as u128) << 64;
628 let lower_half = nonce.lower_half as u128;
629 upper_half | lower_half
630 }
631}
632
633pub fn split_worktree_update(
634 mut message: UpdateWorktree,
635 max_chunk_size: usize,
636) -> impl Iterator<Item = UpdateWorktree> {
637 let mut done_files = false;
638
639 let mut repository_map = message
640 .updated_repositories
641 .into_iter()
642 .map(|repo| (repo.work_directory_id, repo))
643 .collect::<HashMap<_, _>>();
644
645 iter::from_fn(move || {
646 if done_files {
647 return None;
648 }
649
650 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
651 let updated_entries: Vec<_> = message
652 .updated_entries
653 .drain(..updated_entries_chunk_size)
654 .collect();
655
656 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
657 let removed_entries = message
658 .removed_entries
659 .drain(..removed_entries_chunk_size)
660 .collect();
661
662 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
663
664 let mut updated_repositories = Vec::new();
665
666 if !repository_map.is_empty() {
667 for entry in &updated_entries {
668 if let Some(repo) = repository_map.remove(&entry.id) {
669 updated_repositories.push(repo)
670 }
671 }
672 }
673
674 let removed_repositories = if done_files {
675 mem::take(&mut message.removed_repositories)
676 } else {
677 Default::default()
678 };
679
680 if done_files {
681 updated_repositories.extend(mem::take(&mut repository_map).into_values());
682 }
683
684 Some(UpdateWorktree {
685 project_id: message.project_id,
686 worktree_id: message.worktree_id,
687 root_name: message.root_name.clone(),
688 abs_path: message.abs_path.clone(),
689 updated_entries,
690 removed_entries,
691 scan_id: message.scan_id,
692 is_last_update: done_files && message.is_last_update,
693 updated_repositories,
694 removed_repositories,
695 })
696 })
697}
698
699#[cfg(test)]
700mod tests {
701 use super::*;
702
703 #[gpui::test]
704 async fn test_buffer_size() {
705 let (tx, rx) = futures::channel::mpsc::unbounded();
706 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
707 sink.write(Message::Envelope(Envelope {
708 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
709 root_name: "abcdefg".repeat(10),
710 ..Default::default()
711 })),
712 ..Default::default()
713 }))
714 .await
715 .unwrap();
716 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
717 sink.write(Message::Envelope(Envelope {
718 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
719 root_name: "abcdefg".repeat(1000000),
720 ..Default::default()
721 })),
722 ..Default::default()
723 }))
724 .await
725 .unwrap();
726 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
727
728 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
729 stream.read().await.unwrap();
730 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
731 stream.read().await.unwrap();
732 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
733 }
734
735 #[gpui::test]
736 fn test_converting_peer_id_from_and_to_u64() {
737 let peer_id = PeerId {
738 owner_id: 10,
739 id: 3,
740 };
741 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
742 let peer_id = PeerId {
743 owner_id: u32::MAX,
744 id: 3,
745 };
746 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
747 let peer_id = PeerId {
748 owner_id: 10,
749 id: u32::MAX,
750 };
751 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
752 let peer_id = PeerId {
753 owner_id: u32::MAX,
754 id: u32::MAX,
755 };
756 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
757 }
758}