1package eu.siacs.conversations.services;
2
3import static eu.siacs.conversations.utils.Random.SECURE_RANDOM;
4
5import android.util.Log;
6
7import org.jetbrains.annotations.NotNull;
8
9import java.math.BigInteger;
10import java.util.ArrayList;
11import java.util.HashSet;
12import java.util.Iterator;
13import java.util.List;
14
15import eu.siacs.conversations.Config;
16import eu.siacs.conversations.R;
17import eu.siacs.conversations.entities.Account;
18import eu.siacs.conversations.entities.Conversation;
19import eu.siacs.conversations.entities.Conversational;
20import eu.siacs.conversations.entities.ReceiptRequest;
21import eu.siacs.conversations.generator.AbstractGenerator;
22import eu.siacs.conversations.xml.Element;
23import eu.siacs.conversations.xmpp.Jid;
24import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
25import eu.siacs.conversations.xmpp.mam.MamReference;
26import eu.siacs.conversations.xmpp.stanzas.IqPacket;
27import eu.siacs.conversations.xmpp.stanzas.MessagePacket;
28
29public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
30
31 private final XmppConnectionService mXmppConnectionService;
32
33 private final HashSet<Query> queries = new HashSet<>();
34 private final ArrayList<Query> pendingQueries = new ArrayList<>();
35
36 public enum Version {
37 MAM_0("urn:xmpp:mam:0", true),
38 MAM_1("urn:xmpp:mam:1", false),
39 MAM_2("urn:xmpp:mam:2", false);
40
41 public final boolean legacy;
42 public final String namespace;
43
44 Version(String namespace, boolean legacy) {
45 this.namespace = namespace;
46 this.legacy = legacy;
47 }
48
49 public static Version get(Account account) {
50 return get(account, null);
51 }
52
53 public static Version get(Account account, Conversation conversation) {
54 if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
55 return get(account.getXmppConnection().getFeatures().getAccountFeatures());
56 } else {
57 return get(conversation.getMucOptions().getFeatures());
58 }
59 }
60
61 private static Version get(List<String> features) {
62 final Version[] values = values();
63 for (int i = values.length - 1; i >= 0; --i) {
64 for (String feature : features) {
65 if (values[i].namespace.equals(feature)) {
66 return values[i];
67 }
68 }
69 }
70 return MAM_0;
71 }
72
73 public static boolean has(List<String> features) {
74 for (String feature : features) {
75 for (Version version : values()) {
76 if (version.namespace.equals(feature)) {
77 return true;
78 }
79 }
80 }
81 return false;
82 }
83
84 public static Element findResult(MessagePacket packet) {
85 for (Version version : values()) {
86 Element result = packet.findChild("result", version.namespace);
87 if (result != null) {
88 return result;
89 }
90 }
91 return null;
92 }
93
94 }
95
96 MessageArchiveService(final XmppConnectionService service) {
97 this.mXmppConnectionService = service;
98 }
99
100 private void catchup(final Account account) {
101 synchronized (this.queries) {
102 for (Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext(); ) {
103 Query query = iterator.next();
104 if (query.getAccount() == account) {
105 iterator.remove();
106 }
107 }
108 }
109 MamReference mamReference = MamReference.max(
110 mXmppConnectionService.databaseBackend.getLastMessageReceived(account),
111 mXmppConnectionService.databaseBackend.getLastClearDate(account)
112 );
113 mamReference = MamReference.max(mamReference, mXmppConnectionService.getAutomaticMessageDeletionDate());
114 long endCatchup = account.getXmppConnection().getLastSessionEstablished();
115 final Query query;
116 if (mamReference.getTimestamp() == 0) {
117 return;
118 } else if (endCatchup - mamReference.getTimestamp() >= Config.MAM_MAX_CATCHUP) {
119 long startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
120 List<Conversation> conversations = mXmppConnectionService.getConversations();
121 for (Conversation conversation : conversations) {
122 if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted().getTimestamp()) {
123 this.query(conversation, startCatchup, true);
124 }
125 }
126 query = new Query(account, new MamReference(startCatchup), 0);
127 } else {
128 query = new Query(account, mamReference, 0);
129 }
130 synchronized (this.queries) {
131 this.queries.add(query);
132 }
133 this.execute(query);
134 }
135
136 void catchupMUC(final Conversation conversation) {
137 if (conversation.getLastMessageTransmitted().getTimestamp() < 0 && conversation.countMessages() == 0) {
138 query(conversation,
139 new MamReference(0),
140 0,
141 true);
142 } else {
143 query(conversation,
144 conversation.getLastMessageTransmitted(),
145 0,
146 true);
147 }
148 }
149
150 public Query query(final Conversation conversation) {
151 if (conversation.getLastMessageTransmitted().getTimestamp() < 0 && conversation.countMessages() == 0) {
152 return query(conversation,
153 new MamReference(0),
154 System.currentTimeMillis(),
155 false);
156 } else {
157 return query(conversation,
158 conversation.getLastMessageTransmitted(),
159 conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
160 false);
161 }
162 }
163
164 public boolean isCatchingUp(Conversation conversation) {
165 final Account account = conversation.getAccount();
166 if (account.getXmppConnection().isWaitingForSmCatchup()) {
167 return true;
168 } else {
169 synchronized (this.queries) {
170 for (Query query : this.queries) {
171 if (query.getAccount() == account && query.isCatchup() && ((conversation.getMode() == Conversation.MODE_SINGLE && query.getWith() == null) || query.getConversation() == conversation)) {
172 return true;
173 }
174 }
175 }
176 return false;
177 }
178 }
179
180 public Query query(final Conversation conversation, long end, boolean allowCatchup) {
181 return this.query(conversation, conversation.getLastMessageTransmitted(), end, allowCatchup);
182 }
183
184 public Query query(Conversation conversation, MamReference start, long end, boolean allowCatchup) {
185 synchronized (this.queries) {
186 final Query query;
187 final MamReference startActual = MamReference.max(start, mXmppConnectionService.getAutomaticMessageDeletionDate());
188 if (start.getTimestamp() == 0) {
189 query = new Query(conversation, startActual, end, false);
190 query.reference = conversation.getFirstMamReference();
191 } else {
192 if (allowCatchup) {
193 MamReference maxCatchup = MamReference.max(startActual, System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
194 if (maxCatchup.greaterThan(startActual)) {
195 Query reverseCatchup = new Query(conversation, startActual, maxCatchup.getTimestamp(), false);
196 this.queries.add(reverseCatchup);
197 this.execute(reverseCatchup);
198 }
199 query = new Query(conversation, maxCatchup, end, true);
200 } else {
201 query = new Query(conversation, startActual, end, false);
202 }
203 }
204 if (end != 0 && start.greaterThan(end)) {
205 return null;
206 }
207 this.queries.add(query);
208 this.execute(query);
209 return query;
210 }
211 }
212
213 void executePendingQueries(final Account account) {
214 final List<Query> pending = new ArrayList<>();
215 synchronized (this.pendingQueries) {
216 for (Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
217 Query query = iterator.next();
218 if (query.getAccount() == account) {
219 pending.add(query);
220 iterator.remove();
221 }
222 }
223 }
224 for (Query query : pending) {
225 this.execute(query);
226 }
227 }
228
229 private void execute(final Query query) {
230 final Account account = query.getAccount();
231 if (account.getStatus() == Account.State.ONLINE) {
232 final Conversation conversation = query.getConversation();
233 if (conversation != null && conversation.getStatus() == Conversation.STATUS_ARCHIVED) {
234 throw new IllegalStateException("Attempted to run MAM query for archived conversation");
235 }
236 Log.d(Config.LOGTAG, account.getJid().asBareJid().toString() + ": running mam query " + query.toString());
237 final IqPacket packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
238 this.mXmppConnectionService.sendIqPacket(account, packet, (a, p) -> {
239 final Element fin = p.findChild("fin", query.version.namespace);
240 if (p.getType() == IqPacket.TYPE.TIMEOUT) {
241 synchronized (this.queries) {
242 this.queries.remove(query);
243 if (query.hasCallback()) {
244 query.callback(false);
245 }
246 }
247 } else if (p.getType() == IqPacket.TYPE.RESULT && fin != null) {
248 final boolean running;
249 synchronized (this.queries) {
250 running = this.queries.contains(query);
251 }
252 if (running) {
253 processFin(query, fin);
254 } else {
255 Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": ignoring MAM iq result because query had been killed");
256 }
257 } else if (p.getType() == IqPacket.TYPE.RESULT && query.isLegacy()) {
258 //do nothing
259 } else {
260 Log.d(Config.LOGTAG, a.getJid().asBareJid().toString() + ": error executing mam: " + p.toString());
261 try {
262 finalizeQuery(query, true);
263 } catch (final IllegalStateException e) {
264 //ignored
265 }
266 }
267 });
268 } else {
269 synchronized (this.pendingQueries) {
270 this.pendingQueries.add(query);
271 }
272 }
273 }
274
275 private void finalizeQuery(final Query query, boolean done) {
276 synchronized (this.queries) {
277 if (!this.queries.remove(query)) {
278 throw new IllegalStateException("Unable to remove query from queries");
279 }
280 }
281 final Conversation conversation = query.getConversation();
282 if (conversation != null) {
283 conversation.sort();
284 conversation.setHasMessagesLeftOnServer(!done);
285 } else {
286 for (Conversation tmp : this.mXmppConnectionService.getConversations()) {
287 if (tmp.getAccount() == query.getAccount()) {
288 tmp.sort();
289 }
290 }
291 }
292 if (query.hasCallback()) {
293 query.callback(done);
294 } else {
295 this.mXmppConnectionService.updateConversationUi();
296 }
297 }
298
299 boolean inCatchup(Account account) {
300 synchronized (this.queries) {
301 for (Query query : queries) {
302 if (query.account == account && query.isCatchup() && query.getWith() == null) {
303 return true;
304 }
305 }
306 }
307 return false;
308 }
309
310 public boolean isCatchupInProgress(Conversation conversation) {
311 synchronized (this.queries) {
312 for (Query query : queries) {
313 if (query.account == conversation.getAccount() && query.isCatchup()) {
314 final Jid with = query.getWith() == null ? null : query.getWith().asBareJid();
315 if ((conversation.getMode() == Conversational.MODE_SINGLE && with == null) || (conversation.getJid().asBareJid().equals(with))) {
316 return true;
317 }
318 }
319 }
320 }
321 return false;
322 }
323
324 boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
325 synchronized (this.queries) {
326 for (Query query : queries) {
327 if (query.conversation == conversation) {
328 if (!query.hasCallback() && callback != null) {
329 query.setCallback(callback);
330 }
331 return true;
332 }
333 }
334 return false;
335 }
336 }
337
338 public boolean queryInProgress(Conversation conversation) {
339 return queryInProgress(conversation, null);
340 }
341
342 public void processFinLegacy(Element fin, Jid from) {
343 Query query = findQuery(fin.getAttribute("queryid"));
344 if (query != null && query.validFrom(from)) {
345 processFin(query, fin);
346 }
347 }
348
349 private void processFin(Query query, Element fin) {
350 boolean complete = fin.getAttributeAsBoolean("complete");
351 Element set = fin.findChild("set", "http://jabber.org/protocol/rsm");
352 Element last = set == null ? null : set.findChild("last");
353 String count = set == null ? null : set.findChildContent("count");
354 Element first = set == null ? null : set.findChild("first");
355 Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
356 boolean abort = (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
357 if (query.getConversation() != null) {
358 query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
359 }
360 if (complete || relevant == null || abort) {
361 //TODO: FIX done logic to look at complete. using count is probably unreliable because it can be ommited and doesn’t work with paging.
362 boolean done;
363 if (query.isCatchup()) {
364 done = false;
365 } else {
366 if (count != null) {
367 try {
368 done = Integer.parseInt(count) <= query.getTotalCount();
369 } catch (NumberFormatException e) {
370 done = false;
371 }
372 } else {
373 done = query.getTotalCount() == 0;
374 }
375 }
376 done = done || (query.getActualMessageCount() == 0 && !query.isCatchup());
377 this.finalizeQuery(query, done);
378
379 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": finished mam after " + query.getTotalCount() + "(" + query.getActualMessageCount() + ") messages. messages left=" + !done + " count=" + count);
380 if (query.isCatchup() && query.getActualMessageCount() > 0) {
381 mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
382 }
383 processPostponed(query);
384 } else {
385 final Query nextQuery;
386 if (query.getPagingOrder() == PagingOrder.NORMAL) {
387 nextQuery = query.next(last == null ? null : last.getContent());
388 } else {
389 nextQuery = query.prev(first == null ? null : first.getContent());
390 }
391 this.execute(nextQuery);
392 this.finalizeQuery(query, false);
393 synchronized (this.queries) {
394 this.queries.add(nextQuery);
395 }
396 }
397 }
398
399 void kill(final Conversation conversation) {
400 final ArrayList<Query> toBeKilled = new ArrayList<>();
401 synchronized (this.pendingQueries) {
402 for (final Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
403 final Query query = iterator.next();
404 if (query.getConversation() == conversation) {
405 iterator.remove();
406 Log.d(Config.LOGTAG, conversation.getAccount().getJid().asBareJid() + ": killed pending MAM query for archived conversation");
407 }
408 }
409 }
410 synchronized (this.queries) {
411 for (final Query q : queries) {
412 if (q.conversation == conversation) {
413 toBeKilled.add(q);
414 }
415 }
416 }
417 for (final Query q : toBeKilled) {
418 kill(q);
419 }
420 }
421
422 private void kill(Query query) {
423 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": killing mam query prematurely");
424 query.callback = null;
425 this.finalizeQuery(query, false);
426 if (query.isCatchup() && query.getActualMessageCount() > 0) {
427 mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
428 }
429 this.processPostponed(query);
430 }
431
432 private void processPostponed(Query query) {
433 query.account.getAxolotlService().processPostponed();
434 query.pendingReceiptRequests.removeAll(query.receiptRequests);
435 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": found " + query.pendingReceiptRequests.size() + " pending receipt requests");
436 Iterator<ReceiptRequest> iterator = query.pendingReceiptRequests.iterator();
437 while (iterator.hasNext()) {
438 ReceiptRequest rr = iterator.next();
439 mXmppConnectionService.sendMessagePacket(query.account, mXmppConnectionService.getMessageGenerator().received(query.account, rr.getJid(), rr.getId()));
440 iterator.remove();
441 }
442 }
443
444 public Query findQuery(String id) {
445 if (id == null) {
446 return null;
447 }
448 synchronized (this.queries) {
449 for (Query query : this.queries) {
450 if (query.getQueryId().equals(id)) {
451 return query;
452 }
453 }
454 return null;
455 }
456 }
457
458 @Override
459 public void onAdvancedStreamFeaturesAvailable(Account account) {
460 if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
461 this.catchup(account);
462 }
463 }
464
465 public enum PagingOrder {
466 NORMAL,
467 REVERSE
468 }
469
470 public class Query {
471 private HashSet<ReceiptRequest> pendingReceiptRequests = new HashSet<>();
472 private HashSet<ReceiptRequest> receiptRequests = new HashSet<>();
473 private int totalCount = 0;
474 private int actualCount = 0;
475 private int actualInThisQuery = 0;
476 private long start;
477 private final long end;
478 private final String queryId;
479 private String reference = null;
480 private final Account account;
481 private Conversation conversation;
482 private PagingOrder pagingOrder = PagingOrder.NORMAL;
483 private XmppConnectionService.OnMoreMessagesLoaded callback = null;
484 private boolean catchup = true;
485 public final Version version;
486
487
488 Query(Conversation conversation, MamReference start, long end, boolean catchup) {
489 this(conversation.getAccount(), Version.get(conversation.getAccount(), conversation), catchup ? start : start.timeOnly(), end);
490 this.conversation = conversation;
491 this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
492 this.catchup = catchup;
493 }
494
495 Query(Account account, MamReference start, long end) {
496 this(account, Version.get(account), start, end);
497 }
498
499 Query(Account account, Version version, MamReference start, long end) {
500 this.account = account;
501 if (start.getReference() != null) {
502 this.reference = start.getReference();
503 } else {
504 this.start = start.getTimestamp();
505 }
506 this.end = end;
507 this.queryId = new BigInteger(50, SECURE_RANDOM).toString(32);
508 this.version = version;
509 }
510
511 private Query page(String reference) {
512 Query query = new Query(this.account, this.version, new MamReference(this.start, reference), this.end);
513 query.conversation = conversation;
514 query.totalCount = totalCount;
515 query.actualCount = actualCount;
516 query.pendingReceiptRequests = pendingReceiptRequests;
517 query.receiptRequests = receiptRequests;
518 query.callback = callback;
519 query.catchup = catchup;
520 return query;
521 }
522
523 public void removePendingReceiptRequest(ReceiptRequest receiptRequest) {
524 if (!this.pendingReceiptRequests.remove(receiptRequest)) {
525 this.receiptRequests.add(receiptRequest);
526 }
527 }
528
529 public void addPendingReceiptRequest(ReceiptRequest receiptRequest) {
530 this.pendingReceiptRequests.add(receiptRequest);
531 }
532
533 public boolean isLegacy() {
534 return version.legacy;
535 }
536
537 public boolean safeToExtractTrueCounterpart() {
538 return muc() && !isLegacy();
539 }
540
541 public Query next(String reference) {
542 Query query = page(reference);
543 query.pagingOrder = PagingOrder.NORMAL;
544 return query;
545 }
546
547 Query prev(String reference) {
548 Query query = page(reference);
549 query.pagingOrder = PagingOrder.REVERSE;
550 return query;
551 }
552
553 public String getReference() {
554 return reference;
555 }
556
557 public PagingOrder getPagingOrder() {
558 return this.pagingOrder;
559 }
560
561 public String getQueryId() {
562 return queryId;
563 }
564
565 public Jid getWith() {
566 return conversation == null ? null : conversation.getJid().asBareJid();
567 }
568
569 public boolean muc() {
570 return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
571 }
572
573 public long getStart() {
574 return start;
575 }
576
577 public boolean isCatchup() {
578 return catchup;
579 }
580
581 public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
582 this.callback = callback;
583 }
584
585 public void callback(boolean done) {
586 if (this.callback != null) {
587 this.callback.onMoreMessagesLoaded(actualCount, conversation);
588 if (done) {
589 this.callback.informUser(R.string.no_more_history_on_server);
590 }
591 }
592 }
593
594 public long getEnd() {
595 return end;
596 }
597
598 public Conversation getConversation() {
599 return conversation;
600 }
601
602 public Account getAccount() {
603 return this.account;
604 }
605
606 public void incrementMessageCount() {
607 this.totalCount++;
608 }
609
610 public void incrementActualMessageCount() {
611 this.actualInThisQuery++;
612 this.actualCount++;
613 }
614
615 int getTotalCount() {
616 return this.totalCount;
617 }
618
619 int getActualMessageCount() {
620 return this.actualCount;
621 }
622
623 public int getActualInThisQuery() {
624 return this.actualInThisQuery;
625 }
626
627 public boolean validFrom(Jid from) {
628 if (muc()) {
629 return getWith().equals(from);
630 } else {
631 return (from == null) || account.getJid().asBareJid().equals(from.asBareJid());
632 }
633 }
634
635 @NotNull
636 @Override
637 public String toString() {
638 StringBuilder builder = new StringBuilder();
639 if (this.muc()) {
640 builder.append("to=");
641 builder.append(this.getWith().toString());
642 } else {
643 builder.append("with=");
644 if (this.getWith() == null) {
645 builder.append("*");
646 } else {
647 builder.append(getWith().toString());
648 }
649 }
650 if (this.start != 0) {
651 builder.append(", start=");
652 builder.append(AbstractGenerator.getTimestamp(this.start));
653 }
654 if (this.end != 0) {
655 builder.append(", end=");
656 builder.append(AbstractGenerator.getTimestamp(this.end));
657 }
658 builder.append(", order=").append(pagingOrder.toString());
659 if (this.reference != null) {
660 if (this.pagingOrder == PagingOrder.NORMAL) {
661 builder.append(", after=");
662 } else {
663 builder.append(", before=");
664 }
665 builder.append(this.reference);
666 }
667 builder.append(", catchup=").append(catchup);
668 builder.append(", ns=").append(version.namespace);
669 return builder.toString();
670 }
671
672 boolean hasCallback() {
673 return this.callback != null;
674 }
675 }
676}