1package eu.siacs.conversations.services;
2
3import static eu.siacs.conversations.utils.Random.SECURE_RANDOM;
4
5import android.util.Log;
6import androidx.annotation.NonNull;
7import eu.siacs.conversations.Config;
8import eu.siacs.conversations.R;
9import eu.siacs.conversations.entities.Account;
10import eu.siacs.conversations.entities.Conversation;
11import eu.siacs.conversations.entities.Conversational;
12import eu.siacs.conversations.entities.ReceiptRequest;
13import eu.siacs.conversations.generator.AbstractGenerator;
14import eu.siacs.conversations.xml.Element;
15import eu.siacs.conversations.xmpp.Jid;
16import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
17import eu.siacs.conversations.xmpp.mam.MamReference;
18import im.conversations.android.xmpp.model.stanza.Iq;
19import im.conversations.android.xmpp.model.stanza.Message;
20import java.math.BigInteger;
21import java.util.ArrayList;
22import java.util.HashSet;
23import java.util.Iterator;
24import java.util.List;
25
26public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
27
28 private final XmppConnectionService mXmppConnectionService;
29
30 private final HashSet<Query> queries = new HashSet<>();
31 private final ArrayList<Query> pendingQueries = new ArrayList<>();
32
33 public enum Version {
34 MAM_0("urn:xmpp:mam:0", true),
35 MAM_1("urn:xmpp:mam:1", false),
36 MAM_2("urn:xmpp:mam:2", false);
37
38 public final boolean legacy;
39 public final String namespace;
40
41 Version(String namespace, boolean legacy) {
42 this.namespace = namespace;
43 this.legacy = legacy;
44 }
45
46 public static Version get(Account account) {
47 return get(account, null);
48 }
49
50 public static Version get(Account account, Conversation conversation) {
51 if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
52 return get(account.getXmppConnection().getFeatures().getAccountFeatures());
53 } else {
54 return get(conversation.getMucOptions().getFeatures());
55 }
56 }
57
58 private static Version get(List<String> features) {
59 final Version[] values = values();
60 for (int i = values.length - 1; i >= 0; --i) {
61 for (String feature : features) {
62 if (values[i].namespace.equals(feature)) {
63 return values[i];
64 }
65 }
66 }
67 return MAM_0;
68 }
69
70 public static boolean has(List<String> features) {
71 for (String feature : features) {
72 for (Version version : values()) {
73 if (version.namespace.equals(feature)) {
74 return true;
75 }
76 }
77 }
78 return false;
79 }
80
81 public static Element findResult(Message packet) {
82 for (Version version : values()) {
83 Element result = packet.findChild("result", version.namespace);
84 if (result != null) {
85 return result;
86 }
87 }
88 return null;
89 }
90 }
91
92 MessageArchiveService(final XmppConnectionService service) {
93 this.mXmppConnectionService = service;
94 }
95
96 private void catchup(final Account account) {
97 synchronized (this.queries) {
98 for (Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext(); ) {
99 Query query = iterator.next();
100 if (query.getAccount() == account) {
101 iterator.remove();
102 }
103 }
104 }
105 MamReference mamReference =
106 MamReference.max(
107 mXmppConnectionService.databaseBackend.getLastMessageReceived(account),
108 mXmppConnectionService.databaseBackend.getLastClearDate(account));
109 mamReference =
110 MamReference.max(
111 mamReference, mXmppConnectionService.getAutomaticMessageDeletionDate());
112 long endCatchup = account.getXmppConnection().getLastSessionEstablished();
113 final Query query;
114 if (mamReference.getTimestamp() == 0) {
115 return;
116 } else if (endCatchup - mamReference.getTimestamp() >= Config.MAM_MAX_CATCHUP) {
117 long startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
118 List<Conversation> conversations = mXmppConnectionService.getConversations();
119 for (Conversation conversation : conversations) {
120 if (conversation.getMode() == Conversation.MODE_SINGLE
121 && conversation.getAccount() == account
122 && startCatchup > conversation.getLastMessageTransmitted().getTimestamp()) {
123 this.query(conversation, startCatchup, true);
124 }
125 }
126 query = new Query(account, new MamReference(startCatchup), 0);
127 } else {
128 query = new Query(account, mamReference, 0);
129 }
130 synchronized (this.queries) {
131 this.queries.add(query);
132 }
133 this.execute(query);
134 }
135
136 void catchupMUC(final Conversation conversation) {
137 if (conversation.getLastMessageTransmitted().getTimestamp() < 0
138 && conversation.countMessages() == 0) {
139 query(conversation, new MamReference(0), 0, true);
140 } else {
141 query(conversation, conversation.getLastMessageTransmitted(), 0, true);
142 }
143 }
144
145 public Query query(final Conversation conversation) {
146 if (conversation.getLastMessageTransmitted().getTimestamp() < 0
147 && conversation.countMessages() == 0) {
148 return query(conversation, new MamReference(0), System.currentTimeMillis(), false);
149 } else {
150 return query(
151 conversation,
152 conversation.getLastMessageTransmitted(),
153 conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
154 false);
155 }
156 }
157
158 public boolean isCatchingUp(Conversation conversation) {
159 final Account account = conversation.getAccount();
160 if (account.getXmppConnection().isWaitingForSmCatchup()) {
161 return true;
162 } else {
163 synchronized (this.queries) {
164 for (Query query : this.queries) {
165 if (query.getAccount() == account
166 && query.isCatchup()
167 && ((conversation.getMode() == Conversation.MODE_SINGLE
168 && query.getWith() == null)
169 || query.getConversation() == conversation)) {
170 return true;
171 }
172 }
173 }
174 return false;
175 }
176 }
177
178 public Query query(final Conversation conversation, long end, boolean allowCatchup) {
179 return this.query(
180 conversation, conversation.getLastMessageTransmitted(), end, allowCatchup);
181 }
182
183 public Query query(
184 Conversation conversation, MamReference start, long end, boolean allowCatchup) {
185 synchronized (this.queries) {
186 final Query query;
187 final MamReference startActual =
188 MamReference.max(
189 start, mXmppConnectionService.getAutomaticMessageDeletionDate());
190 if (start.getTimestamp() == 0) {
191 query = new Query(conversation, startActual, end, false);
192 query.reference = conversation.getFirstMamReference();
193 } else {
194 if (allowCatchup) {
195 MamReference maxCatchup =
196 MamReference.max(
197 startActual,
198 System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
199 if (maxCatchup.greaterThan(startActual)) {
200 Query reverseCatchup =
201 new Query(
202 conversation,
203 startActual,
204 maxCatchup.getTimestamp(),
205 false);
206 this.queries.add(reverseCatchup);
207 this.execute(reverseCatchup);
208 }
209 query = new Query(conversation, maxCatchup, end, true);
210 } else {
211 query = new Query(conversation, startActual, end, false);
212 }
213 }
214 if (end != 0 && start.greaterThan(end)) {
215 return null;
216 }
217 this.queries.add(query);
218 this.execute(query);
219 return query;
220 }
221 }
222
223 void executePendingQueries(final Account account) {
224 final List<Query> pending = new ArrayList<>();
225 synchronized (this.pendingQueries) {
226 for (Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
227 Query query = iterator.next();
228 if (query.getAccount() == account) {
229 pending.add(query);
230 iterator.remove();
231 }
232 }
233 }
234 for (Query query : pending) {
235 this.execute(query);
236 }
237 }
238
239 private void execute(final Query query) {
240 final Account account = query.getAccount();
241 if (account.getStatus() == Account.State.ONLINE) {
242 final Conversation conversation = query.getConversation();
243 if (conversation != null && conversation.getStatus() == Conversation.STATUS_ARCHIVED) {
244 throw new IllegalStateException(
245 "Attempted to run MAM query for archived conversation");
246 }
247 Log.d(
248 Config.LOGTAG,
249 account.getJid().asBareJid().toString() + ": running mam query " + query);
250 final Iq packet =
251 this.mXmppConnectionService
252 .getIqGenerator()
253 .queryMessageArchiveManagement(query);
254 this.mXmppConnectionService.sendIqPacket(
255 account,
256 packet,
257 (p) -> {
258 final Element fin = p.findChild("fin", query.version.namespace);
259 if (p.getType() == Iq.Type.TIMEOUT) {
260 synchronized (this.queries) {
261 this.queries.remove(query);
262 if (query.hasCallback()) {
263 query.callback(false);
264 }
265 }
266 } else if (p.getType() == Iq.Type.RESULT && fin != null) {
267 final boolean running;
268 synchronized (this.queries) {
269 running = this.queries.contains(query);
270 }
271 if (running) {
272 processFin(query, fin);
273 } else {
274 Log.d(
275 Config.LOGTAG,
276 account.getJid().asBareJid()
277 + ": ignoring MAM iq result because query had been"
278 + " killed");
279 }
280 } else if (p.getType() == Iq.Type.RESULT && query.isLegacy()) {
281 // do nothing
282 } else {
283 Log.d(
284 Config.LOGTAG,
285 account.getJid().asBareJid().toString()
286 + ": error executing mam: "
287 + p);
288 try {
289 finalizeQuery(query, true);
290 } catch (final IllegalStateException e) {
291 // ignored
292 }
293 }
294 });
295 } else {
296 synchronized (this.pendingQueries) {
297 this.pendingQueries.add(query);
298 }
299 }
300 }
301
302 private void finalizeQuery(final Query query, boolean done) {
303 synchronized (this.queries) {
304 if (!this.queries.remove(query)) {
305 throw new IllegalStateException("Unable to remove query from queries");
306 }
307 }
308 final Conversation conversation = query.getConversation();
309 if (conversation != null) {
310 conversation.sort();
311 conversation.setHasMessagesLeftOnServer(!done);
312 final var displayState = conversation.getDisplayState();
313 if (displayState != null) {
314 mXmppConnectionService.markReadUpToStanzaId(conversation, displayState);
315 }
316 } else {
317 for (final Conversation tmp : this.mXmppConnectionService.getConversations()) {
318 if (tmp.getAccount() == query.getAccount()) {
319 tmp.sort();
320 final var displayState = tmp.getDisplayState();
321 if (displayState != null) {
322 mXmppConnectionService.markReadUpToStanzaId(tmp, displayState);
323 }
324 }
325 }
326 }
327 if (query.hasCallback()) {
328 query.callback(done);
329 } else {
330 this.mXmppConnectionService.updateConversationUi();
331 }
332 }
333
334 public boolean inCatchup(Account account) {
335 synchronized (this.queries) {
336 for (Query query : queries) {
337 if (query.account == account && query.isCatchup() && query.getWith() == null) {
338 return true;
339 }
340 }
341 }
342 return false;
343 }
344
345 public boolean isCatchupInProgress(Conversation conversation) {
346 synchronized (this.queries) {
347 for (Query query : queries) {
348 if (query.account == conversation.getAccount() && query.isCatchup()) {
349 final Jid with = query.getWith() == null ? null : query.getWith().asBareJid();
350 if ((conversation.getMode() == Conversational.MODE_SINGLE && with == null)
351 || (conversation.getJid().asBareJid().equals(with))) {
352 return true;
353 }
354 }
355 }
356 }
357 return false;
358 }
359
360 boolean queryInProgress(
361 Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
362 synchronized (this.queries) {
363 for (Query query : queries) {
364 if (query.conversation == conversation) {
365 if (!query.hasCallback() && callback != null) {
366 query.setCallback(callback);
367 }
368 return true;
369 }
370 }
371 return false;
372 }
373 }
374
375 public boolean queryInProgress(Conversation conversation) {
376 return queryInProgress(conversation, null);
377 }
378
379 public void processFinLegacy(Element fin, Jid from) {
380 Query query = findQuery(fin.getAttribute("queryid"));
381 if (query != null && query.validFrom(from)) {
382 processFin(query, fin);
383 }
384 }
385
386 private void processFin(Query query, Element fin) {
387 boolean complete = fin.getAttributeAsBoolean("complete");
388 Element set = fin.findChild("set", "http://jabber.org/protocol/rsm");
389 Element last = set == null ? null : set.findChild("last");
390 String count = set == null ? null : set.findChildContent("count");
391 Element first = set == null ? null : set.findChild("first");
392 Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
393 boolean abort =
394 (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE)
395 || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
396 if (query.getConversation() != null) {
397 query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
398 }
399 if (complete || relevant == null || abort) {
400 // TODO: FIX done logic to look at complete. using count is probably unreliable because
401 // it can be ommited and doesn’t work with paging.
402 boolean done;
403 if (query.isCatchup()) {
404 done = false;
405 } else {
406 if (count != null) {
407 try {
408 done = Integer.parseInt(count) <= query.getTotalCount();
409 } catch (NumberFormatException e) {
410 done = false;
411 }
412 } else {
413 done = query.getTotalCount() == 0;
414 }
415 }
416 done = done || (query.getActualMessageCount() == 0 && !query.isCatchup());
417 this.finalizeQuery(query, done);
418
419 Log.d(
420 Config.LOGTAG,
421 query.getAccount().getJid().asBareJid()
422 + ": finished mam after "
423 + query.getTotalCount()
424 + "("
425 + query.getActualMessageCount()
426 + ") messages. messages left="
427 + !done
428 + " count="
429 + count);
430 if (query.isCatchup() && query.getActualMessageCount() > 0) {
431 mXmppConnectionService
432 .getNotificationService()
433 .finishBacklog(true, query.getAccount());
434 }
435 processPostponed(query);
436 } else {
437 final Query nextQuery;
438 if (query.getPagingOrder() == PagingOrder.NORMAL) {
439 nextQuery = query.next(last == null ? null : last.getContent());
440 } else {
441 nextQuery = query.prev(first == null ? null : first.getContent());
442 }
443 this.execute(nextQuery);
444 this.finalizeQuery(query, false);
445 synchronized (this.queries) {
446 this.queries.add(nextQuery);
447 }
448 }
449 }
450
451 void kill(final Conversation conversation) {
452 final ArrayList<Query> toBeKilled = new ArrayList<>();
453 synchronized (this.pendingQueries) {
454 for (final Iterator<Query> iterator = this.pendingQueries.iterator();
455 iterator.hasNext(); ) {
456 final Query query = iterator.next();
457 if (query.getConversation() == conversation) {
458 iterator.remove();
459 Log.d(
460 Config.LOGTAG,
461 conversation.getAccount().getJid().asBareJid()
462 + ": killed pending MAM query for archived conversation");
463 }
464 }
465 }
466 synchronized (this.queries) {
467 for (final Query q : queries) {
468 if (q.conversation == conversation) {
469 toBeKilled.add(q);
470 }
471 }
472 }
473 for (final Query q : toBeKilled) {
474 kill(q);
475 }
476 }
477
478 private void kill(Query query) {
479 Log.d(
480 Config.LOGTAG,
481 query.getAccount().getJid().asBareJid() + ": killing mam query prematurely");
482 query.callback = null;
483 this.finalizeQuery(query, false);
484 if (query.isCatchup() && query.getActualMessageCount() > 0) {
485 mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
486 }
487 this.processPostponed(query);
488 }
489
490 private void processPostponed(Query query) {
491 query.account.getAxolotlService().processPostponed();
492 query.pendingReceiptRequests.removeAll(query.receiptRequests);
493 Log.d(
494 Config.LOGTAG,
495 query.getAccount().getJid().asBareJid()
496 + ": found "
497 + query.pendingReceiptRequests.size()
498 + " pending receipt requests");
499 Iterator<ReceiptRequest> iterator = query.pendingReceiptRequests.iterator();
500 while (iterator.hasNext()) {
501 ReceiptRequest rr = iterator.next();
502 mXmppConnectionService.sendMessagePacket(
503 query.account,
504 mXmppConnectionService
505 .getMessageGenerator()
506 .received(query.account, rr.getJid(), rr.getId()));
507 iterator.remove();
508 }
509 }
510
511 public Query findQuery(String id) {
512 if (id == null) {
513 return null;
514 }
515 synchronized (this.queries) {
516 for (Query query : this.queries) {
517 if (query.getQueryId().equals(id)) {
518 return query;
519 }
520 }
521 return null;
522 }
523 }
524
525 @Override
526 public void onAdvancedStreamFeaturesAvailable(Account account) {
527 if (account.getXmppConnection() != null
528 && account.getXmppConnection().getFeatures().mam()) {
529 this.catchup(account);
530 }
531 }
532
533 public enum PagingOrder {
534 NORMAL,
535 REVERSE
536 }
537
538 public class Query {
539 private HashSet<ReceiptRequest> pendingReceiptRequests = new HashSet<>();
540 private HashSet<ReceiptRequest> receiptRequests = new HashSet<>();
541 private int totalCount = 0;
542 private int actualCount = 0;
543 private int actualInThisQuery = 0;
544 private long start;
545 private final long end;
546 private final String queryId;
547 private String reference = null;
548 private final Account account;
549 private Conversation conversation;
550 private PagingOrder pagingOrder = PagingOrder.NORMAL;
551 private XmppConnectionService.OnMoreMessagesLoaded callback = null;
552 private boolean catchup = true;
553 public final Version version;
554
555 Query(Conversation conversation, MamReference start, long end, boolean catchup) {
556 this(
557 conversation.getAccount(),
558 Version.get(conversation.getAccount(), conversation),
559 catchup ? start : start.timeOnly(),
560 end);
561 this.conversation = conversation;
562 this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
563 this.catchup = catchup;
564 }
565
566 Query(Account account, MamReference start, long end) {
567 this(account, Version.get(account), start, end);
568 }
569
570 Query(Account account, Version version, MamReference start, long end) {
571 this.account = account;
572 if (start.getReference() != null) {
573 this.reference = start.getReference();
574 } else {
575 this.start = start.getTimestamp();
576 }
577 this.end = end;
578 this.queryId = new BigInteger(50, SECURE_RANDOM).toString(32);
579 this.version = version;
580 }
581
582 private Query page(String reference) {
583 Query query =
584 new Query(
585 this.account,
586 this.version,
587 new MamReference(this.start, reference),
588 this.end);
589 query.conversation = conversation;
590 query.totalCount = totalCount;
591 query.actualCount = actualCount;
592 query.pendingReceiptRequests = pendingReceiptRequests;
593 query.receiptRequests = receiptRequests;
594 query.callback = callback;
595 query.catchup = catchup;
596 return query;
597 }
598
599 public void removePendingReceiptRequest(ReceiptRequest receiptRequest) {
600 if (!this.pendingReceiptRequests.remove(receiptRequest)) {
601 this.receiptRequests.add(receiptRequest);
602 }
603 }
604
605 public void addPendingReceiptRequest(ReceiptRequest receiptRequest) {
606 this.pendingReceiptRequests.add(receiptRequest);
607 }
608
609 public boolean isLegacy() {
610 return version.legacy;
611 }
612
613 public boolean safeToExtractTrueCounterpart() {
614 return muc() && !isLegacy();
615 }
616
617 public Query next(String reference) {
618 Query query = page(reference);
619 query.pagingOrder = PagingOrder.NORMAL;
620 return query;
621 }
622
623 Query prev(String reference) {
624 Query query = page(reference);
625 query.pagingOrder = PagingOrder.REVERSE;
626 return query;
627 }
628
629 public String getReference() {
630 return reference;
631 }
632
633 public PagingOrder getPagingOrder() {
634 return this.pagingOrder;
635 }
636
637 public String getQueryId() {
638 return queryId;
639 }
640
641 public Jid getWith() {
642 return conversation == null ? null : conversation.getJid().asBareJid();
643 }
644
645 public boolean muc() {
646 return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
647 }
648
649 public long getStart() {
650 return start;
651 }
652
653 public boolean isCatchup() {
654 return catchup;
655 }
656
657 public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
658 this.callback = callback;
659 }
660
661 public void callback(boolean done) {
662 if (this.callback != null) {
663 this.callback.onMoreMessagesLoaded(actualCount, conversation);
664 if (done) {
665 this.callback.informUser(R.string.no_more_history_on_server);
666 }
667 }
668 }
669
670 public long getEnd() {
671 return end;
672 }
673
674 public Conversation getConversation() {
675 return conversation;
676 }
677
678 public Account getAccount() {
679 return this.account;
680 }
681
682 public void incrementMessageCount() {
683 this.totalCount++;
684 }
685
686 public void incrementActualMessageCount() {
687 this.actualInThisQuery++;
688 this.actualCount++;
689 }
690
691 int getTotalCount() {
692 return this.totalCount;
693 }
694
695 int getActualMessageCount() {
696 return this.actualCount;
697 }
698
699 public int getActualInThisQuery() {
700 return this.actualInThisQuery;
701 }
702
703 public boolean validFrom(Jid from) {
704 if (muc()) {
705 return getWith().equals(from);
706 } else {
707 return (from == null) || account.getJid().asBareJid().equals(from.asBareJid());
708 }
709 }
710
711 @NonNull
712 @Override
713 public String toString() {
714 StringBuilder builder = new StringBuilder();
715 if (this.muc()) {
716 builder.append("to=");
717 builder.append(this.getWith().toString());
718 } else {
719 builder.append("with=");
720 if (this.getWith() == null) {
721 builder.append("*");
722 } else {
723 builder.append(getWith().toString());
724 }
725 }
726 if (this.start != 0) {
727 builder.append(", start=");
728 builder.append(AbstractGenerator.getTimestamp(this.start));
729 }
730 if (this.end != 0) {
731 builder.append(", end=");
732 builder.append(AbstractGenerator.getTimestamp(this.end));
733 }
734 builder.append(", order=").append(pagingOrder.toString());
735 if (this.reference != null) {
736 if (this.pagingOrder == PagingOrder.NORMAL) {
737 builder.append(", after=");
738 } else {
739 builder.append(", before=");
740 }
741 builder.append(this.reference);
742 }
743 builder.append(", catchup=").append(catchup);
744 builder.append(", ns=").append(version.namespace);
745 return builder.toString();
746 }
747
748 boolean hasCallback() {
749 return this.callback != null;
750 }
751 }
752}