parsers: fix text field namespacing in StreamError

Saarko created

skip-changelog

Change summary

parsers/src/sm.rs                          |  20 +-
parsers/src/stream_error.rs                | 149 ++++++++++++++++++++++-
tokio-xmpp/src/stanzastream/connected.rs   |  28 +--
tokio-xmpp/src/stanzastream/negotiation.rs |  31 ++--
tokio-xmpp/src/stanzastream/worker.rs      |   6 
tokio-xmpp/src/xmlstream/tests.rs          |  39 ++---
6 files changed, 198 insertions(+), 75 deletions(-)

Detailed changes

parsers/src/sm.rs ๐Ÿ”—

@@ -159,17 +159,15 @@ pub struct HandledCountTooHigh {
 
 impl From<HandledCountTooHigh> for crate::stream_error::StreamError {
     fn from(other: HandledCountTooHigh) -> Self {
-        Self {
-            condition: crate::stream_error::DefinedCondition::UndefinedCondition,
-            text: Some((
-                None,
-                format!(
-                    "You acknowledged {} stanza(s), while I only sent {} so far.",
-                    other.h, other.send_count
-                ),
-            )),
-            application_specific: vec![other.into()],
-        }
+        Self::new(
+            crate::stream_error::DefinedCondition::UndefinedCondition,
+            "en",
+            format!(
+                "You acknowledged {} stanza(s), while I only sent {} so far.",
+                other.h, other.send_count
+            ),
+        )
+        .with_application_specific(vec![other.into()])
     }
 }
 

parsers/src/stream_error.rs ๐Ÿ”—

@@ -4,12 +4,13 @@
 // License, v. 2.0. If a copy of the MPL was not distributed with this
 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
+use alloc::collections::BTreeMap;
 use core::{error::Error, fmt};
 
 use minidom::Element;
 use xso::{AsXml, FromXml};
 
-use crate::ns;
+use crate::{message::Lang, ns};
 
 /// Enumeration of all stream error conditions as defined in [RFC 6120].
 ///
@@ -305,13 +306,12 @@ pub struct StreamError {
     #[xml(child)]
     pub condition: DefinedCondition,
 
-    /// Optional error text. The first part is the optional `xml:lang`
-    /// language tag, the second part is the actual text content.
-    #[xml(extract(default, fields(
-        lang(type_ = Option<String>, default),
+    /// Optional error text
+    #[xml(extract(n = .., name = "text", namespace = ns::XMPP_STREAMS, fields(
+        lang(type_ = Lang, default),
         text(type_ = String),
     )))]
-    pub text: Option<(Option<String>, String)>,
+    pub texts: BTreeMap<Lang, String>,
 
     /// Optional application-defined element which refines the specified
     /// [`Self::condition`].
