MessageArchiveService.java

  1package eu.siacs.conversations.services;
  2
  3import static eu.siacs.conversations.utils.Random.SECURE_RANDOM;
  4
  5import android.util.Log;
  6import androidx.annotation.NonNull;
  7import eu.siacs.conversations.Config;
  8import eu.siacs.conversations.R;
  9import eu.siacs.conversations.entities.Account;
 10import eu.siacs.conversations.entities.Conversation;
 11import eu.siacs.conversations.entities.Conversational;
 12import eu.siacs.conversations.entities.ReceiptRequest;
 13import eu.siacs.conversations.generator.AbstractGenerator;
 14import eu.siacs.conversations.xml.Element;
 15import eu.siacs.conversations.xmpp.Jid;
 16import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
 17import eu.siacs.conversations.xmpp.mam.MamReference;
 18import im.conversations.android.xmpp.model.stanza.Iq;
 19import im.conversations.android.xmpp.model.stanza.Message;
 20import java.math.BigInteger;
 21import java.util.ArrayList;
 22import java.util.Collection;
 23import java.util.HashSet;
 24import java.util.Iterator;
 25import java.util.List;
 26
 27public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
 28
 29    private final XmppConnectionService mXmppConnectionService;
 30
 31    private final HashSet<Query> queries = new HashSet<>();
 32    private final ArrayList<Query> pendingQueries = new ArrayList<>();
 33
 34    public enum Version {
 35        MAM_0("urn:xmpp:mam:0", true),
 36        MAM_1("urn:xmpp:mam:1", false),
 37        MAM_2("urn:xmpp:mam:2", false);
 38
 39        public final boolean legacy;
 40        public final String namespace;
 41
 42        Version(String namespace, boolean legacy) {
 43            this.namespace = namespace;
 44            this.legacy = legacy;
 45        }
 46
 47        public static Version get(Account account) {
 48            return get(account, null);
 49        }
 50
 51        public static Version get(Account account, Conversation conversation) {
 52            if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
 53                return get(account.getXmppConnection().getFeatures().getAccountFeatures());
 54            } else {
 55                return get(conversation.getMucOptions().getFeatures());
 56            }
 57        }
 58
 59        private static Version get(final Collection<String> features) {
 60            final Version[] values = values();
 61            for (int i = values.length - 1; i >= 0; --i) {
 62                for (String feature : features) {
 63                    if (values[i].namespace.equals(feature)) {
 64                        return values[i];
 65                    }
 66                }
 67            }
 68            return MAM_0;
 69        }
 70
 71        public static boolean has(final Collection<String> features) {
 72            for (String feature : features) {
 73                for (Version version : values()) {
 74                    if (version.namespace.equals(feature)) {
 75                        return true;
 76                    }
 77                }
 78            }
 79            return false;
 80        }
 81
 82        public static Element findResult(Message packet) {
 83            for (Version version : values()) {
 84                Element result = packet.findChild("result", version.namespace);
 85                if (result != null) {
 86                    return result;
 87                }
 88            }
 89            return null;
 90        }
 91    }
 92
 93    MessageArchiveService(final XmppConnectionService service) {
 94        this.mXmppConnectionService = service;
 95    }
 96
 97    private void catchup(final Account account) {
 98        synchronized (this.queries) {
 99            for (Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext(); ) {
100                Query query = iterator.next();
101                if (query.getAccount() == account) {
102                    iterator.remove();
103                }
104            }
105        }
106        MamReference mamReference =
107                MamReference.max(
108                        mXmppConnectionService.databaseBackend.getLastMessageReceived(account),
109                        mXmppConnectionService.databaseBackend.getLastClearDate(account));
110        mamReference =
111                MamReference.max(
112                        mamReference, mXmppConnectionService.getAutomaticMessageDeletionDate());
113        long endCatchup = account.getXmppConnection().getLastSessionEstablished();
114        final Query query;
115        if (mamReference.getTimestamp() == 0) {
116            return;
117        } else if (endCatchup - mamReference.getTimestamp() >= Config.MAM_MAX_CATCHUP) {
118            long startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
119            List<Conversation> conversations = mXmppConnectionService.getConversations();
120            for (Conversation conversation : conversations) {
121                if (conversation.getMode() == Conversation.MODE_SINGLE
122                        && conversation.getAccount() == account
123                        && startCatchup > conversation.getLastMessageTransmitted().getTimestamp()) {
124                    this.query(conversation, startCatchup, true);
125                }
126            }
127            query = new Query(account, new MamReference(startCatchup), 0);
128        } else {
129            query = new Query(account, mamReference, 0);
130        }
131        synchronized (this.queries) {
132            this.queries.add(query);
133        }
134        this.execute(query);
135    }
136
137    void catchupMUC(final Conversation conversation) {
138        if (conversation.getLastMessageTransmitted().getTimestamp() < 0
139                && conversation.countMessages() == 0) {
140            query(conversation, new MamReference(0), 0, true);
141        } else {
142            query(conversation, conversation.getLastMessageTransmitted(), 0, true);
143        }
144    }
145
146    public Query query(final Conversation conversation) {
147        if (conversation.getLastMessageTransmitted().getTimestamp() < 0
148                && conversation.countMessages() == 0) {
149            return query(conversation, new MamReference(0), System.currentTimeMillis(), false);
150        } else {
151            return query(
152                    conversation,
153                    conversation.getLastMessageTransmitted(),
154                    conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
155                    false);
156        }
157    }
158
159    public boolean isCatchingUp(Conversation conversation) {
160        final Account account = conversation.getAccount();
161        if (account.getXmppConnection().isWaitingForSmCatchup()) {
162            return true;
163        } else {
164            synchronized (this.queries) {
165                for (Query query : this.queries) {
166                    if (query.getAccount() == account
167                            && query.isCatchup()
168                            && ((conversation.getMode() == Conversation.MODE_SINGLE
169                                            && query.getWith() == null)
170                                    || query.getConversation() == conversation)) {
171                        return true;
172                    }
173                }
174            }
175            return false;
176        }
177    }
178
179    public Query query(final Conversation conversation, long end, boolean allowCatchup) {
180        return this.query(
181                conversation, conversation.getLastMessageTransmitted(), end, allowCatchup);
182    }
183
184    public Query query(
185            Conversation conversation, MamReference start, long end, boolean allowCatchup) {
186        synchronized (this.queries) {
187            final Query query;
188            final MamReference startActual =
189                    MamReference.max(
190                            start, mXmppConnectionService.getAutomaticMessageDeletionDate());
191            if (start.getTimestamp() == 0) {
192                query = new Query(conversation, startActual, end, false);
193                query.reference = conversation.getFirstMamReference();
194            } else {
195                if (allowCatchup) {
196                    MamReference maxCatchup =
197                            MamReference.max(
198                                    startActual,
199                                    System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
200                    if (maxCatchup.greaterThan(startActual)) {
201                        Query reverseCatchup =
202                                new Query(
203                                        conversation,
204                                        startActual,
205                                        maxCatchup.getTimestamp(),
206                                        false);
207                        this.queries.add(reverseCatchup);
208                        this.execute(reverseCatchup);
209                    }
210                    query = new Query(conversation, maxCatchup, end, true);
211                } else {
212                    query = new Query(conversation, startActual, end, false);
213                }
214            }
215            if (end != 0 && start.greaterThan(end)) {
216                return null;
217            }
218            this.queries.add(query);
219            this.execute(query);
220            return query;
221        }
222    }
223
224    public void executePendingQueries(final Account account) {
225        final List<Query> pending = new ArrayList<>();
226        synchronized (this.pendingQueries) {
227            for (Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
228                Query query = iterator.next();
229                if (query.getAccount() == account) {
230                    pending.add(query);
231                    iterator.remove();
232                }
233            }
234        }
235        for (Query query : pending) {
236            this.execute(query);
237        }
238    }
239
240    private void execute(final Query query) {
241        final Account account = query.getAccount();
242        if (account.getStatus() == Account.State.ONLINE) {
243            final Conversation conversation = query.getConversation();
244            if (conversation != null && conversation.getStatus() == Conversation.STATUS_ARCHIVED) {
245                throw new IllegalStateException(
246                        "Attempted to run MAM query for archived conversation");
247            }
248            Log.d(
249                    Config.LOGTAG,
250                    account.getJid().asBareJid().toString() + ": running mam query " + query);
251            final Iq packet =
252                    this.mXmppConnectionService
253                            .getIqGenerator()
254                            .queryMessageArchiveManagement(query);
255            this.mXmppConnectionService.sendIqPacket(
256                    account,
257                    packet,
258                    (p) -> {
259                        final Element fin = p.findChild("fin", query.version.namespace);
260                        if (p.getType() == Iq.Type.TIMEOUT) {
261                            synchronized (this.queries) {
262                                this.queries.remove(query);
263                                if (query.hasCallback()) {
264                                    query.callback(false);
265                                }
266                            }
267                        } else if (p.getType() == Iq.Type.RESULT && fin != null) {
268                            final boolean running;
269                            synchronized (this.queries) {
270                                running = this.queries.contains(query);
271                            }
272                            if (running) {
273                                processFin(query, fin);
274                            } else {
275                                Log.d(
276                                        Config.LOGTAG,
277                                        account.getJid().asBareJid()
278                                                + ": ignoring MAM iq result because query had been"
279                                                + " killed");
280                            }
281                        } else if (p.getType() == Iq.Type.RESULT && query.isLegacy()) {
282                            // do nothing
283                        } else {
284                            Log.d(
285                                    Config.LOGTAG,
286                                    account.getJid().asBareJid().toString()
287                                            + ": error executing mam: "
288                                            + p);
289                            try {
290                                finalizeQuery(query, true);
291                            } catch (final IllegalStateException e) {
292                                // ignored
293                            }
294                        }
295                    });
296        } else {
297            synchronized (this.pendingQueries) {
298                this.pendingQueries.add(query);
299            }
300        }
301    }
302
303    private void finalizeQuery(final Query query, boolean done) {
304        synchronized (this.queries) {
305            if (!this.queries.remove(query)) {
306                throw new IllegalStateException("Unable to remove query from queries");
307            }
308        }
309        final Conversation conversation = query.getConversation();
310        if (conversation != null) {
311            conversation.sort();
312            conversation.setHasMessagesLeftOnServer(!done);
313            final var displayState = conversation.getDisplayState();
314            if (displayState != null) {
315                mXmppConnectionService.markReadUpToStanzaId(conversation, displayState);
316            }
317        } else {
318            for (final Conversation tmp : this.mXmppConnectionService.getConversations()) {
319                if (tmp.getAccount() == query.getAccount()) {
320                    tmp.sort();
321                    final var displayState = tmp.getDisplayState();
322                    if (displayState != null) {
323                        mXmppConnectionService.markReadUpToStanzaId(tmp, displayState);
324                    }
325                }
326            }
327        }
328        if (query.hasCallback()) {
329            query.callback(done);
330        } else {
331            this.mXmppConnectionService.updateConversationUi();
332        }
333    }
334
335    public boolean inCatchup(Account account) {
336        synchronized (this.queries) {
337            for (Query query : queries) {
338                if (query.account == account && query.isCatchup() && query.getWith() == null) {
339                    return true;
340                }
341            }
342        }
343        return false;
344    }
345
346    public boolean isCatchupInProgress(Conversation conversation) {
347        synchronized (this.queries) {
348            for (Query query : queries) {
349                if (query.account == conversation.getAccount() && query.isCatchup()) {
350                    final Jid with = query.getWith() == null ? null : query.getWith().asBareJid();
351                    if ((conversation.getMode() == Conversational.MODE_SINGLE && with == null)
352                            || (conversation.getJid().asBareJid().equals(with))) {
353                        return true;
354                    }
355                }
356            }
357        }
358        return false;
359    }
360
361    boolean queryInProgress(
362            Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
363        synchronized (this.queries) {
364            for (Query query : queries) {
365                if (query.conversation == conversation) {
366                    if (!query.hasCallback() && callback != null) {
367                        query.setCallback(callback);
368                    }
369                    return true;
370                }
371            }
372            return false;
373        }
374    }
375
376    public boolean queryInProgress(Conversation conversation) {
377        return queryInProgress(conversation, null);
378    }
379
380    public void processFinLegacy(Element fin, Jid from) {
381        Query query = findQuery(fin.getAttribute("queryid"));
382        if (query != null && query.validFrom(from)) {
383            processFin(query, fin);
384        }
385    }
386
387    private void processFin(Query query, Element fin) {
388        boolean complete = fin.getAttributeAsBoolean("complete");
389        Element set = fin.findChild("set", "http://jabber.org/protocol/rsm");
390        Element last = set == null ? null : set.findChild("last");
391        String count = set == null ? null : set.findChildContent("count");
392        Element first = set == null ? null : set.findChild("first");
393        Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
394        boolean abort =
395                (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE)
396                        || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
397        if (query.getConversation() != null) {
398            query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
399        }
400        if (complete || relevant == null || abort) {
401            // TODO: FIX done logic to look at complete. using count is probably unreliable because
402            // it can be ommited and doesn’t work with paging.
403            boolean done;
404            if (query.isCatchup()) {
405                done = false;
406            } else {
407                if (count != null) {
408                    try {
409                        done = Integer.parseInt(count) <= query.getTotalCount();
410                    } catch (NumberFormatException e) {
411                        done = false;
412                    }
413                } else {
414                    done = query.getTotalCount() == 0;
415                }
416            }
417            done = done || (query.getActualMessageCount() == 0 && !query.isCatchup());
418            this.finalizeQuery(query, done);
419
420            Log.d(
421                    Config.LOGTAG,
422                    query.getAccount().getJid().asBareJid()
423                            + ": finished mam after "
424                            + query.getTotalCount()
425                            + "("
426                            + query.getActualMessageCount()
427                            + ") messages. messages left="
428                            + !done
429                            + " count="
430                            + count);
431            if (query.isCatchup() && query.getActualMessageCount() > 0) {
432                mXmppConnectionService
433                        .getNotificationService()
434                        .finishBacklog(true, query.getAccount());
435            }
436            processPostponed(query);
437        } else {
438            final Query nextQuery;
439            if (query.getPagingOrder() == PagingOrder.NORMAL) {
440                nextQuery = query.next(last == null ? null : last.getContent());
441            } else {
442                nextQuery = query.prev(first == null ? null : first.getContent());
443            }
444            this.execute(nextQuery);
445            this.finalizeQuery(query, false);
446            synchronized (this.queries) {
447                this.queries.add(nextQuery);
448            }
449        }
450    }
451
452    void kill(final Conversation conversation) {
453        final ArrayList<Query> toBeKilled = new ArrayList<>();
454        synchronized (this.pendingQueries) {
455            for (final Iterator<Query> iterator = this.pendingQueries.iterator();
456                    iterator.hasNext(); ) {
457                final Query query = iterator.next();
458                if (query.getConversation() == conversation) {
459                    iterator.remove();
460                    Log.d(
461                            Config.LOGTAG,
462                            conversation.getAccount().getJid().asBareJid()
463                                    + ": killed pending MAM query for archived conversation");
464                }
465            }
466        }
467        synchronized (this.queries) {
468            for (final Query q : queries) {
469                if (q.conversation == conversation) {
470                    toBeKilled.add(q);
471                }
472            }
473        }
474        for (final Query q : toBeKilled) {
475            kill(q);
476        }
477    }
478
479    private void kill(Query query) {
480        Log.d(
481                Config.LOGTAG,
482                query.getAccount().getJid().asBareJid() + ": killing mam query prematurely");
483        query.callback = null;
484        this.finalizeQuery(query, false);
485        if (query.isCatchup() && query.getActualMessageCount() > 0) {
486            mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
487        }
488        this.processPostponed(query);
489    }
490
491    private void processPostponed(Query query) {
492        query.account.getAxolotlService().processPostponed();
493        query.pendingReceiptRequests.removeAll(query.receiptRequests);
494        Log.d(
495                Config.LOGTAG,
496                query.getAccount().getJid().asBareJid()
497                        + ": found "
498                        + query.pendingReceiptRequests.size()
499                        + " pending receipt requests");
500        Iterator<ReceiptRequest> iterator = query.pendingReceiptRequests.iterator();
501        while (iterator.hasNext()) {
502            ReceiptRequest rr = iterator.next();
503            mXmppConnectionService.sendMessagePacket(
504                    query.account,
505                    mXmppConnectionService
506                            .getMessageGenerator()
507                            .received(query.account, rr.getJid(), rr.getId()));
508            iterator.remove();
509        }
510    }
511
512    public Query findQuery(String id) {
513        if (id == null) {
514            return null;
515        }
516        synchronized (this.queries) {
517            for (Query query : this.queries) {
518                if (query.getQueryId().equals(id)) {
519                    return query;
520                }
521            }
522            return null;
523        }
524    }
525
526    @Override
527    public void onAdvancedStreamFeaturesAvailable(Account account) {
528        if (account.getXmppConnection() != null
529                && account.getXmppConnection().getFeatures().mam()) {
530            this.catchup(account);
531        }
532    }
533
534    public enum PagingOrder {
535        NORMAL,
536        REVERSE
537    }
538
539    public class Query {
540        private HashSet<ReceiptRequest> pendingReceiptRequests = new HashSet<>();
541        private HashSet<ReceiptRequest> receiptRequests = new HashSet<>();
542        private int totalCount = 0;
543        private int actualCount = 0;
544        private int actualInThisQuery = 0;
545        private long start;
546        private final long end;
547        private final String queryId;
548        private String reference = null;
549        private final Account account;
550        private Conversation conversation;
551        private PagingOrder pagingOrder = PagingOrder.NORMAL;
552        private XmppConnectionService.OnMoreMessagesLoaded callback = null;
553        private boolean catchup = true;
554        public final Version version;
555
556        Query(Conversation conversation, MamReference start, long end, boolean catchup) {
557            this(
558                    conversation.getAccount(),
559                    Version.get(conversation.getAccount(), conversation),
560                    catchup ? start : start.timeOnly(),
561                    end);
562            this.conversation = conversation;
563            this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
564            this.catchup = catchup;
565        }
566
567        Query(Account account, MamReference start, long end) {
568            this(account, Version.get(account), start, end);
569        }
570
571        Query(Account account, Version version, MamReference start, long end) {
572            this.account = account;
573            if (start.getReference() != null) {
574                this.reference = start.getReference();
575            } else {
576                this.start = start.getTimestamp();
577            }
578            this.end = end;
579            this.queryId = new BigInteger(50, SECURE_RANDOM).toString(32);
580            this.version = version;
581        }
582
583        private Query page(String reference) {
584            Query query =
585                    new Query(
586                            this.account,
587                            this.version,
588                            new MamReference(this.start, reference),
589                            this.end);
590            query.conversation = conversation;
591            query.totalCount = totalCount;
592            query.actualCount = actualCount;
593            query.pendingReceiptRequests = pendingReceiptRequests;
594            query.receiptRequests = receiptRequests;
595            query.callback = callback;
596            query.catchup = catchup;
597            return query;
598        }
599
600        public void removePendingReceiptRequest(ReceiptRequest receiptRequest) {
601            if (!this.pendingReceiptRequests.remove(receiptRequest)) {
602                this.receiptRequests.add(receiptRequest);
603            }
604        }
605
606        public void addPendingReceiptRequest(ReceiptRequest receiptRequest) {
607            this.pendingReceiptRequests.add(receiptRequest);
608        }
609
610        public boolean isLegacy() {
611            return version.legacy;
612        }
613
614        public boolean safeToExtractTrueCounterpart() {
615            return muc() && !isLegacy();
616        }
617
618        public Query next(String reference) {
619            Query query = page(reference);
620            query.pagingOrder = PagingOrder.NORMAL;
621            return query;
622        }
623
624        Query prev(String reference) {
625            Query query = page(reference);
626            query.pagingOrder = PagingOrder.REVERSE;
627            return query;
628        }
629
630        public String getReference() {
631            return reference;
632        }
633
634        public PagingOrder getPagingOrder() {
635            return this.pagingOrder;
636        }
637
638        public String getQueryId() {
639            return queryId;
640        }
641
642        public Jid getWith() {
643            return conversation == null ? null : conversation.getJid().asBareJid();
644        }
645
646        public boolean muc() {
647            return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
648        }
649
650        public long getStart() {
651            return start;
652        }
653
654        public boolean isCatchup() {
655            return catchup;
656        }
657
658        public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
659            this.callback = callback;
660        }
661
662        public void callback(boolean done) {
663            if (this.callback != null) {
664                this.callback.onMoreMessagesLoaded(actualCount, conversation);
665                if (done) {
666                    this.callback.informUser(R.string.no_more_history_on_server);
667                }
668            }
669        }
670
671        public long getEnd() {
672            return end;
673        }
674
675        public Conversation getConversation() {
676            return conversation;
677        }
678
679        public Account getAccount() {
680            return this.account;
681        }
682
683        public void incrementMessageCount() {
684            this.totalCount++;
685        }
686
687        public void incrementActualMessageCount() {
688            this.actualInThisQuery++;
689            this.actualCount++;
690        }
691
692        int getTotalCount() {
693            return this.totalCount;
694        }
695
696        int getActualMessageCount() {
697            return this.actualCount;
698        }
699
700        public int getActualInThisQuery() {
701            return this.actualInThisQuery;
702        }
703
704        public boolean validFrom(Jid from) {
705            if (muc()) {
706                return getWith().equals(from);
707            } else {
708                return (from == null) || account.getJid().asBareJid().equals(from.asBareJid());
709            }
710        }
711
712        @NonNull
713        @Override
714        public String toString() {
715            StringBuilder builder = new StringBuilder();
716            if (this.muc()) {
717                builder.append("to=");
718                builder.append(this.getWith().toString());
719            } else {
720                builder.append("with=");
721                if (this.getWith() == null) {
722                    builder.append("*");
723                } else {
724                    builder.append(getWith().toString());
725                }
726            }
727            if (this.start != 0) {
728                builder.append(", start=");
729                builder.append(AbstractGenerator.getTimestamp(this.start));
730            }
731            if (this.end != 0) {
732                builder.append(", end=");
733                builder.append(AbstractGenerator.getTimestamp(this.end));
734            }
735            builder.append(", order=").append(pagingOrder.toString());
736            if (this.reference != null) {
737                if (this.pagingOrder == PagingOrder.NORMAL) {
738                    builder.append(", after=");
739                } else {
740                    builder.append(", before=");
741                }
742                builder.append(this.reference);
743            }
744            builder.append(", catchup=").append(catchup);
745            builder.append(", ns=").append(version.namespace);
746            return builder.toString();
747        }
748
749        boolean hasCallback() {
750            return this.callback != null;
751        }
752    }
753}