MessageArchiveService.java

  1package eu.siacs.conversations.services;
  2
  3import static eu.siacs.conversations.utils.Random.SECURE_RANDOM;
  4
  5import android.util.Log;
  6import androidx.annotation.NonNull;
  7import eu.siacs.conversations.Config;
  8import eu.siacs.conversations.R;
  9import eu.siacs.conversations.entities.Account;
 10import eu.siacs.conversations.entities.Conversation;
 11import eu.siacs.conversations.entities.Conversational;
 12import eu.siacs.conversations.entities.ReceiptRequest;
 13import eu.siacs.conversations.generator.AbstractGenerator;
 14import eu.siacs.conversations.xml.Element;
 15import eu.siacs.conversations.xmpp.Jid;
 16import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
 17import eu.siacs.conversations.xmpp.mam.MamReference;
 18import im.conversations.android.xmpp.model.stanza.Iq;
 19import im.conversations.android.xmpp.model.stanza.Message;
 20import java.math.BigInteger;
 21import java.util.ArrayList;
 22import java.util.HashSet;
 23import java.util.Iterator;
 24import java.util.List;
 25
 26public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
 27
 28    private final XmppConnectionService mXmppConnectionService;
 29
 30    private final HashSet<Query> queries = new HashSet<>();
 31    private final ArrayList<Query> pendingQueries = new ArrayList<>();
 32
 33    public enum Version {
 34        MAM_0("urn:xmpp:mam:0", true),
 35        MAM_1("urn:xmpp:mam:1", false),
 36        MAM_2("urn:xmpp:mam:2", false);
 37
 38        public final boolean legacy;
 39        public final String namespace;
 40
 41        Version(String namespace, boolean legacy) {
 42            this.namespace = namespace;
 43            this.legacy = legacy;
 44        }
 45
 46        public static Version get(Account account) {
 47            return get(account, null);
 48        }
 49
 50        public static Version get(Account account, Conversation conversation) {
 51            if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
 52                return get(account.getXmppConnection().getFeatures().getAccountFeatures());
 53            } else {
 54                return get(conversation.getMucOptions().getFeatures());
 55            }
 56        }
 57
 58        private static Version get(List<String> features) {
 59            final Version[] values = values();
 60            for (int i = values.length - 1; i >= 0; --i) {
 61                for (String feature : features) {
 62                    if (values[i].namespace.equals(feature)) {
 63                        return values[i];
 64                    }
 65                }
 66            }
 67            return MAM_0;
 68        }
 69
 70        public static boolean has(List<String> features) {
 71            for (String feature : features) {
 72                for (Version version : values()) {
 73                    if (version.namespace.equals(feature)) {
 74                        return true;
 75                    }
 76                }
 77            }
 78            return false;
 79        }
 80
 81        public static Element findResult(Message packet) {
 82            for (Version version : values()) {
 83                Element result = packet.findChild("result", version.namespace);
 84                if (result != null) {
 85                    return result;
 86                }
 87            }
 88            return null;
 89        }
 90    }
 91
 92    MessageArchiveService(final XmppConnectionService service) {
 93        this.mXmppConnectionService = service;
 94    }
 95
 96    private void catchup(final Account account) {
 97        synchronized (this.queries) {
 98            for (Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext(); ) {
 99                Query query = iterator.next();
100                if (query.getAccount() == account) {
101                    iterator.remove();
102                }
103            }
104        }
105        MamReference mamReference =
106                MamReference.max(
107                        mXmppConnectionService.databaseBackend.getLastMessageReceived(account),
108                        mXmppConnectionService.databaseBackend.getLastClearDate(account));
109        mamReference =
110                MamReference.max(
111                        mamReference, mXmppConnectionService.getAutomaticMessageDeletionDate());
112        long endCatchup = account.getXmppConnection().getLastSessionEstablished();
113        final Query query;
114        if (mamReference.getTimestamp() == 0) {
115            return;
116        } else if (endCatchup - mamReference.getTimestamp() >= Config.MAM_MAX_CATCHUP) {
117            long startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
118            List<Conversation> conversations = mXmppConnectionService.getConversations();
119            for (Conversation conversation : conversations) {
120                if (conversation.getMode() == Conversation.MODE_SINGLE
121                        && conversation.getAccount() == account
122                        && startCatchup > conversation.getLastMessageTransmitted().getTimestamp()) {
123                    this.query(conversation, startCatchup, true);
124                }
125            }
126            query = new Query(account, new MamReference(startCatchup), 0);
127        } else {
128            query = new Query(account, mamReference, 0);
129        }
130        synchronized (this.queries) {
131            this.queries.add(query);
132        }
133        this.execute(query);
134    }
135
136    void catchupMUC(final Conversation conversation) {
137        if (conversation.getLastMessageTransmitted().getTimestamp() < 0
138                && conversation.countMessages() == 0) {
139            query(conversation, new MamReference(0), 0, true);
140        } else {
141            query(conversation, conversation.getLastMessageTransmitted(), 0, true);
142        }
143    }
144
145    public Query query(final Conversation conversation) {
146        if (conversation.getLastMessageTransmitted().getTimestamp() < 0
147                && conversation.countMessages() == 0) {
148            return query(conversation, new MamReference(0), System.currentTimeMillis(), false);
149        } else {
150            return query(
151                    conversation,
152                    conversation.getLastMessageTransmitted(),
153                    conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
154                    false);
155        }
156    }
157
158    public boolean isCatchingUp(Conversation conversation) {
159        final Account account = conversation.getAccount();
160        if (account.getXmppConnection().isWaitingForSmCatchup()) {
161            return true;
162        } else {
163            synchronized (this.queries) {
164                for (Query query : this.queries) {
165                    if (query.getAccount() == account
166                            && query.isCatchup()
167                            && ((conversation.getMode() == Conversation.MODE_SINGLE
168                                            && query.getWith() == null)
169                                    || query.getConversation() == conversation)) {
170                        return true;
171                    }
172                }
173            }
174            return false;
175        }
176    }
177
178    public Query query(final Conversation conversation, long end, boolean allowCatchup) {
179        return this.query(
180                conversation, conversation.getLastMessageTransmitted(), end, allowCatchup);
181    }
182
183    public Query query(
184            Conversation conversation, MamReference start, long end, boolean allowCatchup) {
185        synchronized (this.queries) {
186            final Query query;
187            final MamReference startActual =
188                    MamReference.max(
189                            start, mXmppConnectionService.getAutomaticMessageDeletionDate());
190            if (start.getTimestamp() == 0) {
191                query = new Query(conversation, startActual, end, false);
192                query.reference = conversation.getFirstMamReference();
193            } else {
194                if (allowCatchup) {
195                    MamReference maxCatchup =
196                            MamReference.max(
197                                    startActual,
198                                    System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
199                    if (maxCatchup.greaterThan(startActual)) {
200                        Query reverseCatchup =
201                                new Query(
202                                        conversation,
203                                        startActual,
204                                        maxCatchup.getTimestamp(),
205                                        false);
206                        this.queries.add(reverseCatchup);
207                        this.execute(reverseCatchup);
208                    }
209                    query = new Query(conversation, maxCatchup, end, true);
210                } else {
211                    query = new Query(conversation, startActual, end, false);
212                }
213            }
214            if (end != 0 && start.greaterThan(end)) {
215                return null;
216            }
217            this.queries.add(query);
218            this.execute(query);
219            return query;
220        }
221    }
222
223    void executePendingQueries(final Account account) {
224        final List<Query> pending = new ArrayList<>();
225        synchronized (this.pendingQueries) {
226            for (Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
227                Query query = iterator.next();
228                if (query.getAccount() == account) {
229                    pending.add(query);
230                    iterator.remove();
231                }
232            }
233        }
234        for (Query query : pending) {
235            this.execute(query);
236        }
237    }
238
239    private void execute(final Query query) {
240        final Account account = query.getAccount();
241        if (account.getStatus() == Account.State.ONLINE) {
242            final Conversation conversation = query.getConversation();
243            if (conversation != null && conversation.getStatus() == Conversation.STATUS_ARCHIVED) {
244                throw new IllegalStateException(
245                        "Attempted to run MAM query for archived conversation");
246            }
247            Log.d(
248                    Config.LOGTAG,
249                    account.getJid().asBareJid().toString() + ": running mam query " + query);
250            final Iq packet =
251                    this.mXmppConnectionService
252                            .getIqGenerator()
253                            .queryMessageArchiveManagement(query);
254            this.mXmppConnectionService.sendIqPacket(
255                    account,
256                    packet,
257                    (p) -> {
258                        final Element fin = p.findChild("fin", query.version.namespace);
259                        if (p.getType() == Iq.Type.TIMEOUT) {
260                            synchronized (this.queries) {
261                                this.queries.remove(query);
262                                if (query.hasCallback()) {
263                                    query.callback(false);
264                                }
265                            }
266                        } else if (p.getType() == Iq.Type.RESULT && fin != null) {
267                            final boolean running;
268                            synchronized (this.queries) {
269                                running = this.queries.contains(query);
270                            }
271                            if (running) {
272                                processFin(query, fin);
273                            } else {
274                                Log.d(
275                                        Config.LOGTAG,
276                                        account.getJid().asBareJid()
277                                                + ": ignoring MAM iq result because query had been"
278                                                + " killed");
279                            }
280                        } else if (p.getType() == Iq.Type.RESULT && query.isLegacy()) {
281                            // do nothing
282                        } else {
283                            Log.d(
284                                    Config.LOGTAG,
285                                    account.getJid().asBareJid().toString()
286                                            + ": error executing mam: "
287                                            + p);
288                            try {
289                                finalizeQuery(query, true);
290                            } catch (final IllegalStateException e) {
291                                // ignored
292                            }
293                        }
294                    });
295        } else {
296            synchronized (this.pendingQueries) {
297                this.pendingQueries.add(query);
298            }
299        }
300    }
301
302    private void finalizeQuery(final Query query, boolean done) {
303        synchronized (this.queries) {
304            if (!this.queries.remove(query)) {
305                throw new IllegalStateException("Unable to remove query from queries");
306            }
307        }
308        final Conversation conversation = query.getConversation();
309        if (conversation != null) {
310            conversation.sort();
311            conversation.setHasMessagesLeftOnServer(!done);
312            final var displayState = conversation.getDisplayState();
313            if (displayState != null) {
314                mXmppConnectionService.markReadUpToStanzaId(conversation, displayState);
315            }
316        } else {
317            for (final Conversation tmp : this.mXmppConnectionService.getConversations()) {
318                if (tmp.getAccount() == query.getAccount()) {
319                    tmp.sort();
320                    final var displayState = tmp.getDisplayState();
321                    if (displayState != null) {
322                        mXmppConnectionService.markReadUpToStanzaId(tmp, displayState);
323                    }
324                }
325            }
326        }
327        if (query.hasCallback()) {
328            query.callback(done);
329        } else {
330            this.mXmppConnectionService.updateConversationUi();
331        }
332    }
333
334    public boolean inCatchup(Account account) {
335        synchronized (this.queries) {
336            for (Query query : queries) {
337                if (query.account == account && query.isCatchup() && query.getWith() == null) {
338                    return true;
339                }
340            }
341        }
342        return false;
343    }
344
345    public boolean isCatchupInProgress(Conversation conversation) {
346        synchronized (this.queries) {
347            for (Query query : queries) {
348                if (query.account == conversation.getAccount() && query.isCatchup()) {
349                    final Jid with = query.getWith() == null ? null : query.getWith().asBareJid();
350                    if ((conversation.getMode() == Conversational.MODE_SINGLE && with == null)
351                            || (conversation.getJid().asBareJid().equals(with))) {
352                        return true;
353                    }
354                }
355            }
356        }
357        return false;
358    }
359
360    boolean queryInProgress(
361            Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
362        synchronized (this.queries) {
363            for (Query query : queries) {
364                if (query.conversation == conversation) {
365                    if (!query.hasCallback() && callback != null) {
366                        query.setCallback(callback);
367                    }
368                    return true;
369                }
370            }
371            return false;
372        }
373    }
374
375    public boolean queryInProgress(Conversation conversation) {
376        return queryInProgress(conversation, null);
377    }
378
379    public void processFinLegacy(Element fin, Jid from) {
380        Query query = findQuery(fin.getAttribute("queryid"));
381        if (query != null && query.validFrom(from)) {
382            processFin(query, fin);
383        }
384    }
385
386    private void processFin(Query query, Element fin) {
387        boolean complete = fin.getAttributeAsBoolean("complete");
388        Element set = fin.findChild("set", "http://jabber.org/protocol/rsm");
389        Element last = set == null ? null : set.findChild("last");
390        String count = set == null ? null : set.findChildContent("count");
391        Element first = set == null ? null : set.findChild("first");
392        Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
393        boolean abort =
394                (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE)
395                        || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
396        if (query.getConversation() != null) {
397            query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
398        }
399        if (complete || relevant == null || abort) {
400            // TODO: FIX done logic to look at complete. using count is probably unreliable because
401            // it can be ommited and doesn’t work with paging.
402            boolean done;
403            if (query.isCatchup()) {
404                done = false;
405            } else {
406                if (count != null) {
407                    try {
408                        done = Integer.parseInt(count) <= query.getTotalCount();
409                    } catch (NumberFormatException e) {
410                        done = false;
411                    }
412                } else {
413                    done = query.getTotalCount() == 0;
414                }
415            }
416            done = done || (query.getActualMessageCount() == 0 && !query.isCatchup());
417            this.finalizeQuery(query, done);
418
419            Log.d(
420                    Config.LOGTAG,
421                    query.getAccount().getJid().asBareJid()
422                            + ": finished mam after "
423                            + query.getTotalCount()
424                            + "("
425                            + query.getActualMessageCount()
426                            + ") messages. messages left="
427                            + !done
428                            + " count="
429                            + count);
430            if (query.isCatchup() && query.getActualMessageCount() > 0) {
431                mXmppConnectionService
432                        .getNotificationService()
433                        .finishBacklog(true, query.getAccount());
434            }
435            processPostponed(query);
436        } else {
437            final Query nextQuery;
438            if (query.getPagingOrder() == PagingOrder.NORMAL) {
439                nextQuery = query.next(last == null ? null : last.getContent());
440            } else {
441                nextQuery = query.prev(first == null ? null : first.getContent());
442            }
443            this.execute(nextQuery);
444            this.finalizeQuery(query, false);
445            synchronized (this.queries) {
446                this.queries.add(nextQuery);
447            }
448        }
449    }
450
451    void kill(final Conversation conversation) {
452        final ArrayList<Query> toBeKilled = new ArrayList<>();
453        synchronized (this.pendingQueries) {
454            for (final Iterator<Query> iterator = this.pendingQueries.iterator();
455                    iterator.hasNext(); ) {
456                final Query query = iterator.next();
457                if (query.getConversation() == conversation) {
458                    iterator.remove();
459                    Log.d(
460                            Config.LOGTAG,
461                            conversation.getAccount().getJid().asBareJid()
462                                    + ": killed pending MAM query for archived conversation");
463                }
464            }
465        }
466        synchronized (this.queries) {
467            for (final Query q : queries) {
468                if (q.conversation == conversation) {
469                    toBeKilled.add(q);
470                }
471            }
472        }
473        for (final Query q : toBeKilled) {
474            kill(q);
475        }
476    }
477
478    private void kill(Query query) {
479        Log.d(
480                Config.LOGTAG,
481                query.getAccount().getJid().asBareJid() + ": killing mam query prematurely");
482        query.callback = null;
483        this.finalizeQuery(query, false);
484        if (query.isCatchup() && query.getActualMessageCount() > 0) {
485            mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
486        }
487        this.processPostponed(query);
488    }
489
490    private void processPostponed(Query query) {
491        query.account.getAxolotlService().processPostponed();
492        query.pendingReceiptRequests.removeAll(query.receiptRequests);
493        Log.d(
494                Config.LOGTAG,
495                query.getAccount().getJid().asBareJid()
496                        + ": found "
497                        + query.pendingReceiptRequests.size()
498                        + " pending receipt requests");
499        Iterator<ReceiptRequest> iterator = query.pendingReceiptRequests.iterator();
500        while (iterator.hasNext()) {
501            ReceiptRequest rr = iterator.next();
502            mXmppConnectionService.sendMessagePacket(
503                    query.account,
504                    mXmppConnectionService
505                            .getMessageGenerator()
506                            .received(query.account, rr.getJid(), rr.getId()));
507            iterator.remove();
508        }
509    }
510
511    public Query findQuery(String id) {
512        if (id == null) {
513            return null;
514        }
515        synchronized (this.queries) {
516            for (Query query : this.queries) {
517                if (query.getQueryId().equals(id)) {
518                    return query;
519                }
520            }
521            return null;
522        }
523    }
524
525    @Override
526    public void onAdvancedStreamFeaturesAvailable(Account account) {
527        if (account.getXmppConnection() != null
528                && account.getXmppConnection().getFeatures().mam()) {
529            this.catchup(account);
530        }
531    }
532
533    public enum PagingOrder {
534        NORMAL,
535        REVERSE
536    }
537
538    public class Query {
539        private HashSet<ReceiptRequest> pendingReceiptRequests = new HashSet<>();
540        private HashSet<ReceiptRequest> receiptRequests = new HashSet<>();
541        private int totalCount = 0;
542        private int actualCount = 0;
543        private int actualInThisQuery = 0;
544        private long start;
545        private final long end;
546        private final String queryId;
547        private String reference = null;
548        private final Account account;
549        private Conversation conversation;
550        private PagingOrder pagingOrder = PagingOrder.NORMAL;
551        private XmppConnectionService.OnMoreMessagesLoaded callback = null;
552        private boolean catchup = true;
553        public final Version version;
554
555        Query(Conversation conversation, MamReference start, long end, boolean catchup) {
556            this(
557                    conversation.getAccount(),
558                    Version.get(conversation.getAccount(), conversation),
559                    catchup ? start : start.timeOnly(),
560                    end);
561            this.conversation = conversation;
562            this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
563            this.catchup = catchup;
564        }
565
566        Query(Account account, MamReference start, long end) {
567            this(account, Version.get(account), start, end);
568        }
569
570        Query(Account account, Version version, MamReference start, long end) {
571            this.account = account;
572            if (start.getReference() != null) {
573                this.reference = start.getReference();
574            } else {
575                this.start = start.getTimestamp();
576            }
577            this.end = end;
578            this.queryId = new BigInteger(50, SECURE_RANDOM).toString(32);
579            this.version = version;
580        }
581
582        private Query page(String reference) {
583            Query query =
584                    new Query(
585                            this.account,
586                            this.version,
587                            new MamReference(this.start, reference),
588                            this.end);
589            query.conversation = conversation;
590            query.totalCount = totalCount;
591            query.actualCount = actualCount;
592            query.pendingReceiptRequests = pendingReceiptRequests;
593            query.receiptRequests = receiptRequests;
594            query.callback = callback;
595            query.catchup = catchup;
596            return query;
597        }
598
599        public void removePendingReceiptRequest(ReceiptRequest receiptRequest) {
600            if (!this.pendingReceiptRequests.remove(receiptRequest)) {
601                this.receiptRequests.add(receiptRequest);
602            }
603        }
604
605        public void addPendingReceiptRequest(ReceiptRequest receiptRequest) {
606            this.pendingReceiptRequests.add(receiptRequest);
607        }
608
609        public boolean isLegacy() {
610            return version.legacy;
611        }
612
613        public boolean safeToExtractTrueCounterpart() {
614            return muc() && !isLegacy();
615        }
616
617        public Query next(String reference) {
618            Query query = page(reference);
619            query.pagingOrder = PagingOrder.NORMAL;
620            return query;
621        }
622
623        Query prev(String reference) {
624            Query query = page(reference);
625            query.pagingOrder = PagingOrder.REVERSE;
626            return query;
627        }
628
629        public String getReference() {
630            return reference;
631        }
632
633        public PagingOrder getPagingOrder() {
634            return this.pagingOrder;
635        }
636
637        public String getQueryId() {
638            return queryId;
639        }
640
641        public Jid getWith() {
642            return conversation == null ? null : conversation.getJid().asBareJid();
643        }
644
645        public boolean muc() {
646            return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
647        }
648
649        public long getStart() {
650            return start;
651        }
652
653        public boolean isCatchup() {
654            return catchup;
655        }
656
657        public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
658            this.callback = callback;
659        }
660
661        public void callback(boolean done) {
662            if (this.callback != null) {
663                this.callback.onMoreMessagesLoaded(actualCount, conversation);
664                if (done) {
665                    this.callback.informUser(R.string.no_more_history_on_server);
666                }
667            }
668        }
669
670        public long getEnd() {
671            return end;
672        }
673
674        public Conversation getConversation() {
675            return conversation;
676        }
677
678        public Account getAccount() {
679            return this.account;
680        }
681
682        public void incrementMessageCount() {
683            this.totalCount++;
684        }
685
686        public void incrementActualMessageCount() {
687            this.actualInThisQuery++;
688            this.actualCount++;
689        }
690
691        int getTotalCount() {
692            return this.totalCount;
693        }
694
695        int getActualMessageCount() {
696            return this.actualCount;
697        }
698
699        public int getActualInThisQuery() {
700            return this.actualInThisQuery;
701        }
702
703        public boolean validFrom(Jid from) {
704            if (muc()) {
705                return getWith().equals(from);
706            } else {
707                return (from == null) || account.getJid().asBareJid().equals(from.asBareJid());
708            }
709        }
710
711        @NonNull
712        @Override
713        public String toString() {
714            StringBuilder builder = new StringBuilder();
715            if (this.muc()) {
716                builder.append("to=");
717                builder.append(this.getWith().toString());
718            } else {
719                builder.append("with=");
720                if (this.getWith() == null) {
721                    builder.append("*");
722                } else {
723                    builder.append(getWith().toString());
724                }
725            }
726            if (this.start != 0) {
727                builder.append(", start=");
728                builder.append(AbstractGenerator.getTimestamp(this.start));
729            }
730            if (this.end != 0) {
731                builder.append(", end=");
732                builder.append(AbstractGenerator.getTimestamp(this.end));
733            }
734            builder.append(", order=").append(pagingOrder.toString());
735            if (this.reference != null) {
736                if (this.pagingOrder == PagingOrder.NORMAL) {
737                    builder.append(", after=");
738                } else {
739                    builder.append(", before=");
740                }
741                builder.append(this.reference);
742            }
743            builder.append(", catchup=").append(catchup);
744            builder.append(", ns=").append(version.namespace);
745            return builder.toString();
746        }
747
748        boolean hasCallback() {
749            return this.callback != null;
750        }
751    }
752}