tests.rs

  1// Copyright (c) 2024 Jonas Schäfer <jonas@zombofant.net>
  2//
  3// This Source Code Form is subject to the terms of the Mozilla Public
  4// License, v. 2.0. If a copy of the MPL was not distributed with this
  5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7use core::time::Duration;
  8
  9use futures::{SinkExt, StreamExt};
 10
 11use xmpp_parsers::{
 12    ns,
 13    stream_error::{DefinedCondition, StreamError},
 14    stream_features::StreamFeatures,
 15};
 16
 17use super::*;
 18
 19#[derive(FromXml, AsXml, Debug)]
 20#[xml(namespace = "urn:example", name = "data")]
 21struct Data {
 22    #[xml(text)]
 23    contents: String,
 24}
 25
 26#[tokio::test]
 27async fn test_initiate_accept_stream() {
 28    let (lhs, rhs) = tokio::io::duplex(65536);
 29    let initiator = tokio::spawn(async move {
 30        let mut stream = initiate_stream(
 31            tokio::io::BufStream::new(lhs),
 32            ns::JABBER_CLIENT,
 33            StreamHeader {
 34                from: Some("client".into()),
 35                to: Some("server".into()),
 36                id: Some("client-id".into()),
 37            },
 38            Timeouts::tight(),
 39        )
 40        .await?;
 41        Ok::<_, io::Error>(stream.take_header())
 42    });
 43    let responder = tokio::spawn(async move {
 44        let stream = accept_stream(
 45            tokio::io::BufStream::new(rhs),
 46            ns::JABBER_CLIENT,
 47            Timeouts::tight(),
 48        )
 49        .await?;
 50        assert_eq!(stream.header().from.unwrap(), "client");
 51        assert_eq!(stream.header().to.unwrap(), "server");
 52        assert_eq!(stream.header().id.unwrap(), "client-id");
 53        stream
 54            .send_header(StreamHeader {
 55                from: Some("server".into()),
 56                to: Some("client".into()),
 57                id: Some("server-id".into()),
 58            })
 59            .await
 60    });
 61    responder.await.unwrap().expect("responder");
 62    let server_header = initiator.await.unwrap().expect("initiator");
 63    assert_eq!(server_header.from.unwrap(), "server");
 64    assert_eq!(server_header.to.unwrap(), "client");
 65    assert_eq!(server_header.id.unwrap(), "server-id");
 66}
 67
 68#[tokio::test]
 69async fn test_exchange_stream_features() {
 70    let (lhs, rhs) = tokio::io::duplex(65536);
 71    let initiator = tokio::spawn(async move {
 72        let stream = initiate_stream(
 73            tokio::io::BufStream::new(lhs),
 74            ns::JABBER_CLIENT,
 75            StreamHeader::default(),
 76            Timeouts::tight(),
 77        )
 78        .await?;
 79        let (features, _) = stream.recv_features::<Data>().await?;
 80        Ok::<_, RecvFeaturesError>(features)
 81    });
 82    let responder = tokio::spawn(async move {
 83        let stream = accept_stream(
 84            tokio::io::BufStream::new(rhs),
 85            ns::JABBER_CLIENT,
 86            Timeouts::tight(),
 87        )
 88        .await?;
 89        let stream = stream.send_header(StreamHeader::default()).await?;
 90        stream
 91            .send_features::<Data>(&StreamFeatures::default())
 92            .await?;
 93        Ok::<_, io::Error>(())
 94    });
 95    responder.await.unwrap().expect("responder failed");
 96    let features = initiator.await.unwrap().expect("initiator failed");
 97    assert_eq!(features, StreamFeatures::default());
 98}
 99
