fail pending messages on policy violation. fixes #3735

Daniel Gultsch created

Change summary

src/main/java/eu/siacs/conversations/xmpp/XmppConnection.java | 51 +++-
1 file changed, 35 insertions(+), 16 deletions(-)

Detailed changes

src/main/java/eu/siacs/conversations/xmpp/XmppConnection.java 🔗

@@ -111,7 +111,7 @@ public class XmppConnection implements Runnable {
         public void onIqPacketReceived(Account account, IqPacket packet) {
             if (packet.getType() == IqPacket.TYPE.RESULT) {
                 account.setOption(Account.OPTION_REGISTER, false);
-                Log.d(Config.LOGTAG, account.getJid().asBareJid()+": successfully registered new account on server");
+                Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": successfully registered new account on server");
                 throw new StateChangingError(Account.State.REGISTRATION_SUCCESSFUL);
             } else {
                 final List<String> PASSWORD_TOO_WEAK_MSGS = Arrays.asList(
@@ -272,7 +272,7 @@ public class XmppConnection implements Runnable {
                 final int port = account.getPort();
                 final boolean directTls = Resolver.useDirectTls(port);
 
-                Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": connect to " + destination + " via Tor. directTls="+directTls);
+                Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": connect to " + destination + " via Tor. directTls=" + directTls);
                 localSocket = SocksSocketFactory.createSocketOverTor(destination, port);
 
                 if (directTls) {
@@ -302,7 +302,7 @@ public class XmppConnection implements Runnable {
                     return;
                 }
                 if (results.size() == 0) {
-                    Log.e(Config.LOGTAG,account.getJid().asBareJid()+": Resolver results were empty");
+                    Log.e(Config.LOGTAG, account.getJid().asBareJid() + ": Resolver results were empty");
                     return;
                 }
                 final Resolver.Result storedBackupResult;
@@ -325,7 +325,7 @@ public class XmppConnection implements Runnable {
                         // if tls is true, encryption is implied and must not be started
                         features.encryptionEnabled = result.isDirectTls();
                         verifiedHostname = result.isAuthenticated() ? result.getHostname().toString() : null;
-                        Log.d(Config.LOGTAG,"verified hostname "+verifiedHostname);
+                        Log.d(Config.LOGTAG, "verified hostname " + verifiedHostname);
                         final InetSocketAddress addr;
                         if (result.getIp() != null) {
                             addr = new InetSocketAddress(result.getIp(), result.getPort());
@@ -383,7 +383,7 @@ public class XmppConnection implements Runnable {
             this.changeStatus(Account.State.SERVER_NOT_FOUND);
         } catch (final SocksSocketFactory.SocksProxyNotFoundException e) {
             this.changeStatus(Account.State.TOR_NOT_AVAILABLE);
-        } catch (final IOException | XmlPullParserException  e) {
+        } catch (final IOException | XmlPullParserException e) {
             Log.d(Config.LOGTAG, account.getJid().asBareJid().toString() + ": " + e.getMessage());
             this.changeStatus(Account.State.OFFLINE);
             this.attempt = Math.max(0, this.attempt - 1);
@@ -590,7 +590,7 @@ public class XmppConnection implements Runnable {
                     if (mWaitingForSmCatchup.compareAndSet(true, false)) {
                         final int messageCount = mSmCatchupMessageCounter.get();
                         final int pendingIQs = packetCallbacks.size();
-                        Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": SM catchup complete (messages=" + messageCount + ", pending IQs="+pendingIQs+")");
+                        Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": SM catchup complete (messages=" + messageCount + ", pending IQs=" + pendingIQs + ")");
                         accountUiNeedsRefresh = true;
                         if (messageCount > 0) {
                             mXmppConnectionService.getNotificationService().finishBacklog(true, account);
@@ -835,7 +835,7 @@ public class XmppConnection implements Runnable {
             if (isSecure) {
                 register();
             } else {
-                Log.d(Config.LOGTAG,account.getJid().asBareJid()+": unable to find STARTTLS for registration process "+ XmlHelper.printElementNames(this.streamFeatures));
+                Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": unable to find STARTTLS for registration process " + XmlHelper.printElementNames(this.streamFeatures));
                 throw new StateChangingException(Account.State.INCOMPATIBLE_SERVER);
             }
         } else if (!this.streamFeatures.hasChild("register") && account.isOptionSet(Account.OPTION_REGISTER)) {
@@ -854,7 +854,7 @@ public class XmppConnection implements Runnable {
             if (this.streamFeatures.hasChild("bind") && isSecure) {
                 sendBindRequest();
             } else {
-                Log.d(Config.LOGTAG,account.getJid().asBareJid()+": unable to find bind feature "+ XmlHelper.printElementNames(this.streamFeatures));
+                Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": unable to find bind feature " + XmlHelper.printElementNames(this.streamFeatures));
                 throw new StateChangingException(Account.State.INCOMPATIBLE_SERVER);
             }
         }
@@ -893,7 +893,7 @@ public class XmppConnection implements Runnable {
             }
             tagWriter.writeElement(auth);
         } else {
-            Log.d(Config.LOGTAG,account.getJid().asBareJid()+": unable to find supported SASL mechanism in "+mechanisms);
+            Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": unable to find supported SASL mechanism in " + mechanisms);
             throw new StateChangingException(Account.State.INCOMPATIBLE_SERVER);
         }
     }
@@ -918,7 +918,7 @@ public class XmppConnection implements Runnable {
                     sendRegistryRequest();
                 } else {
                     final Element error = response.getError();
-                    Log.d(Config.LOGTAG,account.getJid().asBareJid()+": failed to pre auth. "+error);
+                    Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": failed to pre auth. " + error);
                     throw new StateChangingError(Account.State.REGISTRATION_INVALID_TOKEN);
                 }
             }, true);
