Detailed changes
@@ -1,5 +1,8 @@
Version NEXT
XXXX-YY-ZZ [ RELEASER <admin@localhost> ]
+ * Breaking chagnes:
+ - Event::LeaveRoom, Event::LeaveAllRooms, and Event::JoinRooms have been removed.
+ Agent now handles MUC connection states internally. (!481)
* Fixes:
- Use tokio::sync::RwLock not std::sync::RwLock (!432)
- Agent::wait_for_events now return Vec<Event> and sets inner tokio_xmpp Client
@@ -55,24 +55,6 @@ async fn main() -> Result<(), Option<()>> {
Event::ChatMessage(_id, jid, body, time_info) => {
println!("Message from {} at {}: {}", jid, time_info.received, body.0);
}
- Event::JoinRoom(jid, conference) => {
- println!("Joining room {} ({:?})β¦", jid, conference.name);
- client
- .join_room(
- jid,
- conference.nick,
- conference.password,
- "en",
- "Yet another bot!",
- )
- .await;
- }
- Event::LeaveRoom(jid) => {
- println!("Leaving room {}β¦", jid);
- }
- Event::LeaveAllRooms => {
- println!("Leaving all roomsβ¦");
- }
Event::RoomJoined(jid) => {
println!("Joined room {}.", jid);
client
@@ -4,6 +4,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::RwLock;
@@ -26,6 +27,10 @@ pub struct Agent<C: ServerConnector> {
pub(crate) node: String,
pub(crate) uploads: Vec<(String, Jid, PathBuf)>,
pub(crate) awaiting_disco_bookmarks_type: bool,
+ // Mapping of room->nick
+ pub(crate) rooms_joined: HashMap<BareJid, String>,
+ pub(crate) rooms_joining: HashMap<BareJid, String>,
+ pub(crate) rooms_leaving: HashMap<BareJid, String>,
}
impl<C: ServerConnector> Agent<C> {
@@ -58,11 +63,10 @@ impl<C: ServerConnector> Agent<C> {
pub async fn leave_room(
&mut self,
room_jid: BareJid,
- nickname: RoomNick,
lang: impl Into<String>,
status: impl Into<String>,
) {
- muc::room::leave_room(self, room_jid, nickname, lang, status).await
+ muc::room::leave_room(self, room_jid, lang, status).await
}
pub async fn send_message(
@@ -4,6 +4,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
#[cfg(any(feature = "starttls-rust", feature = "starttls-native"))]
@@ -181,6 +182,9 @@ impl<C: ServerConnector> ClientBuilder<'_, C> {
node,
uploads: Vec::new(),
awaiting_disco_bookmarks_type: false,
+ rooms_joined: HashMap::new(),
+ rooms_joining: HashMap::new(),
+ rooms_leaving: HashMap::new(),
}
}
}
@@ -7,7 +7,7 @@
use tokio_xmpp::jid::BareJid;
#[cfg(feature = "avatars")]
use tokio_xmpp::jid::Jid;
-use tokio_xmpp::parsers::{bookmarks2, message::Body, roster::Item as RosterItem};
+use tokio_xmpp::parsers::{message::Body, roster::Item as RosterItem};
use crate::{delay::StanzaTimeInfo, Error, Id, RoomNick};
@@ -26,9 +26,6 @@ pub enum Event {
/// - The [`Body`] is the message body.
/// - The [`StanzaTimeInfo`] about when message was received, and when the message was claimed sent.
ChatMessage(Id, BareJid, Body, StanzaTimeInfo),
- JoinRoom(BareJid, bookmarks2::Conference),
- LeaveRoom(BareJid),
- LeaveAllRooms,
RoomJoined(BareJid),
RoomLeft(BareJid),
RoomMessage(Id, BareJid, RoomNick, Body, StanzaTimeInfo),
@@ -29,7 +29,7 @@ pub async fn handle_iq_result<C: ServerConnector>(
events.push(Event::ContactAdded(item));
}
} else if payload.is("pubsub", ns::PUBSUB) {
- let new_events = pubsub::handle_iq_result(&from, payload);
+ let new_events = pubsub::handle_iq_result(&from, payload, agent).await;
events.extend(new_events);
} else if payload.is("slot", ns::HTTP_UPLOAD) {
let new_events = upload::receive::handle_upload_result(&from, id, payload, agent).await;
@@ -39,7 +39,7 @@ pub async fn handle_iq_result<C: ServerConnector>(
Ok(query) => {
for conf in query.storage.conferences {
let (jid, room) = conf.into_bookmarks2();
- events.push(Event::JoinRoom(jid, room));
+ agent.join_room(jid, room.nick, room.password, "", "").await;
}
}
Err(e) => {
@@ -13,7 +13,7 @@ use tokio_xmpp::{
},
};
-use crate::{Agent, RoomNick};
+use crate::Agent;
pub async fn join_room<C: ServerConnector>(
agent: &mut Agent<C>,
@@ -23,6 +23,18 @@ pub async fn join_room<C: ServerConnector>(
lang: &str,
status: &str,
) {
+ if agent.rooms_joining.contains_key(&room) {
+ // We are already joining
+ warn!("Requesting to join again room {room} which is already joining...");
+ return;
+ }
+
+ if !agent.rooms_joined.contains_key(&room) {
+ // We are already joined, cannot join
+ warn!("Requesting to join room {room} which is already joined...");
+ return;
+ }
+
let mut muc = Muc::new();
if let Some(password) = password {
muc = muc.with_password(password);
@@ -39,6 +51,8 @@ pub async fn join_room<C: ServerConnector>(
presence.add_payload(muc);
presence.set_status(String::from(lang), String::from(status));
let _ = agent.client.send_stanza(presence.into()).await;
+
+ agent.rooms_joining.insert(room, nick);
}
/// Send a "leave room" request to the server (specifically, an "unavailable" presence stanza).
@@ -57,21 +71,33 @@ pub async fn join_room<C: ServerConnector>(
/// # Arguments
///
/// * `room_jid`: The JID of the room to leave.
-/// * `nickname`: The nickname to use in the room.
/// * `lang`: The language of the status message.
/// * `status`: The status message to send.
pub async fn leave_room<C: ServerConnector>(
agent: &mut Agent<C>,
- room_jid: BareJid,
- nickname: RoomNick,
+ room: BareJid,
lang: impl Into<String>,
status: impl Into<String>,
) {
+ if agent.rooms_leaving.contains_key(&room) {
+ // We are already leaving
+ warn!("Requesting to leave again room {room} which is already leaving...");
+ return;
+ }
+
+ if !agent.rooms_joined.contains_key(&room) {
+ // We are not joined, cannot leave
+ warn!("Requesting to leave room {room} which is not joined...");
+ return;
+ }
+
+ // Get currently-used nickname
+ let nickname = agent.rooms_joined.get(&room).unwrap();
+
// XEP-0045 specifies that, to leave a room, the client must send a presence stanza
// with type="unavailable".
let mut presence = Presence::new(PresenceType::Unavailable).with_to(
- room_jid
- .with_resource_str(nickname.as_str())
+ room.with_resource_str(nickname)
.expect("Invalid room JID after adding resource part."),
);
@@ -85,4 +111,6 @@ pub async fn leave_room<C: ServerConnector>(
// Report any errors to the log.
error!("Failed to send leave room presence: {}", e);
}
+
+ agent.rooms_leaving.insert(room, nickname.to_string());
}
@@ -14,14 +14,14 @@ use crate::{Agent, Event};
/// Translate a `Presence` stanza into a list of higher-level `Event`s.
pub async fn handle_presence<C: ServerConnector>(
- _agent: &mut Agent<C>,
+ agent: &mut Agent<C>,
presence: Presence,
) -> Vec<Event> {
// Allocate an empty vector to store the events.
let mut events = vec![];
// Extract the JID of the sender (i.e. the one whose presence is being sent).
- let from = presence.from.unwrap().to_bare();
+ let from = presence.from.clone().unwrap().to_bare();
// Search through the payloads for a MUC user status.
@@ -38,10 +38,22 @@ pub async fn handle_presence<C: ServerConnector>(
match presence.type_ {
PresenceType::None => {
// According to https://xmpp.org/extensions/xep-0045.html#enter-pres, no type should be seen as "available".
+ if let Some(nick) = agent.rooms_joining.get(&from) {
+ agent.rooms_joined.insert(from.clone(), nick.to_string());
+ agent.rooms_joining.remove(&from);
+ } else {
+ warn!("Received self-presence from {} while the room was not marked as joining.", presence.from.unwrap());
+ }
events.push(Event::RoomJoined(from.clone()));
}
PresenceType::Unavailable => {
// According to https://xmpp.org/extensions/xep-0045.html#exit, the server will use type "unavailable" to notify the client that it has left the room/
+ if agent.rooms_leaving.contains_key(&from) {
+ agent.rooms_joined.remove(&from);
+ agent.rooms_leaving.remove(&from);
+ } else {
+ warn!("Received self-presence unavailable from {} while the room was not marked as leaving.", presence.from.unwrap());
+ }
events.push(Event::RoomLeft(from.clone()));
}
_ => unimplemented!("Presence type {:?}", presence.type_), // TODO: What to do here?
@@ -25,7 +25,11 @@ pub(crate) async fn handle_event<C: ServerConnector>(
elem: Element,
#[cfg_attr(not(feature = "avatars"), allow(unused_variables))] agent: &mut Agent<C>,
) -> Vec<Event> {
+ // We allow the useless mut warning for no-default-features,
+ // since for now only avatars pushes events here.
+ #[allow(unused_mut)]
let mut events = Vec::new();
+
let event = PubSubEvent::try_from(elem);
trace!("PubSub event: {:#?}", event);
match event {
@@ -46,9 +50,20 @@ pub(crate) async fn handle_event<C: ServerConnector>(
match bookmarks2::Conference::try_from(payload) {
Ok(conference) => {
if conference.autojoin {
- events.push(Event::JoinRoom(jid, conference));
+ if !agent.rooms_joined.contains_key(&jid) {
+ agent
+ .join_room(
+ jid.clone(),
+ conference.nick,
+ conference.password,
+ "",
+ "",
+ )
+ .await;
+ }
} else {
- events.push(Event::LeaveRoom(jid));
+ // So maybe another client of ours left the room... let's leave it too
+ agent.leave_room(jid.clone(), "", "").await;
}
}
Err(err) => println!("not bookmark: {}", err),
@@ -64,20 +79,18 @@ pub(crate) async fn handle_event<C: ServerConnector>(
assert_eq!(items.len(), 1);
let item = items.clone().pop().unwrap();
let jid = BareJid::from_str(&item.0).unwrap();
- events.push(Event::LeaveRoom(jid));
+
+ agent.leave_room(jid.clone(), "", "").await;
}
ref node => unimplemented!("node {}", node),
}
}
- Ok(PubSubEvent::Purge { node }) => {
- match node.0 {
- ref node if node == ns::BOOKMARKS2 => {
- // TODO: Check that our bare JID is the sender.
- events.push(Event::LeaveAllRooms);
- }
- ref node => unimplemented!("node {}", node),
+ Ok(PubSubEvent::Purge { node }) => match node.0 {
+ ref node if node == ns::BOOKMARKS2 => {
+ warn!("The bookmarks2 PEP node was deleted!");
}
- }
+ ref node => unimplemented!("node {}", node),
+ },
Err(e) => {
error!("Error parsing PubSub event: {}", e);
}
@@ -86,11 +99,16 @@ pub(crate) async fn handle_event<C: ServerConnector>(
events
}
-pub(crate) fn handle_iq_result(
+pub(crate) async fn handle_iq_result<C: ServerConnector>(
#[cfg_attr(not(feature = "avatars"), allow(unused_variables))] from: &Jid,
elem: Element,
+ agent: &mut Agent<C>,
) -> impl IntoIterator<Item = Event> {
+ // We allow the useless mut warning for no-default-features,
+ // since for now only avatars pushes events here.
+ #[allow(unused_mut)]
let mut events = Vec::new();
+
let pubsub = PubSub::try_from(elem).unwrap();
trace!("PubSub: {:#?}", pubsub);
if let PubSub::Items(items) = pubsub {
@@ -101,20 +119,54 @@ pub(crate) fn handle_iq_result(
events.extend(new_events);
}
ref node if node == ns::BOOKMARKS2 => {
- events.push(Event::LeaveAllRooms);
+ // Keep track of the new added/removed rooms in the bookmarks2 list.
+ // The rooms we joined which are no longer in the list should be left ASAP.
+ let mut new_room_list: Vec<BareJid> = Vec::new();
+
for item in items.items {
let item = item.0;
let jid = BareJid::from_str(&item.id.clone().unwrap().0).unwrap();
let payload = item.payload.clone().unwrap();
match bookmarks2::Conference::try_from(payload) {
Ok(conference) => {
+ // This room was either marked for join or leave, but it was still in the bookmarks.
+ // Keep track in new_room_list.
+ new_room_list.push(jid.clone());
+
if conference.autojoin {
- events.push(Event::JoinRoom(jid, conference));
+ if !agent.rooms_joined.contains_key(&jid) {
+ agent
+ .join_room(
+ jid.clone(),
+ conference.nick,
+ conference.password,
+ "",
+ "",
+ )
+ .await;
+ }
+ } else {
+ // Leave the room that is no longer autojoin
+ agent.leave_room(jid.clone(), "", "").await;
}
}
- Err(err) => panic!("Wrong payload type in bookmarks 2 item: {}", err),
+ Err(err) => {
+ warn!("Wrong payload type in bookmarks2 item: {}", err);
+ }
}
}
+
+ // Now we leave the rooms that are no longer in the bookmarks
+ let mut rooms_to_leave: Vec<BareJid> = Vec::new();
+ for (room, _nick) in &agent.rooms_joined {
+ if !new_room_list.contains(&room) {
+ rooms_to_leave.push(room.clone());
+ }
+ }
+
+ for room in rooms_to_leave {
+ agent.leave_room(room, "", "").await;
+ }
}
_ => unimplemented!(),
}