MessageArchiveService.java

  1package eu.siacs.conversations.services;
  2
  3import static eu.siacs.conversations.utils.Random.SECURE_RANDOM;
  4
  5import android.util.Log;
  6import androidx.annotation.NonNull;
  7import eu.siacs.conversations.Config;
  8import eu.siacs.conversations.R;
  9import eu.siacs.conversations.entities.Account;
 10import eu.siacs.conversations.entities.Conversation;
 11import eu.siacs.conversations.entities.Conversational;
 12import eu.siacs.conversations.entities.ReceiptRequest;
 13import eu.siacs.conversations.generator.AbstractGenerator;
 14import eu.siacs.conversations.xml.Element;
 15import eu.siacs.conversations.xmpp.Jid;
 16import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
 17import eu.siacs.conversations.xmpp.mam.MamReference;
 18import im.conversations.android.xmpp.model.stanza.Iq;
 19import im.conversations.android.xmpp.model.stanza.Message;
 20import java.math.BigInteger;
 21import java.util.ArrayList;
 22import java.util.Collection;
 23import java.util.HashSet;
 24import java.util.Iterator;
 25import java.util.List;
 26
 27public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
 28
 29    private final XmppConnectionService mXmppConnectionService;
 30
 31    private final HashSet<Query> queries = new HashSet<>();
 32    private final ArrayList<Query> pendingQueries = new ArrayList<>();
 33
 34    public enum Version {
 35        MAM_0("urn:xmpp:mam:0", true),
 36        MAM_1("urn:xmpp:mam:1", false),
 37        MAM_2("urn:xmpp:mam:2", false);
 38
 39        public final boolean legacy;
 40        public final String namespace;
 41
 42        Version(String namespace, boolean legacy) {
 43            this.namespace = namespace;
 44            this.legacy = legacy;
 45        }
 46
 47        public static Version get(Account account) {
 48            return get(account, null);
 49        }
 50
 51        public static Version get(Account account, Conversation conversation) {
 52            if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
 53                return get(account.getXmppConnection().getFeatures().getAccountFeatures());
 54            } else {
 55                return get(conversation.getMucOptions().getFeatures());
 56            }
 57        }
 58
 59        private static Version get(final Collection<String> features) {
 60            final Version[] values = values();
 61            for (int i = values.length - 1; i >= 0; --i) {
 62                for (String feature : features) {
 63                    if (values[i].namespace.equals(feature)) {
 64                        return values[i];
 65                    }
 66                }
 67            }
 68            return MAM_0;
 69        }
 70
 71        public static boolean has(final Collection<String> features) {
 72            for (String feature : features) {
 73                for (Version version : values()) {
 74                    if (version.namespace.equals(feature)) {
 75                        return true;
 76                    }
 77                }
 78            }
 79            return false;
 80        }
 81
 82        public static Element findResult(Message packet) {
 83            for (Version version : values()) {
 84                Element result = packet.findChild("result", version.namespace);
 85                if (result != null) {
 86                    return result;
 87                }
 88            }
 89            return null;
 90        }
 91    }
 92
 93    MessageArchiveService(final XmppConnectionService service) {
 94        this.mXmppConnectionService = service;
 95    }
 96
 97    private void catchup(final Account account) {
 98        synchronized (this.queries) {
 99            for (Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext(); ) {
100                Query query = iterator.next();
101                if (query.getAccount() == account) {
102                    iterator.remove();
103                }
104            }
105        }
106        MamReference mamReference =
107                MamReference.max(
108                        mXmppConnectionService.databaseBackend.getLastMessageReceived(account),
109                        mXmppConnectionService.databaseBackend.getLastClearDate(account));
110        mamReference =
111                MamReference.max(
112                        mamReference, mXmppConnectionService.getAutomaticMessageDeletionDate());
113        long endCatchup = account.getXmppConnection().getLastSessionEstablished();
114        final Query query;
115        if (mamReference.getTimestamp() == 0) {
116            return;
117        } else if (endCatchup - mamReference.getTimestamp() >= Config.MAM_MAX_CATCHUP) {
118            long startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
119            List<Conversation> conversations = mXmppConnectionService.getConversations();
120            for (Conversation conversation : conversations) {
121                if (conversation.getMode() == Conversation.MODE_SINGLE
122                        && conversation.getAccount() == account
123                        && startCatchup > conversation.getLastMessageTransmitted().getTimestamp()) {
124                    this.query(conversation, startCatchup, true);
125                }
126            }
127            query = new Query(account, new MamReference(startCatchup), 0);
128        } else {
129            query = new Query(account, mamReference, 0);
130        }
131        synchronized (this.queries) {
132            this.queries.add(query);
133        }
134        this.execute(query);
135    }
136
137    public void catchupMUC(final Conversation conversation) {
138        if (conversation.getLastMessageTransmitted().getTimestamp() < 0
139                && conversation.countMessages() == 0) {
140            query(conversation, new MamReference(0), 0, true);
141        } else {
142            query(conversation, conversation.getLastMessageTransmitted(), 0, true);
143        }
144    }
145
146    public Query query(final Conversation conversation) {
147        if (conversation.getLastMessageTransmitted().getTimestamp() < 0
148                && conversation.countMessages() == 0) {
149            return query(conversation, new MamReference(0), System.currentTimeMillis(), false);
150        } else {
151            return query(
152                    conversation,
153                    conversation.getLastMessageTransmitted(),
154                    conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
155                    false);
156        }
157    }
158
159    public boolean isCatchingUp(Conversation conversation) {
160        final Account account = conversation.getAccount();
161        if (account.getXmppConnection().isWaitingForSmCatchup()) {
162            return true;
163        } else {
164            synchronized (this.queries) {
165                for (Query query : this.queries) {
166                    if (query.getAccount() == account
167                            && query.isCatchup()
168                            && ((conversation.getMode() == Conversation.MODE_SINGLE
169                                            && query.getWith() == null)
170                                    || query.getConversation() == conversation)) {
171                        return true;
172                    }
173                }
174            }
175            return false;
176        }
177    }
178
179    public Query query(final Conversation conversation, long end, boolean allowCatchup) {
180        return this.query(
181                conversation, conversation.getLastMessageTransmitted(), end, allowCatchup);
182    }
183
184    public Query query(
185            Conversation conversation, MamReference start, long end, boolean allowCatchup) {
186        synchronized (this.queries) {
187            final Query query;
188            final MamReference startActual = start;
189            if (start.getTimestamp() == 0) {
190                query = new Query(conversation, startActual, end, false);
191                query.reference = conversation.getFirstMamReference();
192            } else {
193                if (allowCatchup) {
194                    MamReference maxCatchup =
195                            MamReference.max(
196                                    startActual,
197                                    System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
198                    if (maxCatchup.greaterThan(startActual)) {
199                        Query reverseCatchup =
200                                new Query(
201                                        conversation,
202                                        startActual,
203                                        maxCatchup.getTimestamp(),
204                                        false);
205                        this.queries.add(reverseCatchup);
206                        this.execute(reverseCatchup);
207                    }
208                    query = new Query(conversation, maxCatchup, end, true);
209                } else {
210                    query = new Query(conversation, startActual, end, false);
211                }
212            }
213            if (end != 0 && start.greaterThan(end)) {
214                return null;
215            }
216            this.queries.add(query);
217            this.execute(query);
218            return query;
219        }
220    }
221
222    public void executePendingQueries(final Account account) {
223        final List<Query> pending = new ArrayList<>();
224        synchronized (this.pendingQueries) {
225            for (Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
226                Query query = iterator.next();
227                if (query.getAccount() == account) {
228                    pending.add(query);
229                    iterator.remove();
230                }
231            }
232        }
233        for (Query query : pending) {
234            this.execute(query);
235        }
236    }
237
238    private void execute(final Query query) {
239        final Account account = query.getAccount();
240        if (account.getStatus() == Account.State.ONLINE) {
241            final Conversation conversation = query.getConversation();
242            if (conversation != null && conversation.getStatus() == Conversation.STATUS_ARCHIVED) {
243                throw new IllegalStateException(
244                        "Attempted to run MAM query for archived conversation");
245            }
246            Log.d(
247                    Config.LOGTAG,
248                    account.getJid().asBareJid().toString() + ": running mam query " + query);
249            final Iq packet =
250                    this.mXmppConnectionService
251                            .getIqGenerator()
252                            .queryMessageArchiveManagement(query);
253            this.mXmppConnectionService.sendIqPacket(
254                    account,
255                    packet,
256                    (p) -> {
257                        final Element fin = p.findChild("fin", query.version.namespace);
258                        if (p.getType() == Iq.Type.TIMEOUT) {
259                            synchronized (this.queries) {
260                                this.queries.remove(query);
261                                if (query.hasCallback()) {
262                                    query.callback(false);
263                                }
264                            }
265                        } else if (p.getType() == Iq.Type.RESULT && fin != null) {
266                            final boolean running;
267                            synchronized (this.queries) {
268                                running = this.queries.contains(query);
269                            }
270                            if (running) {
271                                processFin(query, fin);
272                            } else {
273                                Log.d(
274                                        Config.LOGTAG,
275                                        account.getJid().asBareJid()
276                                                + ": ignoring MAM iq result because query had been"
277                                                + " killed");
278                            }
279                        } else if (p.getType() == Iq.Type.RESULT && query.isLegacy()) {
280                            // do nothing
281                        } else {
282                            Log.d(
283                                    Config.LOGTAG,
284                                    account.getJid().asBareJid().toString()
285                                            + ": error executing mam: "
286                                            + p);
287                            try {
288                                finalizeQuery(query, true);
289                            } catch (final IllegalStateException e) {
290                                // ignored
291                            }
292                        }
293                    });
294        } else {
295            synchronized (this.pendingQueries) {
296                this.pendingQueries.add(query);
297            }
298        }
299    }
300
301    private void finalizeQuery(final Query query, boolean done) {
302        synchronized (this.queries) {
303            if (!this.queries.remove(query)) {
304                throw new IllegalStateException("Unable to remove query from queries");
305            }
306        }
307        final Conversation conversation = query.getConversation();
308        if (conversation != null) {
309            conversation.sort();
310            conversation.setHasMessagesLeftOnServer(!done);
311            final var displayState = conversation.getDisplayState();
312            if (displayState != null) {
313                mXmppConnectionService.markReadUpToStanzaId(conversation, displayState);
314            }
315        } else {
316            for (final Conversation tmp : this.mXmppConnectionService.getConversations()) {
317                if (tmp.getAccount() == query.getAccount()) {
318                    tmp.sort();
319                    final var displayState = tmp.getDisplayState();
320                    if (displayState != null) {
321                        mXmppConnectionService.markReadUpToStanzaId(tmp, displayState);
322                    }
323                }
324            }
325        }
326        if (query.hasCallback()) {
327            query.callback(done);
328        } else {
329            this.mXmppConnectionService.updateConversationUi();
330        }
331    }
332
333    public boolean inCatchup(Account account) {
334        synchronized (this.queries) {
335            for (Query query : queries) {
336                if (query.account == account && query.isCatchup() && query.getWith() == null) {
337                    return true;
338                }
339            }
340        }
341        return false;
342    }
343
344    public boolean isCatchupInProgress(Conversation conversation) {
345        synchronized (this.queries) {
346            for (Query query : queries) {
347                if (query.account == conversation.getAccount() && query.isCatchup()) {
348                    final Jid with = query.getWith() == null ? null : query.getWith().asBareJid();
349                    if ((conversation.getMode() == Conversational.MODE_SINGLE && with == null)
350                            || (conversation.getJid().asBareJid().equals(with))) {
351                        return true;
352                    }
353                }
354            }
355        }
356        return false;
357    }
358
359    boolean queryInProgress(
360            Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
361        synchronized (this.queries) {
362            for (Query query : queries) {
363                if (query.conversation == conversation) {
364                    if (!query.hasCallback() && callback != null) {
365                        query.setCallback(callback);
366                    }
367                    return true;
368                }
369            }
370            return false;
371        }
372    }
373
374    public boolean queryInProgress(Conversation conversation) {
375        return queryInProgress(conversation, null);
376    }
377
378    public void processFinLegacy(Element fin, Jid from) {
379        Query query = findQuery(fin.getAttribute("queryid"));
380        if (query != null && query.validFrom(from)) {
381            processFin(query, fin);
382        }
383    }
384
385    private void processFin(Query query, Element fin) {
386        boolean complete = fin.getAttributeAsBoolean("complete");
387        Element set = fin.findChild("set", "http://jabber.org/protocol/rsm");
388        Element last = set == null ? null : set.findChild("last");
389        String count = set == null ? null : set.findChildContent("count");
390        Element first = set == null ? null : set.findChild("first");
391        Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
392        boolean abort =
393                (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE)
394                        || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
395        if (query.getConversation() != null) {
396            query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
397        }
398        if (complete || relevant == null || abort) {
399            // TODO: FIX done logic to look at complete. using count is probably unreliable because
400            // it can be ommited and doesn’t work with paging.
401            boolean done;
402            if (query.isCatchup()) {
403                done = false;
404            } else {
405                if (count != null) {
406                    try {
407                        done = Integer.parseInt(count) <= query.getTotalCount();
408                    } catch (NumberFormatException e) {
409                        done = false;
410                    }
411                } else {
412                    done = query.getTotalCount() == 0;
413                }
414            }
415            done = done || (query.getActualMessageCount() == 0 && !query.isCatchup());
416            this.finalizeQuery(query, done);
417
418            Log.d(
419                    Config.LOGTAG,
420                    query.getAccount().getJid().asBareJid()
421                            + ": finished mam after "
422                            + query.getTotalCount()
423                            + "("
424                            + query.getActualMessageCount()
425                            + ") messages. messages left="
426                            + !done
427                            + " count="
428                            + count);
429            if (query.isCatchup() && query.getActualMessageCount() > 0) {
430                mXmppConnectionService
431                        .getNotificationService()
432                        .finishBacklog(true, query.getAccount());
433            }
434            if (query.isCatchup() && query.getPagingOrder() == PagingOrder.NORMAL && !complete && query.getConversation() != null) {
435                // Going forward we stopped without completing due to limits
436                // So we don't have the most recent messages yet
437                synchronized (this.queries) {
438                    final var q = new Query(query.getConversation(), new MamReference(System.currentTimeMillis() - Config.MAM_MIN_CATCHUP), 0, true, PagingOrder.REVERSE);
439                    this.queries.add(q);
440                    this.execute(q);
441                }
442            }
443            processPostponed(query);
444        } else {
445            final Query nextQuery;
446            if (query.getPagingOrder() == PagingOrder.NORMAL) {
447                nextQuery = query.next(last == null ? null : last.getContent());
448            } else {
449                nextQuery = query.prev(first == null ? null : first.getContent());
450            }
451            this.execute(nextQuery);
452            this.finalizeQuery(query, false);
453            synchronized (this.queries) {
454                this.queries.add(nextQuery);
455            }
456        }
457    }
458
459    void kill(final Conversation conversation) {
460        final ArrayList<Query> toBeKilled = new ArrayList<>();
461        synchronized (this.pendingQueries) {
462            for (final Iterator<Query> iterator = this.pendingQueries.iterator();
463                    iterator.hasNext(); ) {
464                final Query query = iterator.next();
465                if (query.getConversation() == conversation) {
466                    iterator.remove();
467                    Log.d(
468                            Config.LOGTAG,
469                            conversation.getAccount().getJid().asBareJid()
470                                    + ": killed pending MAM query for archived conversation");
471                }
472            }
473        }
474        synchronized (this.queries) {
475            for (final Query q : queries) {
476                if (q.conversation == conversation) {
477                    toBeKilled.add(q);
478                }
479            }
480        }
481        for (final Query q : toBeKilled) {
482            kill(q);
483        }
484    }
485
486    private void kill(Query query) {
487        Log.d(
488                Config.LOGTAG,
489                query.getAccount().getJid().asBareJid() + ": killing mam query prematurely");
490        query.callback = null;
491        this.finalizeQuery(query, false);
492        if (query.isCatchup() && query.getActualMessageCount() > 0) {
493            mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
494        }
495        this.processPostponed(query);
496    }
497
498    private void processPostponed(Query query) {
499        query.account.getAxolotlService().processPostponed();
500        query.pendingReceiptRequests.removeAll(query.receiptRequests);
501        Log.d(
502                Config.LOGTAG,
503                query.getAccount().getJid().asBareJid()
504                        + ": found "
505                        + query.pendingReceiptRequests.size()
506                        + " pending receipt requests");
507        Iterator<ReceiptRequest> iterator = query.pendingReceiptRequests.iterator();
508        while (iterator.hasNext()) {
509            ReceiptRequest rr = iterator.next();
510            mXmppConnectionService.sendMessagePacket(
511                    query.account,
512                    mXmppConnectionService
513                            .getMessageGenerator()
514                            .received(query.account, rr.getJid(), rr.getId()));
515            iterator.remove();
516        }
517    }
518
519    public Query findQuery(String id) {
520        if (id == null) {
521            return null;
522        }
523        synchronized (this.queries) {
524            for (Query query : this.queries) {
525                if (query.getQueryId().equals(id)) {
526                    return query;
527                }
528            }
529            return null;
530        }
531    }
532
533    @Override
534    public void onAdvancedStreamFeaturesAvailable(Account account) {
535        if (account.getXmppConnection() != null
536                && account.getXmppConnection().getFeatures().mam()) {
537            this.catchup(account);
538        }
539    }
540
541    public enum PagingOrder {
542        NORMAL,
543        REVERSE
544    }
545
546    public class Query {
547        private HashSet<ReceiptRequest> pendingReceiptRequests = new HashSet<>();
548        private HashSet<ReceiptRequest> receiptRequests = new HashSet<>();
549        private int totalCount = 0;
550        private int actualCount = 0;
551        private int actualInThisQuery = 0;
552        private long start;
553        private final long end;
554        private final String queryId;
555        private String reference = null;
556        private final Account account;
557        private Conversation conversation;
558        private PagingOrder pagingOrder = PagingOrder.NORMAL;
559        private XmppConnectionService.OnMoreMessagesLoaded callback = null;
560        private boolean catchup = true;
561        public final Version version;
562
563        Query(Conversation conversation, MamReference start, long end, boolean catchup, PagingOrder order) {
564            this(conversation, start, end, catchup);
565            this.pagingOrder = order;
566        }
567
568        Query(Conversation conversation, MamReference start, long end, boolean catchup) {
569            this(
570                    conversation.getAccount(),
571                    Version.get(conversation.getAccount(), conversation),
572                    catchup ? start : start.timeOnly(),
573                    end);
574            this.conversation = conversation;
575            this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
576            this.catchup = catchup;
577        }
578
579        Query(Account account, MamReference start, long end) {
580            this(account, Version.get(account), start, end);
581        }
582
583        Query(Account account, Version version, MamReference start, long end) {
584            this.account = account;
585            if (start.getReference() != null) {
586                this.reference = start.getReference();
587            } else {
588                this.start = start.getTimestamp();
589            }
590            this.end = end;
591            this.queryId = new BigInteger(50, SECURE_RANDOM).toString(32);
592            this.version = version;
593        }
594
595        private Query page(String reference) {
596            Query query =
597                    new Query(
598                            this.account,
599                            this.version,
600                            new MamReference(this.start, reference),
601                            this.end);
602            query.conversation = conversation;
603            query.totalCount = totalCount;
604            query.actualCount = actualCount;
605            query.pendingReceiptRequests = pendingReceiptRequests;
606            query.receiptRequests = receiptRequests;
607            query.callback = callback;
608            query.catchup = catchup;
609            return query;
610        }
611
612        public void removePendingReceiptRequest(ReceiptRequest receiptRequest) {
613            if (!this.pendingReceiptRequests.remove(receiptRequest)) {
614                this.receiptRequests.add(receiptRequest);
615            }
616        }
617
618        public void addPendingReceiptRequest(ReceiptRequest receiptRequest) {
619            this.pendingReceiptRequests.add(receiptRequest);
620        }
621
622        public boolean isLegacy() {
623            return version.legacy;
624        }
625
626        public boolean safeToExtractTrueCounterpart() {
627            return muc() && !isLegacy();
628        }
629
630        public Query next(String reference) {
631            Query query = page(reference);
632            query.pagingOrder = PagingOrder.NORMAL;
633            return query;
634        }
635
636        Query prev(String reference) {
637            Query query = page(reference);
638            query.pagingOrder = PagingOrder.REVERSE;
639            return query;
640        }
641
642        public String getReference() {
643            return reference;
644        }
645
646        public PagingOrder getPagingOrder() {
647            return this.pagingOrder;
648        }
649
650        public String getQueryId() {
651            return queryId;
652        }
653
654        public Jid getWith() {
655            return conversation == null ? null : conversation.getJid().asBareJid();
656        }
657
658        public boolean muc() {
659            return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
660        }
661
662        public long getStart() {
663            return start;
664        }
665
666        public boolean isCatchup() {
667            return catchup;
668        }
669
670        public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
671            this.callback = callback;
672        }
673
674        public void callback(boolean done) {
675            if (this.callback != null) {
676                this.callback.onMoreMessagesLoaded(actualCount, conversation);
677                if (done) {
678                    this.callback.informUser(R.string.no_more_history_on_server);
679                }
680            }
681        }
682
683        public long getEnd() {
684            return end;
685        }
686
687        public Conversation getConversation() {
688            return conversation;
689        }
690
691        public Account getAccount() {
692            return this.account;
693        }
694
695        public void incrementMessageCount() {
696            this.totalCount++;
697        }
698
699        public void incrementActualMessageCount() {
700            this.actualInThisQuery++;
701            this.actualCount++;
702        }
703
704        int getTotalCount() {
705            return this.totalCount;
706        }
707
708        int getActualMessageCount() {
709            return this.actualCount;
710        }
711
712        public int getActualInThisQuery() {
713            return this.actualInThisQuery;
714        }
715
716        public boolean validFrom(Jid from) {
717            if (muc()) {
718                return getWith().equals(from);
719            } else {
720                return (from == null) || account.getJid().asBareJid().equals(from.asBareJid());
721            }
722        }
723
724        @NonNull
725        @Override
726        public String toString() {
727            StringBuilder builder = new StringBuilder();
728            if (this.muc()) {
729                builder.append("to=");
730                builder.append(this.getWith().toString());
731            } else {
732                builder.append("with=");
733                if (this.getWith() == null) {
734                    builder.append("*");
735                } else {
736                    builder.append(getWith().toString());
737                }
738            }
739            if (this.start != 0) {
740                builder.append(", start=");
741                builder.append(AbstractGenerator.getTimestamp(this.start));
742            }
743            if (this.end != 0) {
744                builder.append(", end=");
745                builder.append(AbstractGenerator.getTimestamp(this.end));
746            }
747            builder.append(", order=").append(pagingOrder.toString());
748            if (this.reference != null) {
749                if (this.pagingOrder == PagingOrder.NORMAL) {
750                    builder.append(", after=");
751                } else {
752                    builder.append(", before=");
753                }
754                builder.append(this.reference);
755            }
756            builder.append(", catchup=").append(catchup);
757            builder.append(", ns=").append(version.namespace);
758            return builder.toString();
759        }
760
761        boolean hasCallback() {
762            return this.callback != null;
763        }
764
765        public boolean isImplausibleFrom(final Jid from) {
766            if (muc()) {
767                if (from == null) {
768                    return true;
769                }
770                return !from.asBareJid().equals(getWith());
771            } else {
772                return false;
773            }
774        }
775    }
776}