@@ -323,7 +323,7 @@ pub struct StreamError {
 impl fmt::Display for StreamError {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         <DefinedCondition as fmt::Display>::fmt(&self.condition, f)?;
-        if let Some((_, ref text)) = self.text {
+        if let Some((_, text)) = self.get_best_text(vec!["en"]) {
             write!(f, " ({:?})", text)?
         }
         if let Some(cond) = self.application_specific.first() {
@@ -333,6 +333,87 @@ impl fmt::Display for StreamError {
     }
 }
 
+impl StreamError {
+    /// Create a new StreamError with condition, text, and language
+    pub fn new<S: Into<String>, L: Into<Lang>>(
+        condition: DefinedCondition,
+        lang: L,
+        text: S,
+    ) -> Self {
+        let mut texts = BTreeMap::new();
+        texts.insert(lang.into(), text.into());
+        Self {
+            condition,
+            texts,
+            application_specific: Vec::new(),
+        }
+    }
+
+    /// Add a text element with the specified language
+    pub fn add_text<L: Into<Lang>, S: Into<String>>(mut self, lang: L, text: S) -> Self {
+        self.texts.insert(lang.into(), text.into());
+        self
+    }
+
+    /// Append application specific element(s)
+    pub fn with_application_specific(mut self, application_specific: Vec<Element>) -> Self {
+        self.application_specific = application_specific;
+        self
+    }
+
+    /// Get the best matching text from a list of preferred languages.
+    ///
+    /// This follows the same logic as Message::get_best_body:
+    /// 1. First tries to find a match from the preferred languages list
+    /// 2. Falls back to empty language ("") if available
+    /// 3. Returns the first entry if no matches found
+    ///
+    /// Returns None if no text elements exist.
+    pub fn get_best_text(&self, preferred_langs: Vec<&str>) -> Option<(Lang, &String)> {
+        Self::get_best(&self.texts, preferred_langs)
+    }
+
+    /// Cloned variant of [`StreamError::get_best_text`]
+    pub fn get_best_text_cloned(&self, preferred_langs: Vec<&str>) -> Option<(Lang, String)> {
+        Self::get_best_cloned(&self.texts, preferred_langs)
+    }
+
+    // Private helper methods matching Message's pattern
+    fn get_best<'a, T>(
+        map: &'a BTreeMap<Lang, T>,
+        preferred_langs: Vec<&str>,
+    ) -> Option<(Lang, &'a T)> {
+        if map.is_empty() {
+            return None;
+        }
+        for lang in preferred_langs {
+            if let Some(value) = map.get(lang) {
+                return Some((Lang::from(lang), value));
+            }
+        }
+        if let Some(value) = map.get("") {
+            return Some((Lang::new(), value));
+        }
+        map.iter().map(|(lang, value)| (lang.clone(), value)).next()
+    }
+
+    fn get_best_cloned<T: ToOwned<Owned = T>>(
+        map: &BTreeMap<Lang, T>,
+        preferred_langs: Vec<&str>,
+    ) -> Option<(Lang, T)> {
+        if let Some((lang, item)) = Self::get_best::<T>(map, preferred_langs) {
+            Some((lang, item.to_owned()))
+        } else {
+            None
+        }
+    }
+
+    /// Check if the error has any text elements
+    pub fn has_text(&self) -> bool {
+        !self.texts.is_empty()
+    }
+}
+
 /// Wrapper around [`StreamError`] which implements [`core::error::Error`]
 /// with an appropriate error message.
 #[derive(FromXml, AsXml, Debug)]
@@ -378,4 +459,58 @@ mod tests {
         let err: StreamError = xso::from_bytes(doc.as_bytes()).unwrap();
         assert_eq!(err.condition, DefinedCondition::UndefinedCondition);
     }
+
+    #[test]
+    fn test_stream_error_with_text() {
+        let doc = br#"<stream:error xmlns:stream='http://etherx.jabber.org/streams'>
+            <system-shutdown xmlns='urn:ietf:params:xml:ns:xmpp-streams'/>
+            <text xmlns='urn:ietf:params:xml:ns:xmpp-streams'>Server is shutting down for maintenance.</text>
+        </stream:error>"#;
+
+        let err: StreamError = xso::from_bytes(doc).unwrap();
+        assert_eq!(err.condition, DefinedCondition::SystemShutdown);
+        assert!(err.has_text());
+
+        let (lang, text) = err.get_best_text(vec![]).unwrap();
+        assert_eq!(text, "Server is shutting down for maintenance.");
+        assert_eq!(lang, "");
+    }
+
+    #[test]
+    fn test_stream_error_with_multiple_languages() {
+        let doc = br#"<stream:error xmlns:stream='http://etherx.jabber.org/streams'>
+            <policy-violation xmlns='urn:ietf:params:xml:ns:xmpp-streams'/>
+            <text xmlns='urn:ietf:params:xml:ns:xmpp-streams' xml:lang='en'>Message too large</text>
+            <text xmlns='urn:ietf:params:xml:ns:xmpp-streams' xml:lang='de'>Nachricht zu lang</text>
+        </stream:error>"#;
+
+        let err: StreamError = xso::from_bytes(doc).unwrap();
+        assert_eq!(err.condition, DefinedCondition::PolicyViolation);
+
+        // Test German preference
+        let (lang, text) = err.get_best_text(vec!["de"]).unwrap();
+        assert_eq!(lang, "de");
+        assert_eq!(text, "Nachricht zu lang");
+
+        // Test English preference
+        let (lang, text) = err.get_best_text(vec!["en"]).unwrap();
+        assert_eq!(lang, "en");
+        assert_eq!(text, "Message too large");
+
+        // Test cloned variant
+        let (lang, text) = err.get_best_text_cloned(vec!["en"]).unwrap();
+        assert_eq!(lang, "en");
+        assert_eq!(text, "Message too large");
+    }
+
+    #[test]
+    fn test_stream_error_constructors() {
+        let err = StreamError::new(DefinedCondition::Reset, "en", "Connection reset");
+        let (lang, text) = err.get_best_text(vec!["en"]).unwrap();
+        assert_eq!(lang, "en");
+        assert_eq!(text, "Connection reset");
+
+        let err = err.add_text("de", "Verbindung zurรผckgesetzt");
+        assert_eq!(err.texts.len(), 2);
+    }
 }

tokio-xmpp/src/stanzastream/connected.rs ๐Ÿ”—

@@ -572,15 +572,11 @@ impl ConnectedState {
                             }
                         } else {
                             log::warn!("Got an <sm:r/> from the peer, but we don't have any stream management state. Terminating stream with an error.");
-                            self.to_stream_error_state(StreamError {
-                                condition: DefinedCondition::UnsupportedStanzaType,
-                                text: Some((
-                                    None,
-                                    "received <sm:r/>, but stream management is not enabled"
-                                        .to_owned(),
-                                )),
-                                application_specific: vec![],
-                            });
+                            self.to_stream_error_state(StreamError::new(
+                                DefinedCondition::UnsupportedStanzaType,
+                                "en",
+                                "received <sm:r/>, but stream management is not enabled".to_owned(),
+                            ));
                         }
                         // No matter whether we "enqueued" an ACK for send or
                         // whether we just successfully read something from
