1use super::{
2 proto,
3 user::{User, UserStore},
4 Client, Status, Subscription, TypedEnvelope,
5};
6use anyhow::{anyhow, Context, Result};
7use futures::lock::Mutex;
8use gpui::{
9 AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
10};
11use postage::prelude::Stream;
12use rand::prelude::*;
13use std::{
14 collections::{HashMap, HashSet},
15 mem,
16 ops::Range,
17 sync::Arc,
18};
19use sum_tree::{Bias, SumTree};
20use time::OffsetDateTime;
21use util::{post_inc, ResultExt as _, TryFutureExt};
22
23pub struct ChannelList {
24 available_channels: Option<Vec<ChannelDetails>>,
25 channels: HashMap<u64, WeakModelHandle<Channel>>,
26 client: Arc<Client>,
27 user_store: ModelHandle<UserStore>,
28 _task: Task<Option<()>>,
29}
30
31#[derive(Clone, Debug, PartialEq)]
32pub struct ChannelDetails {
33 pub id: u64,
34 pub name: String,
35}
36
37pub struct Channel {
38 details: ChannelDetails,
39 messages: SumTree<ChannelMessage>,
40 loaded_all_messages: bool,
41 next_pending_message_id: usize,
42 user_store: ModelHandle<UserStore>,
43 rpc: Arc<Client>,
44 outgoing_messages_lock: Arc<Mutex<()>>,
45 rng: StdRng,
46 _subscription: Subscription,
47}
48
49#[derive(Clone, Debug)]
50pub struct ChannelMessage {
51 pub id: ChannelMessageId,
52 pub body: String,
53 pub timestamp: OffsetDateTime,
54 pub sender: Arc<User>,
55 pub nonce: u128,
56}
57
58#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
59pub enum ChannelMessageId {
60 Saved(u64),
61 Pending(usize),
62}
63
64#[derive(Clone, Debug, Default)]
65pub struct ChannelMessageSummary {
66 max_id: ChannelMessageId,
67 count: usize,
68}
69
70#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
71struct Count(usize);
72
73pub enum ChannelListEvent {}
74
75#[derive(Clone, Debug, PartialEq)]
76pub enum ChannelEvent {
77 MessagesUpdated {
78 old_range: Range<usize>,
79 new_count: usize,
80 },
81}
82
83impl Entity for ChannelList {
84 type Event = ChannelListEvent;
85}
86
87impl ChannelList {
88 pub fn new(
89 user_store: ModelHandle<UserStore>,
90 rpc: Arc<Client>,
91 cx: &mut ModelContext<Self>,
92 ) -> Self {
93 let _task = cx.spawn_weak(|this, mut cx| {
94 let rpc = rpc.clone();
95 async move {
96 let mut status = rpc.status();
97 while let Some((status, this)) = status.recv().await.zip(this.upgrade(&cx)) {
98 match status {
99 Status::Connected { .. } => {
100 let response = rpc
101 .request(proto::GetChannels {})
102 .await
103 .context("failed to fetch available channels")?;
104 this.update(&mut cx, |this, cx| {
105 this.available_channels =
106 Some(response.channels.into_iter().map(Into::into).collect());
107
108 let mut to_remove = Vec::new();
109 for (channel_id, channel) in &this.channels {
110 if let Some(channel) = channel.upgrade(cx) {
111 channel.update(cx, |channel, cx| channel.rejoin(cx))
112 } else {
113 to_remove.push(*channel_id);
114 }
115 }
116
117 for channel_id in to_remove {
118 this.channels.remove(&channel_id);
119 }
120 cx.notify();
121 });
122 }
123 Status::SignedOut { .. } => {
124 this.update(&mut cx, |this, cx| {
125 this.available_channels = None;
126 this.channels.clear();
127 cx.notify();
128 });
129 }
130 _ => {}
131 }
132 }
133 Ok(())
134 }
135 .log_err()
136 });
137
138 Self {
139 available_channels: None,
140 channels: Default::default(),
141 user_store,
142 client: rpc,
143 _task,
144 }
145 }
146
147 pub fn available_channels(&self) -> Option<&[ChannelDetails]> {
148 self.available_channels.as_deref()
149 }
150
151 pub fn get_channel(
152 &mut self,
153 id: u64,
154 cx: &mut MutableAppContext,
155 ) -> Option<ModelHandle<Channel>> {
156 if let Some(channel) = self.channels.get(&id).and_then(|c| c.upgrade(cx)) {
157 return Some(channel);
158 }
159
160 let channels = self.available_channels.as_ref()?;
161 let details = channels.iter().find(|details| details.id == id)?.clone();
162 let channel = cx.add_model(|cx| {
163 Channel::new(details, self.user_store.clone(), self.client.clone(), cx)
164 });
165 self.channels.insert(id, channel.downgrade());
166 Some(channel)
167 }
168}
169
170impl Entity for Channel {
171 type Event = ChannelEvent;
172
173 fn release(&mut self, _: &mut MutableAppContext) {
174 self.rpc
175 .send(proto::LeaveChannel {
176 channel_id: self.details.id,
177 })
178 .log_err();
179 }
180}
181
182impl Channel {
183 pub fn init(rpc: &Arc<Client>) {
184 rpc.add_model_message_handler(Self::handle_message_sent);
185 }
186
187 pub fn new(
188 details: ChannelDetails,
189 user_store: ModelHandle<UserStore>,
190 rpc: Arc<Client>,
191 cx: &mut ModelContext<Self>,
192 ) -> Self {
193 let _subscription = rpc.add_model_for_remote_entity(details.id, cx);
194
195 {
196 let user_store = user_store.clone();
197 let rpc = rpc.clone();
198 let channel_id = details.id;
199 cx.spawn(|channel, mut cx| {
200 async move {
201 let response = rpc.request(proto::JoinChannel { channel_id }).await?;
202 let messages =
203 messages_from_proto(response.messages, &user_store, &mut cx).await?;
204 let loaded_all_messages = response.done;
205
206 channel.update(&mut cx, |channel, cx| {
207 channel.insert_messages(messages, cx);
208 channel.loaded_all_messages = loaded_all_messages;
209 });
210
211 Ok(())
212 }
213 .log_err()
214 })
215 .detach();
216 }
217
218 Self {
219 details,
220 user_store,
221 rpc,
222 outgoing_messages_lock: Default::default(),
223 messages: Default::default(),
224 loaded_all_messages: false,
225 next_pending_message_id: 0,
226 rng: StdRng::from_entropy(),
227 _subscription,
228 }
229 }
230
231 pub fn name(&self) -> &str {
232 &self.details.name
233 }
234
235 pub fn send_message(
236 &mut self,
237 body: String,
238 cx: &mut ModelContext<Self>,
239 ) -> Result<Task<Result<()>>> {
240 if body.is_empty() {
241 Err(anyhow!("message body can't be empty"))?;
242 }
243
244 let current_user = self
245 .user_store
246 .read(cx)
247 .current_user()
248 .ok_or_else(|| anyhow!("current_user is not present"))?;
249
250 let channel_id = self.details.id;
251 let pending_id = ChannelMessageId::Pending(post_inc(&mut self.next_pending_message_id));
252 let nonce = self.rng.gen();
253 self.insert_messages(
254 SumTree::from_item(
255 ChannelMessage {
256 id: pending_id,
257 body: body.clone(),
258 sender: current_user,
259 timestamp: OffsetDateTime::now_utc(),
260 nonce,
261 },
262 &(),
263 ),
264 cx,
265 );
266 let user_store = self.user_store.clone();
267 let rpc = self.rpc.clone();
268 let outgoing_messages_lock = self.outgoing_messages_lock.clone();
269 Ok(cx.spawn(|this, mut cx| async move {
270 let outgoing_message_guard = outgoing_messages_lock.lock().await;
271 let request = rpc.request(proto::SendChannelMessage {
272 channel_id,
273 body,
274 nonce: Some(nonce.into()),
275 });
276 let response = request.await?;
277 drop(outgoing_message_guard);
278 let message = ChannelMessage::from_proto(
279 response.message.ok_or_else(|| anyhow!("invalid message"))?,
280 &user_store,
281 &mut cx,
282 )
283 .await?;
284 this.update(&mut cx, |this, cx| {
285 this.insert_messages(SumTree::from_item(message, &()), cx);
286 Ok(())
287 })
288 }))
289 }
290
291 pub fn load_more_messages(&mut self, cx: &mut ModelContext<Self>) -> bool {
292 if !self.loaded_all_messages {
293 let rpc = self.rpc.clone();
294 let user_store = self.user_store.clone();
295 let channel_id = self.details.id;
296 if let Some(before_message_id) =
297 self.messages.first().and_then(|message| match message.id {
298 ChannelMessageId::Saved(id) => Some(id),
299 ChannelMessageId::Pending(_) => None,
300 })
301 {
302 cx.spawn(|this, mut cx| {
303 async move {
304 let response = rpc
305 .request(proto::GetChannelMessages {
306 channel_id,
307 before_message_id,
308 })
309 .await?;
310 let loaded_all_messages = response.done;
311 let messages =
312 messages_from_proto(response.messages, &user_store, &mut cx).await?;
313 this.update(&mut cx, |this, cx| {
314 this.loaded_all_messages = loaded_all_messages;
315 this.insert_messages(messages, cx);
316 });
317 Ok(())
318 }
319 .log_err()
320 })
321 .detach();
322 return true;
323 }
324 }
325 false
326 }
327
328 pub fn rejoin(&mut self, cx: &mut ModelContext<Self>) {
329 let user_store = self.user_store.clone();
330 let rpc = self.rpc.clone();
331 let channel_id = self.details.id;
332 cx.spawn(|this, mut cx| {
333 async move {
334 let response = rpc.request(proto::JoinChannel { channel_id }).await?;
335 let messages = messages_from_proto(response.messages, &user_store, &mut cx).await?;
336 let loaded_all_messages = response.done;
337
338 let pending_messages = this.update(&mut cx, |this, cx| {
339 if let Some((first_new_message, last_old_message)) =
340 messages.first().zip(this.messages.last())
341 {
342 if first_new_message.id > last_old_message.id {
343 let old_messages = mem::take(&mut this.messages);
344 cx.emit(ChannelEvent::MessagesUpdated {
345 old_range: 0..old_messages.summary().count,
346 new_count: 0,
347 });
348 this.loaded_all_messages = loaded_all_messages;
349 }
350 }
351
352 this.insert_messages(messages, cx);
353 if loaded_all_messages {
354 this.loaded_all_messages = loaded_all_messages;
355 }
356
357 this.pending_messages().cloned().collect::<Vec<_>>()
358 });
359
360 for pending_message in pending_messages {
361 let request = rpc.request(proto::SendChannelMessage {
362 channel_id,
363 body: pending_message.body,
364 nonce: Some(pending_message.nonce.into()),
365 });
366 let response = request.await?;
367 let message = ChannelMessage::from_proto(
368 response.message.ok_or_else(|| anyhow!("invalid message"))?,
369 &user_store,
370 &mut cx,
371 )
372 .await?;
373 this.update(&mut cx, |this, cx| {
374 this.insert_messages(SumTree::from_item(message, &()), cx);
375 });
376 }
377
378 Ok(())
379 }
380 .log_err()
381 })
382 .detach();
383 }
384
385 pub fn message_count(&self) -> usize {
386 self.messages.summary().count
387 }
388
389 pub fn messages(&self) -> &SumTree<ChannelMessage> {
390 &self.messages
391 }
392
393 pub fn message(&self, ix: usize) -> &ChannelMessage {
394 let mut cursor = self.messages.cursor::<Count>();
395 cursor.seek(&Count(ix), Bias::Right, &());
396 cursor.item().unwrap()
397 }
398
399 pub fn messages_in_range(&self, range: Range<usize>) -> impl Iterator<Item = &ChannelMessage> {
400 let mut cursor = self.messages.cursor::<Count>();
401 cursor.seek(&Count(range.start), Bias::Right, &());
402 cursor.take(range.len())
403 }
404
405 pub fn pending_messages(&self) -> impl Iterator<Item = &ChannelMessage> {
406 let mut cursor = self.messages.cursor::<ChannelMessageId>();
407 cursor.seek(&ChannelMessageId::Pending(0), Bias::Left, &());
408 cursor
409 }
410
411 async fn handle_message_sent(
412 this: ModelHandle<Self>,
413 message: TypedEnvelope<proto::ChannelMessageSent>,
414 _: Arc<Client>,
415 mut cx: AsyncAppContext,
416 ) -> Result<()> {
417 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
418 let message = message
419 .payload
420 .message
421 .ok_or_else(|| anyhow!("empty message"))?;
422
423 let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
424 this.update(&mut cx, |this, cx| {
425 this.insert_messages(SumTree::from_item(message, &()), cx)
426 });
427
428 Ok(())
429 }
430
431 fn insert_messages(&mut self, messages: SumTree<ChannelMessage>, cx: &mut ModelContext<Self>) {
432 if let Some((first_message, last_message)) = messages.first().zip(messages.last()) {
433 let nonces = messages
434 .cursor::<()>()
435 .map(|m| m.nonce)
436 .collect::<HashSet<_>>();
437
438 let mut old_cursor = self.messages.cursor::<(ChannelMessageId, Count)>();
439 let mut new_messages = old_cursor.slice(&first_message.id, Bias::Left, &());
440 let start_ix = old_cursor.start().1 .0;
441 let removed_messages = old_cursor.slice(&last_message.id, Bias::Right, &());
442 let removed_count = removed_messages.summary().count;
443 let new_count = messages.summary().count;
444 let end_ix = start_ix + removed_count;
445
446 new_messages.push_tree(messages, &());
447
448 let mut ranges = Vec::<Range<usize>>::new();
449 if new_messages.last().unwrap().is_pending() {
450 new_messages.push_tree(old_cursor.suffix(&()), &());
451 } else {
452 new_messages.push_tree(
453 old_cursor.slice(&ChannelMessageId::Pending(0), Bias::Left, &()),
454 &(),
455 );
456
457 while let Some(message) = old_cursor.item() {
458 let message_ix = old_cursor.start().1 .0;
459 if nonces.contains(&message.nonce) {
460 if ranges.last().map_or(false, |r| r.end == message_ix) {
461 ranges.last_mut().unwrap().end += 1;
462 } else {
463 ranges.push(message_ix..message_ix + 1);
464 }
465 } else {
466 new_messages.push(message.clone(), &());
467 }
468 old_cursor.next(&());
469 }
470 }
471
472 drop(old_cursor);
473 self.messages = new_messages;
474
475 for range in ranges.into_iter().rev() {
476 cx.emit(ChannelEvent::MessagesUpdated {
477 old_range: range,
478 new_count: 0,
479 });
480 }
481 cx.emit(ChannelEvent::MessagesUpdated {
482 old_range: start_ix..end_ix,
483 new_count,
484 });
485 cx.notify();
486 }
487 }
488}
489
490async fn messages_from_proto(
491 proto_messages: Vec<proto::ChannelMessage>,
492 user_store: &ModelHandle<UserStore>,
493 cx: &mut AsyncAppContext,
494) -> Result<SumTree<ChannelMessage>> {
495 let unique_user_ids = proto_messages
496 .iter()
497 .map(|m| m.sender_id)
498 .collect::<HashSet<_>>()
499 .into_iter()
500 .collect();
501 user_store
502 .update(cx, |user_store, cx| {
503 user_store.get_users(unique_user_ids, cx)
504 })
505 .await?;
506
507 let mut messages = Vec::with_capacity(proto_messages.len());
508 for message in proto_messages {
509 messages.push(ChannelMessage::from_proto(message, user_store, cx).await?);
510 }
511 let mut result = SumTree::new();
512 result.extend(messages, &());
513 Ok(result)
514}
515
516impl From<proto::Channel> for ChannelDetails {
517 fn from(message: proto::Channel) -> Self {
518 Self {
519 id: message.id,
520 name: message.name,
521 }
522 }
523}
524
525impl ChannelMessage {
526 pub async fn from_proto(
527 message: proto::ChannelMessage,
528 user_store: &ModelHandle<UserStore>,
529 cx: &mut AsyncAppContext,
530 ) -> Result<Self> {
531 let sender = user_store
532 .update(cx, |user_store, cx| {
533 user_store.get_user(message.sender_id, cx)
534 })
535 .await?;
536 Ok(ChannelMessage {
537 id: ChannelMessageId::Saved(message.id),
538 body: message.body,
539 timestamp: OffsetDateTime::from_unix_timestamp(message.timestamp as i64)?,
540 sender,
541 nonce: message
542 .nonce
543 .ok_or_else(|| anyhow!("nonce is required"))?
544 .into(),
545 })
546 }
547
548 pub fn is_pending(&self) -> bool {
549 matches!(self.id, ChannelMessageId::Pending(_))
550 }
551}
552
553impl sum_tree::Item for ChannelMessage {
554 type Summary = ChannelMessageSummary;
555
556 fn summary(&self) -> Self::Summary {
557 ChannelMessageSummary {
558 max_id: self.id,
559 count: 1,
560 }
561 }
562}
563
564impl Default for ChannelMessageId {
565 fn default() -> Self {
566 Self::Saved(0)
567 }
568}
569
570impl sum_tree::Summary for ChannelMessageSummary {
571 type Context = ();
572
573 fn add_summary(&mut self, summary: &Self, _: &()) {
574 self.max_id = summary.max_id;
575 self.count += summary.count;
576 }
577}
578
579impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for ChannelMessageId {
580 fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
581 debug_assert!(summary.max_id > *self);
582 *self = summary.max_id;
583 }
584}
585
586impl<'a> sum_tree::Dimension<'a, ChannelMessageSummary> for Count {
587 fn add_summary(&mut self, summary: &'a ChannelMessageSummary, _: &()) {
588 self.0 += summary.count;
589 }
590}
591
592#[cfg(test)]
593mod tests {
594 use super::*;
595 use crate::test::{FakeHttpClient, FakeServer};
596 use gpui::TestAppContext;
597
598 #[gpui::test]
599 async fn test_channel_messages(cx: &mut TestAppContext) {
600 cx.foreground().forbid_parking();
601
602 let user_id = 5;
603 let http_client = FakeHttpClient::with_404_response();
604 let client = cx.update(|cx| Client::new(http_client.clone(), cx));
605 let server = FakeServer::for_client(user_id, &client, cx).await;
606
607 Channel::init(&client);
608 let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));
609
610 let channel_list = cx.add_model(|cx| ChannelList::new(user_store, client.clone(), cx));
611 channel_list.read_with(cx, |list, _| assert_eq!(list.available_channels(), None));
612
613 // Get the available channels.
614 let get_channels = server.receive::<proto::GetChannels>().await.unwrap();
615 server
616 .respond(
617 get_channels.receipt(),
618 proto::GetChannelsResponse {
619 channels: vec![proto::Channel {
620 id: 5,
621 name: "the-channel".to_string(),
622 }],
623 },
624 )
625 .await;
626 channel_list.next_notification(cx).await;
627 channel_list.read_with(cx, |list, _| {
628 assert_eq!(
629 list.available_channels().unwrap(),
630 &[ChannelDetails {
631 id: 5,
632 name: "the-channel".into(),
633 }]
634 )
635 });
636
637 let get_users = server.receive::<proto::GetUsers>().await.unwrap();
638 assert_eq!(get_users.payload.user_ids, vec![5]);
639 server
640 .respond(
641 get_users.receipt(),
642 proto::UsersResponse {
643 users: vec![proto::User {
644 id: 5,
645 github_login: "nathansobo".into(),
646 avatar_url: "http://avatar.com/nathansobo".into(),
647 }],
648 },
649 )
650 .await;
651
652 // Join a channel and populate its existing messages.
653 let channel = channel_list
654 .update(cx, |list, cx| {
655 let channel_id = list.available_channels().unwrap()[0].id;
656 list.get_channel(channel_id, cx)
657 })
658 .unwrap();
659 channel.read_with(cx, |channel, _| assert!(channel.messages().is_empty()));
660 let join_channel = server.receive::<proto::JoinChannel>().await.unwrap();
661 server
662 .respond(
663 join_channel.receipt(),
664 proto::JoinChannelResponse {
665 messages: vec![
666 proto::ChannelMessage {
667 id: 10,
668 body: "a".into(),
669 timestamp: 1000,
670 sender_id: 5,
671 nonce: Some(1.into()),
672 },
673 proto::ChannelMessage {
674 id: 11,
675 body: "b".into(),
676 timestamp: 1001,
677 sender_id: 6,
678 nonce: Some(2.into()),
679 },
680 ],
681 done: false,
682 },
683 )
684 .await;
685
686 // Client requests all users for the received messages
687 let mut get_users = server.receive::<proto::GetUsers>().await.unwrap();
688 get_users.payload.user_ids.sort();
689 assert_eq!(get_users.payload.user_ids, vec![6]);
690 server
691 .respond(
692 get_users.receipt(),
693 proto::UsersResponse {
694 users: vec![proto::User {
695 id: 6,
696 github_login: "maxbrunsfeld".into(),
697 avatar_url: "http://avatar.com/maxbrunsfeld".into(),
698 }],
699 },
700 )
701 .await;
702
703 assert_eq!(
704 channel.next_event(cx).await,
705 ChannelEvent::MessagesUpdated {
706 old_range: 0..0,
707 new_count: 2,
708 }
709 );
710 channel.read_with(cx, |channel, _| {
711 assert_eq!(
712 channel
713 .messages_in_range(0..2)
714 .map(|message| (message.sender.github_login.clone(), message.body.clone()))
715 .collect::<Vec<_>>(),
716 &[
717 ("nathansobo".into(), "a".into()),
718 ("maxbrunsfeld".into(), "b".into())
719 ]
720 );
721 });
722
723 // Receive a new message.
724 server.send(proto::ChannelMessageSent {
725 channel_id: channel.read_with(cx, |channel, _| channel.details.id),
726 message: Some(proto::ChannelMessage {
727 id: 12,
728 body: "c".into(),
729 timestamp: 1002,
730 sender_id: 7,
731 nonce: Some(3.into()),
732 }),
733 });
734
735 // Client requests user for message since they haven't seen them yet
736 let get_users = server.receive::<proto::GetUsers>().await.unwrap();
737 assert_eq!(get_users.payload.user_ids, vec![7]);
738 server
739 .respond(
740 get_users.receipt(),
741 proto::UsersResponse {
742 users: vec![proto::User {
743 id: 7,
744 github_login: "as-cii".into(),
745 avatar_url: "http://avatar.com/as-cii".into(),
746 }],
747 },
748 )
749 .await;
750
751 assert_eq!(
752 channel.next_event(cx).await,
753 ChannelEvent::MessagesUpdated {
754 old_range: 2..2,
755 new_count: 1,
756 }
757 );
758 channel.read_with(cx, |channel, _| {
759 assert_eq!(
760 channel
761 .messages_in_range(2..3)
762 .map(|message| (message.sender.github_login.clone(), message.body.clone()))
763 .collect::<Vec<_>>(),
764 &[("as-cii".into(), "c".into())]
765 )
766 });
767
768 // Scroll up to view older messages.
769 channel.update(cx, |channel, cx| {
770 assert!(channel.load_more_messages(cx));
771 });
772 let get_messages = server.receive::<proto::GetChannelMessages>().await.unwrap();
773 assert_eq!(get_messages.payload.channel_id, 5);
774 assert_eq!(get_messages.payload.before_message_id, 10);
775 server
776 .respond(
777 get_messages.receipt(),
778 proto::GetChannelMessagesResponse {
779 done: true,
780 messages: vec![
781 proto::ChannelMessage {
782 id: 8,
783 body: "y".into(),
784 timestamp: 998,
785 sender_id: 5,
786 nonce: Some(4.into()),
787 },
788 proto::ChannelMessage {
789 id: 9,
790 body: "z".into(),
791 timestamp: 999,
792 sender_id: 6,
793 nonce: Some(5.into()),
794 },
795 ],
796 },
797 )
798 .await;
799
800 assert_eq!(
801 channel.next_event(cx).await,
802 ChannelEvent::MessagesUpdated {
803 old_range: 0..0,
804 new_count: 2,
805 }
806 );
807 channel.read_with(cx, |channel, _| {
808 assert_eq!(
809 channel
810 .messages_in_range(0..2)
811 .map(|message| (message.sender.github_login.clone(), message.body.clone()))
812 .collect::<Vec<_>>(),
813 &[
814 ("nathansobo".into(), "y".into()),
815 ("maxbrunsfeld".into(), "z".into())
816 ]
817 );
818 });
819 }
820}