MessageArchiveService.java

  1package eu.siacs.conversations.services;
  2
  3import static eu.siacs.conversations.utils.Random.SECURE_RANDOM;
  4
  5import android.util.Log;
  6import androidx.annotation.NonNull;
  7import eu.siacs.conversations.Config;
  8import eu.siacs.conversations.R;
  9import eu.siacs.conversations.entities.Account;
 10import eu.siacs.conversations.entities.Conversation;
 11import eu.siacs.conversations.entities.Conversational;
 12import eu.siacs.conversations.entities.ReceiptRequest;
 13import eu.siacs.conversations.generator.AbstractGenerator;
 14import eu.siacs.conversations.xml.Element;
 15import eu.siacs.conversations.xmpp.Jid;
 16import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
 17import eu.siacs.conversations.xmpp.mam.MamReference;
 18import im.conversations.android.xmpp.model.stanza.Iq;
 19import im.conversations.android.xmpp.model.stanza.Message;
 20import java.math.BigInteger;
 21import java.util.ArrayList;
 22import java.util.HashSet;
 23import java.util.Iterator;
 24import java.util.List;
 25
 26public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
 27
 28    private final XmppConnectionService mXmppConnectionService;
 29
 30    private final HashSet<Query> queries = new HashSet<>();
 31    private final ArrayList<Query> pendingQueries = new ArrayList<>();
 32
 33    public enum Version {
 34        MAM_0("urn:xmpp:mam:0", true),
 35        MAM_1("urn:xmpp:mam:1", false),
 36        MAM_2("urn:xmpp:mam:2", false);
 37
 38        public final boolean legacy;
 39        public final String namespace;
 40
 41        Version(String namespace, boolean legacy) {
 42            this.namespace = namespace;
 43            this.legacy = legacy;
 44        }
 45
 46        public static Version get(Account account) {
 47            return get(account, null);
 48        }
 49
 50        public static Version get(Account account, Conversation conversation) {
 51            if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
 52                return get(account.getXmppConnection().getFeatures().getAccountFeatures());
 53            } else {
 54                return get(conversation.getMucOptions().getFeatures());
 55            }
 56        }
 57
 58        private static Version get(List<String> features) {
 59            final Version[] values = values();
 60            for (int i = values.length - 1; i >= 0; --i) {
 61                for (String feature : features) {
 62                    if (values[i].namespace.equals(feature)) {
 63                        return values[i];
 64                    }
 65                }
 66            }
 67            return MAM_0;
 68        }
 69
 70        public static boolean has(List<String> features) {
 71            for (String feature : features) {
 72                for (Version version : values()) {
 73                    if (version.namespace.equals(feature)) {
 74                        return true;
 75                    }
 76                }
 77            }
 78            return false;
 79        }
 80
 81        public static Element findResult(Message packet) {
 82            for (Version version : values()) {
 83                Element result = packet.findChild("result", version.namespace);
 84                if (result != null) {
 85                    return result;
 86                }
 87            }
 88            return null;
 89        }
 90    }
 91
 92    MessageArchiveService(final XmppConnectionService service) {
 93        this.mXmppConnectionService = service;
 94    }
 95
 96    private void catchup(final Account account) {
 97        synchronized (this.queries) {
 98            for (Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext(); ) {
 99                Query query = iterator.next();
100                if (query.getAccount() == account) {
101                    iterator.remove();
102                }
103            }
104        }
105        MamReference mamReference =
106                MamReference.max(
107                        mXmppConnectionService.databaseBackend.getLastMessageReceived(account),
108                        mXmppConnectionService.databaseBackend.getLastClearDate(account));
109        mamReference =
110                MamReference.max(
111                        mamReference, mXmppConnectionService.getAutomaticMessageDeletionDate());
112        long endCatchup = account.getXmppConnection().getLastSessionEstablished();
113        final Query query;
114        if (mamReference.getTimestamp() == 0) {
115            return;
116        } else if (endCatchup - mamReference.getTimestamp() >= Config.MAM_MAX_CATCHUP) {
117            long startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
118            List<Conversation> conversations = mXmppConnectionService.getConversations();
119            for (Conversation conversation : conversations) {
120                if (conversation.getMode() == Conversation.MODE_SINGLE
121                        && conversation.getAccount() == account
122                        && startCatchup > conversation.getLastMessageTransmitted().getTimestamp()) {
123                    this.query(conversation, startCatchup, true);
124                }
125            }
126            query = new Query(account, new MamReference(startCatchup), 0);
127        } else {
128            query = new Query(account, mamReference, 0);
129        }
130        synchronized (this.queries) {
131            this.queries.add(query);
132        }
133        this.execute(query);
134    }
135
136    void catchupMUC(final Conversation conversation) {
137        if (conversation.getLastMessageTransmitted().getTimestamp() < 0
138                && conversation.countMessages() == 0) {
139            query(conversation, new MamReference(0), 0, true);
140        } else {
141            query(conversation, conversation.getLastMessageTransmitted(), 0, true);
142        }
143    }
144
145    public Query query(final Conversation conversation) {
146        if (conversation.getLastMessageTransmitted().getTimestamp() < 0
147                && conversation.countMessages() == 0) {
148            return query(conversation, new MamReference(0), System.currentTimeMillis(), false);
149        } else {
150            return query(
151                    conversation,
152                    conversation.getLastMessageTransmitted(),
153                    conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
154                    false);
155        }
156    }
157
158    public boolean isCatchingUp(Conversation conversation) {
159        final Account account = conversation.getAccount();
160        if (account.getXmppConnection().isWaitingForSmCatchup()) {
161            return true;
162        } else {
163            synchronized (this.queries) {
164                for (Query query : this.queries) {
165                    if (query.getAccount() == account
166                            && query.isCatchup()
167                            && ((conversation.getMode() == Conversation.MODE_SINGLE
168                                            && query.getWith() == null)
169                                    || query.getConversation() == conversation)) {
170                        return true;
171                    }
172                }
173            }
174            return false;
175        }
176    }
177
178    public Query query(final Conversation conversation, long end, boolean allowCatchup) {
179        return this.query(
180                conversation, conversation.getLastMessageTransmitted(), end, allowCatchup);
181    }
182
183    public Query query(
184            Conversation conversation, MamReference start, long end, boolean allowCatchup) {
185        synchronized (this.queries) {
186            final Query query;
187            final MamReference startActual = start;
188            if (start.getTimestamp() == 0) {
189                query = new Query(conversation, startActual, end, false);
190                query.reference = conversation.getFirstMamReference();
191            } else {
192                if (allowCatchup) {
193                    MamReference maxCatchup =
194                            MamReference.max(
195                                    startActual,
196                                    System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
197                    if (maxCatchup.greaterThan(startActual)) {
198                        Query reverseCatchup =
199                                new Query(
200                                        conversation,
201                                        startActual,
202                                        maxCatchup.getTimestamp(),
203                                        false);
204                        this.queries.add(reverseCatchup);
205                        this.execute(reverseCatchup);
206                    }
207                    query = new Query(conversation, maxCatchup, end, true);
208                } else {
209                    query = new Query(conversation, startActual, end, false);
210                }
211            }
212            if (end != 0 && start.greaterThan(end)) {
213                return null;
214            }
215            this.queries.add(query);
216            this.execute(query);
217            return query;
218        }
219    }
220
221    void executePendingQueries(final Account account) {
222        final List<Query> pending = new ArrayList<>();
223        synchronized (this.pendingQueries) {
224            for (Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
225                Query query = iterator.next();
226                if (query.getAccount() == account) {
227                    pending.add(query);
228                    iterator.remove();
229                }
230            }
231        }
232        for (Query query : pending) {
233            this.execute(query);
234        }
235    }
236
237    private void execute(final Query query) {
238        final Account account = query.getAccount();
239        if (account.getStatus() == Account.State.ONLINE) {
240            final Conversation conversation = query.getConversation();
241            if (conversation != null && conversation.getStatus() == Conversation.STATUS_ARCHIVED) {
242                throw new IllegalStateException(
243                        "Attempted to run MAM query for archived conversation");
244            }
245            Log.d(
246                    Config.LOGTAG,
247                    account.getJid().asBareJid().toString() + ": running mam query " + query);
248            final Iq packet =
249                    this.mXmppConnectionService
250                            .getIqGenerator()
251                            .queryMessageArchiveManagement(query);
252            this.mXmppConnectionService.sendIqPacket(
253                    account,
254                    packet,
255                    (p) -> {
256                        final Element fin = p.findChild("fin", query.version.namespace);
257                        if (p.getType() == Iq.Type.TIMEOUT) {
258                            synchronized (this.queries) {
259                                this.queries.remove(query);
260                                if (query.hasCallback()) {
261                                    query.callback(false);
262                                }
263                            }
264                        } else if (p.getType() == Iq.Type.RESULT && fin != null) {
265                            final boolean running;
266                            synchronized (this.queries) {
267                                running = this.queries.contains(query);
268                            }
269                            if (running) {
270                                processFin(query, fin);
271                            } else {
272                                Log.d(
273                                        Config.LOGTAG,
274                                        account.getJid().asBareJid()
275                                                + ": ignoring MAM iq result because query had been"
276                                                + " killed");
277                            }
278                        } else if (p.getType() == Iq.Type.RESULT && query.isLegacy()) {
279                            // do nothing
280                        } else {
281                            Log.d(
282                                    Config.LOGTAG,
283                                    account.getJid().asBareJid().toString()
284                                            + ": error executing mam: "
285                                            + p);
286                            try {
287                                finalizeQuery(query, true);
288                            } catch (final IllegalStateException e) {
289                                // ignored
290                            }
291                        }
292                    });
293        } else {
294            synchronized (this.pendingQueries) {
295                this.pendingQueries.add(query);
296            }
297        }
298    }
299
300    private void finalizeQuery(final Query query, boolean done) {
301        synchronized (this.queries) {
302            if (!this.queries.remove(query)) {
303                throw new IllegalStateException("Unable to remove query from queries");
304            }
305        }
306        final Conversation conversation = query.getConversation();
307        if (conversation != null) {
308            conversation.sort();
309            conversation.setHasMessagesLeftOnServer(!done);
310            final var displayState = conversation.getDisplayState();
311            if (displayState != null) {
312                mXmppConnectionService.markReadUpToStanzaId(conversation, displayState);
313            }
314        } else {
315            for (final Conversation tmp : this.mXmppConnectionService.getConversations()) {
316                if (tmp.getAccount() == query.getAccount()) {
317                    tmp.sort();
318                    final var displayState = tmp.getDisplayState();
319                    if (displayState != null) {
320                        mXmppConnectionService.markReadUpToStanzaId(tmp, displayState);
321                    }
322                }
323            }
324        }
325        if (query.hasCallback()) {
326            query.callback(done);
327        } else {
328            this.mXmppConnectionService.updateConversationUi();
329        }
330    }
331
332    public boolean inCatchup(Account account) {
333        synchronized (this.queries) {
334            for (Query query : queries) {
335                if (query.account == account && query.isCatchup() && query.getWith() == null) {
336                    return true;
337                }
338            }
339        }
340        return false;
341    }
342
343    public boolean isCatchupInProgress(Conversation conversation) {
344        synchronized (this.queries) {
345            for (Query query : queries) {
346                if (query.account == conversation.getAccount() && query.isCatchup()) {
347                    final Jid with = query.getWith() == null ? null : query.getWith().asBareJid();
348                    if ((conversation.getMode() == Conversational.MODE_SINGLE && with == null)
349                            || (conversation.getJid().asBareJid().equals(with))) {
350                        return true;
351                    }
352                }
353            }
354        }
355        return false;
356    }
357
358    boolean queryInProgress(
359            Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
360        synchronized (this.queries) {
361            for (Query query : queries) {
362                if (query.conversation == conversation) {
363                    if (!query.hasCallback() && callback != null) {
364                        query.setCallback(callback);
365                    }
366                    return true;
367                }
368            }
369            return false;
370        }
371    }
372
373    public boolean queryInProgress(Conversation conversation) {
374        return queryInProgress(conversation, null);
375    }
376
377    public void processFinLegacy(Element fin, Jid from) {
378        Query query = findQuery(fin.getAttribute("queryid"));
379        if (query != null && query.validFrom(from)) {
380            processFin(query, fin);
381        }
382    }
383
384    private void processFin(Query query, Element fin) {
385        boolean complete = fin.getAttributeAsBoolean("complete");
386        Element set = fin.findChild("set", "http://jabber.org/protocol/rsm");
387        Element last = set == null ? null : set.findChild("last");
388        String count = set == null ? null : set.findChildContent("count");
389        Element first = set == null ? null : set.findChild("first");
390        Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
391        boolean abort =
392                (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE)
393                        || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
394        if (query.getConversation() != null) {
395            query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
396        }
397        if (complete || relevant == null || abort) {
398            // TODO: FIX done logic to look at complete. using count is probably unreliable because
399            // it can be ommited and doesn’t work with paging.
400            boolean done;
401            if (query.isCatchup()) {
402                done = false;
403            } else {
404                if (count != null) {
405                    try {
406                        done = Integer.parseInt(count) <= query.getTotalCount();
407                    } catch (NumberFormatException e) {
408                        done = false;
409                    }
410                } else {
411                    done = query.getTotalCount() == 0;
412                }
413            }
414            done = done || (query.getActualMessageCount() == 0 && !query.isCatchup());
415            this.finalizeQuery(query, done);
416
417            Log.d(
418                    Config.LOGTAG,
419                    query.getAccount().getJid().asBareJid()
420                            + ": finished mam after "
421                            + query.getTotalCount()
422                            + "("
423                            + query.getActualMessageCount()
424                            + ") messages. messages left="
425                            + !done
426                            + " count="
427                            + count);
428            if (query.isCatchup() && query.getActualMessageCount() > 0) {
429                mXmppConnectionService
430                        .getNotificationService()
431                        .finishBacklog(true, query.getAccount());
432            }
433            if (query.isCatchup() && query.getPagingOrder() == PagingOrder.NORMAL && !complete && query.getConversation() != null) {
434                // Going forward we stopped without completing due to limits
435                // So we don't have the most recent messages yet
436                synchronized (this.queries) {
437                    final var q = new Query(query.getConversation(), new MamReference(System.currentTimeMillis() - Config.MAM_MIN_CATCHUP), 0, true, PagingOrder.REVERSE);
438                    this.queries.add(q);
439                    this.execute(q);
440                }
441            }
442            processPostponed(query);
443        } else {
444            final Query nextQuery;
445            if (query.getPagingOrder() == PagingOrder.NORMAL) {
446                nextQuery = query.next(last == null ? null : last.getContent());
447            } else {
448                nextQuery = query.prev(first == null ? null : first.getContent());
449            }
450            this.execute(nextQuery);
451            this.finalizeQuery(query, false);
452            synchronized (this.queries) {
453                this.queries.add(nextQuery);
454            }
455        }
456    }
457
458    void kill(final Conversation conversation) {
459        final ArrayList<Query> toBeKilled = new ArrayList<>();
460        synchronized (this.pendingQueries) {
461            for (final Iterator<Query> iterator = this.pendingQueries.iterator();
462                    iterator.hasNext(); ) {
463                final Query query = iterator.next();
464                if (query.getConversation() == conversation) {
465                    iterator.remove();
466                    Log.d(
467                            Config.LOGTAG,
468                            conversation.getAccount().getJid().asBareJid()
469                                    + ": killed pending MAM query for archived conversation");
470                }
471            }
472        }
473        synchronized (this.queries) {
474            for (final Query q : queries) {
475                if (q.conversation == conversation) {
476                    toBeKilled.add(q);
477                }
478            }
479        }
480        for (final Query q : toBeKilled) {
481            kill(q);
482        }
483    }
484
485    private void kill(Query query) {
486        Log.d(
487                Config.LOGTAG,
488                query.getAccount().getJid().asBareJid() + ": killing mam query prematurely");
489        query.callback = null;
490        this.finalizeQuery(query, false);
491        if (query.isCatchup() && query.getActualMessageCount() > 0) {
492            mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
493        }
494        this.processPostponed(query);
495    }
496
497    private void processPostponed(Query query) {
498        query.account.getAxolotlService().processPostponed();
499        query.pendingReceiptRequests.removeAll(query.receiptRequests);
500        Log.d(
501                Config.LOGTAG,
502                query.getAccount().getJid().asBareJid()
503                        + ": found "
504                        + query.pendingReceiptRequests.size()
505                        + " pending receipt requests");
506        Iterator<ReceiptRequest> iterator = query.pendingReceiptRequests.iterator();
507        while (iterator.hasNext()) {
508            ReceiptRequest rr = iterator.next();
509            mXmppConnectionService.sendMessagePacket(
510                    query.account,
511                    mXmppConnectionService
512                            .getMessageGenerator()
513                            .received(query.account, rr.getJid(), rr.getId()));
514            iterator.remove();
515        }
516    }
517
518    public Query findQuery(String id) {
519        if (id == null) {
520            return null;
521        }
522        synchronized (this.queries) {
523            for (Query query : this.queries) {
524                if (query.getQueryId().equals(id)) {
525                    return query;
526                }
527            }
528            return null;
529        }
530    }
531
532    @Override
533    public void onAdvancedStreamFeaturesAvailable(Account account) {
534        if (account.getXmppConnection() != null
535                && account.getXmppConnection().getFeatures().mam()) {
536            this.catchup(account);
537        }
538    }
539
540    public enum PagingOrder {
541        NORMAL,
542        REVERSE
543    }
544
545    public class Query {
546        private HashSet<ReceiptRequest> pendingReceiptRequests = new HashSet<>();
547        private HashSet<ReceiptRequest> receiptRequests = new HashSet<>();
548        private int totalCount = 0;
549        private int actualCount = 0;
550        private int actualInThisQuery = 0;
551        private long start;
552        private final long end;
553        private final String queryId;
554        private String reference = null;
555        private final Account account;
556        private Conversation conversation;
557        private PagingOrder pagingOrder = PagingOrder.NORMAL;
558        private XmppConnectionService.OnMoreMessagesLoaded callback = null;
559        private boolean catchup = true;
560        public final Version version;
561
562        Query(Conversation conversation, MamReference start, long end, boolean catchup, PagingOrder order) {
563            this(conversation, start, end, catchup);
564            this.pagingOrder = order;
565        }
566
567        Query(Conversation conversation, MamReference start, long end, boolean catchup) {
568            this(
569                    conversation.getAccount(),
570                    Version.get(conversation.getAccount(), conversation),
571                    catchup ? start : start.timeOnly(),
572                    end);
573            this.conversation = conversation;
574            this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
575            this.catchup = catchup;
576        }
577
578        Query(Account account, MamReference start, long end) {
579            this(account, Version.get(account), start, end);
580        }
581
582        Query(Account account, Version version, MamReference start, long end) {
583            this.account = account;
584            if (start.getReference() != null) {
585                this.reference = start.getReference();
586            } else {
587                this.start = start.getTimestamp();
588            }
589            this.end = end;
590            this.queryId = new BigInteger(50, SECURE_RANDOM).toString(32);
591            this.version = version;
592        }
593
594        private Query page(String reference) {
595            Query query =
596                    new Query(
597                            this.account,
598                            this.version,
599                            new MamReference(this.start, reference),
600                            this.end);
601            query.conversation = conversation;
602            query.totalCount = totalCount;
603            query.actualCount = actualCount;
604            query.pendingReceiptRequests = pendingReceiptRequests;
605            query.receiptRequests = receiptRequests;
606            query.callback = callback;
607            query.catchup = catchup;
608            return query;
609        }
610
611        public void removePendingReceiptRequest(ReceiptRequest receiptRequest) {
612            if (!this.pendingReceiptRequests.remove(receiptRequest)) {
613                this.receiptRequests.add(receiptRequest);
614            }
615        }
616
617        public void addPendingReceiptRequest(ReceiptRequest receiptRequest) {
618            this.pendingReceiptRequests.add(receiptRequest);
619        }
620
621        public boolean isLegacy() {
622            return version.legacy;
623        }
624
625        public boolean safeToExtractTrueCounterpart() {
626            return muc() && !isLegacy();
627        }
628
629        public Query next(String reference) {
630            Query query = page(reference);
631            query.pagingOrder = PagingOrder.NORMAL;
632            return query;
633        }
634
635        Query prev(String reference) {
636            Query query = page(reference);
637            query.pagingOrder = PagingOrder.REVERSE;
638            return query;
639        }
640
641        public String getReference() {
642            return reference;
643        }
644
645        public PagingOrder getPagingOrder() {
646            return this.pagingOrder;
647        }
648
649        public String getQueryId() {
650            return queryId;
651        }
652
653        public Jid getWith() {
654            return conversation == null ? null : conversation.getJid().asBareJid();
655        }
656
657        public boolean muc() {
658            return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
659        }
660
661        public long getStart() {
662            return start;
663        }
664
665        public boolean isCatchup() {
666            return catchup;
667        }
668
669        public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
670            this.callback = callback;
671        }
672
673        public void callback(boolean done) {
674            if (this.callback != null) {
675                this.callback.onMoreMessagesLoaded(actualCount, conversation);
676                if (done) {
677                    this.callback.informUser(R.string.no_more_history_on_server);
678                }
679            }
680        }
681
682        public long getEnd() {
683            return end;
684        }
685
686        public Conversation getConversation() {
687            return conversation;
688        }
689
690        public Account getAccount() {
691            return this.account;
692        }
693
694        public void incrementMessageCount() {
695            this.totalCount++;
696        }
697
698        public void incrementActualMessageCount() {
699            this.actualInThisQuery++;
700            this.actualCount++;
701        }
702
703        int getTotalCount() {
704            return this.totalCount;
705        }
706
707        int getActualMessageCount() {
708            return this.actualCount;
709        }
710
711        public int getActualInThisQuery() {
712            return this.actualInThisQuery;
713        }
714
715        public boolean validFrom(Jid from) {
716            if (muc()) {
717                return getWith().equals(from);
718            } else {
719                return (from == null) || account.getJid().asBareJid().equals(from.asBareJid());
720            }
721        }
722
723        @NonNull
724        @Override
725        public String toString() {
726            StringBuilder builder = new StringBuilder();
727            if (this.muc()) {
728                builder.append("to=");
729                builder.append(this.getWith().toString());
730            } else {
731                builder.append("with=");
732                if (this.getWith() == null) {
733                    builder.append("*");
734                } else {
735                    builder.append(getWith().toString());
736                }
737            }
738            if (this.start != 0) {
739                builder.append(", start=");
740                builder.append(AbstractGenerator.getTimestamp(this.start));
741            }
742            if (this.end != 0) {
743                builder.append(", end=");
744                builder.append(AbstractGenerator.getTimestamp(this.end));
745            }
746            builder.append(", order=").append(pagingOrder.toString());
747            if (this.reference != null) {
748                if (this.pagingOrder == PagingOrder.NORMAL) {
749                    builder.append(", after=");
750                } else {
751                    builder.append(", before=");
752                }
753                builder.append(this.reference);
754            }
755            builder.append(", catchup=").append(catchup);
756            builder.append(", ns=").append(version.namespace);
757            return builder.toString();
758        }
759
760        boolean hasCallback() {
761            return this.callback != null;
762        }
763    }
764}