@@ -593,13 +589,13 @@ impl ConnectedState {
                         log::warn!(
                             "Received unsupported stream element: {other:?}. Emitting stream error.",
                         );
-                        self.to_stream_error_state(StreamError {
-                            condition: DefinedCondition::UnsupportedStanzaType,
-                            // TODO: figure out a good way to provide the
-                            // sender with more information.
-                            text: None,
-                            application_specific: vec![],
-                        });
+                        // TODO: figure out a good way to provide the sender
+                        // with more information.
+                        self.to_stream_error_state(StreamError::new(
+                            DefinedCondition::UnsupportedStanzaType,
+                            "en",
+                            format!("Unsupported stream element: {other:?}"),
+                        ));
                         Poll::Ready(None)
                     }
 

tokio-xmpp/src/stanzastream/negotiation.rs ๐Ÿ”—

@@ -232,11 +232,12 @@ impl NegotiationState {
                             };
                             log::warn!("Received IQ matching the bind request, but parsing failed ({error})! Emitting stream error.");
                             Poll::Ready(Break(NegotiationResult::StreamError {
-                                error: StreamError {
-                                    condition: DefinedCondition::UndefinedCondition,
-                                    text: Some((None, error)),
-                                    application_specific: vec![super::error::ParseError.into()],
-                                },
+                                error: StreamError::new(
+                                    DefinedCondition::UndefinedCondition,
+                                    "en",
+                                    error,
+                                )
+                                .with_application_specific(vec![super::error::ParseError.into()]),
                             }))
                         }
                         st => {
@@ -258,11 +259,11 @@ impl NegotiationState {
                     Ok(other) => {
                         log::warn!("Received unsupported stream element during bind: {other:?}. Emitting stream error.");
                         Poll::Ready(Break(NegotiationResult::StreamError {
-                            error: StreamError {
-                                condition: DefinedCondition::UnsupportedStanzaType,
-                                text: None,
-                                application_specific: vec![],
-                            },
+                            error: StreamError::new(
+                                DefinedCondition::UnsupportedStanzaType,
+                                "en",
+                                format!("Unsupported stream element during bind: {other:?}"),
+                            ),
                         }))
                     }
 
@@ -483,11 +484,11 @@ impl NegotiationState {
                     Ok(other) => {
                         log::warn!("Received unsupported stream element during negotiation: {other:?}. Emitting stream error.");
                         Poll::Ready(Break(NegotiationResult::StreamError {
-                            error: StreamError {
-                                condition: DefinedCondition::UnsupportedStanzaType,
-                                text: None,
-                                application_specific: vec![],
-                            },
+                            error: StreamError::new(
+                                DefinedCondition::UnsupportedStanzaType,
+                                "en",
+                                format!("Unsupported stream element during negotiation: {other:?}"),
+                            ),
                         }))
                     }
 

tokio-xmpp/src/stanzastream/worker.rs ๐Ÿ”—

@@ -401,11 +401,7 @@ pub(super) fn parse_error_to_stream_error(e: xso::error::Error) -> StreamError {
         Error::TextParseError(_) | Error::Other(_) => DefinedCondition::InvalidXml,
         Error::TypeMismatch => DefinedCondition::UnsupportedStanzaType,
     };
-    StreamError {
-        condition,
-        text: Some((None, e.to_string())),
-        application_specific: vec![],
-    }
+    StreamError::new(condition, "en", e.to_string())
 }
 
 /// Worker system for a [`StanzaStream`].

tokio-xmpp/src/xmlstream/tests.rs ๐Ÿ”—

@@ -9,6 +9,7 @@ use core::time::Duration;
 use futures::{SinkExt, StreamExt};
 
 use xmpp_parsers::{
+    ns,
     stream_error::{DefinedCondition, StreamError},
     stream_features::StreamFeatures,
 };
@@ -28,7 +29,7 @@ async fn test_initiate_accept_stream() {
     let initiator = tokio::spawn(async move {
         let mut stream = initiate_stream(
             tokio::io::BufStream::new(lhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             StreamHeader {
                 from: Some("client".into()),
                 to: Some("server".into()),
@@ -42,7 +43,7 @@ async fn test_initiate_accept_stream() {
     let responder = tokio::spawn(async move {
         let stream = accept_stream(
             tokio::io::BufStream::new(rhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             Timeouts::tight(),
         )
         .await?;
@@ -70,7 +71,7 @@ async fn test_exchange_stream_features() {
     let initiator = tokio::spawn(async move {
         let stream = initiate_stream(
             tokio::io::BufStream::new(lhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             StreamHeader::default(),
             Timeouts::tight(),
         )
@@ -81,7 +82,7 @@ async fn test_exchange_stream_features() {
     let responder = tokio::spawn(async move {
         let stream = accept_stream(
             tokio::io::BufStream::new(rhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             Timeouts::tight(),
         )
         .await?;
@@ -99,15 +100,11 @@ async fn test_exchange_stream_features() {
 #[tokio::test]
 async fn test_handle_early_stream_error() {
     let (lhs, rhs) = tokio::io::duplex(65536);
-    let err = StreamError {
-        condition: DefinedCondition::InternalServerError,
-        text: None,
-        application_specific: Vec::new(),
-    };
+    let err = StreamError::new(DefinedCondition::InternalServerError, "en", "Test error");
     let initiator = tokio::spawn(async move {
         let stream = initiate_stream(
             tokio::io::BufStream::new(lhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             StreamHeader::default(),
             Timeouts::tight(),
         )
@@ -123,7 +120,7 @@ async fn test_handle_early_stream_error() {
         tokio::spawn(async move {
             let stream = accept_stream(
                 tokio::io::BufStream::new(rhs),
-                "jabber:client",
+                ns::JABBER_CLIENT,
                 Timeouts::tight(),
             )
             .await?;
@@ -144,7 +141,7 @@ async fn test_exchange_data() {
     let initiator = tokio::spawn(async move {
         let stream = initiate_stream(
             tokio::io::BufStream::new(lhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             StreamHeader::default(),
             Timeouts::tight(),
         )
@@ -165,7 +162,7 @@ async fn test_exchange_data() {
     let responder = tokio::spawn(async move {
         let stream = accept_stream(
             tokio::io::BufStream::new(rhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             Timeouts::tight(),
         )
         .await?;
@@ -196,7 +193,7 @@ async fn test_clean_shutdown() {
     let initiator = tokio::spawn(async move {
         let stream = initiate_stream(
             tokio::io::BufStream::new(lhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             StreamHeader::default(),
             Timeouts::tight(),
         )
@@ -213,7 +210,7 @@ async fn test_clean_shutdown() {
     let responder = tokio::spawn(async move {
         let stream = accept_stream(
             tokio::io::BufStream::new(rhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             Timeouts::tight(),
         )
         .await?;
@@ -240,7 +237,7 @@ async fn test_exchange_data_stream_reset_and_shutdown() {
     let initiator = tokio::spawn(async move {
         let stream = initiate_stream(
             tokio::io::BufStream::new(lhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             StreamHeader::default(),
             Timeouts::tight(),
         )
@@ -288,7 +285,7 @@ async fn test_exchange_data_stream_reset_and_shutdown() {
     let responder = tokio::spawn(async move {
         let stream = accept_stream(
             tokio::io::BufStream::new(rhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             Timeouts::tight(),
         )
         .await?;
@@ -358,7 +355,7 @@ async fn test_emits_soft_timeout_after_silence() {
     let initiator = tokio::spawn(async move {
         let stream = initiate_stream(
             tokio::io::BufStream::new(lhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             StreamHeader::default(),
             client_timeouts,
         )
@@ -405,7 +402,7 @@ async fn test_emits_soft_timeout_after_silence() {
     let responder = tokio::spawn(async move {
         let stream = accept_stream(
             tokio::io::BufStream::new(rhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             server_timeouts,
         )
         .await?;
@@ -447,7 +444,7 @@ async fn test_can_receive_after_shutdown() {
     let initiator = tokio::spawn(async move {
         let stream = initiate_stream(
             tokio::io::BufStream::new(lhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             StreamHeader::default(),
             Timeouts::tight(),
         )
@@ -478,7 +475,7 @@ async fn test_can_receive_after_shutdown() {
     let responder = tokio::spawn(async move {
         let stream = accept_stream(
             tokio::io::BufStream::new(rhs),
-            "jabber:client",
+            ns::JABBER_CLIENT,
             Timeouts::tight(),
         )
         .await?;