diff --git a/parsers/src/sm.rs b/parsers/src/sm.rs index bc68d81abfe53529bcf8107c69061e08bed14a83..8b0f3e1cc1c60363f87253e9006175f5ab67cbe2 100644 --- a/parsers/src/sm.rs +++ b/parsers/src/sm.rs @@ -180,6 +180,39 @@ impl From for crate::stream_error::StreamError { } } +/// Enum which allows parsing/serialising any XEP-0198 element. +#[derive(FromXml, AsXml, PartialEq, Debug, Clone)] +#[xml()] +pub enum Nonza { + /// Request to enable SM + #[xml(transparent)] + Enable(Enable), + + /// Successful SM enablement response + #[xml(transparent)] + Enabled(Enabled), + + /// Request to resume SM + #[xml(transparent)] + Resume(Resume), + + /// Sucessful SM resumption response + #[xml(transparent)] + Resumed(Resumed), + + /// Error response + #[xml(transparent)] + Failed(Failed), + + /// Acknowledgement + #[xml(transparent)] + Ack(A), + + /// Request for an acknowledgement + #[xml(transparent)] + Req(R), +} + #[cfg(test)] mod tests { use super::*; diff --git a/parsers/src/stream_features.rs b/parsers/src/stream_features.rs index 1dac25731964660980f852976f753cb9690c0b3d..3da2784d97c12d0f326a59146ad5d1bf9ca587b9 100644 --- a/parsers/src/stream_features.rs +++ b/parsers/src/stream_features.rs @@ -43,6 +43,10 @@ pub struct StreamFeatures { #[xml(child(default))] pub sasl_cb: Option, + /// Stream management feature + #[xml(child(default))] + pub stream_management: Option, + /// Other stream features advertised /// /// If some features you use end up here, you may want to contribute diff --git a/tokio-xmpp/src/client/login.rs b/tokio-xmpp/src/client/login.rs index 90aa0a34811c759f082991343e8cb379c78d31cf..48193f5101f9b033b18441eebe8f0f03239eb202 100644 --- a/tokio-xmpp/src/client/login.rs +++ b/tokio-xmpp/src/client/login.rs @@ -103,14 +103,13 @@ pub async fn auth( Err(AuthError::NoMechanism.into()) } -/// Log into an XMPP server as a client with a jid+pass -/// does channel binding if supported -pub async fn client_login( +/// Authenticate to an XMPP server, but do not bind a resource. +pub async fn client_auth( server: C, jid: Jid, password: String, timeouts: Timeouts, -) -> Result<(Option, StreamFeatures, XmppStream), Error> { +) -> Result<(StreamFeatures, XmppStream), Error> { let username = jid.node().unwrap().as_str(); let password = password; @@ -132,7 +131,18 @@ pub async fn client_login( id: None, }) .await?; - let (features, mut stream) = stream.recv_features().await?; + Ok(stream.recv_features().await?) +} + +/// Log into an XMPP server as a client with a jid+pass +/// does channel binding if supported +pub async fn client_login( + server: C, + jid: Jid, + password: String, + timeouts: Timeouts, +) -> Result<(Option, StreamFeatures, XmppStream), Error> { + let (features, mut stream) = client_auth(server, jid.clone(), password, timeouts).await?; // XmppStream bound to user session let full_jid = bind(&mut stream, &features, &jid).await?; diff --git a/tokio-xmpp/src/client/mod.rs b/tokio-xmpp/src/client/mod.rs index 851350a7f5656adfef3ff85b07b714dc38b9921e..dbd784bcb831d82791c9ddca378fe5a2739e703e 100644 --- a/tokio-xmpp/src/client/mod.rs +++ b/tokio-xmpp/src/client/mod.rs @@ -17,7 +17,7 @@ use crate::connect::StartTlsServerConnector; use crate::connect::TcpServerConnector; mod bind; -mod login; +pub(crate) mod login; mod stream; /// XMPP client connection and state diff --git a/tokio-xmpp/src/lib.rs b/tokio-xmpp/src/lib.rs index 5b00e42f4ffcb05435b23196f5180a8fe03632bd..beabf88eeb78fe6e03e8ee52305ff83d3af658d3 100644 --- a/tokio-xmpp/src/lib.rs +++ b/tokio-xmpp/src/lib.rs @@ -49,6 +49,7 @@ compile_error!( mod event; pub use event::{Event, Stanza}; pub mod connect; +pub mod stanzastream; pub mod xmlstream; mod client; diff --git a/tokio-xmpp/src/stanzastream/connected.rs b/tokio-xmpp/src/stanzastream/connected.rs new file mode 100644 index 0000000000000000000000000000000000000000..0438a0e5bf77617f197f9664159fda162c52f5b1 --- /dev/null +++ b/tokio-xmpp/src/stanzastream/connected.rs @@ -0,0 +1,836 @@ +// Copyright (c) 2019 Emmanuel Gil Peyrot +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use core::future::Future; +use core::ops::ControlFlow::{Break, Continue}; +use core::pin::Pin; +use core::task::{Context, Poll}; +use std::io; + +use futures::{ready, Sink, Stream}; + +use xmpp_parsers::{ + jid::Jid, + sm, + stream_error::{DefinedCondition, SentStreamError, StreamError}, + stream_features::StreamFeatures, +}; + +use crate::xmlstream::{ReadError, XmppStreamElement}; +use crate::Stanza; + +use super::negotiation::{NegotiationResult, NegotiationState}; +use super::queue::{QueueEntry, StanzaState, TransmitQueue}; +use super::stream_management::*; +use super::worker::{WorkerEvent, XmppStream, LOCAL_SHUTDOWN_TIMEOUT}; + +#[derive(PartialEq)] +pub(super) enum RxShutdownState { + AwaitingFooter, + AwaitingEof, + Done, +} + +fn local_error_for_stream_error( + io_error: &mut Option, + stream_error: &mut Option, +) -> io::Error { + io_error + .take() + .or_else(|| { + stream_error + .take() + .map(|x| io::Error::new(io::ErrorKind::InvalidData, SentStreamError(x))) + }) + .unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + "unknown local stream error generated", + ) + }) +} + +/// Substate of the [`BackendStream::Connected`] state. +/// +/// Having the substate and its logic in a separate type allows us to +/// circumvent problemns with moving data out of `&mut _` when transitioning +/// between substates. +pub(super) enum ConnectedState { + /// The stream is still being negotiated. + Negotiating { + /// Current state within the negotiations + substate: NegotiationState, + }, + + /// The stream is ready for transceiving. + Ready { + /// Stream management state, if any. + sm_state: Option, + }, + + SendStreamError { + /// Stream error to send. + /// + /// `None` implies that we now only need to flush. + stream_error: Option, + + /// I/O error to return to the caller once the flush is done. + /// + /// If `None`, an error will be synthesised. + io_error: Option, + + /// Deadline until which the error must've been sent and the stream + /// must've been shut down. + deadline: Pin>, + }, + + Failed { + error: Option, + sm_state: Option, + }, + + /// A stream shutdown was initiated locally and we are flushing RX and TX + /// queues. + LocalShutdown { + /// Keep track on whether we have closed the TX side yet. + tx_closed: bool, + + /// Keep track on how shut down the receiving side is. + rx_state: RxShutdownState, + + /// Deadline until which graceful shutdown must complete; if the + /// deadline is exceeded (i.e. the contained Sleep future returns + /// ready), the streams will be dropped (and thus closed by the OS). + deadline: Pin>, + }, + + /// The remote side closed the stream. + RemoteShutdown { + /// Keep the SM state for later resumption. + sm_state: Option, + }, + + /// Local shutdown has completed; this is a final state, as local shutdown + /// signals an intent of stopping the stream forever. + LocalShutdownComplete, +} + +/// Enumeration of events happening while the stream has finished the +/// connection procedures and is established. +pub(super) enum ConnectedEvent { + /// Event generated by the stream worker. + Worker(WorkerEvent), + + /// The remote closed the stream orderly. + RemoteShutdown { sm_state: Option }, + + /// We got disconnected through an error, either an I/O error + /// or some kind of stream error. + Disconnect { + /// Stream management state for later resumption attempts. + sm_state: Option, + + /// The error which caused the disconnect. This is generally not none, + /// but we cannot prove this at compile time because we have to take + /// the error from a mutable (i.e. non-owned) place. + error: Option, + }, + + /// A shutdown was requested by the local side of the stream. + LocalShutdownRequested, +} + +impl ConnectedState { + fn to_stream_error_state(&mut self, stream_error: StreamError) { + *self = Self::SendStreamError { + stream_error: Some(stream_error), + io_error: None, + deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)), + }; + } + + fn to_failed_state(&mut self, error: io::Error, sm_state: Option) { + *self = Self::Failed { + error: Some(error), + sm_state, + }; + } + + fn poll_write_sm_req( + mut sm_state: Option<&mut SmState>, + mut stream: Pin<&mut XmppStream>, + cx: &mut Context<'_>, + ) -> Poll> { + if let Some(sm_state) = sm_state.as_mut() { + // Request is pending. + if sm_state.pending_req { + match ready!(>::poll_ready( + stream.as_mut(), + cx, + )) { + Ok(()) => (), + Err(e) => return Poll::Ready(Err(e)), + } + match stream + .as_mut() + .start_send(&XmppStreamElement::SM(sm::Nonza::Req(sm::R))) + { + Ok(()) => (), + Err(e) => { + // As the stream promised we would be able to + // send, this must be a problem with our + // (locally generated) nonza, i.e. this is + // fatal. + panic!("Failed to send SM Req nonza: {}", e); + } + } + sm_state.pending_req = false; + } + } + Poll::Ready(Ok(())) + } + + fn poll_writes_inner( + mut sm_state: Option<&mut SmState>, + mut stream: Pin<&mut XmppStream>, + transmit_queue: &mut TransmitQueue, + cx: &mut Context<'_>, + ) -> Poll> { + let mut depleted = false; + + // We prefer sending SM reqs before actual data. + // SM requests are used in response to soft timeouts as a way to + // trigger the remote side to send *something* to us. We may be + // sending a lot of data in bulk right now without ever expecting a + // response (or at least not anytime soon). In order to ensure that + // the server will in fact send a message to us soon, we have to send + // the SM request before anything else. + // + // Example scenario: Sending a bunch of MAM ``s over a slow, + // but low-latency link. The MAM response only triggers a response + // from the peer when everything has been transmitted, which may be + // longer than the stream timeout. + ready!(Self::poll_write_sm_req( + match sm_state { + None => None, + Some(ref mut v) => Some(v), + }, + stream.as_mut(), + cx + ))?; + + let mut transmitted = false; + // We prefer sending actual data before stream-management ACKs. + // While the other side may be waiting for our ACK, we are not obliged + // to send it straight away (XEP-0198 explicitly allows us to delay it + // for some implementation-defined time if we have stuff to send. Our + // implementation-defined time is "infinity"), so we try to make + // progress on real data. + loop { + // If either the queue has nothing for us or the stream isn't + // ready to send, we break out of the loop. We don't use ready! + // here because we may have SM ACKs to send. + let next = match transmit_queue.poll_next(cx) { + Poll::Ready(Some(v)) => v, + Poll::Ready(None) => { + // The transmit_queue is empty, so we set `depleted` to + // true in order to ensure that we return Ready if all SM + // acks also have been transmitted. + depleted = true; + break; + } + Poll::Pending => break, + }; + // If the stream isn't ready to send, none of the other things can + // be sent either, so we can use ready!. + match ready!(>::poll_ready( + stream.as_mut(), + cx + )) { + Ok(()) => (), + Err(e) => return Poll::Ready(Err(e)), + } + // We now either send the item or "die trying". It must + // be removed from the queue, because even if it fails to + // serialise, we don't want to reattempt sending it ( + // unless by SM resumption retransmission). + let next = next.take(); + match stream.as_mut().start_send(&next.stanza) { + Ok(()) => { + if let Some(sm_state) = sm_state.as_mut() { + sm_state.enqueue(next); + } + transmitted = true; + } + // Serialisation error, report back to the queue item. + Err(e) => { + next.token + .send_replace(StanzaState::Failed { error: e.into() }); + } + } + } + + if let Some(sm_state) = sm_state.as_mut() { + // We can set it to transmitted directly, because it has been + // cleared by the previous call to poll_write_sm_req. + sm_state.pending_req = transmitted; + ready!(Self::poll_write_sm_req(Some(sm_state), stream.as_mut(), cx))?; + } + + // Now, if the stream will let us and we need to, we can tack + // on some SM ACKs. + if let Some(sm_state) = sm_state { + while sm_state.pending_acks > 0 { + match ready!(>::poll_ready( + stream.as_mut(), + cx, + )) { + Ok(()) => (), + Err(e) => return Poll::Ready(Err(e)), + } + match stream + .as_mut() + .start_send(&XmppStreamElement::SM(sm::Nonza::Ack(sm::A { + h: sm_state.inbound_ctr(), + }))) { + Ok(()) => (), + Err(e) => { + // As the stream promised we would be able to + // send, this must be a problem with our + // (locally generated) nonza, i.e. this is + // fatal. + panic!("Failed to send SM Ack nonza: {}", e); + } + } + sm_state.pending_acks -= 1; + } + } + + // If we haven't transmitted, we may also not have polled + // the stream for readiness or flushing. We need to do + // that here to ensure progress. Even if our tx queue is + // empty, the tx buffer may be nonempty + match ready!(>::poll_flush( + stream.as_mut(), + cx + )) { + Ok(()) => (), + Err(e) => return Poll::Ready(Err(e)), + } + + // If we end up here, all data we currently have has been + // transmitted via the stream and the stream's tx buffers have + // been properly flushed. + if depleted { + // And here, we know that the transmit queue is closed, + // too. We return with success. + Poll::Ready(Ok(())) + } else { + // The transmit queue is still open, so more data could + // pour in to be transmitted. + Poll::Pending + } + } + + /// Drive the stream in transmit simplex mode. + /// + /// This will block forever (i.e. return [`Poll::Pending`] without + /// installing a waker) if the stream is currently being negotiated. + /// Otherwise, it will attempt to drain the `transmit_queue`. When the + /// queue is empty, `Ok(())` is returned. When write errors occur, this + /// will also block forever. + /// + /// If nothing is to be sent, but the stream could be used for sending, + /// this will drive the flush part of the inner stream. + /// + /// Any errors are reported on the next call to `poll`. + pub fn poll_writes( + &mut self, + stream: Pin<&mut XmppStream>, + transmit_queue: &mut TransmitQueue, + cx: &mut Context<'_>, + ) -> Poll<()> { + match self { + Self::Ready { sm_state, .. } => match ready!(Self::poll_writes_inner( + sm_state.as_mut(), + stream, + transmit_queue, + cx + )) { + Ok(()) => Poll::Ready(()), + Err(e) => { + *self = Self::Failed { + error: Some(e), + sm_state: sm_state.take(), + }; + Poll::Pending + } + }, + + _ => Poll::Pending, + } + } + + /// Drive the stream in full-duplex mode. + /// + /// Stanzas from the `transmit_queue` are transmitted once the stream is + /// ready. + /// + /// Returns: + /// - Poll::Pending if it blocks on the inner stream + /// - Poll::Ready(None) if it needs to be called again for a proper result + /// - Poll::Ready(Some(.)) when it has proper result + pub fn poll( + &mut self, + mut stream: Pin<&mut XmppStream>, + jid: &Jid, + features: &StreamFeatures, + transmit_queue: &mut TransmitQueue, + cx: &mut Context<'_>, + ) -> Poll> { + match self { + Self::Negotiating { ref mut substate } => { + match ready!(substate.advance(stream, jid, transmit_queue, cx)) { + Break(NegotiationResult::Disconnect { sm_state, error }) => { + self.to_failed_state(error, sm_state); + Poll::Ready(None) + } + Break(NegotiationResult::StreamReset { + sm_state, + bound_jid, + }) => { + *self = Self::Ready { sm_state }; + Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Reset { + bound_jid, + features: features.clone(), + }))) + } + Break(NegotiationResult::StreamResumed { sm_state }) => { + *self = Self::Ready { + sm_state: Some(sm_state), + }; + Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Resumed))) + } + Break(NegotiationResult::StreamError { error }) => { + self.to_stream_error_state(error); + Poll::Ready(None) + } + Continue(Some(stanza)) => { + Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(stanza)))) + } + Continue(None) => Poll::Ready(None), + } + } + + Self::SendStreamError { + ref mut stream_error, + ref mut io_error, + ref mut deadline, + } => { + match stream.as_mut().poll_next(cx) { + Poll::Pending + | Poll::Ready(None) + | Poll::Ready(Some(Err(ReadError::StreamFooterReceived))) + | Poll::Ready(Some(Err(ReadError::SoftTimeout))) => (), + Poll::Ready(Some(Ok(ev))) => { + log::trace!("Discarding incoming data while sending stream error: {ev:?}") + } + Poll::Ready(Some(Err(ReadError::ParseError(e)))) => { + log::trace!("Ignoring parse error while sending stream error: {e}") + } + Poll::Ready(Some(Err(ReadError::HardError(e)))) => { + log::warn!("I/O error while sending stream error: {e}") + } + } + + match deadline.as_mut().poll(cx) { + Poll::Pending => (), + Poll::Ready(()) => { + log::debug!("Timeout while sending stream error. Discarding state."); + let error = local_error_for_stream_error(io_error, stream_error); + self.to_failed_state(error, None); + return Poll::Ready(None); + } + } + + // Cannot use ready! here because we have to consider the + // case where the other side is refusing to accept data + // because its outgoing buffer is too full. + if stream_error.is_some() { + match ready!(>::poll_ready( + stream.as_mut(), + cx + )) + .and_then(|()| { + // The take serves as transition to the next state. + let stream_error = stream_error.take().unwrap(); + let result = stream.as_mut().start_send(&stream_error); + *io_error = Some(local_error_for_stream_error( + io_error, + &mut Some(stream_error), + )); + result + }) { + Ok(()) => (), + Err(e) => { + log::debug!("Got I/O error while sending stream error: {e}. Skipping error transmission."); + let error = local_error_for_stream_error(io_error, stream_error); + self.to_failed_state(error, None); + return Poll::Ready(None); + } + } + } + + match ready!(>::poll_flush( + stream.as_mut(), + cx + )) { + Ok(()) => (), + Err(e) => { + log::debug!( + "Got I/O error while flushing stream error: {e}. Skipping flush.", + ); + } + } + log::trace!("Stream error send complete, transitioning to Failed state"); + *self = Self::Failed { + error: Some(local_error_for_stream_error(io_error, stream_error)), + // Do *not* resume after we caused a stream error. + sm_state: None, + }; + + // Request the caller to call us again to get the + // actual error message. + Poll::Ready(None) + } + + Self::Ready { ref mut sm_state } => { + match Self::poll_writes_inner( + sm_state.as_mut(), + stream.as_mut(), + transmit_queue, + cx, + ) { + Poll::Pending => (), + Poll::Ready(Ok(())) => { + *self = Self::LocalShutdown { + rx_state: RxShutdownState::AwaitingFooter, + tx_closed: false, + deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)), + }; + return Poll::Ready(Some(ConnectedEvent::LocalShutdownRequested)); + } + Poll::Ready(Err(e)) => { + *self = Self::Failed { + error: Some(e), + sm_state: sm_state.take(), + }; + return Poll::Ready(None); + } + } + + let item = ready!(stream.poll_next(cx)); + // We switch to a TxOpen or non-connected state when we + // receive the stream footer, so reading `None` from the + // stream is always an unclean closure. + let item = item.unwrap_or_else(|| { + Err(ReadError::HardError(io::Error::new( + io::ErrorKind::UnexpectedEof, + "eof before stream footer", + ))) + }); + match item { + // Easy case, we got some data. + Ok(XmppStreamElement::Stanza(data)) => { + Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(data)))) + } + + Ok(XmppStreamElement::SM(sm::Nonza::Ack(ack))) => { + if let Some(sm_state) = sm_state { + match sm_state.remote_acked(ack.h) { + Ok(()) => Poll::Ready(None), + Err(e) => { + log::error!( + "Failed to process sent by the server: {e}", + ); + self.to_stream_error_state(e.into()); + return Poll::Ready(None); + } + } + } else { + log::debug!("Hmm... I got an from the peer, but I don't have a stream management state. I'm gonna ignore that..."); + Poll::Ready(None) + } + } + + Ok(XmppStreamElement::SM(sm::Nonza::Req(_))) => { + if let Some(sm_state) = sm_state { + match sm_state.pending_acks.checked_add(1) { + None => panic!("Too many pending ACKs, something is wrong."), + Some(v) => sm_state.pending_acks = v, + } + } else { + log::warn!("Got an from the peer, but we don't have any stream management state. Terminating stream with an error."); + self.to_stream_error_state(StreamError { + condition: DefinedCondition::UnsupportedStanzaType, + text: Some(( + None, + "received , but stream management is not enabled" + .to_owned(), + )), + application_specific: vec![], + }); + } + // No matter whether we "enqueued" an ACK for send or + // whether we just successfully read something from + // the stream, we have to request to be polled again + // right away. + Poll::Ready(None) + } + + Ok(other) => { + log::warn!( + "Received unsupported stream element: {other:?}. Emitting stream error.", + ); + self.to_stream_error_state(StreamError { + condition: DefinedCondition::UnsupportedStanzaType, + // TODO: figure out a good way to provide the + // sender with more information. + text: None, + application_specific: vec![], + }); + Poll::Ready(None) + } + + // Another easy case: Soft timeouts are passed through + // to the caller for handling. + Err(ReadError::SoftTimeout) => { + Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::SoftTimeout))) + } + + // Parse errors are also just passed through (and will + // likely cause us to send a stream error). + Err(ReadError::ParseError(e)) => { + Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::ParseError(e)))) + } + + // I/O errors cause the stream to be considerde + // broken; we drop it and send a Disconnect event with + // the error embedded. + Err(ReadError::HardError(e)) => { + let sm_state = sm_state.take(); + Poll::Ready(Some(ConnectedEvent::Disconnect { + sm_state, + error: Some(e), + })) + } + + // Stream footer indicates the remote wants to shut this + // stream down. + // We transition into RemoteShutdown state which makes us + // emit a special event until the caller takes care of it. + Err(ReadError::StreamFooterReceived) => { + *self = Self::RemoteShutdown { + sm_state: sm_state.take(), + }; + + // Let us be called again immediately to emit the + // notification. + Poll::Ready(None) + } + } + } + + Self::Failed { sm_state, error } => Poll::Ready(Some(ConnectedEvent::Disconnect { + error: error.take(), + sm_state: sm_state.take(), + })), + + Self::LocalShutdown { .. } | Self::LocalShutdownComplete => { + panic!("poll_next called in local shutdown"); + } + + Self::RemoteShutdown { ref mut sm_state } => { + Poll::Ready(Some(ConnectedEvent::RemoteShutdown { + sm_state: sm_state.take(), + })) + } + } + } + + pub(super) fn poll_close( + &mut self, + mut stream: Pin<&mut XmppStream>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + match self { + // User initiates shutdown by local choice. + // The actual shutdown is driven by the poll_read function and we + // only get woken up via the close_poller waker. + Self::Ready { .. } | Self::RemoteShutdown { .. } | Self::Negotiating { .. } => { + *self = Self::LocalShutdown { + rx_state: RxShutdownState::AwaitingFooter, + tx_closed: false, + deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)), + }; + } + + Self::Failed { error, .. } => match error.take() { + Some(error) => return Poll::Ready(Err(error)), + None => return Poll::Ready(Ok(())), + }, + + // If close is called while an attempt is made to send the + // stream error, we abort transmission. + Self::SendStreamError { .. } => { + log::debug!("close() called while stream error was being sent. Aborting transmission of stream error."); + *self = Self::LocalShutdown { + rx_state: RxShutdownState::AwaitingFooter, + tx_closed: false, + deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)), + }; + } + + // Wait for local shutdown (driven by poll_read) to complete. + Self::LocalShutdown { + ref mut deadline, + ref mut rx_state, + ref mut tx_closed, + } => { + match deadline.as_mut().poll(cx) { + Poll::Ready(()) => { + log::debug!("Dropping stream after shutdown timeout was exceeded."); + *self = Self::LocalShutdownComplete; + return Poll::Ready(Ok(())); + } + Poll::Pending => (), + } + + if !*tx_closed { + // We cannot use ready! here, because we want to poll the + // receiving side in parallel. + match stream.as_mut().poll_shutdown(cx) { + Poll::Pending => (), + Poll::Ready(Ok(())) => { + *tx_closed = true; + } + Poll::Ready(Err(e)) => { + log::debug!( + "Ignoring write error during local stream shutdown: {e}" + ); + *tx_closed = true; + } + } + } + + match rx_state { + RxShutdownState::Done => { + if !*tx_closed { + // poll_close() returned Poll::Pending, so we have to + // return that, too. + return Poll::Pending; + } + } + // We can use ready! here because the `poll_close` has + // happened already; we don't want to poll anything else + // anymore. + _ => loop { + match ready!(stream.as_mut().poll_next(cx)) { + None => { + if *rx_state != RxShutdownState::AwaitingEof { + log::debug!("Ignoring early EOF during stream shutdown."); + } + *rx_state = RxShutdownState::Done; + break; + } + Some(Ok(data)) => { + log::debug!("Ignoring data received on stream during local shutdown: {data:?}"); + } + Some(Err(ReadError::SoftTimeout)) => (), + Some(Err(ReadError::HardError(e))) => { + *rx_state = RxShutdownState::Done; + log::debug!("Ignoring read error during local shutdown: {e}"); + break; + } + Some(Err(ReadError::ParseError(e))) => { + log::debug!( + "Ignoring parse error during local shutdown: {}", + e + ); + } + Some(Err(ReadError::StreamFooterReceived)) => match rx_state { + RxShutdownState::AwaitingFooter => { + *rx_state = RxShutdownState::AwaitingEof; + } + RxShutdownState::AwaitingEof => { + unreachable!("multiple stream footers?!") + } + RxShutdownState::Done => unreachable!(), + }, + } + }, + } + + if *tx_closed && *rx_state == RxShutdownState::Done { + // Now that everything is properly cleaned up on the + // xmlstream layer, we go through with closure. + ready!(>::poll_close( + stream.as_mut(), + cx + ))?; + // And now that's done, we can finally call it a day. + *self = Self::LocalShutdownComplete; + return Poll::Ready(Ok(())); + } else { + return Poll::Pending; + } + } + + Self::LocalShutdownComplete => return Poll::Ready(Ok(())), + } + } + } + + pub(super) fn start_send_stream_error(&mut self, error: StreamError) { + match self { + Self::LocalShutdownComplete + | Self::LocalShutdown { .. } + | Self::RemoteShutdown { .. } + | Self::Failed { .. } => { + log::debug!("Request to send stream error ({error}), but we are already shutting down or have already failed. Discarding."); + return; + } + + Self::Ready { .. } | Self::Negotiating { .. } => {} + + Self::SendStreamError { .. } => { + log::debug!("Request to send stream error ({error}) while transmission of another stream error is already in progress. Discarding the new one."); + return; + } + } + + *self = Self::SendStreamError { + deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)), + stream_error: Some(error), + io_error: None, + }; + } + + pub fn queue_sm_request(&mut self) -> bool { + match self { + Self::Ready { sm_state, .. } => { + if let Some(sm_state) = sm_state { + sm_state.pending_req = true; + true + } else { + false + } + } + _ => false, + } + } +} diff --git a/tokio-xmpp/src/stanzastream/error.rs b/tokio-xmpp/src/stanzastream/error.rs new file mode 100644 index 0000000000000000000000000000000000000000..cf91a98e1d8fb82799d0130d9b56cd574db908ad --- /dev/null +++ b/tokio-xmpp/src/stanzastream/error.rs @@ -0,0 +1,16 @@ +// Copyright (c) 2019 Emmanuel Gil Peyrot +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use xso::{AsXml, FromXml}; + +static NS: &str = "https://xmlns.xmpp.rs/stream-errors"; + +/// Represents a parse error. +/// +/// Details are found in the ``. +#[derive(FromXml, AsXml, Debug)] +#[xml(namespace = NS, name = "parse-error")] +pub struct ParseError; diff --git a/tokio-xmpp/src/stanzastream/mod.rs b/tokio-xmpp/src/stanzastream/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..9829fa800d14b8b36055d898a0f0da506d9aa72c --- /dev/null +++ b/tokio-xmpp/src/stanzastream/mod.rs @@ -0,0 +1,268 @@ +// Copyright (c) 2019 Emmanuel Gil Peyrot +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +//! # Resilient stanza stream +//! +//! This module provides the [`StanzaStream`], which is the next level up from +//! the low-level [`XmlStream`][`crate::xmlstream::XmlStream`]. +//! +//! The stanza stream knows about XMPP and it most importantly knows how to +//! fix a broken connection with a reconnect and how to do this smoothly using +//! [XEP-0198 (Stream Management)](https://xmpp.org/extensions/xep-0198.html). +//! XEP-0198 is only used if the peer supports it. If the peer does not +//! support XEP-0198, automatic reconnects are still done, but with more +//! undetectable data loss. +//! +//! The main API entrypoint for the stanza stream is, unsurprisingly, +//! [`StanzaStream`]. + +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::time::Duration; + +// TODO: ensure that IDs are always set on stanzas. + +// TODO: figure out what to do with the mpsc::Sender on lossy +// stream reconnects. Keeping it may cause stanzas to be sent which weren't +// meant for that stream, replacing it is racy. + +use futures::{SinkExt, Stream}; + +use tokio::sync::{mpsc, oneshot}; + +use xmpp_parsers::{jid::Jid, stream_features::StreamFeatures}; + +use crate::connect::ServerConnector; +use crate::xmlstream::Timeouts; +use crate::Stanza; + +mod connected; +mod error; +mod negotiation; +mod queue; +mod stream_management; +mod worker; + +use self::queue::QueueEntry; +pub use self::queue::{StanzaStage, StanzaState, StanzaToken}; +pub use self::worker::{Connection, XmppStream}; +use self::worker::{StanzaStreamWorker, LOCAL_SHUTDOWN_TIMEOUT}; + +/// Event informing about the change of the [`StanzaStream`]'s status. +#[derive(Debug)] +pub enum StreamEvent { + /// The stream was (re-)established **with** loss of state. + Reset { + /// The new JID to which the stream is bound. + bound_jid: Jid, + + /// The features reported by the stream. + features: StreamFeatures, + }, + + /// The stream is currently inactive because a connection was lost. + /// + /// Resumption without loss of state is still possible. This event is + /// merely informative and may be used to prolong timeouts or inform the + /// user that the connection is currently unstable. + Suspended, + + /// The stream was reestablished **without** loss of state. + /// + /// This is merely informative. Potentially useful to prolong timeouts. + Resumed, +} + +/// Event emitted by the [`StanzaStream`]. +/// +/// Note that stream closure is not an explicit event, but the end of the +/// event stream itself. +#[derive(Debug)] +pub enum Event { + /// The streams connectivity status has changed. + Stream(StreamEvent), + + /// A stanza was received over the stream. + Stanza(Stanza), +} + +/// Frontend interface to a reliable, always-online stanza stream. +pub struct StanzaStream { + rx: mpsc::Receiver, + tx: mpsc::Sender, +} + +impl StanzaStream { + /// Establish a new client-to-server stream using the given + /// [`ServerConnector`]. + /// + /// `jid` and `password` must be the user account's credentials. `jid` may + /// either be a bare JID (to let the server choose a resource) or a full + /// JID (to request a specific resource from the server, with no guarantee + /// of succcess). + /// + /// `timeouts` controls the responsiveness to connection interruptions + /// on the underlying transports. Please see the [`Timeouts`] struct's + /// documentation for hints on how to correctly configure this. + /// + /// The `queue_depth` controls the sizes for the incoming and outgoing + /// stanza queues. If the size is exceeded, the corresponding direction + /// will block until the queues can be flushed. Note that the respective + /// reverse direction is not affected (i.e. if your outgoing queue is + /// full for example because of a slow server, you can still receive + /// data). + pub fn new_c2s( + server: C, + jid: Jid, + password: String, + timeouts: Timeouts, + queue_depth: usize, + ) -> Self { + let reconnector = Box::new( + move |_preferred_location: Option, slot: oneshot::Sender| { + let jid = jid.clone(); + let server = server.clone(); + let password = password.clone(); + tokio::spawn(async move { + const MAX_DELAY: Duration = Duration::new(30, 0); + let mut delay = Duration::new(1, 0); + loop { + log::debug!("Starting new connection as {}", jid); + match crate::client::login::client_auth( + server.clone(), + jid.clone(), + password.clone(), + timeouts, + ) + .await + { + Ok((features, stream)) => { + log::debug!("Connection as {} established", jid); + let stream = stream.box_stream(); + let Err(mut conn) = slot.send(Connection { + stream, + features, + identity: jid, + }) else { + // Send succeeded, we're done here. + return; + }; + + log::debug!("StanzaStream dropped, attempting graceful termination of fresh stream."); + // Send failed, i.e. the stanzastream is dead. Let's + // be polite and close this stream cleanly. + // We don't care whether that works, though, we + // just want to release the resources after a + // defined amount of time. + let _: Result<_, _> = tokio::time::timeout( + LOCAL_SHUTDOWN_TIMEOUT, + >::close(&mut conn.stream), + ) + .await; + return; + } + Err(e) => { + // TODO: auth errors should probably be fatal?? + log::error!("Failed to connect: {}. Retrying in {:?}.", e, delay); + tokio::time::sleep(delay).await; + delay = delay * 2; + if delay > MAX_DELAY { + delay = MAX_DELAY; + } + } + } + } + }); + }, + ); + Self::new(reconnector, queue_depth) + } + + /// Create a new stanza stream. + /// + /// Stanza streams operate using a `connector` which is responsible for + /// producing a new stream whenever necessary. It is the connector's + /// responsibility that: + /// + /// - It never fails to send to the channel it is given. If the connector + /// drops the channel, the `StanzaStream` will consider this fatal and + /// fail the stream. + /// + /// - All streams are authenticated and secured as necessary. + /// + /// - All streams are authenticated for the same entity. If the connector + /// were to provide streams for different identities, information leaks + /// could occur as queues from previous sessions are being flushed on + /// the new stream on a reconnect. + /// + /// Most notably, the `connector` is **not** responsible for performing + /// resource binding: Resource binding is handled by the `StanzaStream`. + /// + /// `connector` will be called soon after `new()` was called to establish + /// the first underlying stream for the `StanzaStream`. + /// + /// The `queue_depth` controls the sizes for the incoming and outgoing + /// stanza queues. If the size is exceeded, the corresponding direction + /// will block until the queues can be flushed. Note that the respective + /// reverse direction is not affected (i.e. if your outgoing queue is + /// full for example because of a slow server, you can still receive + /// data). + pub fn new( + connector: Box, oneshot::Sender) + Send + 'static>, + queue_depth: usize, + ) -> Self { + // c2f = core to frontend, f2c = frontend to core + let (f2c_tx, c2f_rx) = StanzaStreamWorker::spawn(connector, queue_depth); + Self { + tx: f2c_tx, + rx: c2f_rx, + } + } + + async fn assert_send(&self, cmd: QueueEntry) { + match self.tx.send(cmd).await { + Ok(()) => (), + Err(_) => panic!("Stream closed or the stream's background workers have crashed."), + } + } + + /// Close the stream. + /// + /// This will initiate a clean shutdown of the stream and will prevent and + /// cancel any more reconnection attempts. + pub async fn close(mut self) { + drop(self.tx); // closes stream. + while let Some(ev) = self.rx.recv().await { + log::trace!("discarding event {:?} after stream closure", ev); + } + } + + /// Send a stanza via the stream. + /// + /// Note that completion of this function merely signals that the stanza + /// has been enqueued successfully: it may be stuck in the transmission + /// queue for quite a while if the stream is currently disconnected. The + /// transmission progress can be observed via the returned + /// [`StanzaToken`]. + /// + /// # Panics + /// + /// If the stream has failed catastrophically (i.e. due to a software + /// bug), this function may panic. + pub async fn send(&self, stanza: Box) -> StanzaToken { + let (queue_entry, token) = QueueEntry::tracked(stanza); + self.assert_send(queue_entry).await; + token + } +} + +impl Stream for StanzaStream { + type Item = Event; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.rx.poll_recv(cx) + } +} diff --git a/tokio-xmpp/src/stanzastream/negotiation.rs b/tokio-xmpp/src/stanzastream/negotiation.rs new file mode 100644 index 0000000000000000000000000000000000000000..f045ba241a4964def484e78e3b0b4f5dec36fdb5 --- /dev/null +++ b/tokio-xmpp/src/stanzastream/negotiation.rs @@ -0,0 +1,534 @@ +// Copyright (c) 2019 Emmanuel Gil Peyrot +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use core::ops::ControlFlow::{self, Break, Continue}; +use core::pin::Pin; +use core::task::{Context, Poll}; +use std::io; + +use futures::{ready, Sink, Stream}; + +use xmpp_parsers::{ + bind::{BindQuery, BindResponse}, + iq::{Iq, IqType}, + jid::{FullJid, Jid}, + sm, + stream_error::{DefinedCondition, StreamError}, + stream_features::StreamFeatures, +}; + +use crate::xmlstream::{ReadError, XmppStreamElement}; +use crate::Stanza; + +use super::queue::{QueueEntry, TransmitQueue}; +use super::stream_management::*; +use super::worker::{parse_error_to_stream_error, XmppStream}; + +static BIND_REQ_ID: &str = "resource-binding"; + +pub(super) enum NegotiationState { + /// Send request to enable or resume stream management. + SendSmRequest { + /// Stream management state to use. If present, resumption will be + /// attempted. Otherwise, a fresh session will be established. + sm_state: Option, + + /// If the stream has been freshly bound, we carry the bound JID along + /// with us. + bound_jid: Option, + }, + + /// Await the response to the SM enable/resume request. + ReceiveSmResponse { + /// State to use. + sm_state: Option, + + /// If the stream has been freshly bound, we carry the bound JID along + /// with us. + bound_jid: Option, + }, + + /// Send a new request to bind to a resource. + SendBindRequest { sm_supported: bool }, + + /// Receive the bind response. + ReceiveBindResponse { sm_supported: bool }, +} + +/// The ultimate result of a stream negotiation. +pub(super) enum NegotiationResult { + /// An unplanned disconnect happened or a stream error was received from + /// the remote party. + Disconnect { + /// Stream management state for a later resumption attempt. + sm_state: Option, + + /// I/O error which came along the disconnect. + error: io::Error, + }, + + /// The negotiation completed successfully, but the stream was reset (i.e. + /// stream management and all session state was lost). + StreamReset { + /// Stream management state. This may still be non-None if the new + /// stream has successfully negotiated stream management. + sm_state: Option, + + /// The JID to which the stream is now bound. + bound_jid: Jid, + }, + + /// The negotiation completed successfully and a previous session was + /// resumed. + StreamResumed { + /// Negotiated stream management state. + sm_state: SmState, + }, + + /// The negotiation failed and we need to emit a stream error. + /// + /// **Note:** Stream errors *received* from the peer are signalled using + /// [`Self::Disconnect`] instead, with an I/O error of kind `Other`. + StreamError { + /// Stream error to send to the remote party with details about the + /// failure. + error: StreamError, + }, +} + +impl NegotiationState { + pub fn new(features: &StreamFeatures, sm_state: Option) -> io::Result { + match sm_state { + Some(sm_state) => { + if features.stream_management.is_some() { + return Ok(Self::SendSmRequest { + sm_state: Some(sm_state), + bound_jid: None, + }); + } else { + log::warn!("Peer is not offering stream management anymore. Dropping state."); + } + } + None => (), + } + + if !features.can_bind() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Peer is not offering the bind feature. Cannot proceed with stream negotiation.", + )); + } + + Ok(Self::SendBindRequest { + sm_supported: features.stream_management.is_some(), + }) + } + + fn flush(stream: Pin<&mut XmppStream>, cx: &mut Context) -> ControlFlow { + match >::poll_flush(stream, cx) { + Poll::Pending | Poll::Ready(Ok(())) => Continue(()), + Poll::Ready(Err(error)) => Break(error), + } + } + + pub fn advance( + &mut self, + mut stream: Pin<&mut XmppStream>, + jid: &Jid, + transmit_queue: &mut TransmitQueue, + cx: &mut Context<'_>, + ) -> Poll>> { + // When sending requests, we need to wait for the stream to become + // ready to send and then send the corresponding request. + // Note that if this wasn't a fresh stream (which it always is!), + // doing it in this kind of simplex fashion could lead to deadlocks + // (because we are blockingly sending without attempting to receive: a + // peer could stop receiving from our side if their tx buffer was too + // full or smth). However, because this stream is fresh, we know that + // our tx buffers are empty enough that this will succeed quickly, so + // that we can proceed. + // TODO: define a deadline for negotiation. + match self { + Self::SendBindRequest { sm_supported } => { + match ready!(>::poll_ready( + stream.as_mut(), + cx + )) { + // We can send. + Ok(()) => (), + + // Stream broke horribly. + Err(error) => { + return Poll::Ready(Break(NegotiationResult::Disconnect { + sm_state: None, + error, + })) + } + }; + + let resource = jid.resource().map(|x| x.as_str().to_owned()); + let stanza = Iq::from_set(BIND_REQ_ID, BindQuery::new(resource)); + match stream.start_send(&stanza) { + Ok(()) => (), + Err(e) => panic!("failed to serialize BindQuery: {}", e), + }; + + *self = Self::ReceiveBindResponse { + sm_supported: *sm_supported, + }; + Poll::Ready(Continue(None)) + } + + Self::ReceiveBindResponse { sm_supported } => { + match Self::flush(stream.as_mut(), cx) { + Break(error) => { + return Poll::Ready(Break(NegotiationResult::Disconnect { + sm_state: None, + error, + })) + } + Continue(()) => (), + } + + let item = ready!(stream.poll_next(cx)); + let item = item.unwrap_or_else(|| { + Err(ReadError::HardError(io::Error::new( + io::ErrorKind::UnexpectedEof, + "eof before stream footer", + ))) + }); + + match item { + Ok(XmppStreamElement::Stanza(data)) => match data { + Stanza::Iq(iq) if iq.id == BIND_REQ_ID => { + let error = match iq.payload { + IqType::Result(Some(payload)) => { + match BindResponse::try_from(payload) { + Ok(v) => { + let bound_jid = v.into(); + if *sm_supported { + *self = Self::SendSmRequest { + sm_state: None, + bound_jid: Some(bound_jid), + }; + return Poll::Ready(Continue(None)); + } else { + return Poll::Ready(Break( + NegotiationResult::StreamReset { + sm_state: None, + bound_jid: Jid::from(bound_jid), + }, + )); + } + } + Err(e) => e.to_string(), + } + } + IqType::Result(None) => "Bind response has no payload".to_owned(), + _ => "Unexpected IQ type in response to bind request".to_owned(), + }; + log::warn!("Received IQ matching the bind request, but parsing failed ({error})! Emitting stream error."); + Poll::Ready(Break(NegotiationResult::StreamError { + error: StreamError { + condition: DefinedCondition::UndefinedCondition, + text: Some((None, error)), + application_specific: vec![super::error::ParseError.into()], + }, + })) + } + st => { + log::warn!("Received unexpected stanza before response to bind request: {st:?}. Dropping."); + Poll::Ready(Continue(None)) + } + }, + + Ok(XmppStreamElement::StreamError(error)) => { + log::debug!("Received stream:error, failing stream and discarding any stream management state."); + let error = io::Error::new(io::ErrorKind::Other, error); + transmit_queue.fail(&(&error).into()); + Poll::Ready(Break(NegotiationResult::Disconnect { + error, + sm_state: None, + })) + } + + Ok(other) => { + log::warn!("Received unsupported stream element during bind: {other:?}. Emitting stream error."); + Poll::Ready(Break(NegotiationResult::StreamError { + error: StreamError { + condition: DefinedCondition::UnsupportedStanzaType, + text: None, + application_specific: vec![], + }, + })) + } + + // Soft timeouts during negotiation are a bad sign + // (because we already prompted the server to send + // something and are waiting for it), but also nothing + // to write home about. + Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)), + + // Parse errors during negotiation cause an unconditional + // stream error. + Err(ReadError::ParseError(e)) => { + Poll::Ready(Break(NegotiationResult::StreamError { + error: parse_error_to_stream_error(e), + })) + } + + // I/O errors cause the stream to be considered + // broken; we drop it and send a Disconnect event with + // the error embedded. + Err(ReadError::HardError(error)) => { + Poll::Ready(Break(NegotiationResult::Disconnect { + sm_state: None, + error, + })) + } + + // Stream footer during negotation is really weird. + // We kill the stream immediately with an error + // (but allow preservation of the SM state). + Err(ReadError::StreamFooterReceived) => { + Poll::Ready(Break(NegotiationResult::Disconnect { + sm_state: None, + error: io::Error::new( + io::ErrorKind::InvalidData, + "stream footer received during negotation", + ), + })) + } + } + } + + Self::SendSmRequest { + sm_state, + bound_jid, + } => { + match ready!(>::poll_ready( + stream.as_mut(), + cx + )) { + // We can send. + Ok(()) => (), + + // Stream broke horribly. + Err(error) => { + return Poll::Ready(Break(NegotiationResult::Disconnect { + sm_state: sm_state.take(), + error, + })) + } + }; + + let nonza = if let Some((id, inbound_ctr)) = + sm_state.as_ref().and_then(|x| x.resume_info()) + { + // Attempt resumption + sm::Nonza::Resume(sm::Resume { + h: inbound_ctr, + previd: sm::StreamId(id.to_owned()), + }) + } else { + // Attempt enabling + sm::Nonza::Enable(sm::Enable { + max: None, + resume: true, + }) + }; + match stream.start_send(&XmppStreamElement::SM(nonza)) { + Ok(()) => (), + Err(e) => { + // We panic here, instead of returning an + // error, because after we confirmed via + // poll_ready that the stream is ready to + // send, the only error returned by start_send + // can be caused by our data. + panic!("Failed to send SM nonza: {}", e); + } + } + + *self = Self::ReceiveSmResponse { + sm_state: sm_state.take(), + bound_jid: bound_jid.take(), + }; + // Ask caller to poll us again immediately in order to + // start flushing the stream. + Poll::Ready(Continue(None)) + } + + Self::ReceiveSmResponse { + sm_state, + bound_jid, + } => { + match Self::flush(stream.as_mut(), cx) { + Break(error) => { + return Poll::Ready(Break(NegotiationResult::Disconnect { + sm_state: sm_state.take(), + error, + })) + } + Continue(()) => (), + } + + // So the difficulty here is that there's a possibility + // that we receive non-SM data while the SM negotiation + // is going on. + + let item = ready!(stream.poll_next(cx)); + let item = item.unwrap_or_else(|| { + Err(ReadError::HardError(io::Error::new( + io::ErrorKind::UnexpectedEof, + "eof before stream footer", + ))) + }); + match item { + // Pre-SM data. Note that we mustn't count this while we + // are still in negotiating state: we transit to + // [`Self::Ready`] immediately after we got the + // `` or ``, and before we got that, + // counting inbound stanzas is definitely wrong (see e.g. + // aioxmpp commit 796aa32). + Ok(XmppStreamElement::Stanza(data)) => Poll::Ready(Continue(Some(data))), + + Ok(XmppStreamElement::SM(sm::Nonza::Enabled(enabled))) => { + if sm_state.is_some() { + // Okay, the peer violated the stream management + // protocol here (or we have a bug). + log::warn!( + "Received , but we also have previous SM state. One of us has a bug here (us or the peer) and I'm not sure which it is. If you can reproduce this, please re-run with trace loglevel and provide the logs. Attempting to proceed with a fresh session.", + ); + } + // We must emit Reset here because this is a + // fresh stream and we did not resume. + Poll::Ready(Break(NegotiationResult::StreamReset { + sm_state: Some(enabled.into()), + bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(), + })) + } + + Ok(XmppStreamElement::SM(sm::Nonza::Resumed(resumed))) => match sm_state.take() + { + Some(mut sm_state) => { + // Yay! + match sm_state.resume(resumed.h) { + Ok(to_retransmit) => transmit_queue.requeue_all(to_retransmit), + Err(e) => { + // We kill the stream with an error + log::error!("Resumption failed: {e}"); + return Poll::Ready(Break(NegotiationResult::StreamError { + error: e.into(), + })); + } + } + Poll::Ready(Break(NegotiationResult::StreamResumed { sm_state })) + } + None => { + // Okay, the peer violated the stream management + // protocol here (or we have a bug). + // Unlike the + // received-enabled-but-attempted-to-resume + // situation, we do not have enough information to + // proceed without having the stream break soon. + // (If we proceeded without a SM state, we would + // have the stream die as soon as the peer + // requests our counters). + // We thus terminate the stream with an error. + // We must emit Reset here because this is a fresh + // stream and we did not resume. + Poll::Ready(Break(NegotiationResult::Disconnect { + sm_state: None, + error: io::Error::new(io::ErrorKind::InvalidData, "Peer replied to request with response"), + })) + } + }, + + Ok(XmppStreamElement::SM(sm::Nonza::Failed(failed))) => match sm_state { + Some(sm_state) => { + log::debug!("Received in response to resumption request. Discarding SM data and attempting to renegotiate."); + if let Some(h) = failed.h { + // This is only an optimization anyway, so + // we can also just ignore this. + let _: Result<_, _> = sm_state.remote_acked(h); + } + *self = Self::SendBindRequest { sm_supported: true }; + Poll::Ready(Continue(None)) + } + None => { + log::warn!("Received in response to enable request. Proceeding without stream management."); + + // We must emit Reset here because this is a + // fresh stream and we did not resume. + Poll::Ready(Break(NegotiationResult::StreamReset { + bound_jid: bound_jid.take().expect("State machine error: no bound_jid available in SM negotiation.").into(), + sm_state: None, + })) + } + }, + + Ok(XmppStreamElement::StreamError(error)) => { + log::debug!("Received stream error, failing stream and discarding any stream management state."); + let error = io::Error::new(io::ErrorKind::Other, error); + transmit_queue.fail(&(&error).into()); + Poll::Ready(Break(NegotiationResult::Disconnect { + error, + sm_state: None, + })) + } + + Ok(other) => { + log::warn!("Received unsupported stream element during negotiation: {other:?}. Emitting stream error."); + Poll::Ready(Break(NegotiationResult::StreamError { + error: StreamError { + condition: DefinedCondition::UnsupportedStanzaType, + text: None, + application_specific: vec![], + }, + })) + } + + // Soft timeouts during negotiation are a bad sign + // (because we already prompted the server to send + // something and are waiting for it), but also nothing + // to write home about. + Err(ReadError::SoftTimeout) => Poll::Ready(Continue(None)), + + // Parse errors during negotiation cause an unconditional + // stream error. + Err(ReadError::ParseError(e)) => { + Poll::Ready(Break(NegotiationResult::StreamError { + error: parse_error_to_stream_error(e), + })) + } + + // I/O errors cause the stream to be considered + // broken; we drop it and send a Disconnect event with + // the error embedded. + Err(ReadError::HardError(error)) => { + Poll::Ready(Break(NegotiationResult::Disconnect { + sm_state: sm_state.take(), + error, + })) + } + + // Stream footer during negotation is really weird. + // We kill the stream immediately with an error + // (but allow preservation of the SM state). + Err(ReadError::StreamFooterReceived) => { + Poll::Ready(Break(NegotiationResult::Disconnect { + sm_state: sm_state.take(), + error: io::Error::new( + io::ErrorKind::InvalidData, + "stream footer received during negotation", + ), + })) + } + } + } + } + } +} diff --git a/tokio-xmpp/src/stanzastream/queue.rs b/tokio-xmpp/src/stanzastream/queue.rs new file mode 100644 index 0000000000000000000000000000000000000000..ffb0900154a063be26839ef8fc2a93379ed59b3a --- /dev/null +++ b/tokio-xmpp/src/stanzastream/queue.rs @@ -0,0 +1,356 @@ +// Copyright (c) 2019 Emmanuel Gil Peyrot +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use core::cmp::Ordering; +use core::fmt; +use core::task::{Context, Poll}; +use std::collections::VecDeque; +use std::io; + +use futures::ready; + +use tokio::sync::{mpsc, watch}; + +use crate::Stanza; + +#[derive(Debug, Clone)] +pub struct OpaqueIoError { + kind: io::ErrorKind, + message: String, +} + +impl OpaqueIoError { + pub fn kind(&self) -> io::ErrorKind { + self.kind + } + + pub fn into_io_error(self) -> io::Error { + io::Error::new(self.kind, self.message) + } + + pub fn to_io_error(&self) -> io::Error { + io::Error::new(self.kind, self.message.clone()) + } +} + +impl From for OpaqueIoError { + fn from(other: io::Error) -> Self { + >::from(&other) + } +} + +impl From<&io::Error> for OpaqueIoError { + fn from(other: &io::Error) -> Self { + Self { + kind: other.kind(), + message: other.to_string(), + } + } +} + +impl fmt::Display for OpaqueIoError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(&self.message) + } +} + +impl core::error::Error for OpaqueIoError {} + +/// The five stages of stanza transmission. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +pub enum StanzaStage { + /// The stanza is in the transmit queue, but has not been serialised or + /// sent to the stream yet. + Queued, + + /// The stanza was successfully serialised and put into the transmit + /// buffers. + Sent, + + /// The stanza has been acked by the peer using XEP-0198 or comparable + /// means. + /// + /// **Note:** This state is only ever reached on streams where XEP-0198 + /// was succesfully negotiated. + Acked, + + /// Stanza transmission or serialisation failed. + Failed, + + /// The stanza was dropped from the transmit queue before it could be + /// sent. + /// + /// This may happen if the stream breaks in a fatal, panick-y way. + Dropped, +} + +impl From<&StanzaState> for StanzaStage { + fn from(other: &StanzaState) -> Self { + match other { + StanzaState::Queued => Self::Queued, + StanzaState::Sent { .. } => Self::Sent, + StanzaState::Acked { .. } => Self::Acked, + StanzaState::Failed { .. } => Self::Failed, + StanzaState::Dropped => Self::Dropped, + } + } +} + +impl PartialEq for StanzaState { + fn eq(&self, other: &StanzaStage) -> bool { + StanzaStage::from(self).eq(other) + } +} + +impl PartialEq for StanzaStage { + fn eq(&self, other: &StanzaState) -> bool { + self.eq(&Self::from(other)) + } +} + +impl PartialOrd for StanzaState { + fn partial_cmp(&self, other: &StanzaStage) -> Option { + StanzaStage::from(self).partial_cmp(other) + } +} + +impl PartialOrd for StanzaStage { + fn partial_cmp(&self, other: &StanzaState) -> Option { + self.partial_cmp(&Self::from(other)) + } +} + +/// State of a stanza in transit to the peer. +#[derive(Debug, Clone)] +pub enum StanzaState { + /// The stanza has been enqueued in the local queue but not sent yet. + Queued, + + /// The stanza has been sent to the server, but there is no proof that it + /// has been received by the server yet. + Sent { + /* + /// The time from when the stanza was enqueued until the time it was + /// sent on the stream. + queue_delay: Duration, + */ + }, + + /// Confirmation that the stanza has been seen by the server has been + /// received. + Acked { + /* + /// The time from when the stanza was enqueued until the time it was + /// sent on the stream. + queue_delay: Duration, + + /// The time between sending the stanza on the stream and receiving + /// confirmation from the server. + ack_delay: Duration, + */ + }, + + /// Sending the stanza has failed in a non-recoverable manner. + Failed { + /// The error which caused the sending to fail. + error: OpaqueIoError, + }, + + /// The stanza was dropped out of the queue for unspecified reasons, + /// such as the stream breaking in a fatal, panick-y way. + Dropped, +} + +/// Track stanza transmission through the +/// [`StanzaStream`][`super::StanzaStream`] up to the peer. +#[derive(Clone)] +pub struct StanzaToken { + inner: watch::Receiver, +} + +impl StanzaToken { + /// Wait for the stanza transmission to reach the given state. + /// + /// If the stanza is removed from tracking before that state is reached, + /// `None` is returned. + pub async fn wait_for(&mut self, state: StanzaStage) -> Option { + self.inner + .wait_for(|st| *st >= state) + .await + .map(|x| x.clone()) + .ok() + } + + /// Read the current transmission state. + pub fn state(&self) -> StanzaState { + self.inner.borrow().clone() + } +} + +pub(super) struct QueueEntry { + pub stanza: Box, + pub token: watch::Sender, +} + +impl QueueEntry { + pub fn untracked(st: Box) -> Self { + Self::tracked(st).0 + } + + pub fn tracked(st: Box) -> (Self, StanzaToken) { + let (tx, rx) = watch::channel(StanzaState::Queued); + let token = StanzaToken { inner: rx }; + ( + QueueEntry { + stanza: st, + token: tx, + }, + token, + ) + } +} + +/// Reference to a transmit queue entry. +/// +/// On drop, the entry is returned to the queue. +pub(super) struct TransmitQueueRef<'x, T> { + q: &'x mut VecDeque, +} + +impl<'x, T> TransmitQueueRef<'x, T> { + /// Take the item out of the queue. + pub fn take(self) -> T { + // Unwrap: when this type is created, a check is made that the queue + // actually has a front item and because we borrow, that also cannot + // change. + self.q.pop_front().unwrap() + } +} + +/// A transmit queue coupled to an [`mpsc::Receiver`]. +/// +/// The transmit queue will by default only allow one element to reside in the +/// queue outside the inner `Receiver`: the main queueing happens inside the +/// receiver and is governed by its queue depth and associated backpressure. +/// +/// However, the queue does allow prepending elements to the front, which is +/// useful for retransmitting items. +pub(super) struct TransmitQueue { + inner: mpsc::Receiver, + peek: VecDeque, +} + +impl TransmitQueue { + /// Create a new transmission queue around an existing mpsc receiver. + pub fn wrap(ch: mpsc::Receiver) -> Self { + Self { + inner: ch, + peek: VecDeque::with_capacity(1), + } + } + + /// Create a new mpsc channel and wrap the receiving side in a + /// transmission queue + pub fn channel(depth: usize) -> (mpsc::Sender, Self) { + let (tx, rx) = mpsc::channel(depth); + (tx, Self::wrap(rx)) + } + + /// Poll the queue for the next item to transmit. + pub fn poll_next(&mut self, cx: &mut Context) -> Poll>> { + if self.peek.len() > 0 { + // Cannot use `if let Some(.) = .` here because of a borrowchecker + // restriction. If the reference is created before the branch is + // entered, it will think it needs to be borrowed until the end + // of the function (and that will conflict with the mutable + // borrow we do for `self.peek.push_back` below). + // See also https://github.com/rust-lang/rust/issues/54663. + return Poll::Ready(Some(TransmitQueueRef { q: &mut self.peek })); + } else { + // The target size for the queue is 1, effectively acting as an + // Option. In some cases, we need more than one, but that is + // always only a temporary burst (e.g. SM resumption + // retransmissions), so we release the memory as soon as possible + // after that. + // Even though the target size is 1, we don't want to be pedantic + // about this and we don't want to reallocate often. Some short + // bursts are ok, and given that the stanzas inside QueueEntry + // elements (the main use case for this type) are boxed anyway, + // the size of the elements is rather small. + if self.peek.capacity() > 32 { + // We do not use shrink_to here, because we are *certain* that + // we won't need a larger capacity any time soon, and + // allocators may avoid moving data around. + let mut new = VecDeque::new(); + core::mem::swap(&mut self.peek, &mut new); + } + } + match ready!(self.inner.poll_recv(cx)) { + None => Poll::Ready(None), + Some(v) => { + self.peek.push_back(v); + Poll::Ready(Some(TransmitQueueRef { q: &mut self.peek })) + } + } + } + + /// Requeue a sequence of items to the front of the queue. + /// + /// This function preserves ordering of the elements in `iter`, meaning + /// that the first item from `iter` is going to be the next item yielded + /// by `poll_take` or `poll_peek`. + pub fn requeue_all>(&mut self, iter: I) { + let iter = iter.into_iter(); + let to_reserve = iter.size_hint().1.unwrap_or(iter.size_hint().0); + self.peek.reserve(to_reserve); + let mut n = 0; + for item in iter { + self.peek.push_front(item); + n += 1; + } + // Now we need to revert the order: we pushed the elements to the + // front, so if we now read back from the front via poll_peek or + // poll_take, that will cause them to be read in reverse order. The + // following loop fixes that. + for i in 0..(n / 2) { + let j = n - (i + 1); + self.peek.swap(i, j); + } + } + + /// Enqueues an item to be sent after all items in the *local* queue, but + /// *before* all items which are still inside the inner `mpsc` channel. + pub fn enqueue(&mut self, item: T) { + self.peek.push_back(item); + } + + /// Return true if the sender side of the queue is closed. + /// + /// Note that there may still be items which can be retrieved from the + /// queue even though it has been closed. + pub fn is_closed(&self) -> bool { + self.inner.is_closed() + } +} + +impl TransmitQueue { + /// Fail all currently queued items with the given error. + /// + /// Future items will not be affected. + pub fn fail(&mut self, error: &OpaqueIoError) { + for item in self.peek.drain(..) { + item.token.send_replace(StanzaState::Failed { + error: error.clone(), + }); + } + while let Ok(item) = self.inner.try_recv() { + item.token.send_replace(StanzaState::Failed { + error: error.clone(), + }); + } + self.peek.shrink_to(1); + } +} diff --git a/tokio-xmpp/src/stanzastream/stream_management.rs b/tokio-xmpp/src/stanzastream/stream_management.rs new file mode 100644 index 0000000000000000000000000000000000000000..26b0e1c8c5aea49f3fe0291f9180f4787b79bfd5 --- /dev/null +++ b/tokio-xmpp/src/stanzastream/stream_management.rs @@ -0,0 +1,256 @@ +// Copyright (c) 2019 Emmanuel Gil Peyrot +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use core::fmt; +use std::collections::{vec_deque, VecDeque}; + +use xmpp_parsers::sm; + +use super::queue::{QueueEntry, StanzaState}; + +#[derive(Debug)] +pub(super) enum SmResumeInfo { + NotResumable, + Resumable { + /// XEP-0198 stream ID + id: String, + + /// Preferred IP and port for resumption as indicated by the peer. + // TODO: pass this to the reconnection logic. + #[allow(dead_code)] + location: Option, + }, +} + +/// State for stream management +pub(super) struct SmState { + /// Last value seen from the remote stanza counter. + outbound_base: u32, + + /// Counter for received stanzas + inbound_ctr: u32, + + /// Number of `` we still need to send. + /// + /// Acks cannot always be sent right away (if our tx buffer is full), and + /// instead of cluttering our outbound queue or something with them, we + /// just keep a counter of unsanswered ``. The stream will process + /// these in due time. + pub(super) pending_acks: usize, + + /// Flag indicating that a `` request should be sent. + pub(super) pending_req: bool, + + /// Information about resumability of the stream + resumption: SmResumeInfo, + + /// Unacked stanzas in the order they were sent + // We use a VecDeque here because that has better performance + // characteristics with the ringbuffer-type usage we're seeing here: + // we push stuff to the back, and then drain it from the front. Vec would + // have to move all the data around all the time, while VecDeque will just + // move some pointers around. + unacked_stanzas: VecDeque, +} + +impl fmt::Debug for SmState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("SmState") + .field("outbound_base", &self.outbound_base) + .field("inbound_ctr", &self.inbound_ctr) + .field("resumption", &self.resumption) + .field("len(unacked_stanzas)", &self.unacked_stanzas.len()) + .finish() + } +} + +#[derive(Debug)] +pub(super) enum SmError { + RemoteAckedMoreStanzas { + local_base: u32, + queue_len: u32, + remote_ctr: u32, + }, + RemoteAckWentBackwards { + local_base: u32, + // NOTE: this is not needed to fully specify the error, but it's + // needed to generate a `` from Self. + queue_len: u32, + remote_ctr: u32, + }, +} + +impl From for xmpp_parsers::stream_error::StreamError { + fn from(other: SmError) -> Self { + let (h, send_count) = match other { + SmError::RemoteAckedMoreStanzas { + local_base, + queue_len, + remote_ctr, + } => (remote_ctr, local_base.wrapping_add(queue_len)), + SmError::RemoteAckWentBackwards { + local_base, + queue_len, + remote_ctr, + } => (remote_ctr, local_base.wrapping_add(queue_len)), + }; + xmpp_parsers::sm::HandledCountTooHigh { h, send_count }.into() + } +} + +impl fmt::Display for SmError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::RemoteAckedMoreStanzas { + local_base, + queue_len, + remote_ctr, + } => { + let local_tip = local_base.wrapping_add(*queue_len); + write!(f, "remote acked more stanzas than we sent: remote counter = {}. queue covers range {}..<{}", remote_ctr, local_base, local_tip) + } + Self::RemoteAckWentBackwards { + local_base, + remote_ctr, + .. + } => { + write!(f, "remote acked less stanzas than before: remote counter = {}, local queue starts at {}", remote_ctr, local_base) + } + } + } +} + +impl SmState { + /// Mark a stanza as sent and keep it in the stream management queue. + pub fn enqueue(&mut self, entry: QueueEntry) { + // This may seem like an arbitrary limit, but there's some thought + // in this. + // First, the SM counters go up to u32 at most and then wrap around. + // That means that any queue size larger than u32 would immediately + // cause ambiguities when resuming. + // Second, there's RFC 1982 "Serial Number Arithmetic". It is used for + // example in DNS for the serial number and it has thoughts on how to + // use counters which wrap around at some point. The document proposes + // that if the (wrapped) difference between two numbers is larger than + // half the number space, you should consider it as a negative + // difference. + // + // Hence the ambiguity already starts at u32::MAX / 2, so we limit the + // queue to one less than that. + const MAX_QUEUE_SIZE: usize = (u32::MAX / 2 - 1) as usize; + if self.unacked_stanzas.len() >= MAX_QUEUE_SIZE { + // We don't bother with an error return here. u32::MAX / 2 stanzas + // in the queue is fatal in any circumstance I can fathom (also, + // we have no way to return this error to the + // [`StanzaStream::send`] call anyway). + panic!("Too many pending stanzas."); + } + + self.unacked_stanzas.push_back(entry); + log::trace!( + "Stored stanza in SmState. We are now at {} unacked stanzas.", + self.unacked_stanzas.len() + ); + } + + /// Process resumption. + /// + /// Updates the internal state according to the received remote counter. + /// Returns an iterator which yields the queue entries which need to be + /// retransmitted. + pub fn resume(&mut self, h: u32) -> Result, SmError> { + self.remote_acked(h)?; + // Return the entire leftover queue. We cannot receive acks for them, + // unless they are retransmitted, because the peer has not seen them + // yet (they got lost in the previous unclean disconnect). + Ok(self.unacked_stanzas.drain(..)) + } + + /// Process remote `` + pub fn remote_acked(&mut self, h: u32) -> Result<(), SmError> { + log::debug!("remote_acked: {self:?}::remote_acked({h})"); + // XEP-0198 specifies that counters are mod 2^32, which is handy when + // you use u32 data types :-). + let to_drop = h.wrapping_sub(self.outbound_base) as usize; + if to_drop > 0 { + log::trace!("remote_acked: need to drop {to_drop} stanzas"); + if to_drop as usize > self.unacked_stanzas.len() { + if to_drop as u32 > u32::MAX / 2 { + // If we look at the stanza counter values as RFC 1982 + // values, a wrapping difference greater than half the + // number space indicates a negative difference, i.e. + // h went backwards. + return Err(SmError::RemoteAckWentBackwards { + local_base: self.outbound_base, + queue_len: self.unacked_stanzas.len() as u32, + remote_ctr: h, + }); + } else { + return Err(SmError::RemoteAckedMoreStanzas { + local_base: self.outbound_base, + queue_len: self.unacked_stanzas.len() as u32, + remote_ctr: h, + }); + } + } + for entry in self.unacked_stanzas.drain(..to_drop) { + entry.token.send_replace(StanzaState::Acked {}); + } + self.outbound_base = h; + log::debug!("remote_acked: remote acked {to_drop} stanzas"); + Ok(()) + } else { + log::trace!("remote_acked: no stanzas to drop"); + Ok(()) + } + } + + /// Get the current inbound counter. + #[inline(always)] + pub fn inbound_ctr(&self) -> u32 { + self.inbound_ctr + } + + /// Get the info necessary for resumption. + /// + /// Returns the stream ID and the current inbound counter if resumption is + /// available and None otherwise. + pub fn resume_info(&self) -> Option<(&str, u32)> { + match self.resumption { + SmResumeInfo::Resumable { ref id, .. } => Some((id, self.inbound_ctr)), + SmResumeInfo::NotResumable => None, + } + } +} + +/// Initialize stream management state +impl From for SmState { + fn from(other: sm::Enabled) -> Self { + let resumption = if other.resume { + match other.id { + Some(id) => SmResumeInfo::Resumable { + location: other.location, + id: id.0, + }, + None => { + log::warn!("peer replied with , but without an ID! cannot make this stream resumable."); + SmResumeInfo::NotResumable + } + } + } else { + SmResumeInfo::NotResumable + }; + + Self { + outbound_base: 0, + inbound_ctr: 0, + pending_acks: 0, + pending_req: false, + resumption, + unacked_stanzas: VecDeque::new(), + } + } +} diff --git a/tokio-xmpp/src/stanzastream/worker.rs b/tokio-xmpp/src/stanzastream/worker.rs new file mode 100644 index 0000000000000000000000000000000000000000..6e821a1fcda63fd58e694dd844344a1418bed2c1 --- /dev/null +++ b/tokio-xmpp/src/stanzastream/worker.rs @@ -0,0 +1,612 @@ +// Copyright (c) 2019 Emmanuel Gil Peyrot +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::time::Duration; +use std::io; + +use rand::{thread_rng, Rng}; + +use futures::{ready, SinkExt, StreamExt}; + +use tokio::{ + sync::{mpsc, oneshot}, + time::Instant, +}; + +use xmpp_parsers::{ + iq, + jid::Jid, + ping, + stream_error::{DefinedCondition, StreamError}, + stream_features::StreamFeatures, +}; + +use crate::connect::AsyncReadAndWrite; +use crate::xmlstream::{ReadError, XmppStreamElement}; +use crate::Stanza; + +use super::connected::{ConnectedEvent, ConnectedState}; +use super::negotiation::NegotiationState; +use super::queue::{QueueEntry, TransmitQueue}; +use super::stream_management::SmState; +use super::{Event, StreamEvent}; + +/// Convenience alias for [`XmlStreams`][`crate::xmlstream::XmlStream`] which +/// may be used with [`StanzaStream`][`super::StanzaStream`]. +pub type XmppStream = + crate::xmlstream::XmlStream, XmppStreamElement>; + +/// Underlying connection for a [`StanzaStream`][`super::StanzaStream`]. +pub struct Connection { + /// The stream to use to send and receive XMPP data. + pub stream: XmppStream, + + /// The stream features offered by the peer. + pub features: StreamFeatures, + + /// The identity to which this stream belongs. + /// + /// Note that connectors must not return bound streams. However, the Jid + /// may still be a full jid in order to request a specific resource at + /// bind time. If `identity` is a bare JID, the peer will assign the + /// resource. + pub identity: Jid, +} + +// Allow for up to 10s for local shutdown. +// TODO: make this configurable maybe? +pub(super) static LOCAL_SHUTDOWN_TIMEOUT: Duration = Duration::new(10, 0); +pub(super) static REMOTE_SHUTDOWN_TIMEOUT: Duration = Duration::new(5, 0); +pub(super) static PING_PROBE_ID_PREFIX: &str = "xmpp-rs-stanzastream-liveness-probe"; + +pub(super) enum Never {} + +pub(super) enum WorkerEvent { + /// The stream was reset and can now be used for rx/tx. + Reset { + bound_jid: Jid, + features: StreamFeatures, + }, + + /// The stream has been resumed successfully. + Resumed, + + /// Data received successfully. + Stanza(Stanza), + + /// Failed to parse pieces from the stream. + ParseError(xso::error::Error), + + /// Soft timeout noted by the underlying XmppStream. + SoftTimeout, + + /// Stream disonnected. + Disconnected { + /// Slot for a new connection. + slot: oneshot::Sender, + + /// Set to None if the stream was cleanly closed by the remote side. + error: Option, + }, + + /// The reconnection backend dropped the connection channel. + ReconnectAborted, +} + +enum WorkerStream { + /// Pending connection. + Connecting { + /// Optional contents of an [`WorkerEvent::Disconnect`] to emit. + notify: Option<(oneshot::Sender, Option)>, + + /// Receiver slot for the next connection. + slot: oneshot::Receiver, + + /// Straem management state from a previous connection. + sm_state: Option, + }, + + /// Connection available. + Connected { + stream: XmppStream, + substate: ConnectedState, + features: StreamFeatures, + identity: Jid, + }, + + /// Disconnected permanently by local choice. + Terminated, +} + +impl WorkerStream { + fn disconnect(&mut self, sm_state: Option, error: Option) -> WorkerEvent { + let (tx, rx) = oneshot::channel(); + *self = Self::Connecting { + notify: None, + slot: rx, + sm_state, + }; + WorkerEvent::Disconnected { slot: tx, error } + } + + fn poll_duplex( + self: Pin<&mut Self>, + transmit_queue: &mut TransmitQueue, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + loop { + match this { + // Disconnected cleanly (terminal state), signal end of + // stream. + Self::Terminated => return Poll::Ready(None), + + // In the progress of reconnecting, wait for reconnection to + // complete and then switch states. + Self::Connecting { + notify, + slot, + sm_state, + } => { + if let Some((slot, error)) = notify.take() { + return Poll::Ready(Some(WorkerEvent::Disconnected { slot, error })); + } + + match ready!(Pin::new(slot).poll(cx)) { + Ok(Connection { + stream, + features, + identity, + }) => { + let substate = ConnectedState::Negotiating { + // We panic here, but that is ok-ish, because + // that will "only" crash the worker and thus + // the stream, and that is kind of exactly + // what we want. + substate: NegotiationState::new(&features, sm_state.take()) + .expect("Non-negotiable stream"), + }; + *this = Self::Connected { + substate, + stream, + features, + identity, + }; + } + Err(_) => { + // The sender was dropped. This is fatal. + *this = Self::Terminated; + return Poll::Ready(Some(WorkerEvent::ReconnectAborted)); + } + } + } + + Self::Connected { + stream, + identity, + substate, + features, + } => { + match ready!(substate.poll( + Pin::new(stream), + identity, + &features, + transmit_queue, + cx + )) { + // continue looping if the substate did not produce a result. + None => (), + + // produced an event to emit. + Some(ConnectedEvent::Worker(v)) => { + match v { + // Capture the JID from a stream reset to + // update our state. + WorkerEvent::Reset { ref bound_jid, .. } => { + *identity = bound_jid.clone(); + } + _ => (), + } + return Poll::Ready(Some(v)); + } + + // stream broke or closed somehow. + Some(ConnectedEvent::Disconnect { sm_state, error }) => { + return Poll::Ready(Some(this.disconnect(sm_state, error))); + } + + Some(ConnectedEvent::RemoteShutdown { sm_state }) => { + let error = io::Error::new( + io::ErrorKind::ConnectionAborted, + "peer closed the XML stream", + ); + let (tx, rx) = oneshot::channel(); + let mut new_state = Self::Connecting { + notify: None, + slot: rx, + sm_state, + }; + core::mem::swap(this, &mut new_state); + match new_state { + Self::Connected { stream, .. } => { + tokio::spawn(shutdown_stream_by_remote_choice( + stream, + REMOTE_SHUTDOWN_TIMEOUT, + )); + } + _ => unreachable!(), + } + + return Poll::Ready(Some(WorkerEvent::Disconnected { + slot: tx, + error: Some(error), + })); + } + + Some(ConnectedEvent::LocalShutdownRequested) => { + // We don't switch to "terminated" here, but we + // return "end of stream" nontheless. + return Poll::Ready(None); + } + } + } + } + } + } + + /// Poll the stream write-only. + /// + /// This never completes, not even if the `transmit_queue` is empty and + /// its sender has been dropped, unless a write error occurs. + /// + /// The use case behind this is to run his in parallel to a blocking + /// operation which should only block the receive side, but not the + /// transmit side of the stream. + /// + /// Calling this and `poll_duplex` from different tasks in parallel will + /// cause havoc. + /// + /// Any errors are reported on the next call to `poll_duplex`. + fn poll_writes( + &mut self, + transmit_queue: &mut TransmitQueue, + cx: &mut Context, + ) -> Poll { + match self { + Self::Terminated | Self::Connecting { .. } => Poll::Pending, + Self::Connected { + substate, stream, .. + } => { + ready!(substate.poll_writes(Pin::new(stream), transmit_queue, cx)); + Poll::Pending + } + } + } + + fn start_send_stream_error(&mut self, error: StreamError) { + match self { + // If we are not connected or still connecting, we feign success + // and enter the Terminated state. + Self::Terminated | Self::Connecting { .. } => { + *self = Self::Terminated; + } + + Self::Connected { substate, .. } => substate.start_send_stream_error(error), + } + } + + fn poll_close(&mut self, cx: &mut Context) -> Poll> { + match self { + Self::Terminated => Poll::Ready(Ok(())), + Self::Connecting { .. } => { + *self = Self::Terminated; + Poll::Ready(Ok(())) + } + Self::Connected { + substate, stream, .. + } => { + let result = ready!(substate.poll_close(Pin::new(stream), cx)); + *self = Self::Terminated; + Poll::Ready(result) + } + } + } + + fn drive_duplex<'a>( + &'a mut self, + transmit_queue: &'a mut TransmitQueue, + ) -> DriveDuplex<'a> { + DriveDuplex { + stream: Pin::new(self), + queue: transmit_queue, + } + } + + fn drive_writes<'a>( + &'a mut self, + transmit_queue: &'a mut TransmitQueue, + ) -> DriveWrites<'a> { + DriveWrites { + stream: Pin::new(self), + queue: transmit_queue, + } + } + + fn close(&mut self) -> Close { + Close { + stream: Pin::new(self), + } + } + + /// Enqueue a ``, if stream management is enabled. + /// + /// Multiple calls to `send_sm_request` may cause only a single `` + /// to be sent. + /// + /// Returns true if stream management is enabled and a request could be + /// queued or deduplicated with a previous request. + fn queue_sm_request(&mut self) -> bool { + match self { + Self::Terminated | Self::Connecting { .. } => false, + Self::Connected { substate, .. } => substate.queue_sm_request(), + } + } +} + +struct DriveDuplex<'x> { + stream: Pin<&'x mut WorkerStream>, + queue: &'x mut TransmitQueue, +} + +impl<'x> Future for DriveDuplex<'x> { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + this.stream.as_mut().poll_duplex(this.queue, cx) + } +} + +struct DriveWrites<'x> { + stream: Pin<&'x mut WorkerStream>, + queue: &'x mut TransmitQueue, +} + +impl<'x> Future for DriveWrites<'x> { + type Output = Never; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + this.stream.as_mut().poll_writes(this.queue, cx) + } +} + +struct Close<'x> { + stream: Pin<&'x mut WorkerStream>, +} + +impl<'x> Future for Close<'x> { + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + this.stream.as_mut().poll_close(cx) + } +} + +pub(super) fn parse_error_to_stream_error(e: xso::error::Error) -> StreamError { + use xso::error::Error; + let condition = match e { + Error::XmlError(_) => DefinedCondition::NotWellFormed, + Error::TextParseError(_) | Error::Other(_) => DefinedCondition::InvalidXml, + Error::TypeMismatch => DefinedCondition::UnsupportedStanzaType, + }; + StreamError { + condition, + text: Some((None, e.to_string())), + application_specific: vec![], + } +} + +/// Worker system for a [`StanzaStream`]. +pub(super) struct StanzaStreamWorker { + reconnector: Box, oneshot::Sender) + Send + 'static>, + frontend_tx: mpsc::Sender, + stream: WorkerStream, + transmit_queue: TransmitQueue, +} + +macro_rules! send_or_break { + ($value:expr => $permit:ident in $ch:expr, $txq:expr => $stream:expr$(,)?) => { + if let Some(permit) = $permit.take() { + log::trace!("stanza received, passing to frontend via permit"); + permit.send($value); + } else { + log::trace!("no permit for received stanza available, blocking on channel send while handling writes"); + tokio::select! { + // drive_writes never completes: I/O errors are reported on + // the next call to drive_duplex(), which makes it ideal for + // use in parallel to $ch.send(). + result = $stream.drive_writes(&mut $txq) => { match result {} }, + result = $ch.send($value) => match result { + Err(_) => break, + Ok(()) => (), + }, + } + } + }; +} + +impl StanzaStreamWorker { + pub fn spawn( + mut reconnector: Box< + dyn FnMut(Option, oneshot::Sender) + Send + 'static, + >, + queue_depth: usize, + ) -> (mpsc::Sender, mpsc::Receiver) { + let (conn_tx, conn_rx) = oneshot::channel(); + reconnector(None, conn_tx); + // c2f = core to frontend + let (c2f_tx, c2f_rx) = mpsc::channel(queue_depth); + // f2c = frontend to core + let (f2c_tx, transmit_queue) = TransmitQueue::channel(queue_depth); + let mut worker = StanzaStreamWorker { + reconnector, + frontend_tx: c2f_tx, + stream: WorkerStream::Connecting { + slot: conn_rx, + sm_state: None, + notify: None, + }, + transmit_queue, + }; + tokio::spawn(async move { worker.run().await }); + (f2c_tx, c2f_rx) + } + + pub async fn run(&mut self) { + // TODO: consider moving this into SmState somehow, i.e. run a kind + // of fake stream management exploiting the sequentiality requirement + // from RFC 6120. + // NOTE: we use a random starting value here to avoid clashes with + // other application code. + let mut ping_probe_ctr: u64 = thread_rng().gen(); + + // We use mpsc::Sender permits (check the docs on + // [`tokio::sync::mpsc::Sender::reserve`]) as a way to avoid blocking + // on the `frontend_tx` whenever possible. + // + // We always try to have a permit available. If we have a permit + // available, any event we receive from the stream can be sent to + // the frontend tx without blocking. If we do not have a permit + // available, the code generated by the send_or_break macro will + // use the normal Sender::send coroutine function, but will also + // service stream writes in parallel (putting backpressure on the + // sender while not blocking writes on our end). + let mut permit = None; + loop { + tokio::select! { + new_permit = self.frontend_tx.reserve(), if permit.is_none() && !self.frontend_tx.is_closed() => match new_permit { + Ok(new_permit) => permit = Some(new_permit), + // Receiver side dropped… That is stream closure, so we + // shut everything down and exit. + Err(_) => break, + }, + ev = self.stream.drive_duplex(&mut self.transmit_queue) => { + let Some(ev) = ev else { + // Stream terminated by local choice. Exit. + break; + }; + match ev { + WorkerEvent::Reset { bound_jid, features } => send_or_break!( + Event::Stream(StreamEvent::Reset { bound_jid, features }) => permit in self.frontend_tx, + self.transmit_queue => self.stream, + ), + WorkerEvent::Disconnected { slot, error } => { + send_or_break!( + Event::Stream(StreamEvent::Suspended) => permit in self.frontend_tx, + self.transmit_queue => self.stream, + ); + if let Some(error) = error { + log::debug!("Backend stream got disconnected because of an I/O error: {error}. Attempting reconnect."); + } else { + log::debug!("Backend stream got disconnected for an unknown reason. Attempting reconnect."); + } + if self.frontend_tx.is_closed() || self.transmit_queue.is_closed() { + log::debug!("Immediately aborting reconnect because the frontend is gone."); + break; + } + (self.reconnector)(None, slot); + } + WorkerEvent::Resumed => send_or_break!( + Event::Stream(StreamEvent::Resumed) => permit in self.frontend_tx, + self.transmit_queue => self.stream, + ), + WorkerEvent::Stanza(stanza) => send_or_break!( + Event::Stanza(stanza) => permit in self.frontend_tx, + self.transmit_queue => self.stream, + ), + WorkerEvent::ParseError(e) => { + log::error!("Parse error on stream: {e}"); + self.stream.start_send_stream_error(parse_error_to_stream_error(e)); + // We are not break-ing here, because drive_duplex + // is sending the error. + } + WorkerEvent::SoftTimeout => { + if self.stream.queue_sm_request() { + log::debug!("SoftTimeout tripped: enqueued "); + } else { + log::debug!("SoftTimeout tripped. Stream Management is not enabled, enqueueing ping IQ"); + ping_probe_ctr = ping_probe_ctr.wrapping_add(1); + // We can leave to/from blank because those + // are not needed to send a ping to the peer. + // (At least that holds true on c2s streams. + // On s2s, things are more complicated anyway + // due to how bidi works.) + self.transmit_queue.enqueue(QueueEntry::untracked(Box::new(iq::Iq::from_get( + format!("{}-{}", PING_PROBE_ID_PREFIX, ping_probe_ctr), + ping::Ping, + ).into()))); + } + } + WorkerEvent::ReconnectAborted => { + panic!("Backend was unable to handle reconnect request."); + } + } + }, + } + } + match self.stream.close().await { + Ok(()) => log::debug!("Stream closed successfully"), + Err(e) => log::debug!("Stream closure failed: {e}"), + } + } +} + +async fn shutdown_stream_by_remote_choice(mut stream: XmppStream, timeout: Duration) { + let deadline = Instant::now() + timeout; + match tokio::time::timeout_at( + deadline, + >::close(&mut stream), + ) + .await + { + // We don't really care about success or failure here. + Ok(_) => (), + // .. but if we run in a timeout, we exit here right away. + Err(_) => { + log::debug!("Giving up on clean stream shutdown after timeout elapsed."); + return; + } + } + let timeout = tokio::time::sleep_until(deadline); + tokio::pin!(timeout); + loop { + tokio::select! { + _ = &mut timeout => { + log::debug!("Giving up on clean stream shutdown after timeout elapsed."); + break; + } + ev = stream.next() => match ev { + None => break, + Some(Ok(data)) => { + log::debug!("Ignoring data on stream during shutdown: {data:?}"); + break; + } + Some(Err(ReadError::HardError(e))) => { + log::debug!("Ignoring stream I/O error during shutdown: {e}"); + break; + } + Some(Err(ReadError::SoftTimeout)) => (), + Some(Err(ReadError::ParseError(_))) => (), + Some(Err(ReadError::StreamFooterReceived)) => (), + } + } + } +} diff --git a/tokio-xmpp/src/xmlstream/mod.rs b/tokio-xmpp/src/xmlstream/mod.rs index 002fbdc0cd4d39531555bda94b26457134fc1043..27c51ea32b2474509f940ee5f43fe08abab8a3bd 100644 --- a/tokio-xmpp/src/xmlstream/mod.rs +++ b/tokio-xmpp/src/xmlstream/mod.rs @@ -7,7 +7,8 @@ //! # RFC 6120 XML Streams //! //! **Note:** The XML stream is a low-level API which you should probably not -//! use directly. +//! use directly. You may be looking for +//! [`StanzaStream`][`crate::stanzastream::StanzaStream`] instead. //! //! Establishing an XML stream is always a multi-stage process due to how //! stream negotiation works. Based on the values sent by the initiator in the @@ -239,6 +240,10 @@ pin_project_lite::pin_project! { /// [RFC 6120](https://tools.ietf.org/html/rfc6120) XML stream, where the /// payload consists of items of type `T` implementing [`FromXml`] and /// [`AsXml`]. + /// + /// **Note:** The XML stream is a low-level API which you should probably + /// not use directly. You may be looking for + /// [`StanzaStream`][`crate::stanzastream::StnazaStream`] instead. pub struct XmlStream { #[pin] inner: RawXmlStream, diff --git a/tokio-xmpp/src/xmlstream/xmpp.rs b/tokio-xmpp/src/xmlstream/xmpp.rs index 0be11852f2e3dbccf87782dce3cc13893d2886ac..600b181d5f2fbc8fe059ee86fe64b853f5a91c6d 100644 --- a/tokio-xmpp/src/xmlstream/xmpp.rs +++ b/tokio-xmpp/src/xmlstream/xmpp.rs @@ -6,7 +6,7 @@ use xso::{AsXml, FromXml}; -use xmpp_parsers::{component, sasl, starttls, stream_error::ReceivedStreamError}; +use xmpp_parsers::{component, sasl, sm, starttls, stream_error::ReceivedStreamError}; use crate::Stanza; @@ -33,4 +33,8 @@ pub enum XmppStreamElement { /// Stream error received #[xml(transparent)] StreamError(ReceivedStreamError), + + /// XEP-0198 nonzas + #[xml(transparent)] + SM(sm::Nonza), }