// Copyright (c) 2019 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

use core::future::Future;
use core::ops::ControlFlow::{Break, Continue};
use core::pin::Pin;
use core::task::{Context, Poll};
use std::io;

use futures::{ready, Sink, Stream};

use xmpp_parsers::{
    jid::Jid,
    sm,
    stream_error::{DefinedCondition, SentStreamError, StreamError},
    stream_features::StreamFeatures,
};

use crate::xmlstream::{ReadError, XmppStreamElement};
use crate::Stanza;

use super::negotiation::{NegotiationResult, NegotiationState};
use super::queue::{QueueEntry, StanzaState, TransmitQueue};
use super::stream_management::*;
use super::worker::{WorkerEvent, XmppStream, LOCAL_SHUTDOWN_TIMEOUT};

#[derive(PartialEq)]
pub(super) enum RxShutdownState {
    AwaitingFooter,
    AwaitingEof,
    Done,
}

fn local_error_for_stream_error(
    io_error: &mut Option<io::Error>,
    stream_error: &mut Option<StreamError>,
) -> io::Error {
    io_error
        .take()
        .or_else(|| {
            stream_error
                .take()
                .map(|x| io::Error::new(io::ErrorKind::InvalidData, SentStreamError(x)))
        })
        .unwrap_or_else(|| {
            io::Error::new(
                io::ErrorKind::InvalidData,
                "unknown local stream error generated",
            )
        })
}

/// Substate of the [`BackendStream::Connected`] state.
///
/// Having the substate and its logic in a separate type allows us to
/// circumvent problemns with moving data out of `&mut _` when transitioning
/// between substates.
pub(super) enum ConnectedState {
    /// The stream is still being negotiated.
    Negotiating {
        /// Current state within the negotiations
        substate: NegotiationState,
    },

    /// The stream is ready for transceiving.
    Ready {
        /// Stream management state, if any.
        sm_state: Option<SmState>,
    },

    SendStreamError {
        /// Stream error to send.
        ///
        /// `None` implies that we now only need to flush.
        stream_error: Option<StreamError>,

        /// I/O error to return to the caller once the flush is done.
        ///
        /// If `None`, an error will be synthesised.
        io_error: Option<io::Error>,

        /// Deadline until which the error must've been sent and the stream
        /// must've been shut down.
        deadline: Pin<Box<tokio::time::Sleep>>,
    },

    Failed {
        error: Option<io::Error>,
        sm_state: Option<SmState>,
    },

    /// A stream shutdown was initiated locally and we are flushing RX and TX
    /// queues.
    LocalShutdown {
        /// Keep track on whether we have closed the TX side yet.
        tx_closed: bool,

        /// Keep track on how shut down the receiving side is.
        rx_state: RxShutdownState,

        /// Deadline until which graceful shutdown must complete; if the
        /// deadline is exceeded (i.e. the contained Sleep future returns
        /// ready), the streams will be dropped (and thus closed by the OS).
        deadline: Pin<Box<tokio::time::Sleep>>,
    },

    /// The remote side closed the stream.
    RemoteShutdown {
        /// Keep the SM state for later resumption.
        sm_state: Option<SmState>,
    },

    /// Local shutdown has completed; this is a final state, as local shutdown
    /// signals an intent of stopping the stream forever.
    LocalShutdownComplete,
}

/// Enumeration of events happening while the stream has finished the
/// connection procedures and is established.
pub(super) enum ConnectedEvent {
    /// Event generated by the stream worker.
    Worker(WorkerEvent),

    /// The remote closed the stream orderly.
    RemoteShutdown { sm_state: Option<SmState> },

    /// We got disconnected through an error, either an I/O error
    /// or some kind of stream error.
    Disconnect {
        /// Stream management state for later resumption attempts.
        sm_state: Option<SmState>,

        /// The error which caused the disconnect. This is generally not none,
        /// but we cannot prove this at compile time because we have to take
        /// the error from a mutable (i.e. non-owned) place.
        error: Option<io::Error>,
    },

    /// A shutdown was requested by the local side of the stream.
    LocalShutdownRequested,
}

impl ConnectedState {
    fn to_stream_error_state(&mut self, stream_error: StreamError) {
        *self = Self::SendStreamError {
            stream_error: Some(stream_error),
            io_error: None,
            deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
        };
    }

    fn to_failed_state(&mut self, error: io::Error, sm_state: Option<SmState>) {
        *self = Self::Failed {
            error: Some(error),
            sm_state,
        };
    }

