experimental: wait for SM catchup before triggering notifications

Daniel Gultsch created

wait for a first SM ACK before calculating if we need to trigger any notifications
might have to be changed to an XEP-0199 ping later on. for now sending <r/> after resume seems to work outfine

Change summary

src/main/java/eu/siacs/conversations/parser/MessageParser.java |  9 +
src/main/java/eu/siacs/conversations/xmpp/XmppConnection.java  | 21 ++++
2 files changed, 27 insertions(+), 3 deletions(-)

Detailed changes

src/main/java/eu/siacs/conversations/parser/MessageParser.java 🔗

@@ -606,10 +606,13 @@ public class MessageParser extends AbstractParser implements OnMessagePacketRece
 			if (message.trusted() && message.treatAsDownloadable() != Message.Decision.NEVER && manager.getAutoAcceptFileSize() > 0) {
 				manager.createNewDownloadConnection(message);
 			} else if (notify) {
-				if (query == null) {
-					mXmppConnectionService.getNotificationService().push(message);
-				} else if (query.isCatchup()) { // mam catchup
+				if (query != null && query.isCatchup()) {
+					mXmppConnectionService.getNotificationService().pushFromBacklog(message);
+				} else if (account.getXmppConnection().isWaitingForSmCatchup()) {
+					account.getXmppConnection().incrementSmCatchupMessageCounter();
 					mXmppConnectionService.getNotificationService().pushFromBacklog(message);
+				} else {
+					mXmppConnectionService.getNotificationService().push(message);
 				}
 			}
 		} else if (!packet.hasChild("body")){ //no body

src/main/java/eu/siacs/conversations/xmpp/XmppConnection.java 🔗

@@ -120,6 +120,8 @@ public class XmppConnection implements Runnable {
 	private long lastDiscoStarted = 0;
 	private AtomicInteger mPendingServiceDiscoveries = new AtomicInteger(0);
 	private AtomicBoolean mWaitForDisco = new AtomicBoolean(true);
+	private AtomicBoolean mWaitingForSmCatchup = new AtomicBoolean(false);
+	private AtomicInteger mSmCatchupMessageCounter = new AtomicInteger(0);
 	private boolean mInteractive = false;
 	private int attempt = 0;
 	private final Hashtable<String, Pair<IqPacket, OnIqPacketReceived>> packetCallbacks = new Hashtable<>();
@@ -239,9 +241,18 @@ public class XmppConnection implements Runnable {
 		this.lastConnect = SystemClock.elapsedRealtime();
 		this.lastPingSent = SystemClock.elapsedRealtime();
 		this.lastDiscoStarted = Long.MAX_VALUE;
+		this.mWaitingForSmCatchup.set(false);
 		this.changeStatus(Account.State.CONNECTING);
 	}
 
+	public boolean isWaitingForSmCatchup() {
+		return mWaitingForSmCatchup.get();
+	}
+
+	public void incrementSmCatchupMessageCounter() {
+		this.mSmCatchupMessageCounter.incrementAndGet();
+	}
+
 	protected void connect() {
 		if (mXmppConnectionService.areMessagesInitialized()) {
 			mXmppConnectionService.resetSendingToWaiting(account);
@@ -613,6 +624,13 @@ public class XmppConnection implements Runnable {
 				final AckPacket ack = new AckPacket(this.stanzasReceived, smVersion);
 				tagWriter.writeStanzaAsync(ack);
 			} else if (nextTag.isStart("a")) {
+				if (mWaitingForSmCatchup.compareAndSet(true,false)) {
+					int count = mSmCatchupMessageCounter.get();
+					Log.d(Config.LOGTAG,account.getJid().toBareJid()+": SM catchup complete ("+count+")");
+					if (count > 0) {
+						mXmppConnectionService.getNotificationService().finishBacklog(true,account);
+					}
+				}
 				final Element ack = tagReader.readElement(nextTag);
 				lastPacketReceived = SystemClock.elapsedRealtime();
 				try {
@@ -839,7 +857,10 @@ public class XmppConnection implements Runnable {
 				Log.d(Config.LOGTAG,account.getJid().toBareJid()+": resuming after stanza #"+stanzasReceived);
 			}
 			final ResumePacket resume = new ResumePacket(this.streamId, stanzasReceived, smVersion);
+			this.mSmCatchupMessageCounter.set(0);
+			this.mWaitingForSmCatchup.set(true);
 			this.tagWriter.writeStanzaAsync(resume);
+			this.tagWriter.writeStanzaAsync(new RequestPacket(smVersion));
 		} else if (needsBinding) {
 			if (this.streamFeatures.hasChild("bind")) {
 				sendBindRequest();