100#[tokio::test]
101async fn test_handle_early_stream_error() {
102    let (lhs, rhs) = tokio::io::duplex(65536);
103    let err = StreamError::new(DefinedCondition::InternalServerError, "en", "Test error");
104    let initiator = tokio::spawn(async move {
105        let stream = initiate_stream(
106            tokio::io::BufStream::new(lhs),
107            ns::JABBER_CLIENT,
108            StreamHeader::default(),
109            Timeouts::tight(),
110        )
111        .await?;
112        match stream.recv_features::<Data>().await {
113            Ok((v, ..)) => panic!("test expected stream error, got features {v:?}"),
114            Err(RecvFeaturesError::Io(e)) => Err(e),
115            Err(RecvFeaturesError::StreamError(e)) => Ok(e),
116        }
117    });
118    let responder = {
119        let err = err.clone();
120        tokio::spawn(async move {
121            let stream = accept_stream(
122                tokio::io::BufStream::new(rhs),
123                ns::JABBER_CLIENT,
124                Timeouts::tight(),
125            )
126            .await?;
127            let stream = stream.send_header(StreamHeader::default()).await?;
128            stream.send_error(&err).await?;
129            Ok::<_, io::Error>(())
130        })
131    };
132    responder.await.unwrap().expect("responder failed");
133    let received = initiator.await.unwrap().expect("initiator failed");
134    assert_eq!(received.0, err);
135}
136
137#[tokio::test]
138async fn test_exchange_data() {
139    let (lhs, rhs) = tokio::io::duplex(65536);
140
141    let initiator = tokio::spawn(async move {
142        let stream = initiate_stream(
143            tokio::io::BufStream::new(lhs),
144            ns::JABBER_CLIENT,
145            StreamHeader::default(),
146            Timeouts::tight(),
147        )
148        .await?;
149        let (_, mut stream) = stream.recv_features::<Data>().await?;
150        stream
151            .send(&Data {
152                contents: "hello".to_owned(),
153            })
154            .await?;
155        match stream.next().await {
156            Some(Ok(Data { contents })) => assert_eq!(contents, "world!"),
157            other => panic!("unexpected stream message: {:?}", other),
158        }
159        Ok::<_, RecvFeaturesError>(())
160    });
161
162    let responder = tokio::spawn(async move {
163        let stream = accept_stream(
164            tokio::io::BufStream::new(rhs),
165            ns::JABBER_CLIENT,
166            Timeouts::tight(),
167        )
168        .await?;
169        let stream = stream.send_header(StreamHeader::default()).await?;
170        let mut stream = stream
171            .send_features::<Data>(&StreamFeatures::default())
172            .await?;
173        stream
174            .send(&Data {
175                contents: "world!".to_owned(),
176            })
177            .await?;
178        match stream.next().await {
179            Some(Ok(Data { contents })) => assert_eq!(contents, "hello"),
180            other => panic!("unexpected stream message: {:?}", other),
181        }
182        Ok::<_, io::Error>(())
183    });
184
185    responder.await.unwrap().expect("responder failed");
186    initiator.await.unwrap().expect("initiator failed");
187}
188
189#[tokio::test]
190async fn test_clean_shutdown() {
191    let (lhs, rhs) = tokio::io::duplex(65536);
192
193    let initiator = tokio::spawn(async move {
194        let stream = initiate_stream(
195            tokio::io::BufStream::new(lhs),
196            ns::JABBER_CLIENT,
197            StreamHeader::default(),
198            Timeouts::tight(),
199        )
200        .await?;
201        let (_, mut stream) = stream.recv_features::<Data>().await?;
202        SinkExt::<&Data>::close(&mut stream).await?;
203        match stream.next().await {
204            Some(Err(ReadError::StreamFooterReceived)) => (),
205            other => panic!("unexpected stream message: {:?}", other),
206        }
207        Ok::<_, RecvFeaturesError>(())
208    });
209
210    let responder = tokio::spawn(async move {
211        let stream = accept_stream(
212            tokio::io::BufStream::new(rhs),
213            ns::JABBER_CLIENT,
214            Timeouts::tight(),
215        )
216        .await?;
217        let stream = stream.send_header(StreamHeader::default()).await?;
218        let mut stream = stream
219            .send_features::<Data>(&StreamFeatures::default())
220            .await?;
221        match stream.next().await {
222            Some(Err(ReadError::StreamFooterReceived)) => (),
223            other => panic!("unexpected stream message: {:?}", other),
224        }
225        SinkExt::<&Data>::close(&mut stream).await?;
226        Ok::<_, io::Error>(())
227    });
228
229    responder.await.unwrap().expect("responder failed");
230    initiator.await.unwrap().expect("initiator failed");
231}
232
233#[tokio::test]
234async fn test_exchange_data_stream_reset_and_shutdown() {
235    let (lhs, rhs) = tokio::io::duplex(65536);
236
237    let initiator = tokio::spawn(async move {
238        let stream = initiate_stream(
239            tokio::io::BufStream::new(lhs),
240            ns::JABBER_CLIENT,
241            StreamHeader::default(),
242            Timeouts::tight(),
243        )
244        .await?;
245        let (_, mut stream) = stream.recv_features::<Data>().await?;
246        stream
247            .send(&Data {
248                contents: "hello".to_owned(),
249            })
250            .await?;
251        match stream.next().await {
252            Some(Ok(Data { contents })) => assert_eq!(contents, "world!"),
253            other => panic!("unexpected stream message: {:?}", other),
254        }
255        let stream = stream
256            .initiate_reset()
257            .send_header(StreamHeader {
258                from: Some("client".into()),
259                to: Some("server".into()),
260                id: Some("client-id".into()),
261            })
262            .await?;
263        assert_eq!(stream.header().from.unwrap(), "server");
264        assert_eq!(stream.header().to.unwrap(), "client");
265        assert_eq!(stream.header().id.unwrap(), "server-id");
266
267        let (_, mut stream) = stream.recv_features::<Data>().await?;
268        stream
269            .send(&Data {
270                contents: "once more".to_owned(),
271            })
272            .await?;
273        SinkExt::<&Data>::close(&mut stream).await?;
274        match stream.next().await {
275            Some(Ok(Data { contents })) => assert_eq!(contents, "hello world!"),
276            other => panic!("unexpected stream message: {:?}", other),
277        }
278        match stream.next().await {
279            Some(Err(ReadError::StreamFooterReceived)) => (),
280            other => panic!("unexpected stream message: {:?}", other),
281        }
282        Ok::<_, RecvFeaturesError>(())
283    });
284
285    let responder = tokio::spawn(async move {
286        let stream = accept_stream(
287            tokio::io::BufStream::new(rhs),
288            ns::JABBER_CLIENT,
289            Timeouts::tight(),
290        )
291        .await?;
292        let stream = stream.send_header(StreamHeader::default()).await?;
293        let mut stream = stream
294            .send_features::<Data>(&StreamFeatures::default())
295            .await?;
296        match stream.next().await {
297            Some(Ok(Data { contents })) => assert_eq!(contents, "hello"),
298            other => panic!("unexpected stream message: {:?}", other),
299        }
300        let stream = stream
301            .accept_reset(&Data {
302                contents: "world!".to_owned(),
303            })
304            .await?;
305        assert_eq!(stream.header().from.unwrap(), "client");
306        assert_eq!(stream.header().to.unwrap(), "server");
307        assert_eq!(stream.header().id.unwrap(), "client-id");
308        let stream = stream
309            .send_header(StreamHeader {
310                from: Some("server".into()),
311                to: Some("client".into()),
312                id: Some("server-id".into()),
313            })
314            .await?;
315        let mut stream = stream
316            .send_features::<Data>(&StreamFeatures::default())
317            .await?;
318        stream
319            .send(&Data {
320                contents: "hello world!".to_owned(),
321            })
322            .await?;
323        match stream.next().await {
324            Some(Ok(Data { contents })) => assert_eq!(contents, "once more"),
325            other => panic!("unexpected stream message: {:?}", other),
326        }
327        SinkExt::<&Data>::close(&mut stream).await?;
328        match stream.next().await {
329            Some(Err(ReadError::StreamFooterReceived)) => (),
330            other => panic!("unexpected stream message: {:?}", other),
331        }
332        Ok::<_, io::Error>(())
333    });
334
335    responder.await.unwrap().expect("responder failed");
336    initiator.await.unwrap().expect("initiator failed");
337}
338
339#[tokio::test(start_paused = true)]
340async fn test_emits_soft_timeout_after_silence() {
341    let (lhs, rhs) = tokio::io::duplex(65536);
342
343    let client_timeouts = Timeouts {
344        read_timeout: Duration::new(300, 0),
345        response_timeout: Duration::new(15, 0),
346    };
347
348    // We do want to trigger only one set of timeouts, so we set the server
349    // timeouts much longer than the client timeouts
350    let server_timeouts = Timeouts {
351        read_timeout: Duration::new(900, 0),
352        response_timeout: Duration::new(15, 0),
353    };
354
355    let initiator = tokio::spawn(async move {
356        let stream = initiate_stream(
357            tokio::io::BufStream::new(lhs),
358            ns::JABBER_CLIENT,
359            StreamHeader::default(),
360            client_timeouts,
361        )
362        .await?;
363        let (_, mut stream) = stream.recv_features::<Data>().await?;
364        stream
365            .send(&Data {
366                contents: "hello".to_owned(),
367            })
368            .await?;
369        match stream.next().await {
370            Some(Ok(Data { contents })) => assert_eq!(contents, "world!"),
371            other => panic!("unexpected stream message: {:?}", other),
372        }
373        // Here we prove that the stream doesn't see any data and also does
374        // not see the SoftTimeout too early.
375        // (Well, not exactly a proof: We only check until half of the read
376        // timeout, because that was easy to write and I deem it good enough.)
377        match tokio::time::timeout(client_timeouts.read_timeout / 2, stream.next()).await {
378            Err(_) => (),
379            Ok(ev) => panic!("early stream message (before soft timeout): {:?}", ev),
380        };
381        // Now the next thing that happens is the soft timeout ...
382        match stream.next().await {
383            Some(Err(ReadError::SoftTimeout)) => (),
384            other => panic!("unexpected stream message: {:?}", other),
385        }
386        // Another check that the there is some time between soft and hard
387        // timeout.
388        match tokio::time::timeout(client_timeouts.response_timeout / 3, stream.next()).await {
389            Err(_) => (),
390            Ok(ev) => {
391                panic!("early stream message (before hard timeout): {:?}", ev);
392            }
393        };
394        // ... and thereafter the hard timeout in form of an I/O error.
395        match stream.next().await {
396            Some(Err(ReadError::HardError(e))) if e.kind() == io::ErrorKind::TimedOut => (),
397            other => panic!("unexpected stream message: {:?}", other),
398        }
399        Ok::<_, RecvFeaturesError>(())
400    });
401
402    let responder = tokio::spawn(async move {
403        let stream = accept_stream(
404            tokio::io::BufStream::new(rhs),
405            ns::JABBER_CLIENT,
406            server_timeouts,
407        )
408        .await?;
409        let stream = stream.send_header(StreamHeader::default()).await?;
410        let mut stream = stream
411            .send_features::<Data>(&StreamFeatures::default())
412            .await?;
413        stream
414            .send(&Data {
415                contents: "world!".to_owned(),
416            })
417            .await?;
418        match stream.next().await {
419            Some(Ok(Data { contents })) => assert_eq!(contents, "hello"),
420            other => panic!("unexpected stream message: {:?}", other),
421        }
422        match stream.next().await {
423            Some(Err(ReadError::HardError(e))) if e.kind() == io::ErrorKind::InvalidData => {
424                match e.downcast::<rxml::Error>() {
425                    // the initiator closes the stream by dropping it once the
426                    // timeout trips, so we get a hard eof here.
427                    Ok(rxml::Error::InvalidEof(_)) => (),
428                    other => panic!("unexpected error: {:?}", other),
429                }
430            }
431            other => panic!("unexpected stream message: {:?}", other),
432        }
433        Ok::<_, io::Error>(())
434    });
435
436    responder.await.unwrap().expect("responder failed");
437    initiator.await.unwrap().expect("initiator failed");
438}
439
440#[tokio::test]
441async fn test_can_receive_after_shutdown() {
442    let (lhs, rhs) = tokio::io::duplex(65536);
443
444    let initiator = tokio::spawn(async move {
445        let stream = initiate_stream(
446            tokio::io::BufStream::new(lhs),
447            ns::JABBER_CLIENT,
448            StreamHeader::default(),
449            Timeouts::tight(),
450        )
451        .await?;
452        let (_, mut stream) = stream.recv_features::<Data>().await?;
453        match stream.next().await {
454            Some(Err(ReadError::StreamFooterReceived)) => (),
455            other => panic!("unexpected stream message: {:?}", other),
456        }
457        match stream.next().await {
458            None => (),
459            other => panic!("unexpected stream message: {:?}", other),
460        }
461        stream
462            .send(&Data {
463                contents: "hello".to_owned(),
464            })
465            .await?;
466        stream
467            .send(&Data {
468                contents: "world!".to_owned(),
469            })
470            .await?;
471        <XmlStream<_, _> as SinkExt<&Data>>::close(&mut stream).await?;
472        Ok::<_, RecvFeaturesError>(())
473    });
474
475    let responder = tokio::spawn(async move {
476        let stream = accept_stream(
477            tokio::io::BufStream::new(rhs),
478            ns::JABBER_CLIENT,
479            Timeouts::tight(),
480        )
481        .await?;
482        let stream = stream.send_header(StreamHeader::default()).await?;
483        let mut stream = stream
484            .send_features::<Data>(&StreamFeatures::default())
485            .await?;
486        stream.shutdown().await?;
487        match stream.next().await {
488            Some(Ok(Data { contents })) => assert_eq!(contents, "hello"),
489            other => panic!("unexpected stream message: {:?}", other),
490        }
491        match stream.next().await {
492            Some(Ok(Data { contents })) => assert_eq!(contents, "world!"),
493            other => panic!("unexpected stream message: {:?}", other),
494        }
495        match stream.next().await {
496            Some(Err(ReadError::StreamFooterReceived)) => (),
497            other => panic!("unexpected stream message: {:?}", other),
498        }
499        match stream.next().await {
500            None => (),
501            other => panic!("unexpected stream message: {:?}", other),
502        }
503        Ok::<_, io::Error>(())
504    });
505
506    responder.await.unwrap().expect("responder failed");
507    initiator.await.unwrap().expect("initiator failed");
508}