1package eu.siacs.conversations.services;
2
3import static eu.siacs.conversations.utils.Random.SECURE_RANDOM;
4
5import android.util.Log;
6
7import androidx.annotation.NonNull;
8
9import java.math.BigInteger;
10import java.util.ArrayList;
11import java.util.HashSet;
12import java.util.Iterator;
13import java.util.List;
14
15import eu.siacs.conversations.Config;
16import eu.siacs.conversations.R;
17import eu.siacs.conversations.entities.Account;
18import eu.siacs.conversations.entities.Conversation;
19import eu.siacs.conversations.entities.Conversational;
20import eu.siacs.conversations.entities.ReceiptRequest;
21import eu.siacs.conversations.generator.AbstractGenerator;
22import eu.siacs.conversations.xml.Element;
23import eu.siacs.conversations.xmpp.Jid;
24import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
25import eu.siacs.conversations.xmpp.mam.MamReference;
26import im.conversations.android.xmpp.model.stanza.Iq;
27import im.conversations.android.xmpp.model.stanza.Message;
28
29public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
30
31 private final XmppConnectionService mXmppConnectionService;
32
33 private final HashSet<Query> queries = new HashSet<>();
34 private final ArrayList<Query> pendingQueries = new ArrayList<>();
35
36 public enum Version {
37 MAM_0("urn:xmpp:mam:0", true),
38 MAM_1("urn:xmpp:mam:1", false),
39 MAM_2("urn:xmpp:mam:2", false);
40
41 public final boolean legacy;
42 public final String namespace;
43
44 Version(String namespace, boolean legacy) {
45 this.namespace = namespace;
46 this.legacy = legacy;
47 }
48
49 public static Version get(Account account) {
50 return get(account, null);
51 }
52
53 public static Version get(Account account, Conversation conversation) {
54 if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
55 return get(account.getXmppConnection().getFeatures().getAccountFeatures());
56 } else {
57 return get(conversation.getMucOptions().getFeatures());
58 }
59 }
60
61 private static Version get(List<String> features) {
62 final Version[] values = values();
63 for (int i = values.length - 1; i >= 0; --i) {
64 for (String feature : features) {
65 if (values[i].namespace.equals(feature)) {
66 return values[i];
67 }
68 }
69 }
70 return MAM_0;
71 }
72
73 public static boolean has(List<String> features) {
74 for (String feature : features) {
75 for (Version version : values()) {
76 if (version.namespace.equals(feature)) {
77 return true;
78 }
79 }
80 }
81 return false;
82 }
83
84 public static Element findResult(Message packet) {
85 for (Version version : values()) {
86 Element result = packet.findChild("result", version.namespace);
87 if (result != null) {
88 return result;
89 }
90 }
91 return null;
92 }
93
94 }
95
96 MessageArchiveService(final XmppConnectionService service) {
97 this.mXmppConnectionService = service;
98 }
99
100 private void catchup(final Account account) {
101 synchronized (this.queries) {
102 for (Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext(); ) {
103 Query query = iterator.next();
104 if (query.getAccount() == account) {
105 iterator.remove();
106 }
107 }
108 }
109 MamReference mamReference = MamReference.max(
110 mXmppConnectionService.databaseBackend.getLastMessageReceived(account),
111 mXmppConnectionService.databaseBackend.getLastClearDate(account)
112 );
113 mamReference = MamReference.max(mamReference, mXmppConnectionService.getAutomaticMessageDeletionDate());
114 long endCatchup = account.getXmppConnection().getLastSessionEstablished();
115 final Query query;
116 if (mamReference.getTimestamp() == 0) {
117 return;
118 } else if (endCatchup - mamReference.getTimestamp() >= Config.MAM_MAX_CATCHUP) {
119 long startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
120 List<Conversation> conversations = mXmppConnectionService.getConversations();
121 for (Conversation conversation : conversations) {
122 if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted().getTimestamp()) {
123 this.query(conversation, startCatchup, true);
124 }
125 }
126 query = new Query(account, new MamReference(startCatchup), 0);
127 } else {
128 query = new Query(account, mamReference, 0);
129 }
130 synchronized (this.queries) {
131 this.queries.add(query);
132 }
133 this.execute(query);
134 }
135
136 void catchupMUC(final Conversation conversation) {
137 if (conversation.getLastMessageTransmitted().getTimestamp() < 0 && conversation.countMessages() == 0) {
138 query(conversation,
139 new MamReference(0),
140 0,
141 true);
142 } else {
143 query(conversation,
144 conversation.getLastMessageTransmitted(),
145 0,
146 true);
147 }
148 }
149
150 public Query query(final Conversation conversation) {
151 if (conversation.getLastMessageTransmitted().getTimestamp() < 0 && conversation.countMessages() == 0) {
152 return query(conversation,
153 new MamReference(0),
154 System.currentTimeMillis(),
155 false);
156 } else {
157 return query(conversation,
158 conversation.getLastMessageTransmitted(),
159 conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
160 false);
161 }
162 }
163
164 public boolean isCatchingUp(Conversation conversation) {
165 final Account account = conversation.getAccount();
166 if (account.getXmppConnection().isWaitingForSmCatchup()) {
167 return true;
168 } else {
169 synchronized (this.queries) {
170 for (Query query : this.queries) {
171 if (query.getAccount() == account && query.isCatchup() && ((conversation.getMode() == Conversation.MODE_SINGLE && query.getWith() == null) || query.getConversation() == conversation)) {
172 return true;
173 }
174 }
175 }
176 return false;
177 }
178 }
179
180 public Query query(final Conversation conversation, long end, boolean allowCatchup) {
181 return this.query(conversation, conversation.getLastMessageTransmitted(), end, allowCatchup);
182 }
183
184 public Query query(Conversation conversation, MamReference start, long end, boolean allowCatchup) {
185 synchronized (this.queries) {
186 final Query query;
187 final MamReference startActual = MamReference.max(start, mXmppConnectionService.getAutomaticMessageDeletionDate());
188 if (start.getTimestamp() == 0) {
189 query = new Query(conversation, startActual, end, false);
190 query.reference = conversation.getFirstMamReference();
191 } else {
192 if (allowCatchup) {
193 MamReference maxCatchup = MamReference.max(startActual, System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
194 if (maxCatchup.greaterThan(startActual)) {
195 Query reverseCatchup = new Query(conversation, startActual, maxCatchup.getTimestamp(), false);
196 this.queries.add(reverseCatchup);
197 this.execute(reverseCatchup);
198 }
199 query = new Query(conversation, maxCatchup, end, true);
200 } else {
201 query = new Query(conversation, startActual, end, false);
202 }
203 }
204 if (end != 0 && start.greaterThan(end)) {
205 return null;
206 }
207 this.queries.add(query);
208 this.execute(query);
209 return query;
210 }
211 }
212
213 void executePendingQueries(final Account account) {
214 final List<Query> pending = new ArrayList<>();
215 synchronized (this.pendingQueries) {
216 for (Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
217 Query query = iterator.next();
218 if (query.getAccount() == account) {
219 pending.add(query);
220 iterator.remove();
221 }
222 }
223 }
224 for (Query query : pending) {
225 this.execute(query);
226 }
227 }
228
229 private void execute(final Query query) {
230 final Account account = query.getAccount();
231 if (account.getStatus() == Account.State.ONLINE) {
232 final Conversation conversation = query.getConversation();
233 if (conversation != null && conversation.getStatus() == Conversation.STATUS_ARCHIVED) {
234 throw new IllegalStateException("Attempted to run MAM query for archived conversation");
235 }
236 Log.d(Config.LOGTAG, account.getJid().asBareJid().toString() + ": running mam query " + query.toString());
237 final Iq packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
238 this.mXmppConnectionService.sendIqPacket(account, packet, (p) -> {
239 final Element fin = p.findChild("fin", query.version.namespace);
240 if (p.getType() == Iq.Type.TIMEOUT) {
241 synchronized (this.queries) {
242 this.queries.remove(query);
243 if (query.hasCallback()) {
244 query.callback(false);
245 }
246 }
247 } else if (p.getType() == Iq.Type.RESULT && fin != null) {
248 final boolean running;
249 synchronized (this.queries) {
250 running = this.queries.contains(query);
251 }
252 if (running) {
253 processFin(query, fin);
254 } else {
255 Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": ignoring MAM iq result because query had been killed");
256 }
257 } else if (p.getType() == Iq.Type.RESULT && query.isLegacy()) {
258 //do nothing
259 } else {
260 Log.d(Config.LOGTAG, account.getJid().asBareJid().toString() + ": error executing mam: " + p.toString());
261 try {
262 finalizeQuery(query, true);
263 } catch (final IllegalStateException e) {
264 //ignored
265 }
266 }
267 });
268 } else {
269 synchronized (this.pendingQueries) {
270 this.pendingQueries.add(query);
271 }
272 }
273 }
274
275 private void finalizeQuery(final Query query, boolean done) {
276 synchronized (this.queries) {
277 if (!this.queries.remove(query)) {
278 throw new IllegalStateException("Unable to remove query from queries");
279 }
280 }
281 final Conversation conversation = query.getConversation();
282 if (conversation != null) {
283 conversation.sort();
284 conversation.setHasMessagesLeftOnServer(!done);
285 final var displayState = conversation.getDisplayState();
286 if (displayState != null) {
287 mXmppConnectionService.markReadUpToStanzaId(conversation, displayState);
288 }
289 } else {
290 for (final Conversation tmp : this.mXmppConnectionService.getConversations()) {
291 if (tmp.getAccount() == query.getAccount()) {
292 tmp.sort();
293 final var displayState = tmp.getDisplayState();
294 if (displayState != null) {
295 mXmppConnectionService.markReadUpToStanzaId(tmp, displayState);
296 }
297 }
298 }
299 }
300 if (query.hasCallback()) {
301 query.callback(done);
302 } else {
303 this.mXmppConnectionService.updateConversationUi();
304 }
305 }
306
307 public boolean inCatchup(Account account) {
308 synchronized (this.queries) {
309 for (Query query : queries) {
310 if (query.account == account && query.isCatchup() && query.getWith() == null) {
311 return true;
312 }
313 }
314 }
315 return false;
316 }
317
318 public boolean isCatchupInProgress(Conversation conversation) {
319 synchronized (this.queries) {
320 for (Query query : queries) {
321 if (query.account == conversation.getAccount() && query.isCatchup()) {
322 final Jid with = query.getWith() == null ? null : query.getWith().asBareJid();
323 if ((conversation.getMode() == Conversational.MODE_SINGLE && with == null) || (conversation.getJid().asBareJid().equals(with))) {
324 return true;
325 }
326 }
327 }
328 }
329 return false;
330 }
331
332 boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
333 synchronized (this.queries) {
334 for (Query query : queries) {
335 if (query.conversation == conversation) {
336 if (!query.hasCallback() && callback != null) {
337 query.setCallback(callback);
338 }
339 return true;
340 }
341 }
342 return false;
343 }
344 }
345
346 public boolean queryInProgress(Conversation conversation) {
347 return queryInProgress(conversation, null);
348 }
349
350 public void processFinLegacy(Element fin, Jid from) {
351 Query query = findQuery(fin.getAttribute("queryid"));
352 if (query != null && query.validFrom(from)) {
353 processFin(query, fin);
354 }
355 }
356
357 private void processFin(Query query, Element fin) {
358 boolean complete = fin.getAttributeAsBoolean("complete");
359 Element set = fin.findChild("set", "http://jabber.org/protocol/rsm");
360 Element last = set == null ? null : set.findChild("last");
361 String count = set == null ? null : set.findChildContent("count");
362 Element first = set == null ? null : set.findChild("first");
363 Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
364 boolean abort = (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
365 if (query.getConversation() != null) {
366 query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
367 }
368 if (complete || relevant == null || abort) {
369 //TODO: FIX done logic to look at complete. using count is probably unreliable because it can be ommited and doesn’t work with paging.
370 boolean done;
371 if (query.isCatchup()) {
372 done = false;
373 } else {
374 if (count != null) {
375 try {
376 done = Integer.parseInt(count) <= query.getTotalCount();
377 } catch (NumberFormatException e) {
378 done = false;
379 }
380 } else {
381 done = query.getTotalCount() == 0;
382 }
383 }
384 done = done || (query.getActualMessageCount() == 0 && !query.isCatchup());
385 this.finalizeQuery(query, done);
386
387 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": finished mam after " + query.getTotalCount() + "(" + query.getActualMessageCount() + ") messages. messages left=" + !done + " count=" + count);
388 if (query.isCatchup() && query.getActualMessageCount() > 0) {
389 mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
390 }
391 processPostponed(query);
392 } else {
393 final Query nextQuery;
394 if (query.getPagingOrder() == PagingOrder.NORMAL) {
395 nextQuery = query.next(last == null ? null : last.getContent());
396 } else {
397 nextQuery = query.prev(first == null ? null : first.getContent());
398 }
399 this.execute(nextQuery);
400 this.finalizeQuery(query, false);
401 synchronized (this.queries) {
402 this.queries.add(nextQuery);
403 }
404 }
405 }
406
407 void kill(final Conversation conversation) {
408 final ArrayList<Query> toBeKilled = new ArrayList<>();
409 synchronized (this.pendingQueries) {
410 for (final Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
411 final Query query = iterator.next();
412 if (query.getConversation() == conversation) {
413 iterator.remove();
414 Log.d(Config.LOGTAG, conversation.getAccount().getJid().asBareJid() + ": killed pending MAM query for archived conversation");
415 }
416 }
417 }
418 synchronized (this.queries) {
419 for (final Query q : queries) {
420 if (q.conversation == conversation) {
421 toBeKilled.add(q);
422 }
423 }
424 }
425 for (final Query q : toBeKilled) {
426 kill(q);
427 }
428 }
429
430 private void kill(Query query) {
431 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": killing mam query prematurely");
432 query.callback = null;
433 this.finalizeQuery(query, false);
434 if (query.isCatchup() && query.getActualMessageCount() > 0) {
435 mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
436 }
437 this.processPostponed(query);
438 }
439
440 private void processPostponed(Query query) {
441 query.account.getAxolotlService().processPostponed();
442 query.pendingReceiptRequests.removeAll(query.receiptRequests);
443 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": found " + query.pendingReceiptRequests.size() + " pending receipt requests");
444 Iterator<ReceiptRequest> iterator = query.pendingReceiptRequests.iterator();
445 while (iterator.hasNext()) {
446 ReceiptRequest rr = iterator.next();
447 mXmppConnectionService.sendMessagePacket(query.account, mXmppConnectionService.getMessageGenerator().received(query.account, rr.getJid(), rr.getId()));
448 iterator.remove();
449 }
450 }
451
452 public Query findQuery(String id) {
453 if (id == null) {
454 return null;
455 }
456 synchronized (this.queries) {
457 for (Query query : this.queries) {
458 if (query.getQueryId().equals(id)) {
459 return query;
460 }
461 }
462 return null;
463 }
464 }
465
466 @Override
467 public void onAdvancedStreamFeaturesAvailable(Account account) {
468 if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
469 this.catchup(account);
470 }
471 }
472
473 public enum PagingOrder {
474 NORMAL,
475 REVERSE
476 }
477
478 public class Query {
479 private HashSet<ReceiptRequest> pendingReceiptRequests = new HashSet<>();
480 private HashSet<ReceiptRequest> receiptRequests = new HashSet<>();
481 private int totalCount = 0;
482 private int actualCount = 0;
483 private int actualInThisQuery = 0;
484 private long start;
485 private final long end;
486 private final String queryId;
487 private String reference = null;
488 private final Account account;
489 private Conversation conversation;
490 private PagingOrder pagingOrder = PagingOrder.NORMAL;
491 private XmppConnectionService.OnMoreMessagesLoaded callback = null;
492 private boolean catchup = true;
493 public final Version version;
494
495
496 Query(Conversation conversation, MamReference start, long end, boolean catchup) {
497 this(conversation.getAccount(), Version.get(conversation.getAccount(), conversation), catchup ? start : start.timeOnly(), end);
498 this.conversation = conversation;
499 this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
500 this.catchup = catchup;
501 }
502
503 Query(Account account, MamReference start, long end) {
504 this(account, Version.get(account), start, end);
505 }
506
507 Query(Account account, Version version, MamReference start, long end) {
508 this.account = account;
509 if (start.getReference() != null) {
510 this.reference = start.getReference();
511 } else {
512 this.start = start.getTimestamp();
513 }
514 this.end = end;
515 this.queryId = new BigInteger(50, SECURE_RANDOM).toString(32);
516 this.version = version;
517 }
518
519 private Query page(String reference) {
520 Query query = new Query(this.account, this.version, new MamReference(this.start, reference), this.end);
521 query.conversation = conversation;
522 query.totalCount = totalCount;
523 query.actualCount = actualCount;
524 query.pendingReceiptRequests = pendingReceiptRequests;
525 query.receiptRequests = receiptRequests;
526 query.callback = callback;
527 query.catchup = catchup;
528 return query;
529 }
530
531 public void removePendingReceiptRequest(ReceiptRequest receiptRequest) {
532 if (!this.pendingReceiptRequests.remove(receiptRequest)) {
533 this.receiptRequests.add(receiptRequest);
534 }
535 }
536
537 public void addPendingReceiptRequest(ReceiptRequest receiptRequest) {
538 this.pendingReceiptRequests.add(receiptRequest);
539 }
540
541 public boolean isLegacy() {
542 return version.legacy;
543 }
544
545 public boolean safeToExtractTrueCounterpart() {
546 return muc() && !isLegacy();
547 }
548
549 public Query next(String reference) {
550 Query query = page(reference);
551 query.pagingOrder = PagingOrder.NORMAL;
552 return query;
553 }
554
555 Query prev(String reference) {
556 Query query = page(reference);
557 query.pagingOrder = PagingOrder.REVERSE;
558 return query;
559 }
560
561 public String getReference() {
562 return reference;
563 }
564
565 public PagingOrder getPagingOrder() {
566 return this.pagingOrder;
567 }
568
569 public String getQueryId() {
570 return queryId;
571 }
572
573 public Jid getWith() {
574 return conversation == null ? null : conversation.getJid().asBareJid();
575 }
576
577 public boolean muc() {
578 return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
579 }
580
581 public long getStart() {
582 return start;
583 }
584
585 public boolean isCatchup() {
586 return catchup;
587 }
588
589 public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
590 this.callback = callback;
591 }
592
593 public void callback(boolean done) {
594 if (this.callback != null) {
595 this.callback.onMoreMessagesLoaded(actualCount, conversation);
596 if (done) {
597 this.callback.informUser(R.string.no_more_history_on_server);
598 }
599 }
600 }
601
602 public long getEnd() {
603 return end;
604 }
605
606 public Conversation getConversation() {
607 return conversation;
608 }
609
610 public Account getAccount() {
611 return this.account;
612 }
613
614 public void incrementMessageCount() {
615 this.totalCount++;
616 }
617
618 public void incrementActualMessageCount() {
619 this.actualInThisQuery++;
620 this.actualCount++;
621 }
622
623 int getTotalCount() {
624 return this.totalCount;
625 }
626
627 int getActualMessageCount() {
628 return this.actualCount;
629 }
630
631 public int getActualInThisQuery() {
632 return this.actualInThisQuery;
633 }
634
635 public boolean validFrom(Jid from) {
636 if (muc()) {
637 return getWith().equals(from);
638 } else {
639 return (from == null) || account.getJid().asBareJid().equals(from.asBareJid());
640 }
641 }
642
643 @NonNull
644 @Override
645 public String toString() {
646 StringBuilder builder = new StringBuilder();
647 if (this.muc()) {
648 builder.append("to=");
649 builder.append(this.getWith().toString());
650 } else {
651 builder.append("with=");
652 if (this.getWith() == null) {
653 builder.append("*");
654 } else {
655 builder.append(getWith().toString());
656 }
657 }
658 if (this.start != 0) {
659 builder.append(", start=");
660 builder.append(AbstractGenerator.getTimestamp(this.start));
661 }
662 if (this.end != 0) {
663 builder.append(", end=");
664 builder.append(AbstractGenerator.getTimestamp(this.end));
665 }
666 builder.append(", order=").append(pagingOrder.toString());
667 if (this.reference != null) {
668 if (this.pagingOrder == PagingOrder.NORMAL) {
669 builder.append(", after=");
670 } else {
671 builder.append(", before=");
672 }
673 builder.append(this.reference);
674 }
675 builder.append(", catchup=").append(catchup);
676 builder.append(", ns=").append(version.namespace);
677 return builder.toString();
678 }
679
680 boolean hasCallback() {
681 return this.callback != null;
682 }
683 }
684}