    fn poll_write_sm_req(
        mut sm_state: Option<&mut SmState>,
        mut stream: Pin<&mut XmppStream>,
        cx: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        if let Some(sm_state) = sm_state.as_mut() {
            // Request is pending.
            if sm_state.pending_req {
                match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
                    stream.as_mut(),
                    cx,
                )) {
                    Ok(()) => (),
                    Err(e) => return Poll::Ready(Err(e)),
                }
                match stream
                    .as_mut()
                    .start_send(&XmppStreamElement::SM(sm::Nonza::Req(sm::R)))
                {
                    Ok(()) => (),
                    Err(e) => {
                        // As the stream promised we would be able to
                        // send, this must be a problem with our
                        // (locally generated) nonza, i.e. this is
                        // fatal.
                        panic!("Failed to send SM Req nonza: {}", e);
                    }
                }
                sm_state.pending_req = false;
            }
        }
        Poll::Ready(Ok(()))
    }

    fn poll_writes_inner(
        mut sm_state: Option<&mut SmState>,
        mut stream: Pin<&mut XmppStream>,
        transmit_queue: &mut TransmitQueue<QueueEntry>,
        cx: &mut Context<'_>,
    ) -> Poll<io::Result<()>> {
        let mut depleted = false;

        // We prefer sending SM reqs before actual data.
        // SM requests are used in response to soft timeouts as a way to
        // trigger the remote side to send *something* to us. We may be
        // sending a lot of data in bulk right now without ever expecting a
        // response (or at least not anytime soon). In order to ensure that
        // the server will in fact send a message to us soon, we have to send
        // the SM request before anything else.
        //
        // Example scenario: Sending a bunch of MAM `<message/>`s over a slow,
        // but low-latency link. The MAM response only triggers a response
        // from the peer when everything has been transmitted, which may be
        // longer than the stream timeout.
        ready!(Self::poll_write_sm_req(
            sm_state.as_mut().map(|x| x as &mut SmState),
            stream.as_mut(),
            cx
        ))?;

        let mut transmitted = false;
        // We prefer sending actual data before stream-management ACKs.
        // While the other side may be waiting for our ACK, we are not obliged
        // to send it straight away (XEP-0198 explicitly allows us to delay it
        // for some implementation-defined time if we have stuff to send. Our
        // implementation-defined time is "infinity"), so we try to make
        // progress on real data.
        loop {
            // If either the queue has nothing for us or the stream isn't
            // ready to send, we break out of the loop. We don't use ready!
            // here because we may have SM ACKs to send.
            let next = match transmit_queue.poll_next(cx) {
                Poll::Ready(Some(v)) => v,
                Poll::Ready(None) => {
                    // The transmit_queue is empty, so we set `depleted` to
                    // true in order to ensure that we return Ready if all SM
                    // acks also have been transmitted.
                    depleted = true;
                    break;
                }
                Poll::Pending => break,
            };
            // If the stream isn't ready to send, none of the other things can
            // be sent either, so we can use ready!.
            match ready!(<XmppStream as Sink<&Stanza>>::poll_ready(
                stream.as_mut(),
                cx
            )) {
                Ok(()) => (),
                Err(e) => return Poll::Ready(Err(e)),
            }
            // We now either send the item or "die trying". It must
            // be removed from the queue, because even if it fails to
            // serialise, we don't want to reattempt sending it (
            // unless by SM resumption retransmission).
            let next = next.take();
            match stream.as_mut().start_send(&next.stanza) {
                Ok(()) => {
                    next.token.send_replace(StanzaState::Sent {});
                    if let Some(sm_state) = sm_state.as_mut() {
                        sm_state.enqueue(next);
                    }
                    transmitted = true;
                }
                // Serialisation error, report back to the queue item.
                Err(e) => {
                    next.token
                        .send_replace(StanzaState::Failed { error: e.into() });
                }
            }
        }

        if let Some(sm_state) = sm_state.as_mut() {
            // We can set it to transmitted directly, because it has been
            // cleared by the previous call to poll_write_sm_req.
            sm_state.pending_req = transmitted;
            ready!(Self::poll_write_sm_req(Some(sm_state), stream.as_mut(), cx))?;
        }

        // Now, if the stream will let us and we need to, we can tack
        // on some SM ACKs.
        if let Some(sm_state) = sm_state {
            while sm_state.pending_acks > 0 {
                match ready!(<XmppStream as Sink<&XmppStreamElement>>::poll_ready(
                    stream.as_mut(),
                    cx,
                )) {
                    Ok(()) => (),
                    Err(e) => return Poll::Ready(Err(e)),
                }
                match stream
                    .as_mut()
                    .start_send(&XmppStreamElement::SM(sm::Nonza::Ack(sm::A {
                        h: sm_state.inbound_ctr(),
                    }))) {
                    Ok(()) => (),
                    Err(e) => {
                        // As the stream promised we would be able to
                        // send, this must be a problem with our
                        // (locally generated) nonza, i.e. this is
                        // fatal.
                        panic!("Failed to send SM Ack nonza: {}", e);
                    }
                }
                sm_state.pending_acks -= 1;
            }
        }

        // If we haven't transmitted, we may also not have polled
        // the stream for readiness or flushing. We need to do
        // that here to ensure progress. Even if our tx queue is
        // empty, the tx buffer may be nonempty
        match ready!(<XmppStream as Sink<&Stanza>>::poll_flush(
            stream.as_mut(),
            cx
        )) {
            Ok(()) => (),
            Err(e) => return Poll::Ready(Err(e)),
        }

        // If we end up here, all data we currently have has been
        // transmitted via the stream and the stream's tx buffers have
        // been properly flushed.
        if depleted {
            // And here, we know that the transmit queue is closed,
            // too. We return with success.
            Poll::Ready(Ok(()))
        } else {
            // The transmit queue is still open, so more data could
            // pour in to be transmitted.
            Poll::Pending
        }
    }

    /// Drive the stream in transmit simplex mode.
    ///
    /// This will block forever (i.e. return [`Poll::Pending`] without
    /// installing a waker) if the stream is currently being negotiated.
    /// Otherwise, it will attempt to drain the `transmit_queue`. When the
    /// queue is empty, `Ok(())` is returned. When write errors occur, this
    /// will also block forever.
    ///
    /// If nothing is to be sent, but the stream could be used for sending,
    /// this will drive the flush part of the inner stream.
    ///
    /// Any errors are reported on the next call to `poll`.
    pub fn poll_writes(
        &mut self,
        stream: Pin<&mut XmppStream>,
        transmit_queue: &mut TransmitQueue<QueueEntry>,
        cx: &mut Context<'_>,
    ) -> Poll<()> {
        match self {
            Self::Ready { sm_state, .. } => match ready!(Self::poll_writes_inner(
                sm_state.as_mut(),
                stream,
                transmit_queue,
                cx
            )) {
                Ok(()) => Poll::Ready(()),
                Err(e) => {
                    *self = Self::Failed {
                        error: Some(e),
                        sm_state: sm_state.take(),
                    };
                    Poll::Pending
                }
            },

            _ => Poll::Pending,
        }
    }

    /// Drive the stream in full-duplex mode.
    ///
    /// Stanzas from the `transmit_queue` are transmitted once the stream is
    /// ready.
    ///
    /// Returns:
    /// - Poll::Pending if it blocks on the inner stream
    /// - Poll::Ready(None) if it needs to be called again for a proper result
    /// - Poll::Ready(Some(.)) when it has proper result
    pub fn poll(
        &mut self,
        mut stream: Pin<&mut XmppStream>,
        jid: &Jid,
        features: &StreamFeatures,
        transmit_queue: &mut TransmitQueue<QueueEntry>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<ConnectedEvent>> {
        match self {
            Self::Negotiating { ref mut substate } => {
                match ready!(substate.advance(stream, jid, transmit_queue, cx)) {
                    Break(NegotiationResult::Disconnect { sm_state, error }) => {
                        self.to_failed_state(error, sm_state);
                        Poll::Ready(None)
                    }
                    Break(NegotiationResult::StreamReset {
                        sm_state,
                        bound_jid,
                    }) => {
                        *self = Self::Ready { sm_state };
                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Reset {
                            bound_jid,
                            features: features.clone(),
                        })))
                    }
                    Break(NegotiationResult::StreamResumed { sm_state }) => {
                        *self = Self::Ready {
                            sm_state: Some(sm_state),
                        };
                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Resumed)))
                    }
                    Break(NegotiationResult::StreamError { error }) => {
                        self.to_stream_error_state(error);
                        Poll::Ready(None)
                    }
                    Continue(Some(stanza)) => {
                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(stanza))))
                    }
                    Continue(None) => Poll::Ready(None),
                }
            }

            Self::SendStreamError {
                ref mut stream_error,
                ref mut io_error,
                ref mut deadline,
            } => {
                match stream.as_mut().poll_next(cx) {
                    Poll::Pending
                    | Poll::Ready(None)
                    | Poll::Ready(Some(Err(ReadError::StreamFooterReceived)))
                    | Poll::Ready(Some(Err(ReadError::SoftTimeout))) => (),
                    Poll::Ready(Some(Ok(ev))) => {
                        log::trace!("Discarding incoming data while sending stream error: {ev:?}")
                    }
                    Poll::Ready(Some(Err(ReadError::ParseError(e)))) => {
                        log::trace!("Ignoring parse error while sending stream error: {e}")
                    }
                    Poll::Ready(Some(Err(ReadError::HardError(e)))) => {
                        log::warn!("I/O error while sending stream error: {e}")
                    }
                }

                match deadline.as_mut().poll(cx) {
                    Poll::Pending => (),
                    Poll::Ready(()) => {
                        log::debug!("Timeout while sending stream error. Discarding state.");
                        let error = local_error_for_stream_error(io_error, stream_error);
                        self.to_failed_state(error, None);
                        return Poll::Ready(None);
                    }
                }

                // Cannot use ready! here because we have to consider the
                // case where the other side is refusing to accept data
                // because its outgoing buffer is too full.
                if stream_error.is_some() {
                    match ready!(<XmppStream as Sink<&StreamError>>::poll_ready(
                        stream.as_mut(),
                        cx
                    ))
                    .and_then(|()| {
                        // The take serves as transition to the next state.
                        let stream_error = stream_error.take().unwrap();
                        let result = stream.as_mut().start_send(&stream_error);
                        *io_error = Some(local_error_for_stream_error(
                            io_error,
                            &mut Some(stream_error),
                        ));
                        result
                    }) {
                        Ok(()) => (),
                        Err(e) => {
                            log::debug!("Got I/O error while sending stream error: {e}. Skipping error transmission.");
                            let error = local_error_for_stream_error(io_error, stream_error);
                            self.to_failed_state(error, None);
                            return Poll::Ready(None);
                        }
                    }
                }

                match ready!(<XmppStream as Sink<&StreamError>>::poll_flush(
                    stream.as_mut(),
                    cx
                )) {
                    Ok(()) => (),
                    Err(e) => {
                        log::debug!(
                            "Got I/O error while flushing stream error: {e}. Skipping flush.",
                        );
                    }
                }
                log::trace!("Stream error send complete, transitioning to Failed state");
                *self = Self::Failed {
                    error: Some(local_error_for_stream_error(io_error, stream_error)),
                    // Do *not* resume after we caused a stream error.
                    sm_state: None,
                };

                // Request the caller to call us again to get the
                // actual error message.
                Poll::Ready(None)
            }

            Self::Ready { ref mut sm_state } => {
                match Self::poll_writes_inner(
                    sm_state.as_mut(),
                    stream.as_mut(),
                    transmit_queue,
                    cx,
                ) {
                    Poll::Pending => (),
                    Poll::Ready(Ok(())) => {
                        *self = Self::LocalShutdown {
                            rx_state: RxShutdownState::AwaitingFooter,
                            tx_closed: false,
                            deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
                        };
                        return Poll::Ready(Some(ConnectedEvent::LocalShutdownRequested));
                    }
                    Poll::Ready(Err(e)) => {
                        *self = Self::Failed {
                            error: Some(e),
                            sm_state: sm_state.take(),
                        };
                        return Poll::Ready(None);
                    }
                }

                let item = ready!(stream.poll_next(cx));
                // We switch to a TxOpen or non-connected state when we
                // receive the stream footer, so reading `None` from the
                // stream is always an unclean closure.
                let item = item.unwrap_or_else(|| {
                    Err(ReadError::HardError(io::Error::new(
                        io::ErrorKind::UnexpectedEof,
                        "eof before stream footer",
                    )))
                });
                match item {
                    // Easy case, we got some data.
                    Ok(XmppStreamElement::Stanza(data)) => {
                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::Stanza(data))))
                    }

                    Ok(XmppStreamElement::SM(sm::Nonza::Ack(ack))) => {
                        if let Some(sm_state) = sm_state {
                            match sm_state.remote_acked(ack.h) {
                                Ok(()) => Poll::Ready(None),
                                Err(e) => {
                                    log::error!(
                                        "Failed to process <sm:a/> sent by the server: {e}",
                                    );
                                    self.to_stream_error_state(e.into());
                                    Poll::Ready(None)
                                }
                            }
                        } else {
                            log::debug!("Hmm... I got an <sm:a/> from the peer, but I don't have a stream management state. I'm gonna ignore that...");
                            Poll::Ready(None)
                        }
                    }

                    Ok(XmppStreamElement::SM(sm::Nonza::Req(_))) => {
                        if let Some(sm_state) = sm_state {
                            match sm_state.pending_acks.checked_add(1) {
                                None => panic!("Too many pending ACKs, something is wrong."),
                                Some(v) => sm_state.pending_acks = v,
                            }
                        } else {
                            log::warn!("Got an <sm:r/> from the peer, but we don't have any stream management state. Terminating stream with an error.");
                            self.to_stream_error_state(StreamError::new(
                                DefinedCondition::UnsupportedStanzaType,
                                "en",
                                "received <sm:r/>, but stream management is not enabled".to_owned(),
                            ));
                        }
                        // No matter whether we "enqueued" an ACK for send or
                        // whether we just successfully read something from
                        // the stream, we have to request to be polled again
                        // right away.
                        Poll::Ready(None)
                    }

                    Ok(other) => {
                        log::warn!(
                            "Received unsupported stream element: {other:?}. Emitting stream error.",
                        );
                        // TODO: figure out a good way to provide the sender
                        // with more information.
                        self.to_stream_error_state(StreamError::new(
                            DefinedCondition::UnsupportedStanzaType,
                            "en",
                            format!("Unsupported stream element: {other:?}"),
                        ));
                        Poll::Ready(None)
                    }

                    // Another easy case: Soft timeouts are passed through
                    // to the caller for handling.
                    Err(ReadError::SoftTimeout) => {
                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::SoftTimeout)))
                    }

                    // Parse errors are also just passed through (and will
                    // likely cause us to send a stream error).
                    Err(ReadError::ParseError(e)) => {
                        Poll::Ready(Some(ConnectedEvent::Worker(WorkerEvent::ParseError(e))))
                    }

                    // I/O errors cause the stream to be considerde
                    // broken; we drop it and send a Disconnect event with
                    // the error embedded.
                    Err(ReadError::HardError(e)) => {
                        let sm_state = sm_state.take();
                        Poll::Ready(Some(ConnectedEvent::Disconnect {
                            sm_state,
                            error: Some(e),
                        }))
                    }

                    // Stream footer indicates the remote wants to shut this
                    // stream down.
                    // We transition into RemoteShutdown state which makes us
                    // emit a special event until the caller takes care of it.
                    Err(ReadError::StreamFooterReceived) => {
                        *self = Self::RemoteShutdown {
                            sm_state: sm_state.take(),
                        };

                        // Let us be called again immediately to emit the
                        // notification.
                        Poll::Ready(None)
                    }
                }
            }

            Self::Failed { sm_state, error } => Poll::Ready(Some(ConnectedEvent::Disconnect {
                error: error.take(),
                sm_state: sm_state.take(),
            })),

            Self::LocalShutdown { .. } | Self::LocalShutdownComplete => {
                panic!("poll_next called in local shutdown");
            }

            Self::RemoteShutdown { ref mut sm_state } => {
                Poll::Ready(Some(ConnectedEvent::RemoteShutdown {
                    sm_state: sm_state.take(),
                }))
            }
        }
    }

    pub(super) fn poll_close(
        &mut self,
        mut stream: Pin<&mut XmppStream>,
        cx: &mut Context<'_>,
    ) -> Poll<Result<(), io::Error>> {
        loop {
            match self {
                // User initiates shutdown by local choice.
                // The actual shutdown is driven by the poll_read function and we
                // only get woken up via the close_poller waker.
                Self::Ready { .. } | Self::RemoteShutdown { .. } | Self::Negotiating { .. } => {
                    *self = Self::LocalShutdown {
                        rx_state: RxShutdownState::AwaitingFooter,
                        tx_closed: false,
                        deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
                    };
                }

                Self::Failed { error, .. } => match error.take() {
                    Some(error) => return Poll::Ready(Err(error)),
                    None => return Poll::Ready(Ok(())),
                },

                // If close is called while an attempt is made to send the
                // stream error, we abort transmission.
                Self::SendStreamError { .. } => {
                    log::debug!("close() called while stream error was being sent. Aborting transmission of stream error.");
                    *self = Self::LocalShutdown {
                        rx_state: RxShutdownState::AwaitingFooter,
                        tx_closed: false,
                        deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
                    };
                }

                // Wait for local shutdown (driven by poll_read) to complete.
                Self::LocalShutdown {
                    ref mut deadline,
                    ref mut rx_state,
                    ref mut tx_closed,
                } => {
                    match deadline.as_mut().poll(cx) {
                        Poll::Ready(()) => {
                            log::debug!("Dropping stream after shutdown timeout was exceeded.");
                            *self = Self::LocalShutdownComplete;
                            return Poll::Ready(Ok(()));
                        }
                        Poll::Pending => (),
                    }

                    if !*tx_closed {
                        // We cannot use ready! here, because we want to poll the
                        // receiving side in parallel.
                        match stream.as_mut().poll_shutdown(cx) {
                            Poll::Pending => (),
                            Poll::Ready(Ok(())) => {
                                *tx_closed = true;
                            }
                            Poll::Ready(Err(e)) => {
                                log::debug!(
                                    "Ignoring write error during local stream shutdown: {e}"
                                );
                                *tx_closed = true;
                            }
                        }
                    }

                    match rx_state {
                        RxShutdownState::Done => {
                            if !*tx_closed {
                                // poll_close() returned Poll::Pending, so we have to
                                // return that, too.
                                return Poll::Pending;
                            }
                        }
                        // We can use ready! here because the `poll_close` has
                        // happened already; we don't want to poll anything else
                        // anymore.
                        _ => loop {
                            match ready!(stream.as_mut().poll_next(cx)) {
                                None => {
                                    if *rx_state != RxShutdownState::AwaitingEof {
                                        log::debug!("Ignoring early EOF during stream shutdown.");
                                    }
                                    *rx_state = RxShutdownState::Done;
                                    break;
                                }
                                Some(Ok(data)) => {
                                    log::debug!("Ignoring data received on stream during local shutdown: {data:?}");
                                }
                                Some(Err(ReadError::SoftTimeout)) => (),
                                Some(Err(ReadError::HardError(e))) => {
                                    *rx_state = RxShutdownState::Done;
                                    log::debug!("Ignoring read error during local shutdown: {e}");
                                    break;
                                }
                                Some(Err(ReadError::ParseError(e))) => {
                                    log::debug!(
                                        "Ignoring parse error during local shutdown: {}",
                                        e
                                    );
                                }
                                Some(Err(ReadError::StreamFooterReceived)) => match rx_state {
                                    RxShutdownState::AwaitingFooter => {
                                        *rx_state = RxShutdownState::AwaitingEof;
                                    }
                                    RxShutdownState::AwaitingEof => {
                                        unreachable!("multiple stream footers?!")
                                    }
                                    RxShutdownState::Done => unreachable!(),
                                },
                            }
                        },
                    }

                    if *tx_closed && *rx_state == RxShutdownState::Done {
                        // Now that everything is properly cleaned up on the
                        // xmlstream layer, we go through with closure.
                        ready!(<XmppStream as Sink<&Stanza>>::poll_close(
                            stream.as_mut(),
                            cx
                        ))?;
                        // And now that's done, we can finally call it a day.
                        *self = Self::LocalShutdownComplete;
                        return Poll::Ready(Ok(()));
                    } else {
                        return Poll::Pending;
                    }
                }

                Self::LocalShutdownComplete => return Poll::Ready(Ok(())),
            }
        }
    }

    pub(super) fn start_send_stream_error(&mut self, error: StreamError) {
        match self {
            Self::LocalShutdownComplete
            | Self::LocalShutdown { .. }
            | Self::RemoteShutdown { .. }
            | Self::Failed { .. } => {
                log::debug!("Request to send stream error ({error}), but we are already shutting down or have already failed. Discarding.");
                return;
            }

            Self::Ready { .. } | Self::Negotiating { .. } => {}

            Self::SendStreamError { .. } => {
                log::debug!("Request to send stream error ({error}) while transmission of another stream error is already in progress. Discarding the new one.");
                return;
            }
        }

        *self = Self::SendStreamError {
            deadline: Box::pin(tokio::time::sleep(LOCAL_SHUTDOWN_TIMEOUT)),
            stream_error: Some(error),
            io_error: None,
        };
    }

    pub fn queue_sm_request(&mut self) -> bool {
        match self {
            Self::Ready {
                sm_state: Some(sm_state),
                ..
            } => {
                sm_state.pending_req = true;
                true
            }
            _ => false,
        }
    }
}
