1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::time::Instant;
12use std::{
13 cmp,
14 fmt::Debug,
15 io, iter,
16 time::{Duration, SystemTime, UNIX_EPOCH},
17};
18use std::{fmt, mem};
19
20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
21
22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
23 const NAME: &'static str;
24 const PRIORITY: MessagePriority;
25 fn into_envelope(
26 self,
27 id: u32,
28 responding_to: Option<u32>,
29 original_sender_id: Option<PeerId>,
30 ) -> Envelope;
31 fn from_envelope(envelope: Envelope) -> Option<Self>;
32}
33
34pub trait EntityMessage: EnvelopedMessage {
35 type Entity;
36 fn remote_entity_id(&self) -> u64;
37}
38
39pub trait RequestMessage: EnvelopedMessage {
40 type Response: EnvelopedMessage;
41}
42
43pub trait AnyTypedEnvelope: 'static + Send + Sync {
44 fn payload_type_id(&self) -> TypeId;
45 fn payload_type_name(&self) -> &'static str;
46 fn as_any(&self) -> &dyn Any;
47 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
48 fn is_background(&self) -> bool;
49 fn original_sender_id(&self) -> Option<PeerId>;
50 fn sender_id(&self) -> ConnectionId;
51 fn message_id(&self) -> u32;
52}
53
54pub enum MessagePriority {
55 Foreground,
56 Background,
57}
58
59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
60 fn payload_type_id(&self) -> TypeId {
61 TypeId::of::<T>()
62 }
63
64 fn payload_type_name(&self) -> &'static str {
65 T::NAME
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
73 self
74 }
75
76 fn is_background(&self) -> bool {
77 matches!(T::PRIORITY, MessagePriority::Background)
78 }
79
80 fn original_sender_id(&self) -> Option<PeerId> {
81 self.original_sender_id
82 }
83
84 fn sender_id(&self) -> ConnectionId {
85 self.sender_id
86 }
87
88 fn message_id(&self) -> u32 {
89 self.message_id
90 }
91}
92
93impl PeerId {
94 pub fn from_u64(peer_id: u64) -> Self {
95 let owner_id = (peer_id >> 32) as u32;
96 let id = peer_id as u32;
97 Self { owner_id, id }
98 }
99
100 pub fn as_u64(self) -> u64 {
101 ((self.owner_id as u64) << 32) | (self.id as u64)
102 }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110 fn cmp(&self, other: &Self) -> cmp::Ordering {
111 self.owner_id
112 .cmp(&other.owner_id)
113 .then_with(|| self.id.cmp(&other.id))
114 }
115}
116
117impl PartialOrd for PeerId {
118 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119 Some(self.cmp(other))
120 }
121}
122
123impl std::hash::Hash for PeerId {
124 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125 self.owner_id.hash(state);
126 self.id.hash(state);
127 }
128}
129
130impl fmt::Display for PeerId {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(f, "{}/{}", self.owner_id, self.id)
133 }
134}
135
136messages!(
137 (Ack, Foreground),
138 (AckBufferOperation, Background),
139 (AckChannelMessage, Background),
140 (AddNotification, Foreground),
141 (AddProjectCollaborator, Foreground),
142 (ApplyCodeAction, Background),
143 (ApplyCodeActionResponse, Background),
144 (ApplyCompletionAdditionalEdits, Background),
145 (ApplyCompletionAdditionalEditsResponse, Background),
146 (BufferReloaded, Foreground),
147 (BufferSaved, Foreground),
148 (Call, Foreground),
149 (CallCanceled, Foreground),
150 (CancelCall, Foreground),
151 (ChannelMessageSent, Foreground),
152 (ChannelMessageUpdate, Foreground),
153 (CompleteWithLanguageModel, Background),
154 (ComputeEmbeddings, Background),
155 (ComputeEmbeddingsResponse, Background),
156 (CopyProjectEntry, Foreground),
157 (CountTokensWithLanguageModel, Background),
158 (CountTokensResponse, Background),
159 (CreateBufferForPeer, Foreground),
160 (CreateChannel, Foreground),
161 (CreateChannelResponse, Foreground),
162 (CreateProjectEntry, Foreground),
163 (CreateRoom, Foreground),
164 (CreateRoomResponse, Foreground),
165 (DeclineCall, Foreground),
166 (DeleteChannel, Foreground),
167 (DeleteNotification, Foreground),
168 (UpdateNotification, Foreground),
169 (DeleteProjectEntry, Foreground),
170 (EndStream, Foreground),
171 (Error, Foreground),
172 (ExpandProjectEntry, Foreground),
173 (ExpandProjectEntryResponse, Foreground),
174 (Follow, Foreground),
175 (FollowResponse, Foreground),
176 (FormatBuffers, Foreground),
177 (FormatBuffersResponse, Foreground),
178 (FuzzySearchUsers, Foreground),
179 (GetCachedEmbeddings, Background),
180 (GetCachedEmbeddingsResponse, Background),
181 (GetChannelMembers, Foreground),
182 (GetChannelMembersResponse, Foreground),
183 (GetChannelMessages, Background),
184 (GetChannelMessagesById, Background),
185 (GetChannelMessagesResponse, Background),
186 (GetCodeActions, Background),
187 (GetCodeActionsResponse, Background),
188 (GetCompletions, Background),
189 (GetCompletionsResponse, Background),
190 (GetDefinition, Background),
191 (GetDefinitionResponse, Background),
192 (GetDocumentHighlights, Background),
193 (GetDocumentHighlightsResponse, Background),
194 (GetHover, Background),
195 (GetHoverResponse, Background),
196 (GetNotifications, Foreground),
197 (GetNotificationsResponse, Foreground),
198 (GetPrivateUserInfo, Foreground),
199 (GetPrivateUserInfoResponse, Foreground),
200 (GetProjectSymbols, Background),
201 (GetProjectSymbolsResponse, Background),
202 (GetReferences, Background),
203 (GetReferencesResponse, Background),
204 (GetTypeDefinition, Background),
205 (GetTypeDefinitionResponse, Background),
206 (GetImplementation, Background),
207 (GetImplementationResponse, Background),
208 (GetUsers, Foreground),
209 (Hello, Foreground),
210 (IncomingCall, Foreground),
211 (InlayHints, Background),
212 (InlayHintsResponse, Background),
213 (InviteChannelMember, Foreground),
214 (JoinChannel, Foreground),
215 (JoinChannelBuffer, Foreground),
216 (JoinChannelBufferResponse, Foreground),
217 (JoinChannelChat, Foreground),
218 (JoinChannelChatResponse, Foreground),
219 (JoinProject, Foreground),
220 (JoinHostedProject, Foreground),
221 (JoinProjectResponse, Foreground),
222 (JoinRoom, Foreground),
223 (JoinRoomResponse, Foreground),
224 (LanguageModelResponse, Background),
225 (LeaveChannelBuffer, Background),
226 (LeaveChannelChat, Foreground),
227 (LeaveProject, Foreground),
228 (LeaveRoom, Foreground),
229 (MarkNotificationRead, Foreground),
230 (MoveChannel, Foreground),
231 (OnTypeFormatting, Background),
232 (OnTypeFormattingResponse, Background),
233 (OpenBufferById, Background),
234 (OpenBufferByPath, Background),
235 (OpenBufferForSymbol, Background),
236 (OpenBufferForSymbolResponse, Background),
237 (OpenBufferResponse, Background),
238 (PerformRename, Background),
239 (PerformRenameResponse, Background),
240 (Ping, Foreground),
241 (PrepareRename, Background),
242 (PrepareRenameResponse, Background),
243 (ProjectEntryResponse, Foreground),
244 (RefreshInlayHints, Foreground),
245 (RejoinChannelBuffers, Foreground),
246 (RejoinChannelBuffersResponse, Foreground),
247 (RejoinRoom, Foreground),
248 (RejoinRoomResponse, Foreground),
249 (ReloadBuffers, Foreground),
250 (ReloadBuffersResponse, Foreground),
251 (RemoveChannelMember, Foreground),
252 (RemoveChannelMessage, Foreground),
253 (UpdateChannelMessage, Foreground),
254 (RemoveContact, Foreground),
255 (RemoveProjectCollaborator, Foreground),
256 (RenameChannel, Foreground),
257 (RenameChannelResponse, Foreground),
258 (RenameProjectEntry, Foreground),
259 (RequestContact, Foreground),
260 (ResolveCompletionDocumentation, Background),
261 (ResolveCompletionDocumentationResponse, Background),
262 (ResolveInlayHint, Background),
263 (ResolveInlayHintResponse, Background),
264 (RespondToChannelInvite, Foreground),
265 (RespondToContactRequest, Foreground),
266 (RoomUpdated, Foreground),
267 (SaveBuffer, Foreground),
268 (SetChannelMemberRole, Foreground),
269 (SetChannelVisibility, Foreground),
270 (SearchProject, Background),
271 (SearchProjectResponse, Background),
272 (SendChannelMessage, Background),
273 (SendChannelMessageResponse, Background),
274 (ShareProject, Foreground),
275 (ShareProjectResponse, Foreground),
276 (ShowContacts, Foreground),
277 (StartLanguageServer, Foreground),
278 (SynchronizeBuffers, Foreground),
279 (SynchronizeBuffersResponse, Foreground),
280 (Test, Foreground),
281 (Unfollow, Foreground),
282 (UnshareProject, Foreground),
283 (UpdateBuffer, Foreground),
284 (UpdateBufferFile, Foreground),
285 (UpdateChannelBuffer, Foreground),
286 (UpdateChannelBufferCollaborators, Foreground),
287 (UpdateChannels, Foreground),
288 (UpdateUserChannels, Foreground),
289 (UpdateContacts, Foreground),
290 (UpdateDiagnosticSummary, Foreground),
291 (UpdateDiffBase, Foreground),
292 (UpdateFollowers, Foreground),
293 (UpdateInviteInfo, Foreground),
294 (UpdateLanguageServer, Foreground),
295 (UpdateParticipantLocation, Foreground),
296 (UpdateProject, Foreground),
297 (UpdateProjectCollaborator, Foreground),
298 (UpdateWorktree, Foreground),
299 (UpdateWorktreeSettings, Foreground),
300 (UsersResponse, Foreground),
301 (LspExtExpandMacro, Background),
302 (LspExtExpandMacroResponse, Background),
303 (SetRoomParticipantRole, Foreground),
304 (BlameBuffer, Foreground),
305 (BlameBufferResponse, Foreground),
306 (CreateRemoteProject, Background),
307 (CreateRemoteProjectResponse, Foreground),
308 (CreateDevServer, Foreground),
309 (CreateDevServerResponse, Foreground),
310 (DevServerInstructions, Foreground),
311 (ShutdownDevServer, Foreground),
312 (ReconnectDevServer, Foreground),
313 (ReconnectDevServerResponse, Foreground),
314 (ShareRemoteProject, Foreground),
315 (JoinRemoteProject, Foreground),
316 (RejoinRemoteProjects, Foreground),
317 (RejoinRemoteProjectsResponse, Foreground),
318 (MultiLspQuery, Background),
319 (MultiLspQueryResponse, Background),
320 (RemoteProjectsUpdate, Foreground),
321 (ValidateRemoteProjectRequest, Background),
322 (DeleteDevServer, Foreground)
323);
324
325request_messages!(
326 (ApplyCodeAction, ApplyCodeActionResponse),
327 (
328 ApplyCompletionAdditionalEdits,
329 ApplyCompletionAdditionalEditsResponse
330 ),
331 (Call, Ack),
332 (CancelCall, Ack),
333 (CopyProjectEntry, ProjectEntryResponse),
334 (CompleteWithLanguageModel, LanguageModelResponse),
335 (ComputeEmbeddings, ComputeEmbeddingsResponse),
336 (CountTokensWithLanguageModel, CountTokensResponse),
337 (CreateChannel, CreateChannelResponse),
338 (CreateProjectEntry, ProjectEntryResponse),
339 (CreateRoom, CreateRoomResponse),
340 (DeclineCall, Ack),
341 (DeleteChannel, Ack),
342 (DeleteProjectEntry, ProjectEntryResponse),
343 (ExpandProjectEntry, ExpandProjectEntryResponse),
344 (Follow, FollowResponse),
345 (FormatBuffers, FormatBuffersResponse),
346 (FuzzySearchUsers, UsersResponse),
347 (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
348 (GetChannelMembers, GetChannelMembersResponse),
349 (GetChannelMessages, GetChannelMessagesResponse),
350 (GetChannelMessagesById, GetChannelMessagesResponse),
351 (GetCodeActions, GetCodeActionsResponse),
352 (GetCompletions, GetCompletionsResponse),
353 (GetDefinition, GetDefinitionResponse),
354 (GetImplementation, GetImplementationResponse),
355 (GetDocumentHighlights, GetDocumentHighlightsResponse),
356 (GetHover, GetHoverResponse),
357 (GetNotifications, GetNotificationsResponse),
358 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
359 (GetProjectSymbols, GetProjectSymbolsResponse),
360 (GetReferences, GetReferencesResponse),
361 (GetTypeDefinition, GetTypeDefinitionResponse),
362 (GetUsers, UsersResponse),
363 (IncomingCall, Ack),
364 (InlayHints, InlayHintsResponse),
365 (InviteChannelMember, Ack),
366 (JoinChannel, JoinRoomResponse),
367 (JoinChannelBuffer, JoinChannelBufferResponse),
368 (JoinChannelChat, JoinChannelChatResponse),
369 (JoinHostedProject, JoinProjectResponse),
370 (JoinProject, JoinProjectResponse),
371 (JoinRoom, JoinRoomResponse),
372 (LeaveChannelBuffer, Ack),
373 (LeaveRoom, Ack),
374 (MarkNotificationRead, Ack),
375 (MoveChannel, Ack),
376 (OnTypeFormatting, OnTypeFormattingResponse),
377 (OpenBufferById, OpenBufferResponse),
378 (OpenBufferByPath, OpenBufferResponse),
379 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
380 (PerformRename, PerformRenameResponse),
381 (Ping, Ack),
382 (PrepareRename, PrepareRenameResponse),
383 (RefreshInlayHints, Ack),
384 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
385 (RejoinRoom, RejoinRoomResponse),
386 (ReloadBuffers, ReloadBuffersResponse),
387 (RemoveChannelMember, Ack),
388 (RemoveChannelMessage, Ack),
389 (UpdateChannelMessage, Ack),
390 (RemoveContact, Ack),
391 (RenameChannel, RenameChannelResponse),
392 (RenameProjectEntry, ProjectEntryResponse),
393 (RequestContact, Ack),
394 (
395 ResolveCompletionDocumentation,
396 ResolveCompletionDocumentationResponse
397 ),
398 (ResolveInlayHint, ResolveInlayHintResponse),
399 (RespondToChannelInvite, Ack),
400 (RespondToContactRequest, Ack),
401 (SaveBuffer, BufferSaved),
402 (SearchProject, SearchProjectResponse),
403 (SendChannelMessage, SendChannelMessageResponse),
404 (SetChannelMemberRole, Ack),
405 (SetChannelVisibility, Ack),
406 (ShareProject, ShareProjectResponse),
407 (SynchronizeBuffers, SynchronizeBuffersResponse),
408 (Test, Test),
409 (UpdateBuffer, Ack),
410 (UpdateParticipantLocation, Ack),
411 (UpdateProject, Ack),
412 (UpdateWorktree, Ack),
413 (LspExtExpandMacro, LspExtExpandMacroResponse),
414 (SetRoomParticipantRole, Ack),
415 (BlameBuffer, BlameBufferResponse),
416 (CreateRemoteProject, CreateRemoteProjectResponse),
417 (CreateDevServer, CreateDevServerResponse),
418 (ShutdownDevServer, Ack),
419 (ShareRemoteProject, ShareProjectResponse),
420 (JoinRemoteProject, JoinProjectResponse),
421 (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
422 (ReconnectDevServer, ReconnectDevServerResponse),
423 (ValidateRemoteProjectRequest, Ack),
424 (MultiLspQuery, MultiLspQueryResponse),
425 (DeleteDevServer, Ack),
426);
427
428entity_messages!(
429 {project_id, ShareProject},
430 AddProjectCollaborator,
431 ApplyCodeAction,
432 ApplyCompletionAdditionalEdits,
433 BlameBuffer,
434 BufferReloaded,
435 BufferSaved,
436 CopyProjectEntry,
437 CreateBufferForPeer,
438 CreateProjectEntry,
439 DeleteProjectEntry,
440 ExpandProjectEntry,
441 FormatBuffers,
442 GetCodeActions,
443 GetCompletions,
444 GetDefinition,
445 GetImplementation,
446 GetDocumentHighlights,
447 GetHover,
448 GetProjectSymbols,
449 GetReferences,
450 GetTypeDefinition,
451 InlayHints,
452 JoinProject,
453 LeaveProject,
454 MultiLspQuery,
455 OnTypeFormatting,
456 OpenBufferById,
457 OpenBufferByPath,
458 OpenBufferForSymbol,
459 PerformRename,
460 PrepareRename,
461 RefreshInlayHints,
462 ReloadBuffers,
463 RemoveProjectCollaborator,
464 RenameProjectEntry,
465 ResolveCompletionDocumentation,
466 ResolveInlayHint,
467 SaveBuffer,
468 SearchProject,
469 StartLanguageServer,
470 SynchronizeBuffers,
471 UnshareProject,
472 UpdateBuffer,
473 UpdateBufferFile,
474 UpdateDiagnosticSummary,
475 UpdateDiffBase,
476 UpdateLanguageServer,
477 UpdateProject,
478 UpdateProjectCollaborator,
479 UpdateWorktree,
480 UpdateWorktreeSettings,
481 LspExtExpandMacro,
482);
483
484entity_messages!(
485 {channel_id, Channel},
486 ChannelMessageSent,
487 ChannelMessageUpdate,
488 RemoveChannelMessage,
489 UpdateChannelMessage,
490 UpdateChannelBuffer,
491 UpdateChannelBufferCollaborators,
492);
493
494const KIB: usize = 1024;
495const MIB: usize = KIB * 1024;
496const MAX_BUFFER_LEN: usize = MIB;
497
498/// A stream of protobuf messages.
499pub struct MessageStream<S> {
500 stream: S,
501 encoding_buffer: Vec<u8>,
502}
503
504#[allow(clippy::large_enum_variant)]
505#[derive(Debug)]
506pub enum Message {
507 Envelope(Envelope),
508 Ping,
509 Pong,
510}
511
512impl<S> MessageStream<S> {
513 pub fn new(stream: S) -> Self {
514 Self {
515 stream,
516 encoding_buffer: Vec::new(),
517 }
518 }
519
520 pub fn inner_mut(&mut self) -> &mut S {
521 &mut self.stream
522 }
523}
524
525impl<S> MessageStream<S>
526where
527 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
528{
529 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
530 #[cfg(any(test, feature = "test-support"))]
531 const COMPRESSION_LEVEL: i32 = -7;
532
533 #[cfg(not(any(test, feature = "test-support")))]
534 const COMPRESSION_LEVEL: i32 = 4;
535
536 match message {
537 Message::Envelope(message) => {
538 self.encoding_buffer.reserve(message.encoded_len());
539 message
540 .encode(&mut self.encoding_buffer)
541 .map_err(io::Error::from)?;
542 let buffer =
543 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
544 .unwrap();
545
546 self.encoding_buffer.clear();
547 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
548 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
549 }
550 Message::Ping => {
551 self.stream
552 .send(WebSocketMessage::Ping(Default::default()))
553 .await?;
554 }
555 Message::Pong => {
556 self.stream
557 .send(WebSocketMessage::Pong(Default::default()))
558 .await?;
559 }
560 }
561
562 Ok(())
563 }
564}
565
566impl<S> MessageStream<S>
567where
568 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
569{
570 pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
571 while let Some(bytes) = self.stream.next().await {
572 let received_at = Instant::now();
573 match bytes? {
574 WebSocketMessage::Binary(bytes) => {
575 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
576 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
577 .map_err(io::Error::from)?;
578
579 self.encoding_buffer.clear();
580 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
581 return Ok((Message::Envelope(envelope), received_at));
582 }
583 WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
584 WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
585 WebSocketMessage::Close(_) => break,
586 _ => {}
587 }
588 }
589 Err(anyhow!("connection closed"))
590 }
591}
592
593impl From<Timestamp> for SystemTime {
594 fn from(val: Timestamp) -> Self {
595 UNIX_EPOCH
596 .checked_add(Duration::new(val.seconds, val.nanos))
597 .unwrap()
598 }
599}
600
601impl From<SystemTime> for Timestamp {
602 fn from(time: SystemTime) -> Self {
603 let duration = time.duration_since(UNIX_EPOCH).unwrap();
604 Self {
605 seconds: duration.as_secs(),
606 nanos: duration.subsec_nanos(),
607 }
608 }
609}
610
611impl From<u128> for Nonce {
612 fn from(nonce: u128) -> Self {
613 let upper_half = (nonce >> 64) as u64;
614 let lower_half = nonce as u64;
615 Self {
616 upper_half,
617 lower_half,
618 }
619 }
620}
621
622impl From<Nonce> for u128 {
623 fn from(nonce: Nonce) -> Self {
624 let upper_half = (nonce.upper_half as u128) << 64;
625 let lower_half = nonce.lower_half as u128;
626 upper_half | lower_half
627 }
628}
629
630pub fn split_worktree_update(
631 mut message: UpdateWorktree,
632 max_chunk_size: usize,
633) -> impl Iterator<Item = UpdateWorktree> {
634 let mut done_files = false;
635
636 let mut repository_map = message
637 .updated_repositories
638 .into_iter()
639 .map(|repo| (repo.work_directory_id, repo))
640 .collect::<HashMap<_, _>>();
641
642 iter::from_fn(move || {
643 if done_files {
644 return None;
645 }
646
647 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
648 let updated_entries: Vec<_> = message
649 .updated_entries
650 .drain(..updated_entries_chunk_size)
651 .collect();
652
653 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
654 let removed_entries = message
655 .removed_entries
656 .drain(..removed_entries_chunk_size)
657 .collect();
658
659 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
660
661 let mut updated_repositories = Vec::new();
662
663 if !repository_map.is_empty() {
664 for entry in &updated_entries {
665 if let Some(repo) = repository_map.remove(&entry.id) {
666 updated_repositories.push(repo)
667 }
668 }
669 }
670
671 let removed_repositories = if done_files {
672 mem::take(&mut message.removed_repositories)
673 } else {
674 Default::default()
675 };
676
677 if done_files {
678 updated_repositories.extend(mem::take(&mut repository_map).into_values());
679 }
680
681 Some(UpdateWorktree {
682 project_id: message.project_id,
683 worktree_id: message.worktree_id,
684 root_name: message.root_name.clone(),
685 abs_path: message.abs_path.clone(),
686 updated_entries,
687 removed_entries,
688 scan_id: message.scan_id,
689 is_last_update: done_files && message.is_last_update,
690 updated_repositories,
691 removed_repositories,
692 })
693 })
694}
695
696#[cfg(test)]
697mod tests {
698 use super::*;
699
700 #[gpui::test]
701 async fn test_buffer_size() {
702 let (tx, rx) = futures::channel::mpsc::unbounded();
703 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
704 sink.write(Message::Envelope(Envelope {
705 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
706 root_name: "abcdefg".repeat(10),
707 ..Default::default()
708 })),
709 ..Default::default()
710 }))
711 .await
712 .unwrap();
713 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
714 sink.write(Message::Envelope(Envelope {
715 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
716 root_name: "abcdefg".repeat(1000000),
717 ..Default::default()
718 })),
719 ..Default::default()
720 }))
721 .await
722 .unwrap();
723 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
724
725 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
726 stream.read().await.unwrap();
727 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
728 stream.read().await.unwrap();
729 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
730 }
731
732 #[gpui::test]
733 fn test_converting_peer_id_from_and_to_u64() {
734 let peer_id = PeerId {
735 owner_id: 10,
736 id: 3,
737 };
738 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
739 let peer_id = PeerId {
740 owner_id: u32::MAX,
741 id: 3,
742 };
743 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
744 let peer_id = PeerId {
745 owner_id: 10,
746 id: u32::MAX,
747 };
748 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
749 let peer_id = PeerId {
750 owner_id: u32::MAX,
751 id: u32::MAX,
752 };
753 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
754 }
755}