1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use core::time::Duration;
8
9use futures::{SinkExt, StreamExt};
10
11use xmpp_parsers::{
12 ns,
13 stream_error::{DefinedCondition, StreamError},
14 stream_features::StreamFeatures,
15};
16
17use super::*;
18
19#[derive(FromXml, AsXml, Debug)]
20#[xml(namespace = "urn:example", name = "data")]
21struct Data {
22 #[xml(text)]
23 contents: String,
24}
25
26#[tokio::test]
27async fn test_initiate_accept_stream() {
28 let (lhs, rhs) = tokio::io::duplex(65536);
29 let initiator = tokio::spawn(async move {
30 let mut stream = initiate_stream(
31 tokio::io::BufStream::new(lhs),
32 ns::JABBER_CLIENT,
33 StreamHeader {
34 from: Some("client".into()),
35 to: Some("server".into()),
36 id: Some("client-id".into()),
37 },
38 Timeouts::tight(),
39 )
40 .await?;
41 Ok::<_, io::Error>(stream.take_header())
42 });
43 let responder = tokio::spawn(async move {
44 let stream = accept_stream(
45 tokio::io::BufStream::new(rhs),
46 ns::JABBER_CLIENT,
47 Timeouts::tight(),
48 )
49 .await?;
50 assert_eq!(stream.header().from.unwrap(), "client");
51 assert_eq!(stream.header().to.unwrap(), "server");
52 assert_eq!(stream.header().id.unwrap(), "client-id");
53 stream
54 .send_header(StreamHeader {
55 from: Some("server".into()),
56 to: Some("client".into()),
57 id: Some("server-id".into()),
58 })
59 .await
60 });
61 responder.await.unwrap().expect("responder");
62 let server_header = initiator.await.unwrap().expect("initiator");
63 assert_eq!(server_header.from.unwrap(), "server");
64 assert_eq!(server_header.to.unwrap(), "client");
65 assert_eq!(server_header.id.unwrap(), "server-id");
66}
67
68#[tokio::test]
69async fn test_exchange_stream_features() {
70 let (lhs, rhs) = tokio::io::duplex(65536);
71 let initiator = tokio::spawn(async move {
72 let stream = initiate_stream(
73 tokio::io::BufStream::new(lhs),
74 ns::JABBER_CLIENT,
75 StreamHeader::default(),
76 Timeouts::tight(),
77 )
78 .await?;
79 let (features, _) = stream.recv_features::<Data>().await?;
80 Ok::<_, RecvFeaturesError>(features)
81 });
82 let responder = tokio::spawn(async move {
83 let stream = accept_stream(
84 tokio::io::BufStream::new(rhs),
85 ns::JABBER_CLIENT,
86 Timeouts::tight(),
87 )
88 .await?;
89 let stream = stream.send_header(StreamHeader::default()).await?;
90 stream
91 .send_features::<Data>(&StreamFeatures::default())
92 .await?;
93 Ok::<_, io::Error>(())
94 });
95 responder.await.unwrap().expect("responder failed");
96 let features = initiator.await.unwrap().expect("initiator failed");
97 assert_eq!(features, StreamFeatures::default());
98}
99
100#[tokio::test]
101async fn test_handle_early_stream_error() {
102 let (lhs, rhs) = tokio::io::duplex(65536);
103 let err = StreamError::new(DefinedCondition::InternalServerError, "en", "Test error");
104 let initiator = tokio::spawn(async move {
105 let stream = initiate_stream(
106 tokio::io::BufStream::new(lhs),
107 ns::JABBER_CLIENT,
108 StreamHeader::default(),
109 Timeouts::tight(),
110 )
111 .await?;
112 match stream.recv_features::<Data>().await {
113 Ok((v, ..)) => panic!("test expected stream error, got features {v:?}"),
114 Err(RecvFeaturesError::Io(e)) => Err(e),
115 Err(RecvFeaturesError::StreamError(e)) => Ok(e),
116 }
117 });
118 let responder = {
119 let err = err.clone();
120 tokio::spawn(async move {
121 let stream = accept_stream(
122 tokio::io::BufStream::new(rhs),
123 ns::JABBER_CLIENT,
124 Timeouts::tight(),
125 )
126 .await?;
127 let stream = stream.send_header(StreamHeader::default()).await?;
128 stream.send_error(&err).await?;
129 Ok::<_, io::Error>(())
130 })
131 };
132 responder.await.unwrap().expect("responder failed");
133 let received = initiator.await.unwrap().expect("initiator failed");
134 assert_eq!(received.0, err);
135}
136
137#[tokio::test]
138async fn test_exchange_data() {
139 let (lhs, rhs) = tokio::io::duplex(65536);
140
141 let initiator = tokio::spawn(async move {
142 let stream = initiate_stream(
143 tokio::io::BufStream::new(lhs),
144 ns::JABBER_CLIENT,
145 StreamHeader::default(),
146 Timeouts::tight(),
147 )
148 .await?;
149 let (_, mut stream) = stream.recv_features::<Data>().await?;
150 stream
151 .send(&Data {
152 contents: "hello".to_owned(),
153 })
154 .await?;
155 match stream.next().await {
156 Some(Ok(Data { contents })) => assert_eq!(contents, "world!"),
157 other => panic!("unexpected stream message: {:?}", other),
158 }
159 Ok::<_, RecvFeaturesError>(())
160 });
161
162 let responder = tokio::spawn(async move {
163 let stream = accept_stream(
164 tokio::io::BufStream::new(rhs),
165 ns::JABBER_CLIENT,
166 Timeouts::tight(),
167 )
168 .await?;
169 let stream = stream.send_header(StreamHeader::default()).await?;
170 let mut stream = stream
171 .send_features::<Data>(&StreamFeatures::default())
172 .await?;
173 stream
174 .send(&Data {
175 contents: "world!".to_owned(),
176 })
177 .await?;
178 match stream.next().await {
179 Some(Ok(Data { contents })) => assert_eq!(contents, "hello"),
180 other => panic!("unexpected stream message: {:?}", other),
181 }
182 Ok::<_, io::Error>(())
183 });
184
185 responder.await.unwrap().expect("responder failed");
186 initiator.await.unwrap().expect("initiator failed");
187}
188
189#[tokio::test]
190async fn test_clean_shutdown() {
191 let (lhs, rhs) = tokio::io::duplex(65536);
192
193 let initiator = tokio::spawn(async move {
194 let stream = initiate_stream(
195 tokio::io::BufStream::new(lhs),
196 ns::JABBER_CLIENT,
197 StreamHeader::default(),
198 Timeouts::tight(),
199 )
200 .await?;
201 let (_, mut stream) = stream.recv_features::<Data>().await?;
202 SinkExt::<&Data>::close(&mut stream).await?;
203 match stream.next().await {
204 Some(Err(ReadError::StreamFooterReceived)) => (),
205 other => panic!("unexpected stream message: {:?}", other),
206 }
207 Ok::<_, RecvFeaturesError>(())
208 });
209
210 let responder = tokio::spawn(async move {
211 let stream = accept_stream(
212 tokio::io::BufStream::new(rhs),
213 ns::JABBER_CLIENT,
214 Timeouts::tight(),
215 )
216 .await?;
217 let stream = stream.send_header(StreamHeader::default()).await?;
218 let mut stream = stream
219 .send_features::<Data>(&StreamFeatures::default())
220 .await?;
221 match stream.next().await {
222 Some(Err(ReadError::StreamFooterReceived)) => (),
223 other => panic!("unexpected stream message: {:?}", other),
224 }
225 SinkExt::<&Data>::close(&mut stream).await?;
226 Ok::<_, io::Error>(())
227 });
228
229 responder.await.unwrap().expect("responder failed");
230 initiator.await.unwrap().expect("initiator failed");
231}
232
233#[tokio::test]
234async fn test_exchange_data_stream_reset_and_shutdown() {
235 let (lhs, rhs) = tokio::io::duplex(65536);
236
237 let initiator = tokio::spawn(async move {
238 let stream = initiate_stream(
239 tokio::io::BufStream::new(lhs),
240 ns::JABBER_CLIENT,
241 StreamHeader::default(),
242 Timeouts::tight(),
243 )
244 .await?;
245 let (_, mut stream) = stream.recv_features::<Data>().await?;
246 stream
247 .send(&Data {
248 contents: "hello".to_owned(),
249 })
250 .await?;
251 match stream.next().await {
252 Some(Ok(Data { contents })) => assert_eq!(contents, "world!"),
253 other => panic!("unexpected stream message: {:?}", other),
254 }
255 let stream = stream
256 .initiate_reset()
257 .send_header(StreamHeader {
258 from: Some("client".into()),
259 to: Some("server".into()),
260 id: Some("client-id".into()),
261 })
262 .await?;
263 assert_eq!(stream.header().from.unwrap(), "server");
264 assert_eq!(stream.header().to.unwrap(), "client");
265 assert_eq!(stream.header().id.unwrap(), "server-id");
266
267 let (_, mut stream) = stream.recv_features::<Data>().await?;
268 stream
269 .send(&Data {
270 contents: "once more".to_owned(),
271 })
272 .await?;
273 SinkExt::<&Data>::close(&mut stream).await?;
274 match stream.next().await {
275 Some(Ok(Data { contents })) => assert_eq!(contents, "hello world!"),
276 other => panic!("unexpected stream message: {:?}", other),
277 }
278 match stream.next().await {
279 Some(Err(ReadError::StreamFooterReceived)) => (),
280 other => panic!("unexpected stream message: {:?}", other),
281 }
282 Ok::<_, RecvFeaturesError>(())
283 });
284
285 let responder = tokio::spawn(async move {
286 let stream = accept_stream(
287 tokio::io::BufStream::new(rhs),
288 ns::JABBER_CLIENT,
289 Timeouts::tight(),
290 )
291 .await?;
292 let stream = stream.send_header(StreamHeader::default()).await?;
293 let mut stream = stream
294 .send_features::<Data>(&StreamFeatures::default())
295 .await?;
296 match stream.next().await {
297 Some(Ok(Data { contents })) => assert_eq!(contents, "hello"),
298 other => panic!("unexpected stream message: {:?}", other),
299 }
300 let stream = stream
301 .accept_reset(&Data {
302 contents: "world!".to_owned(),
303 })
304 .await?;
305 assert_eq!(stream.header().from.unwrap(), "client");
306 assert_eq!(stream.header().to.unwrap(), "server");
307 assert_eq!(stream.header().id.unwrap(), "client-id");
308 let stream = stream
309 .send_header(StreamHeader {
310 from: Some("server".into()),
311 to: Some("client".into()),
312 id: Some("server-id".into()),
313 })
314 .await?;
315 let mut stream = stream
316 .send_features::<Data>(&StreamFeatures::default())
317 .await?;
318 stream
319 .send(&Data {
320 contents: "hello world!".to_owned(),
321 })
322 .await?;
323 match stream.next().await {
324 Some(Ok(Data { contents })) => assert_eq!(contents, "once more"),
325 other => panic!("unexpected stream message: {:?}", other),
326 }
327 SinkExt::<&Data>::close(&mut stream).await?;
328 match stream.next().await {
329 Some(Err(ReadError::StreamFooterReceived)) => (),
330 other => panic!("unexpected stream message: {:?}", other),
331 }
332 Ok::<_, io::Error>(())
333 });
334
335 responder.await.unwrap().expect("responder failed");
336 initiator.await.unwrap().expect("initiator failed");
337}
338
339#[tokio::test(start_paused = true)]
340async fn test_emits_soft_timeout_after_silence() {
341 let (lhs, rhs) = tokio::io::duplex(65536);
342
343 let client_timeouts = Timeouts {
344 read_timeout: Duration::new(300, 0),
345 response_timeout: Duration::new(15, 0),
346 };
347
348 // We do want to trigger only one set of timeouts, so we set the server
349 // timeouts much longer than the client timeouts
350 let server_timeouts = Timeouts {
351 read_timeout: Duration::new(900, 0),
352 response_timeout: Duration::new(15, 0),
353 };
354
355 let initiator = tokio::spawn(async move {
356 let stream = initiate_stream(
357 tokio::io::BufStream::new(lhs),
358 ns::JABBER_CLIENT,
359 StreamHeader::default(),
360 client_timeouts,
361 )
362 .await?;
363 let (_, mut stream) = stream.recv_features::<Data>().await?;
364 stream
365 .send(&Data {
366 contents: "hello".to_owned(),
367 })
368 .await?;
369 match stream.next().await {
370 Some(Ok(Data { contents })) => assert_eq!(contents, "world!"),
371 other => panic!("unexpected stream message: {:?}", other),
372 }
373 // Here we prove that the stream doesn't see any data and also does
374 // not see the SoftTimeout too early.
375 // (Well, not exactly a proof: We only check until half of the read
376 // timeout, because that was easy to write and I deem it good enough.)
377 match tokio::time::timeout(client_timeouts.read_timeout / 2, stream.next()).await {
378 Err(_) => (),
379 Ok(ev) => panic!("early stream message (before soft timeout): {:?}", ev),
380 };
381 // Now the next thing that happens is the soft timeout ...
382 match stream.next().await {
383 Some(Err(ReadError::SoftTimeout)) => (),
384 other => panic!("unexpected stream message: {:?}", other),
385 }
386 // Another check that the there is some time between soft and hard
387 // timeout.
388 match tokio::time::timeout(client_timeouts.response_timeout / 3, stream.next()).await {
389 Err(_) => (),
390 Ok(ev) => {
391 panic!("early stream message (before hard timeout): {:?}", ev);
392 }
393 };
394 // ... and thereafter the hard timeout in form of an I/O error.
395 match stream.next().await {
396 Some(Err(ReadError::HardError(e))) if e.kind() == io::ErrorKind::TimedOut => (),
397 other => panic!("unexpected stream message: {:?}", other),
398 }
399 Ok::<_, RecvFeaturesError>(())
400 });
401
402 let responder = tokio::spawn(async move {
403 let stream = accept_stream(
404 tokio::io::BufStream::new(rhs),
405 ns::JABBER_CLIENT,
406 server_timeouts,
407 )
408 .await?;
409 let stream = stream.send_header(StreamHeader::default()).await?;
410 let mut stream = stream
411 .send_features::<Data>(&StreamFeatures::default())
412 .await?;
413 stream
414 .send(&Data {
415 contents: "world!".to_owned(),
416 })
417 .await?;
418 match stream.next().await {
419 Some(Ok(Data { contents })) => assert_eq!(contents, "hello"),
420 other => panic!("unexpected stream message: {:?}", other),
421 }
422 match stream.next().await {
423 Some(Err(ReadError::HardError(e))) if e.kind() == io::ErrorKind::InvalidData => {
424 match e.downcast::<rxml::Error>() {
425 // the initiator closes the stream by dropping it once the
426 // timeout trips, so we get a hard eof here.
427 Ok(rxml::Error::InvalidEof(_)) => (),
428 other => panic!("unexpected error: {:?}", other),
429 }
430 }
431 other => panic!("unexpected stream message: {:?}", other),
432 }
433 Ok::<_, io::Error>(())
434 });
435
436 responder.await.unwrap().expect("responder failed");
437 initiator.await.unwrap().expect("initiator failed");
438}
439
440#[tokio::test]
441async fn test_can_receive_after_shutdown() {
442 let (lhs, rhs) = tokio::io::duplex(65536);
443
444 let initiator = tokio::spawn(async move {
445 let stream = initiate_stream(
446 tokio::io::BufStream::new(lhs),
447 ns::JABBER_CLIENT,
448 StreamHeader::default(),
449 Timeouts::tight(),
450 )
451 .await?;
452 let (_, mut stream) = stream.recv_features::<Data>().await?;
453 match stream.next().await {
454 Some(Err(ReadError::StreamFooterReceived)) => (),
455 other => panic!("unexpected stream message: {:?}", other),
456 }
457 match stream.next().await {
458 None => (),
459 other => panic!("unexpected stream message: {:?}", other),
460 }
461 stream
462 .send(&Data {
463 contents: "hello".to_owned(),
464 })
465 .await?;
466 stream
467 .send(&Data {
468 contents: "world!".to_owned(),
469 })
470 .await?;
471 <XmlStream<_, _> as SinkExt<&Data>>::close(&mut stream).await?;
472 Ok::<_, RecvFeaturesError>(())
473 });
474
475 let responder = tokio::spawn(async move {
476 let stream = accept_stream(
477 tokio::io::BufStream::new(rhs),
478 ns::JABBER_CLIENT,
479 Timeouts::tight(),
480 )
481 .await?;
482 let stream = stream.send_header(StreamHeader::default()).await?;
483 let mut stream = stream
484 .send_features::<Data>(&StreamFeatures::default())
485 .await?;
486 stream.shutdown().await?;
487 match stream.next().await {
488 Some(Ok(Data { contents })) => assert_eq!(contents, "hello"),
489 other => panic!("unexpected stream message: {:?}", other),
490 }
491 match stream.next().await {
492 Some(Ok(Data { contents })) => assert_eq!(contents, "world!"),
493 other => panic!("unexpected stream message: {:?}", other),
494 }
495 match stream.next().await {
496 Some(Err(ReadError::StreamFooterReceived)) => (),
497 other => panic!("unexpected stream message: {:?}", other),
498 }
499 match stream.next().await {
500 None => (),
501 other => panic!("unexpected stream message: {:?}", other),
502 }
503 Ok::<_, io::Error>(())
504 });
505
506 responder.await.unwrap().expect("responder failed");
507 initiator.await.unwrap().expect("initiator failed");
508}