1package eu.siacs.conversations.services;
2
3import static eu.siacs.conversations.utils.Random.SECURE_RANDOM;
4
5import android.util.Log;
6import androidx.annotation.NonNull;
7import eu.siacs.conversations.Config;
8import eu.siacs.conversations.R;
9import eu.siacs.conversations.entities.Account;
10import eu.siacs.conversations.entities.Conversation;
11import eu.siacs.conversations.entities.Conversational;
12import eu.siacs.conversations.entities.ReceiptRequest;
13import eu.siacs.conversations.generator.AbstractGenerator;
14import eu.siacs.conversations.xml.Element;
15import eu.siacs.conversations.xmpp.Jid;
16import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
17import eu.siacs.conversations.xmpp.mam.MamReference;
18import im.conversations.android.xmpp.model.stanza.Iq;
19import im.conversations.android.xmpp.model.stanza.Message;
20import java.math.BigInteger;
21import java.util.ArrayList;
22import java.util.Collection;
23import java.util.HashSet;
24import java.util.Iterator;
25import java.util.List;
26
27public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
28
29 private final XmppConnectionService mXmppConnectionService;
30
31 private final HashSet<Query> queries = new HashSet<>();
32 private final ArrayList<Query> pendingQueries = new ArrayList<>();
33
34 public enum Version {
35 MAM_0("urn:xmpp:mam:0", true),
36 MAM_1("urn:xmpp:mam:1", false),
37 MAM_2("urn:xmpp:mam:2", false);
38
39 public final boolean legacy;
40 public final String namespace;
41
42 Version(String namespace, boolean legacy) {
43 this.namespace = namespace;
44 this.legacy = legacy;
45 }
46
47 public static Version get(Account account) {
48 return get(account, null);
49 }
50
51 public static Version get(Account account, Conversation conversation) {
52 if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
53 return get(account.getXmppConnection().getFeatures().getAccountFeatures());
54 } else {
55 return get(conversation.getMucOptions().getFeatures());
56 }
57 }
58
59 private static Version get(final Collection<String> features) {
60 final Version[] values = values();
61 for (int i = values.length - 1; i >= 0; --i) {
62 for (String feature : features) {
63 if (values[i].namespace.equals(feature)) {
64 return values[i];
65 }
66 }
67 }
68 return MAM_0;
69 }
70
71 public static boolean has(final Collection<String> features) {
72 for (String feature : features) {
73 for (Version version : values()) {
74 if (version.namespace.equals(feature)) {
75 return true;
76 }
77 }
78 }
79 return false;
80 }
81
82 public static Element findResult(Message packet) {
83 for (Version version : values()) {
84 Element result = packet.findChild("result", version.namespace);
85 if (result != null) {
86 return result;
87 }
88 }
89 return null;
90 }
91 }
92
93 MessageArchiveService(final XmppConnectionService service) {
94 this.mXmppConnectionService = service;
95 }
96
97 private void catchup(final Account account) {
98 synchronized (this.queries) {
99 for (Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext(); ) {
100 Query query = iterator.next();
101 if (query.getAccount() == account) {
102 iterator.remove();
103 }
104 }
105 }
106 MamReference mamReference =
107 MamReference.max(
108 mXmppConnectionService.databaseBackend.getLastMessageReceived(account),
109 mXmppConnectionService.databaseBackend.getLastClearDate(account));
110 mamReference =
111 MamReference.max(
112 mamReference, mXmppConnectionService.getAutomaticMessageDeletionDate());
113 long endCatchup = account.getXmppConnection().getLastSessionEstablished();
114 final Query query;
115 if (mamReference.getTimestamp() == 0) {
116 return;
117 } else if (endCatchup - mamReference.getTimestamp() >= Config.MAM_MAX_CATCHUP) {
118 long startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
119 List<Conversation> conversations = mXmppConnectionService.getConversations();
120 for (Conversation conversation : conversations) {
121 if (conversation.getMode() == Conversation.MODE_SINGLE
122 && conversation.getAccount() == account
123 && startCatchup > conversation.getLastMessageTransmitted().getTimestamp()) {
124 this.query(conversation, startCatchup, true);
125 }
126 }
127 query = new Query(account, new MamReference(startCatchup), 0);
128 } else {
129 query = new Query(account, mamReference, 0);
130 }
131 synchronized (this.queries) {
132 this.queries.add(query);
133 }
134 this.execute(query);
135 }
136
137 public void catchupMUC(final Conversation conversation) {
138 if (conversation.getLastMessageTransmitted().getTimestamp() < 0
139 && conversation.countMessages() == 0) {
140 query(conversation, new MamReference(0), 0, true);
141 } else {
142 query(conversation, conversation.getLastMessageTransmitted(), 0, true);
143 }
144 }
145
146 public Query query(final Conversation conversation) {
147 if (conversation.getLastMessageTransmitted().getTimestamp() < 0
148 && conversation.countMessages() == 0) {
149 return query(conversation, new MamReference(0), System.currentTimeMillis(), false);
150 } else {
151 return query(
152 conversation,
153 conversation.getLastMessageTransmitted(),
154 conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
155 false);
156 }
157 }
158
159 public boolean isCatchingUp(Conversation conversation) {
160 final Account account = conversation.getAccount();
161 if (account.getXmppConnection().isWaitingForSmCatchup()) {
162 return true;
163 } else {
164 synchronized (this.queries) {
165 for (Query query : this.queries) {
166 if (query.getAccount() == account
167 && query.isCatchup()
168 && ((conversation.getMode() == Conversation.MODE_SINGLE
169 && query.getWith() == null)
170 || query.getConversation() == conversation)) {
171 return true;
172 }
173 }
174 }
175 return false;
176 }
177 }
178
179 public Query query(final Conversation conversation, long end, boolean allowCatchup) {
180 return this.query(
181 conversation, conversation.getLastMessageTransmitted(), end, allowCatchup);
182 }
183
184 public Query query(
185 Conversation conversation, MamReference start, long end, boolean allowCatchup) {
186 synchronized (this.queries) {
187 final Query query;
188 final MamReference startActual = start;
189 if (start.getTimestamp() == 0) {
190 query = new Query(conversation, startActual, end, false);
191 query.reference = conversation.getFirstMamReference();
192 } else {
193 if (allowCatchup) {
194 MamReference maxCatchup =
195 MamReference.max(
196 startActual,
197 System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
198 if (maxCatchup.greaterThan(startActual)) {
199 Query reverseCatchup =
200 new Query(
201 conversation,
202 startActual,
203 maxCatchup.getTimestamp(),
204 false);
205 this.queries.add(reverseCatchup);
206 this.execute(reverseCatchup);
207 }
208 query = new Query(conversation, maxCatchup, end, true);
209 } else {
210 query = new Query(conversation, startActual, end, false);
211 }
212 }
213 if (end != 0 && start.greaterThan(end)) {
214 return null;
215 }
216 this.queries.add(query);
217 this.execute(query);
218 return query;
219 }
220 }
221
222 public void executePendingQueries(final Account account) {
223 final List<Query> pending = new ArrayList<>();
224 synchronized (this.pendingQueries) {
225 for (Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
226 Query query = iterator.next();
227 if (query.getAccount() == account) {
228 pending.add(query);
229 iterator.remove();
230 }
231 }
232 }
233 for (Query query : pending) {
234 this.execute(query);
235 }
236 }
237
238 private void execute(final Query query) {
239 final Account account = query.getAccount();
240 if (account.getStatus() == Account.State.ONLINE) {
241 final Conversation conversation = query.getConversation();
242 if (conversation != null && conversation.getStatus() == Conversation.STATUS_ARCHIVED) {
243 throw new IllegalStateException(
244 "Attempted to run MAM query for archived conversation");
245 }
246 Log.d(
247 Config.LOGTAG,
248 account.getJid().asBareJid().toString() + ": running mam query " + query);
249 final Iq packet =
250 this.mXmppConnectionService
251 .getIqGenerator()
252 .queryMessageArchiveManagement(query);
253 this.mXmppConnectionService.sendIqPacket(
254 account,
255 packet,
256 (p) -> {
257 final Element fin = p.findChild("fin", query.version.namespace);
258 if (p.getType() == Iq.Type.TIMEOUT) {
259 synchronized (this.queries) {
260 this.queries.remove(query);
261 if (query.hasCallback()) {
262 query.callback(false);
263 }
264 }
265 } else if (p.getType() == Iq.Type.RESULT && fin != null) {
266 final boolean running;
267 synchronized (this.queries) {
268 running = this.queries.contains(query);
269 }
270 if (running) {
271 processFin(query, fin);
272 } else {
273 Log.d(
274 Config.LOGTAG,
275 account.getJid().asBareJid()
276 + ": ignoring MAM iq result because query had been"
277 + " killed");
278 }
279 } else if (p.getType() == Iq.Type.RESULT && query.isLegacy()) {
280 // do nothing
281 } else {
282 Log.d(
283 Config.LOGTAG,
284 account.getJid().asBareJid().toString()
285 + ": error executing mam: "
286 + p);
287 try {
288 finalizeQuery(query, true);
289 } catch (final IllegalStateException e) {
290 // ignored
291 }
292 }
293 });
294 } else {
295 synchronized (this.pendingQueries) {
296 this.pendingQueries.add(query);
297 }
298 }
299 }
300
301 private void finalizeQuery(final Query query, boolean done) {
302 synchronized (this.queries) {
303 if (!this.queries.remove(query)) {
304 throw new IllegalStateException("Unable to remove query from queries");
305 }
306 }
307 final Conversation conversation = query.getConversation();
308 if (conversation != null) {
309 conversation.sort();
310 conversation.setHasMessagesLeftOnServer(!done);
311 final var displayState = conversation.getDisplayState();
312 if (displayState != null) {
313 mXmppConnectionService.markReadUpToStanzaId(conversation, displayState);
314 }
315 } else {
316 for (final Conversation tmp : this.mXmppConnectionService.getConversations()) {
317 if (tmp.getAccount() == query.getAccount()) {
318 tmp.sort();
319 final var displayState = tmp.getDisplayState();
320 if (displayState != null) {
321 mXmppConnectionService.markReadUpToStanzaId(tmp, displayState);
322 }
323 }
324 }
325 }
326 if (query.hasCallback()) {
327 query.callback(done);
328 } else {
329 this.mXmppConnectionService.updateConversationUi();
330 }
331 }
332
333 public boolean inCatchup(Account account) {
334 synchronized (this.queries) {
335 for (Query query : queries) {
336 if (query.account == account && query.isCatchup() && query.getWith() == null) {
337 return true;
338 }
339 }
340 }
341 return false;
342 }
343
344 public boolean isCatchupInProgress(Conversation conversation) {
345 synchronized (this.queries) {
346 for (Query query : queries) {
347 if (query.account == conversation.getAccount() && query.isCatchup()) {
348 final Jid with = query.getWith() == null ? null : query.getWith().asBareJid();
349 if ((conversation.getMode() == Conversational.MODE_SINGLE && with == null)
350 || (conversation.getJid().asBareJid().equals(with))) {
351 return true;
352 }
353 }
354 }
355 }
356 return false;
357 }
358
359 boolean queryInProgress(
360 Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
361 synchronized (this.queries) {
362 for (Query query : queries) {
363 if (query.conversation == conversation) {
364 if (!query.hasCallback() && callback != null) {
365 query.setCallback(callback);
366 }
367 return true;
368 }
369 }
370 return false;
371 }
372 }
373
374 public boolean queryInProgress(Conversation conversation) {
375 return queryInProgress(conversation, null);
376 }
377
378 public void processFinLegacy(Element fin, Jid from) {
379 Query query = findQuery(fin.getAttribute("queryid"));
380 if (query != null && query.validFrom(from)) {
381 processFin(query, fin);
382 }
383 }
384
385 private void processFin(Query query, Element fin) {
386 boolean complete = fin.getAttributeAsBoolean("complete");
387 Element set = fin.findChild("set", "http://jabber.org/protocol/rsm");
388 Element last = set == null ? null : set.findChild("last");
389 String count = set == null ? null : set.findChildContent("count");
390 Element first = set == null ? null : set.findChild("first");
391 Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
392 boolean abort =
393 (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE)
394 || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
395 if (query.getConversation() != null) {
396 query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
397 }
398 if (complete || relevant == null || abort) {
399 // TODO: FIX done logic to look at complete. using count is probably unreliable because
400 // it can be ommited and doesn’t work with paging.
401 boolean done;
402 if (query.isCatchup()) {
403 done = false;
404 } else {
405 if (count != null) {
406 try {
407 done = Integer.parseInt(count) <= query.getTotalCount();
408 } catch (NumberFormatException e) {
409 done = false;
410 }
411 } else {
412 done = query.getTotalCount() == 0;
413 }
414 }
415 done = done || (query.getActualMessageCount() == 0 && !query.isCatchup());
416 this.finalizeQuery(query, done);
417
418 Log.d(
419 Config.LOGTAG,
420 query.getAccount().getJid().asBareJid()
421 + ": finished mam after "
422 + query.getTotalCount()
423 + "("
424 + query.getActualMessageCount()
425 + ") messages. messages left="
426 + !done
427 + " count="
428 + count);
429 if (query.isCatchup() && query.getActualMessageCount() > 0) {
430 mXmppConnectionService
431 .getNotificationService()
432 .finishBacklog(true, query.getAccount());
433 }
434 if (query.isCatchup() && query.getPagingOrder() == PagingOrder.NORMAL && !complete && query.getConversation() != null) {
435 // Going forward we stopped without completing due to limits
436 // So we don't have the most recent messages yet
437 synchronized (this.queries) {
438 final var q = new Query(query.getConversation(), new MamReference(System.currentTimeMillis() - Config.MAM_MIN_CATCHUP), 0, true, PagingOrder.REVERSE);
439 this.queries.add(q);
440 this.execute(q);
441 }
442 }
443 processPostponed(query);
444 } else {
445 final Query nextQuery;
446 if (query.getPagingOrder() == PagingOrder.NORMAL) {
447 nextQuery = query.next(last == null ? null : last.getContent());
448 } else {
449 nextQuery = query.prev(first == null ? null : first.getContent());
450 }
451 this.execute(nextQuery);
452 this.finalizeQuery(query, false);
453 synchronized (this.queries) {
454 this.queries.add(nextQuery);
455 }
456 }
457 }
458
459 void kill(final Conversation conversation) {
460 final ArrayList<Query> toBeKilled = new ArrayList<>();
461 synchronized (this.pendingQueries) {
462 for (final Iterator<Query> iterator = this.pendingQueries.iterator();
463 iterator.hasNext(); ) {
464 final Query query = iterator.next();
465 if (query.getConversation() == conversation) {
466 iterator.remove();
467 Log.d(
468 Config.LOGTAG,
469 conversation.getAccount().getJid().asBareJid()
470 + ": killed pending MAM query for archived conversation");
471 }
472 }
473 }
474 synchronized (this.queries) {
475 for (final Query q : queries) {
476 if (q.conversation == conversation) {
477 toBeKilled.add(q);
478 }
479 }
480 }
481 for (final Query q : toBeKilled) {
482 kill(q);
483 }
484 }
485
486 private void kill(Query query) {
487 Log.d(
488 Config.LOGTAG,
489 query.getAccount().getJid().asBareJid() + ": killing mam query prematurely");
490 query.callback = null;
491 this.finalizeQuery(query, false);
492 if (query.isCatchup() && query.getActualMessageCount() > 0) {
493 mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
494 }
495 this.processPostponed(query);
496 }
497
498 private void processPostponed(Query query) {
499 query.account.getAxolotlService().processPostponed();
500 query.pendingReceiptRequests.removeAll(query.receiptRequests);
501 Log.d(
502 Config.LOGTAG,
503 query.getAccount().getJid().asBareJid()
504 + ": found "
505 + query.pendingReceiptRequests.size()
506 + " pending receipt requests");
507 Iterator<ReceiptRequest> iterator = query.pendingReceiptRequests.iterator();
508 while (iterator.hasNext()) {
509 ReceiptRequest rr = iterator.next();
510 mXmppConnectionService.sendMessagePacket(
511 query.account,
512 mXmppConnectionService
513 .getMessageGenerator()
514 .received(query.account, rr.getJid(), rr.getId()));
515 iterator.remove();
516 }
517 }
518
519 public Query findQuery(String id) {
520 if (id == null) {
521 return null;
522 }
523 synchronized (this.queries) {
524 for (Query query : this.queries) {
525 if (query.getQueryId().equals(id)) {
526 return query;
527 }
528 }
529 return null;
530 }
531 }
532
533 @Override
534 public void onAdvancedStreamFeaturesAvailable(Account account) {
535 if (account.getXmppConnection() != null
536 && account.getXmppConnection().getFeatures().mam()) {
537 this.catchup(account);
538 }
539 }
540
541 public enum PagingOrder {
542 NORMAL,
543 REVERSE
544 }
545
546 public class Query {
547 private HashSet<ReceiptRequest> pendingReceiptRequests = new HashSet<>();
548 private HashSet<ReceiptRequest> receiptRequests = new HashSet<>();
549 private int totalCount = 0;
550 private int actualCount = 0;
551 private int actualInThisQuery = 0;
552 private long start;
553 private final long end;
554 private final String queryId;
555 private String reference = null;
556 private final Account account;
557 private Conversation conversation;
558 private PagingOrder pagingOrder = PagingOrder.NORMAL;
559 private XmppConnectionService.OnMoreMessagesLoaded callback = null;
560 private boolean catchup = true;
561 public final Version version;
562
563 Query(Conversation conversation, MamReference start, long end, boolean catchup, PagingOrder order) {
564 this(conversation, start, end, catchup);
565 this.pagingOrder = order;
566 }
567
568 Query(Conversation conversation, MamReference start, long end, boolean catchup) {
569 this(
570 conversation.getAccount(),
571 Version.get(conversation.getAccount(), conversation),
572 catchup ? start : start.timeOnly(),
573 end);
574 this.conversation = conversation;
575 this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
576 this.catchup = catchup;
577 }
578
579 Query(Account account, MamReference start, long end) {
580 this(account, Version.get(account), start, end);
581 }
582
583 Query(Account account, Version version, MamReference start, long end) {
584 this.account = account;
585 if (start.getReference() != null) {
586 this.reference = start.getReference();
587 } else {
588 this.start = start.getTimestamp();
589 }
590 this.end = end;
591 this.queryId = new BigInteger(50, SECURE_RANDOM).toString(32);
592 this.version = version;
593 }
594
595 private Query page(String reference) {
596 Query query =
597 new Query(
598 this.account,
599 this.version,
600 new MamReference(this.start, reference),
601 this.end);
602 query.conversation = conversation;
603 query.totalCount = totalCount;
604 query.actualCount = actualCount;
605 query.pendingReceiptRequests = pendingReceiptRequests;
606 query.receiptRequests = receiptRequests;
607 query.callback = callback;
608 query.catchup = catchup;
609 return query;
610 }
611
612 public void removePendingReceiptRequest(ReceiptRequest receiptRequest) {
613 if (!this.pendingReceiptRequests.remove(receiptRequest)) {
614 this.receiptRequests.add(receiptRequest);
615 }
616 }
617
618 public void addPendingReceiptRequest(ReceiptRequest receiptRequest) {
619 this.pendingReceiptRequests.add(receiptRequest);
620 }
621
622 public boolean isLegacy() {
623 return version.legacy;
624 }
625
626 public boolean safeToExtractTrueCounterpart() {
627 return muc() && !isLegacy();
628 }
629
630 public Query next(String reference) {
631 Query query = page(reference);
632 query.pagingOrder = PagingOrder.NORMAL;
633 return query;
634 }
635
636 Query prev(String reference) {
637 Query query = page(reference);
638 query.pagingOrder = PagingOrder.REVERSE;
639 return query;
640 }
641
642 public String getReference() {
643 return reference;
644 }
645
646 public PagingOrder getPagingOrder() {
647 return this.pagingOrder;
648 }
649
650 public String getQueryId() {
651 return queryId;
652 }
653
654 public Jid getWith() {
655 return conversation == null ? null : conversation.getJid().asBareJid();
656 }
657
658 public boolean muc() {
659 return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
660 }
661
662 public long getStart() {
663 return start;
664 }
665
666 public boolean isCatchup() {
667 return catchup;
668 }
669
670 public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
671 this.callback = callback;
672 }
673
674 public void callback(boolean done) {
675 if (this.callback != null) {
676 this.callback.onMoreMessagesLoaded(actualCount, conversation);
677 if (done) {
678 this.callback.informUser(R.string.no_more_history_on_server);
679 }
680 }
681 }
682
683 public long getEnd() {
684 return end;
685 }
686
687 public Conversation getConversation() {
688 return conversation;
689 }
690
691 public Account getAccount() {
692 return this.account;
693 }
694
695 public void incrementMessageCount() {
696 this.totalCount++;
697 }
698
699 public void incrementActualMessageCount() {
700 this.actualInThisQuery++;
701 this.actualCount++;
702 }
703
704 int getTotalCount() {
705 return this.totalCount;
706 }
707
708 int getActualMessageCount() {
709 return this.actualCount;
710 }
711
712 public int getActualInThisQuery() {
713 return this.actualInThisQuery;
714 }
715
716 public boolean validFrom(Jid from) {
717 if (muc()) {
718 return getWith().equals(from);
719 } else {
720 return (from == null) || account.getJid().asBareJid().equals(from.asBareJid());
721 }
722 }
723
724 @NonNull
725 @Override
726 public String toString() {
727 StringBuilder builder = new StringBuilder();
728 if (this.muc()) {
729 builder.append("to=");
730 builder.append(this.getWith().toString());
731 } else {
732 builder.append("with=");
733 if (this.getWith() == null) {
734 builder.append("*");
735 } else {
736 builder.append(getWith().toString());
737 }
738 }
739 if (this.start != 0) {
740 builder.append(", start=");
741 builder.append(AbstractGenerator.getTimestamp(this.start));
742 }
743 if (this.end != 0) {
744 builder.append(", end=");
745 builder.append(AbstractGenerator.getTimestamp(this.end));
746 }
747 builder.append(", order=").append(pagingOrder.toString());
748 if (this.reference != null) {
749 if (this.pagingOrder == PagingOrder.NORMAL) {
750 builder.append(", after=");
751 } else {
752 builder.append(", before=");
753 }
754 builder.append(this.reference);
755 }
756 builder.append(", catchup=").append(catchup);
757 builder.append(", ns=").append(version.namespace);
758 return builder.toString();
759 }
760
761 boolean hasCallback() {
762 return this.callback != null;
763 }
764
765 public boolean isImplausibleFrom(final Jid from) {
766 if (muc()) {
767 if (from == null) {
768 return true;
769 }
770 return !from.asBareJid().equals(getWith());
771 } else {
772 return false;
773 }
774 }
775 }
776}