1package eu.siacs.conversations.services;
2
3import static eu.siacs.conversations.utils.Random.SECURE_RANDOM;
4
5import android.util.Log;
6import androidx.annotation.NonNull;
7import eu.siacs.conversations.Config;
8import eu.siacs.conversations.R;
9import eu.siacs.conversations.entities.Account;
10import eu.siacs.conversations.entities.Conversation;
11import eu.siacs.conversations.entities.Conversational;
12import eu.siacs.conversations.entities.ReceiptRequest;
13import eu.siacs.conversations.generator.AbstractGenerator;
14import eu.siacs.conversations.xml.Element;
15import eu.siacs.conversations.xmpp.Jid;
16import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
17import eu.siacs.conversations.xmpp.mam.MamReference;
18import im.conversations.android.xmpp.model.stanza.Iq;
19import im.conversations.android.xmpp.model.stanza.Message;
20import java.math.BigInteger;
21import java.util.ArrayList;
22import java.util.Collection;
23import java.util.HashSet;
24import java.util.Iterator;
25import java.util.List;
26
27public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
28
29 private final XmppConnectionService mXmppConnectionService;
30
31 private final HashSet<Query> queries = new HashSet<>();
32 private final ArrayList<Query> pendingQueries = new ArrayList<>();
33
34 public enum Version {
35 MAM_0("urn:xmpp:mam:0", true),
36 MAM_1("urn:xmpp:mam:1", false),
37 MAM_2("urn:xmpp:mam:2", false);
38
39 public final boolean legacy;
40 public final String namespace;
41
42 Version(String namespace, boolean legacy) {
43 this.namespace = namespace;
44 this.legacy = legacy;
45 }
46
47 public static Version get(Account account) {
48 return get(account, null);
49 }
50
51 public static Version get(Account account, Conversation conversation) {
52 if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
53 return get(account.getXmppConnection().getFeatures().getAccountFeatures());
54 } else {
55 return get(conversation.getMucOptions().getFeatures());
56 }
57 }
58
59 private static Version get(final Collection<String> features) {
60 final Version[] values = values();
61 for (int i = values.length - 1; i >= 0; --i) {
62 for (String feature : features) {
63 if (values[i].namespace.equals(feature)) {
64 return values[i];
65 }
66 }
67 }
68 return MAM_0;
69 }
70
71 public static boolean has(final Collection<String> features) {
72 for (String feature : features) {
73 for (Version version : values()) {
74 if (version.namespace.equals(feature)) {
75 return true;
76 }
77 }
78 }
79 return false;
80 }
81
82 public static Element findResult(Message packet) {
83 for (Version version : values()) {
84 Element result = packet.findChild("result", version.namespace);
85 if (result != null) {
86 return result;
87 }
88 }
89 return null;
90 }
91 }
92
93 MessageArchiveService(final XmppConnectionService service) {
94 this.mXmppConnectionService = service;
95 }
96
97 private void catchup(final Account account) {
98 synchronized (this.queries) {
99 for (Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext(); ) {
100 Query query = iterator.next();
101 if (query.getAccount() == account) {
102 iterator.remove();
103 }
104 }
105 }
106 MamReference mamReference =
107 MamReference.max(
108 mXmppConnectionService.databaseBackend.getLastMessageReceived(account),
109 mXmppConnectionService.databaseBackend.getLastClearDate(account));
110 mamReference =
111 MamReference.max(
112 mamReference, mXmppConnectionService.getAutomaticMessageDeletionDate());
113 long endCatchup = account.getXmppConnection().getLastSessionEstablished();
114 final Query query;
115 if (mamReference.getTimestamp() == 0) {
116 return;
117 } else if (endCatchup - mamReference.getTimestamp() >= Config.MAM_MAX_CATCHUP) {
118 long startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
119 List<Conversation> conversations = mXmppConnectionService.getConversations();
120 for (Conversation conversation : conversations) {
121 if (conversation.getMode() == Conversation.MODE_SINGLE
122 && conversation.getAccount() == account
123 && startCatchup > conversation.getLastMessageTransmitted().getTimestamp()) {
124 this.query(conversation, startCatchup, true);
125 }
126 }
127 query = new Query(account, new MamReference(startCatchup), 0);
128 } else {
129 query = new Query(account, mamReference, 0);
130 }
131 synchronized (this.queries) {
132 this.queries.add(query);
133 }
134 this.execute(query);
135 }
136
137 void catchupMUC(final Conversation conversation) {
138 if (conversation.getLastMessageTransmitted().getTimestamp() < 0
139 && conversation.countMessages() == 0) {
140 query(conversation, new MamReference(0), 0, true);
141 } else {
142 query(conversation, conversation.getLastMessageTransmitted(), 0, true);
143 }
144 }
145
146 public Query query(final Conversation conversation) {
147 if (conversation.getLastMessageTransmitted().getTimestamp() < 0
148 && conversation.countMessages() == 0) {
149 return query(conversation, new MamReference(0), System.currentTimeMillis(), false);
150 } else {
151 return query(
152 conversation,
153 conversation.getLastMessageTransmitted(),
154 conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
155 false);
156 }
157 }
158
159 public boolean isCatchingUp(Conversation conversation) {
160 final Account account = conversation.getAccount();
161 if (account.getXmppConnection().isWaitingForSmCatchup()) {
162 return true;
163 } else {
164 synchronized (this.queries) {
165 for (Query query : this.queries) {
166 if (query.getAccount() == account
167 && query.isCatchup()
168 && ((conversation.getMode() == Conversation.MODE_SINGLE
169 && query.getWith() == null)
170 || query.getConversation() == conversation)) {
171 return true;
172 }
173 }
174 }
175 return false;
176 }
177 }
178
179 public Query query(final Conversation conversation, long end, boolean allowCatchup) {
180 return this.query(
181 conversation, conversation.getLastMessageTransmitted(), end, allowCatchup);
182 }
183
184 public Query query(
185 Conversation conversation, MamReference start, long end, boolean allowCatchup) {
186 synchronized (this.queries) {
187 final Query query;
188 final MamReference startActual =
189 MamReference.max(
190 start, mXmppConnectionService.getAutomaticMessageDeletionDate());
191 if (start.getTimestamp() == 0) {
192 query = new Query(conversation, startActual, end, false);
193 query.reference = conversation.getFirstMamReference();
194 } else {
195 if (allowCatchup) {
196 MamReference maxCatchup =
197 MamReference.max(
198 startActual,
199 System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
200 if (maxCatchup.greaterThan(startActual)) {
201 Query reverseCatchup =
202 new Query(
203 conversation,
204 startActual,
205 maxCatchup.getTimestamp(),
206 false);
207 this.queries.add(reverseCatchup);
208 this.execute(reverseCatchup);
209 }
210 query = new Query(conversation, maxCatchup, end, true);
211 } else {
212 query = new Query(conversation, startActual, end, false);
213 }
214 }
215 if (end != 0 && start.greaterThan(end)) {
216 return null;
217 }
218 this.queries.add(query);
219 this.execute(query);
220 return query;
221 }
222 }
223
224 public void executePendingQueries(final Account account) {
225 final List<Query> pending = new ArrayList<>();
226 synchronized (this.pendingQueries) {
227 for (Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
228 Query query = iterator.next();
229 if (query.getAccount() == account) {
230 pending.add(query);
231 iterator.remove();
232 }
233 }
234 }
235 for (Query query : pending) {
236 this.execute(query);
237 }
238 }
239
240 private void execute(final Query query) {
241 final Account account = query.getAccount();
242 if (account.getStatus() == Account.State.ONLINE) {
243 final Conversation conversation = query.getConversation();
244 if (conversation != null && conversation.getStatus() == Conversation.STATUS_ARCHIVED) {
245 throw new IllegalStateException(
246 "Attempted to run MAM query for archived conversation");
247 }
248 Log.d(
249 Config.LOGTAG,
250 account.getJid().asBareJid().toString() + ": running mam query " + query);
251 final Iq packet =
252 this.mXmppConnectionService
253 .getIqGenerator()
254 .queryMessageArchiveManagement(query);
255 this.mXmppConnectionService.sendIqPacket(
256 account,
257 packet,
258 (p) -> {
259 final Element fin = p.findChild("fin", query.version.namespace);
260 if (p.getType() == Iq.Type.TIMEOUT) {
261 synchronized (this.queries) {
262 this.queries.remove(query);
263 if (query.hasCallback()) {
264 query.callback(false);
265 }
266 }
267 } else if (p.getType() == Iq.Type.RESULT && fin != null) {
268 final boolean running;
269 synchronized (this.queries) {
270 running = this.queries.contains(query);
271 }
272 if (running) {
273 processFin(query, fin);
274 } else {
275 Log.d(
276 Config.LOGTAG,
277 account.getJid().asBareJid()
278 + ": ignoring MAM iq result because query had been"
279 + " killed");
280 }
281 } else if (p.getType() == Iq.Type.RESULT && query.isLegacy()) {
282 // do nothing
283 } else {
284 Log.d(
285 Config.LOGTAG,
286 account.getJid().asBareJid().toString()
287 + ": error executing mam: "
288 + p);
289 try {
290 finalizeQuery(query, true);
291 } catch (final IllegalStateException e) {
292 // ignored
293 }
294 }
295 });
296 } else {
297 synchronized (this.pendingQueries) {
298 this.pendingQueries.add(query);
299 }
300 }
301 }
302
303 private void finalizeQuery(final Query query, boolean done) {
304 synchronized (this.queries) {
305 if (!this.queries.remove(query)) {
306 throw new IllegalStateException("Unable to remove query from queries");
307 }
308 }
309 final Conversation conversation = query.getConversation();
310 if (conversation != null) {
311 conversation.sort();
312 conversation.setHasMessagesLeftOnServer(!done);
313 final var displayState = conversation.getDisplayState();
314 if (displayState != null) {
315 mXmppConnectionService.markReadUpToStanzaId(conversation, displayState);
316 }
317 } else {
318 for (final Conversation tmp : this.mXmppConnectionService.getConversations()) {
319 if (tmp.getAccount() == query.getAccount()) {
320 tmp.sort();
321 final var displayState = tmp.getDisplayState();
322 if (displayState != null) {
323 mXmppConnectionService.markReadUpToStanzaId(tmp, displayState);
324 }
325 }
326 }
327 }
328 if (query.hasCallback()) {
329 query.callback(done);
330 } else {
331 this.mXmppConnectionService.updateConversationUi();
332 }
333 }
334
335 public boolean inCatchup(Account account) {
336 synchronized (this.queries) {
337 for (Query query : queries) {
338 if (query.account == account && query.isCatchup() && query.getWith() == null) {
339 return true;
340 }
341 }
342 }
343 return false;
344 }
345
346 public boolean isCatchupInProgress(Conversation conversation) {
347 synchronized (this.queries) {
348 for (Query query : queries) {
349 if (query.account == conversation.getAccount() && query.isCatchup()) {
350 final Jid with = query.getWith() == null ? null : query.getWith().asBareJid();
351 if ((conversation.getMode() == Conversational.MODE_SINGLE && with == null)
352 || (conversation.getJid().asBareJid().equals(with))) {
353 return true;
354 }
355 }
356 }
357 }
358 return false;
359 }
360
361 boolean queryInProgress(
362 Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
363 synchronized (this.queries) {
364 for (Query query : queries) {
365 if (query.conversation == conversation) {
366 if (!query.hasCallback() && callback != null) {
367 query.setCallback(callback);
368 }
369 return true;
370 }
371 }
372 return false;
373 }
374 }
375
376 public boolean queryInProgress(Conversation conversation) {
377 return queryInProgress(conversation, null);
378 }
379
380 public void processFinLegacy(Element fin, Jid from) {
381 Query query = findQuery(fin.getAttribute("queryid"));
382 if (query != null && query.validFrom(from)) {
383 processFin(query, fin);
384 }
385 }
386
387 private void processFin(Query query, Element fin) {
388 boolean complete = fin.getAttributeAsBoolean("complete");
389 Element set = fin.findChild("set", "http://jabber.org/protocol/rsm");
390 Element last = set == null ? null : set.findChild("last");
391 String count = set == null ? null : set.findChildContent("count");
392 Element first = set == null ? null : set.findChild("first");
393 Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
394 boolean abort =
395 (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE)
396 || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
397 if (query.getConversation() != null) {
398 query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
399 }
400 if (complete || relevant == null || abort) {
401 // TODO: FIX done logic to look at complete. using count is probably unreliable because
402 // it can be ommited and doesn’t work with paging.
403 boolean done;
404 if (query.isCatchup()) {
405 done = false;
406 } else {
407 if (count != null) {
408 try {
409 done = Integer.parseInt(count) <= query.getTotalCount();
410 } catch (NumberFormatException e) {
411 done = false;
412 }
413 } else {
414 done = query.getTotalCount() == 0;
415 }
416 }
417 done = done || (query.getActualMessageCount() == 0 && !query.isCatchup());
418 this.finalizeQuery(query, done);
419
420 Log.d(
421 Config.LOGTAG,
422 query.getAccount().getJid().asBareJid()
423 + ": finished mam after "
424 + query.getTotalCount()
425 + "("
426 + query.getActualMessageCount()
427 + ") messages. messages left="
428 + !done
429 + " count="
430 + count);
431 if (query.isCatchup() && query.getActualMessageCount() > 0) {
432 mXmppConnectionService
433 .getNotificationService()
434 .finishBacklog(true, query.getAccount());
435 }
436 processPostponed(query);
437 } else {
438 final Query nextQuery;
439 if (query.getPagingOrder() == PagingOrder.NORMAL) {
440 nextQuery = query.next(last == null ? null : last.getContent());
441 } else {
442 nextQuery = query.prev(first == null ? null : first.getContent());
443 }
444 this.execute(nextQuery);
445 this.finalizeQuery(query, false);
446 synchronized (this.queries) {
447 this.queries.add(nextQuery);
448 }
449 }
450 }
451
452 void kill(final Conversation conversation) {
453 final ArrayList<Query> toBeKilled = new ArrayList<>();
454 synchronized (this.pendingQueries) {
455 for (final Iterator<Query> iterator = this.pendingQueries.iterator();
456 iterator.hasNext(); ) {
457 final Query query = iterator.next();
458 if (query.getConversation() == conversation) {
459 iterator.remove();
460 Log.d(
461 Config.LOGTAG,
462 conversation.getAccount().getJid().asBareJid()
463 + ": killed pending MAM query for archived conversation");
464 }
465 }
466 }
467 synchronized (this.queries) {
468 for (final Query q : queries) {
469 if (q.conversation == conversation) {
470 toBeKilled.add(q);
471 }
472 }
473 }
474 for (final Query q : toBeKilled) {
475 kill(q);
476 }
477 }
478
479 private void kill(Query query) {
480 Log.d(
481 Config.LOGTAG,
482 query.getAccount().getJid().asBareJid() + ": killing mam query prematurely");
483 query.callback = null;
484 this.finalizeQuery(query, false);
485 if (query.isCatchup() && query.getActualMessageCount() > 0) {
486 mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
487 }
488 this.processPostponed(query);
489 }
490
491 private void processPostponed(Query query) {
492 query.account.getAxolotlService().processPostponed();
493 query.pendingReceiptRequests.removeAll(query.receiptRequests);
494 Log.d(
495 Config.LOGTAG,
496 query.getAccount().getJid().asBareJid()
497 + ": found "
498 + query.pendingReceiptRequests.size()
499 + " pending receipt requests");
500 Iterator<ReceiptRequest> iterator = query.pendingReceiptRequests.iterator();
501 while (iterator.hasNext()) {
502 ReceiptRequest rr = iterator.next();
503 mXmppConnectionService.sendMessagePacket(
504 query.account,
505 mXmppConnectionService
506 .getMessageGenerator()
507 .received(query.account, rr.getJid(), rr.getId()));
508 iterator.remove();
509 }
510 }
511
512 public Query findQuery(String id) {
513 if (id == null) {
514 return null;
515 }
516 synchronized (this.queries) {
517 for (Query query : this.queries) {
518 if (query.getQueryId().equals(id)) {
519 return query;
520 }
521 }
522 return null;
523 }
524 }
525
526 @Override
527 public void onAdvancedStreamFeaturesAvailable(Account account) {
528 if (account.getXmppConnection() != null
529 && account.getXmppConnection().getFeatures().mam()) {
530 this.catchup(account);
531 }
532 }
533
534 public enum PagingOrder {
535 NORMAL,
536 REVERSE
537 }
538
539 public class Query {
540 private HashSet<ReceiptRequest> pendingReceiptRequests = new HashSet<>();
541 private HashSet<ReceiptRequest> receiptRequests = new HashSet<>();
542 private int totalCount = 0;
543 private int actualCount = 0;
544 private int actualInThisQuery = 0;
545 private long start;
546 private final long end;
547 private final String queryId;
548 private String reference = null;
549 private final Account account;
550 private Conversation conversation;
551 private PagingOrder pagingOrder = PagingOrder.NORMAL;
552 private XmppConnectionService.OnMoreMessagesLoaded callback = null;
553 private boolean catchup = true;
554 public final Version version;
555
556 Query(Conversation conversation, MamReference start, long end, boolean catchup) {
557 this(
558 conversation.getAccount(),
559 Version.get(conversation.getAccount(), conversation),
560 catchup ? start : start.timeOnly(),
561 end);
562 this.conversation = conversation;
563 this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
564 this.catchup = catchup;
565 }
566
567 Query(Account account, MamReference start, long end) {
568 this(account, Version.get(account), start, end);
569 }
570
571 Query(Account account, Version version, MamReference start, long end) {
572 this.account = account;
573 if (start.getReference() != null) {
574 this.reference = start.getReference();
575 } else {
576 this.start = start.getTimestamp();
577 }
578 this.end = end;
579 this.queryId = new BigInteger(50, SECURE_RANDOM).toString(32);
580 this.version = version;
581 }
582
583 private Query page(String reference) {
584 Query query =
585 new Query(
586 this.account,
587 this.version,
588 new MamReference(this.start, reference),
589 this.end);
590 query.conversation = conversation;
591 query.totalCount = totalCount;
592 query.actualCount = actualCount;
593 query.pendingReceiptRequests = pendingReceiptRequests;
594 query.receiptRequests = receiptRequests;
595 query.callback = callback;
596 query.catchup = catchup;
597 return query;
598 }
599
600 public void removePendingReceiptRequest(ReceiptRequest receiptRequest) {
601 if (!this.pendingReceiptRequests.remove(receiptRequest)) {
602 this.receiptRequests.add(receiptRequest);
603 }
604 }
605
606 public void addPendingReceiptRequest(ReceiptRequest receiptRequest) {
607 this.pendingReceiptRequests.add(receiptRequest);
608 }
609
610 public boolean isLegacy() {
611 return version.legacy;
612 }
613
614 public boolean safeToExtractTrueCounterpart() {
615 return muc() && !isLegacy();
616 }
617
618 public Query next(String reference) {
619 Query query = page(reference);
620 query.pagingOrder = PagingOrder.NORMAL;
621 return query;
622 }
623
624 Query prev(String reference) {
625 Query query = page(reference);
626 query.pagingOrder = PagingOrder.REVERSE;
627 return query;
628 }
629
630 public String getReference() {
631 return reference;
632 }
633
634 public PagingOrder getPagingOrder() {
635 return this.pagingOrder;
636 }
637
638 public String getQueryId() {
639 return queryId;
640 }
641
642 public Jid getWith() {
643 return conversation == null ? null : conversation.getJid().asBareJid();
644 }
645
646 public boolean muc() {
647 return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
648 }
649
650 public long getStart() {
651 return start;
652 }
653
654 public boolean isCatchup() {
655 return catchup;
656 }
657
658 public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
659 this.callback = callback;
660 }
661
662 public void callback(boolean done) {
663 if (this.callback != null) {
664 this.callback.onMoreMessagesLoaded(actualCount, conversation);
665 if (done) {
666 this.callback.informUser(R.string.no_more_history_on_server);
667 }
668 }
669 }
670
671 public long getEnd() {
672 return end;
673 }
674
675 public Conversation getConversation() {
676 return conversation;
677 }
678
679 public Account getAccount() {
680 return this.account;
681 }
682
683 public void incrementMessageCount() {
684 this.totalCount++;
685 }
686
687 public void incrementActualMessageCount() {
688 this.actualInThisQuery++;
689 this.actualCount++;
690 }
691
692 int getTotalCount() {
693 return this.totalCount;
694 }
695
696 int getActualMessageCount() {
697 return this.actualCount;
698 }
699
700 public int getActualInThisQuery() {
701 return this.actualInThisQuery;
702 }
703
704 public boolean validFrom(Jid from) {
705 if (muc()) {
706 return getWith().equals(from);
707 } else {
708 return (from == null) || account.getJid().asBareJid().equals(from.asBareJid());
709 }
710 }
711
712 @NonNull
713 @Override
714 public String toString() {
715 StringBuilder builder = new StringBuilder();
716 if (this.muc()) {
717 builder.append("to=");
718 builder.append(this.getWith().toString());
719 } else {
720 builder.append("with=");
721 if (this.getWith() == null) {
722 builder.append("*");
723 } else {
724 builder.append(getWith().toString());
725 }
726 }
727 if (this.start != 0) {
728 builder.append(", start=");
729 builder.append(AbstractGenerator.getTimestamp(this.start));
730 }
731 if (this.end != 0) {
732 builder.append(", end=");
733 builder.append(AbstractGenerator.getTimestamp(this.end));
734 }
735 builder.append(", order=").append(pagingOrder.toString());
736 if (this.reference != null) {
737 if (this.pagingOrder == PagingOrder.NORMAL) {
738 builder.append(", after=");
739 } else {
740 builder.append(", before=");
741 }
742 builder.append(this.reference);
743 }
744 builder.append(", catchup=").append(catchup);
745 builder.append(", ns=").append(version.namespace);
746 return builder.toString();
747 }
748
749 boolean hasCallback() {
750 return this.callback != null;
751 }
752 }
753}