@@ -1241,10 +1241,10 @@ public class XmppConnection implements Runnable {
         IqPacket request = new IqPacket(IqPacket.TYPE.GET);
         request.addChild("prefs", MessageArchiveService.Version.MAM_2.namespace);
         sendIqPacket(request, (account, response) -> {
-           if (response.getType() == IqPacket.TYPE.RESULT) {
-               Element prefs = response.findChild("prefs", MessageArchiveService.Version.MAM_2.namespace);
-               isMamPreferenceAlways = "always".equals(prefs == null ? null : prefs.getAttribute("default"));
-           }
+            if (response.getType() == IqPacket.TYPE.RESULT) {
+                Element prefs = response.findChild("prefs", MessageArchiveService.Version.MAM_2.namespace);
+                isMamPreferenceAlways = "always".equals(prefs == null ? null : prefs.getAttribute("default"));
+            }
         });
     }
 
@@ -1338,7 +1338,8 @@ public class XmppConnection implements Runnable {
         } else if (streamError.hasChild("policy-violation")) {
             this.lastConnect = SystemClock.elapsedRealtime();
             final String text = streamError.findChildContent("text");
-            Log.d(Config.LOGTAG,account.getJid().asBareJid()+": policy violation. "+text);
+            Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": policy violation. " + text);
+            failPendingMessages(text);
             throw new StateChangingException(Account.State.POLICY_VIOLATION);
         } else {
             Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": stream error " + streamError.toString());
@@ -1346,6 +1347,24 @@ public class XmppConnection implements Runnable {
         }
     }
 
+    private void failPendingMessages(final String error) {
+        synchronized (this.mStanzaQueue) {
+            for (int i = 0; i < mStanzaQueue.size(); ++i) {
+                final AbstractAcknowledgeableStanza stanza = mStanzaQueue.valueAt(i);
+                if (stanza instanceof MessagePacket) {
+                    final MessagePacket packet = (MessagePacket) stanza;
+                    final String id = packet.getId();
+                    final Jid to = packet.getTo();
+                    mXmppConnectionService.markMessage(account,
+                            to.asBareJid(),
+                            id,
+                            Message.STATUS_SEND_FAILED,
+                            error);
+                }
+            }
+        }
+    }
+
     private void sendStartStream() throws IOException {
         final Tag stream = Tag.start("stream:stream");
         stream.setAttribute("to", account.getServer());
@@ -1903,7 +1922,7 @@ public class XmppConnection implements Runnable {
         }
 
         public boolean externalServiceDiscovery() {
-            return hasDiscoFeature(account.getDomain(),Namespace.EXTERNAL_SERVICE_DISCOVERY);
+            return hasDiscoFeature(account.getDomain(), Namespace.EXTERNAL_SERVICE_DISCOVERY);
         }
     }
 }