1package eu.siacs.conversations.services;
2
3import static eu.siacs.conversations.utils.Random.SECURE_RANDOM;
4
5import android.util.Log;
6
7import androidx.annotation.NonNull;
8
9import java.math.BigInteger;
10import java.util.ArrayList;
11import java.util.HashSet;
12import java.util.Iterator;
13import java.util.List;
14
15import eu.siacs.conversations.Config;
16import eu.siacs.conversations.R;
17import eu.siacs.conversations.entities.Account;
18import eu.siacs.conversations.entities.Conversation;
19import eu.siacs.conversations.entities.Conversational;
20import eu.siacs.conversations.entities.ReceiptRequest;
21import eu.siacs.conversations.generator.AbstractGenerator;
22import eu.siacs.conversations.xml.Element;
23import eu.siacs.conversations.xmpp.Jid;
24import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
25import eu.siacs.conversations.xmpp.mam.MamReference;
26import im.conversations.android.xmpp.model.stanza.Iq;
27import im.conversations.android.xmpp.model.stanza.Message;
28
29public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
30
31 private final XmppConnectionService mXmppConnectionService;
32
33 private final HashSet<Query> queries = new HashSet<>();
34 private final ArrayList<Query> pendingQueries = new ArrayList<>();
35
36 public enum Version {
37 MAM_0("urn:xmpp:mam:0", true),
38 MAM_1("urn:xmpp:mam:1", false),
39 MAM_2("urn:xmpp:mam:2", false);
40
41 public final boolean legacy;
42 public final String namespace;
43
44 Version(String namespace, boolean legacy) {
45 this.namespace = namespace;
46 this.legacy = legacy;
47 }
48
49 public static Version get(Account account) {
50 return get(account, null);
51 }
52
53 public static Version get(Account account, Conversation conversation) {
54 if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
55 return get(account.getXmppConnection().getFeatures().getAccountFeatures());
56 } else {
57 return get(conversation.getMucOptions().getFeatures());
58 }
59 }
60
61 private static Version get(List<String> features) {
62 final Version[] values = values();
63 for (int i = values.length - 1; i >= 0; --i) {
64 for (String feature : features) {
65 if (values[i].namespace.equals(feature)) {
66 return values[i];
67 }
68 }
69 }
70 return MAM_0;
71 }
72
73 public static boolean has(List<String> features) {
74 for (String feature : features) {
75 for (Version version : values()) {
76 if (version.namespace.equals(feature)) {
77 return true;
78 }
79 }
80 }
81 return false;
82 }
83
84 public static Element findResult(Message packet) {
85 for (Version version : values()) {
86 Element result = packet.findChild("result", version.namespace);
87 if (result != null) {
88 return result;
89 }
90 }
91 return null;
92 }
93
94 }
95
96 MessageArchiveService(final XmppConnectionService service) {
97 this.mXmppConnectionService = service;
98 }
99
100 private void catchup(final Account account) {
101 synchronized (this.queries) {
102 for (Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext(); ) {
103 Query query = iterator.next();
104 if (query.getAccount() == account) {
105 iterator.remove();
106 }
107 }
108 }
109 MamReference mamReference = MamReference.max(
110 mXmppConnectionService.databaseBackend.getLastMessageReceived(account),
111 mXmppConnectionService.databaseBackend.getLastClearDate(account)
112 );
113 long endCatchup = account.getXmppConnection().getLastSessionEstablished();
114 final Query query;
115 if (mamReference.getTimestamp() == 0) {
116 return;
117 } else if (endCatchup - mamReference.getTimestamp() >= Config.MAM_MAX_CATCHUP) {
118 long startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
119 List<Conversation> conversations = mXmppConnectionService.getConversations();
120 for (Conversation conversation : conversations) {
121 if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted().getTimestamp()) {
122 this.query(conversation, startCatchup, true);
123 }
124 }
125 query = new Query(account, new MamReference(startCatchup), 0);
126 } else {
127 query = new Query(account, mamReference, 0);
128 }
129 synchronized (this.queries) {
130 this.queries.add(query);
131 }
132 this.execute(query);
133 }
134
135 void catchupMUC(final Conversation conversation) {
136 if (conversation.getLastMessageTransmitted().getTimestamp() < 0 && conversation.countMessages() == 0) {
137 query(conversation,
138 new MamReference(0),
139 0,
140 true);
141 } else {
142 query(conversation,
143 conversation.getLastMessageTransmitted(),
144 0,
145 true);
146 }
147 }
148
149 public Query query(final Conversation conversation) {
150 if (conversation.getLastMessageTransmitted().getTimestamp() < 0 && conversation.countMessages() == 0) {
151 return query(conversation,
152 new MamReference(0),
153 System.currentTimeMillis(),
154 false);
155 } else {
156 return query(conversation,
157 conversation.getLastMessageTransmitted(),
158 conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
159 false);
160 }
161 }
162
163 public boolean isCatchingUp(Conversation conversation) {
164 final Account account = conversation.getAccount();
165 if (account.getXmppConnection().isWaitingForSmCatchup()) {
166 return true;
167 } else {
168 synchronized (this.queries) {
169 for (Query query : this.queries) {
170 if (query.getAccount() == account && query.isCatchup() && ((conversation.getMode() == Conversation.MODE_SINGLE && query.getWith() == null) || query.getConversation() == conversation)) {
171 return true;
172 }
173 }
174 }
175 return false;
176 }
177 }
178
179 public Query query(final Conversation conversation, long end, boolean allowCatchup) {
180 return this.query(conversation, conversation.getLastMessageTransmitted(), end, allowCatchup);
181 }
182
183 public Query query(Conversation conversation, MamReference start, long end, boolean allowCatchup) {
184 synchronized (this.queries) {
185 final Query query;
186 final MamReference startActual = start;
187 if (start.getTimestamp() == 0) {
188 query = new Query(conversation, startActual, end, false);
189 query.reference = conversation.getFirstMamReference();
190 } else {
191 if (allowCatchup) {
192 MamReference maxCatchup = MamReference.max(startActual, System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
193 if (maxCatchup.greaterThan(startActual)) {
194 Query reverseCatchup = new Query(conversation, startActual, maxCatchup.getTimestamp(), false);
195 this.queries.add(reverseCatchup);
196 this.execute(reverseCatchup);
197 }
198 query = new Query(conversation, maxCatchup, end, true);
199 } else {
200 query = new Query(conversation, startActual, end, false);
201 }
202 }
203 if (end != 0 && start.greaterThan(end)) {
204 return null;
205 }
206 this.queries.add(query);
207 this.execute(query);
208 return query;
209 }
210 }
211
212 void executePendingQueries(final Account account) {
213 final List<Query> pending = new ArrayList<>();
214 synchronized (this.pendingQueries) {
215 for (Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
216 Query query = iterator.next();
217 if (query.getAccount() == account) {
218 pending.add(query);
219 iterator.remove();
220 }
221 }
222 }
223 for (Query query : pending) {
224 this.execute(query);
225 }
226 }
227
228 private void execute(final Query query) {
229 final Account account = query.getAccount();
230 if (account.getStatus() == Account.State.ONLINE) {
231 final Conversation conversation = query.getConversation();
232 if (conversation != null && conversation.getStatus() == Conversation.STATUS_ARCHIVED) {
233 throw new IllegalStateException("Attempted to run MAM query for archived conversation");
234 }
235 Log.d(Config.LOGTAG, account.getJid().asBareJid().toString() + ": running mam query " + query.toString());
236 final Iq packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
237 this.mXmppConnectionService.sendIqPacket(account, packet, (p) -> {
238 final Element fin = p.findChild("fin", query.version.namespace);
239 if (p.getType() == Iq.Type.TIMEOUT) {
240 synchronized (this.queries) {
241 this.queries.remove(query);
242 if (query.hasCallback()) {
243 query.callback(false);
244 }
245 }
246 } else if (p.getType() == Iq.Type.RESULT && fin != null) {
247 final boolean running;
248 synchronized (this.queries) {
249 running = this.queries.contains(query);
250 }
251 if (running) {
252 processFin(query, fin);
253 } else {
254 Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": ignoring MAM iq result because query had been killed");
255 }
256 } else if (p.getType() == Iq.Type.RESULT && query.isLegacy()) {
257 //do nothing
258 } else {
259 Log.d(Config.LOGTAG, account.getJid().asBareJid().toString() + ": error executing mam: " + p.toString());
260 try {
261 finalizeQuery(query, true);
262 } catch (final IllegalStateException e) {
263 //ignored
264 }
265 }
266 });
267 } else {
268 synchronized (this.pendingQueries) {
269 this.pendingQueries.add(query);
270 }
271 }
272 }
273
274 private void finalizeQuery(final Query query, boolean done) {
275 synchronized (this.queries) {
276 if (!this.queries.remove(query)) {
277 throw new IllegalStateException("Unable to remove query from queries");
278 }
279 }
280 final Conversation conversation = query.getConversation();
281 if (conversation != null) {
282 conversation.sort();
283 conversation.setHasMessagesLeftOnServer(!done);
284 final var displayState = conversation.getDisplayState();
285 if (displayState != null) {
286 mXmppConnectionService.markReadUpToStanzaId(conversation, displayState);
287 }
288 } else {
289 for (final Conversation tmp : this.mXmppConnectionService.getConversations()) {
290 if (tmp.getAccount() == query.getAccount()) {
291 tmp.sort();
292 final var displayState = tmp.getDisplayState();
293 if (displayState != null) {
294 mXmppConnectionService.markReadUpToStanzaId(tmp, displayState);
295 }
296 }
297 }
298 }
299 if (query.hasCallback()) {
300 query.callback(done);
301 } else {
302 this.mXmppConnectionService.updateConversationUi();
303 }
304 }
305
306 public boolean inCatchup(Account account) {
307 synchronized (this.queries) {
308 for (Query query : queries) {
309 if (query.account == account && query.isCatchup() && query.getWith() == null) {
310 return true;
311 }
312 }
313 }
314 return false;
315 }
316
317 public boolean isCatchupInProgress(Conversation conversation) {
318 synchronized (this.queries) {
319 for (Query query : queries) {
320 if (query.account == conversation.getAccount() && query.isCatchup()) {
321 final Jid with = query.getWith() == null ? null : query.getWith().asBareJid();
322 if ((conversation.getMode() == Conversational.MODE_SINGLE && with == null) || (conversation.getJid().asBareJid().equals(with))) {
323 return true;
324 }
325 }
326 }
327 }
328 return false;
329 }
330
331 boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
332 synchronized (this.queries) {
333 for (Query query : queries) {
334 if (query.conversation == conversation) {
335 if (!query.hasCallback() && callback != null) {
336 query.setCallback(callback);
337 }
338 return true;
339 }
340 }
341 return false;
342 }
343 }
344
345 public boolean queryInProgress(Conversation conversation) {
346 return queryInProgress(conversation, null);
347 }
348
349 public void processFinLegacy(Element fin, Jid from) {
350 Query query = findQuery(fin.getAttribute("queryid"));
351 if (query != null && query.validFrom(from)) {
352 processFin(query, fin);
353 }
354 }
355
356 private void processFin(Query query, Element fin) {
357 boolean complete = fin.getAttributeAsBoolean("complete");
358 Element set = fin.findChild("set", "http://jabber.org/protocol/rsm");
359 Element last = set == null ? null : set.findChild("last");
360 String count = set == null ? null : set.findChildContent("count");
361 Element first = set == null ? null : set.findChild("first");
362 Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
363 boolean abort = (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
364 if (query.getConversation() != null) {
365 query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
366 }
367 if (complete || relevant == null || abort) {
368 //TODO: FIX done logic to look at complete. using count is probably unreliable because it can be ommited and doesn’t work with paging.
369 boolean done;
370 if (query.isCatchup()) {
371 done = false;
372 } else {
373 if (count != null) {
374 try {
375 done = Integer.parseInt(count) <= query.getTotalCount();
376 } catch (NumberFormatException e) {
377 done = false;
378 }
379 } else {
380 done = query.getTotalCount() == 0;
381 }
382 }
383 done = done || (query.getActualMessageCount() == 0 && !query.isCatchup());
384 this.finalizeQuery(query, done);
385
386 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": finished mam after " + query.getTotalCount() + "(" + query.getActualMessageCount() + ") messages. messages left=" + !done + " count=" + count);
387 if (query.isCatchup() && query.getActualMessageCount() > 0) {
388 mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
389 }
390 if (query.isCatchup() && query.getPagingOrder() == PagingOrder.NORMAL && !complete && query.getConversation() != null) {
391 // Going forward we stopped without completing due to limits
392 // So we don't have the most recent messages yet
393 synchronized (this.queries) {
394 final var q = new Query(query.getConversation(), new MamReference(System.currentTimeMillis() - Config.MAM_MIN_CATCHUP), 0, true, PagingOrder.REVERSE);
395 this.queries.add(q);
396 this.execute(q);
397 }
398 }
399 processPostponed(query);
400 } else {
401 final Query nextQuery;
402 if (query.getPagingOrder() == PagingOrder.NORMAL) {
403 nextQuery = query.next(last == null ? null : last.getContent());
404 } else {
405 nextQuery = query.prev(first == null ? null : first.getContent());
406 }
407 this.execute(nextQuery);
408 this.finalizeQuery(query, false);
409 synchronized (this.queries) {
410 this.queries.add(nextQuery);
411 }
412 }
413 }
414
415 void kill(final Conversation conversation) {
416 final ArrayList<Query> toBeKilled = new ArrayList<>();
417 synchronized (this.pendingQueries) {
418 for (final Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
419 final Query query = iterator.next();
420 if (query.getConversation() == conversation) {
421 iterator.remove();
422 Log.d(Config.LOGTAG, conversation.getAccount().getJid().asBareJid() + ": killed pending MAM query for archived conversation");
423 }
424 }
425 }
426 synchronized (this.queries) {
427 for (final Query q : queries) {
428 if (q.conversation == conversation) {
429 toBeKilled.add(q);
430 }
431 }
432 }
433 for (final Query q : toBeKilled) {
434 kill(q);
435 }
436 }
437
438 private void kill(Query query) {
439 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": killing mam query prematurely");
440 query.callback = null;
441 this.finalizeQuery(query, false);
442 if (query.isCatchup() && query.getActualMessageCount() > 0) {
443 mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
444 }
445 this.processPostponed(query);
446 }
447
448 private void processPostponed(Query query) {
449 query.account.getAxolotlService().processPostponed();
450 query.pendingReceiptRequests.removeAll(query.receiptRequests);
451 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": found " + query.pendingReceiptRequests.size() + " pending receipt requests");
452 Iterator<ReceiptRequest> iterator = query.pendingReceiptRequests.iterator();
453 while (iterator.hasNext()) {
454 ReceiptRequest rr = iterator.next();
455 mXmppConnectionService.sendMessagePacket(query.account, mXmppConnectionService.getMessageGenerator().received(query.account, rr.getJid(), rr.getId()));
456 iterator.remove();
457 }
458 }
459
460 public Query findQuery(String id) {
461 if (id == null) {
462 return null;
463 }
464 synchronized (this.queries) {
465 for (Query query : this.queries) {
466 if (query.getQueryId().equals(id)) {
467 return query;
468 }
469 }
470 return null;
471 }
472 }
473
474 @Override
475 public void onAdvancedStreamFeaturesAvailable(Account account) {
476 if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
477 this.catchup(account);
478 }
479 }
480
481 public enum PagingOrder {
482 NORMAL,
483 REVERSE
484 }
485
486 public class Query {
487 private HashSet<ReceiptRequest> pendingReceiptRequests = new HashSet<>();
488 private HashSet<ReceiptRequest> receiptRequests = new HashSet<>();
489 private int totalCount = 0;
490 private int actualCount = 0;
491 private int actualInThisQuery = 0;
492 private long start;
493 private final long end;
494 private final String queryId;
495 private String reference = null;
496 private final Account account;
497 private Conversation conversation;
498 private PagingOrder pagingOrder = PagingOrder.NORMAL;
499 private XmppConnectionService.OnMoreMessagesLoaded callback = null;
500 private boolean catchup = true;
501 public final Version version;
502
503
504 Query(Conversation conversation, MamReference start, long end, boolean catchup, PagingOrder order) {
505 this(conversation, start, end, catchup);
506 this.pagingOrder = order;
507 }
508
509 Query(Conversation conversation, MamReference start, long end, boolean catchup) {
510 this(conversation.getAccount(), Version.get(conversation.getAccount(), conversation), catchup ? start : start.timeOnly(), end);
511 this.conversation = conversation;
512 this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
513 this.catchup = catchup;
514 }
515
516 Query(Account account, MamReference start, long end) {
517 this(account, Version.get(account), start, end);
518 }
519
520 Query(Account account, Version version, MamReference start, long end) {
521 this.account = account;
522 if (start.getReference() != null) {
523 this.reference = start.getReference();
524 } else {
525 this.start = start.getTimestamp();
526 }
527 this.end = end;
528 this.queryId = new BigInteger(50, SECURE_RANDOM).toString(32);
529 this.version = version;
530 }
531
532 private Query page(String reference) {
533 Query query = new Query(this.account, this.version, new MamReference(this.start, reference), this.end);
534 query.conversation = conversation;
535 query.totalCount = totalCount;
536 query.actualCount = actualCount;
537 query.pendingReceiptRequests = pendingReceiptRequests;
538 query.receiptRequests = receiptRequests;
539 query.callback = callback;
540 query.catchup = catchup;
541 return query;
542 }
543
544 public void removePendingReceiptRequest(ReceiptRequest receiptRequest) {
545 if (!this.pendingReceiptRequests.remove(receiptRequest)) {
546 this.receiptRequests.add(receiptRequest);
547 }
548 }
549
550 public void addPendingReceiptRequest(ReceiptRequest receiptRequest) {
551 this.pendingReceiptRequests.add(receiptRequest);
552 }
553
554 public boolean isLegacy() {
555 return version.legacy;
556 }
557
558 public boolean safeToExtractTrueCounterpart() {
559 return muc() && !isLegacy();
560 }
561
562 public Query next(String reference) {
563 Query query = page(reference);
564 query.pagingOrder = PagingOrder.NORMAL;
565 return query;
566 }
567
568 Query prev(String reference) {
569 Query query = page(reference);
570 query.pagingOrder = PagingOrder.REVERSE;
571 return query;
572 }
573
574 public String getReference() {
575 return reference;
576 }
577
578 public PagingOrder getPagingOrder() {
579 return this.pagingOrder;
580 }
581
582 public String getQueryId() {
583 return queryId;
584 }
585
586 public Jid getWith() {
587 return conversation == null ? null : conversation.getJid().asBareJid();
588 }
589
590 public boolean muc() {
591 return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
592 }
593
594 public long getStart() {
595 return start;
596 }
597
598 public boolean isCatchup() {
599 return catchup;
600 }
601
602 public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
603 this.callback = callback;
604 }
605
606 public void callback(boolean done) {
607 if (this.callback != null) {
608 this.callback.onMoreMessagesLoaded(actualCount, conversation);
609 if (done) {
610 this.callback.informUser(R.string.no_more_history_on_server);
611 }
612 }
613 }
614
615 public long getEnd() {
616 return end;
617 }
618
619 public Conversation getConversation() {
620 return conversation;
621 }
622
623 public Account getAccount() {
624 return this.account;
625 }
626
627 public void incrementMessageCount() {
628 this.totalCount++;
629 }
630
631 public void incrementActualMessageCount() {
632 this.actualInThisQuery++;
633 this.actualCount++;
634 }
635
636 int getTotalCount() {
637 return this.totalCount;
638 }
639
640 int getActualMessageCount() {
641 return this.actualCount;
642 }
643
644 public int getActualInThisQuery() {
645 return this.actualInThisQuery;
646 }
647
648 public boolean validFrom(Jid from) {
649 if (muc()) {
650 return getWith().equals(from);
651 } else {
652 return (from == null) || account.getJid().asBareJid().equals(from.asBareJid());
653 }
654 }
655
656 @NonNull
657 @Override
658 public String toString() {
659 StringBuilder builder = new StringBuilder();
660 if (this.muc()) {
661 builder.append("to=");
662 builder.append(this.getWith().toString());
663 } else {
664 builder.append("with=");
665 if (this.getWith() == null) {
666 builder.append("*");
667 } else {
668 builder.append(getWith().toString());
669 }
670 }
671 if (this.start != 0) {
672 builder.append(", start=");
673 builder.append(AbstractGenerator.getTimestamp(this.start));
674 }
675 if (this.end != 0) {
676 builder.append(", end=");
677 builder.append(AbstractGenerator.getTimestamp(this.end));
678 }
679 builder.append(", order=").append(pagingOrder.toString());
680 if (this.reference != null) {
681 if (this.pagingOrder == PagingOrder.NORMAL) {
682 builder.append(", after=");
683 } else {
684 builder.append(", before=");
685 }
686 builder.append(this.reference);
687 }
688 builder.append(", catchup=").append(catchup);
689 builder.append(", ns=").append(version.namespace);
690 return builder.toString();
691 }
692
693 boolean hasCallback() {
694 return this.callback != null;
695 }
696 }
697}