1package eu.siacs.conversations.services;
2
3import static eu.siacs.conversations.utils.Random.SECURE_RANDOM;
4
5import android.util.Log;
6
7import org.jetbrains.annotations.NotNull;
8
9import java.math.BigInteger;
10import java.util.ArrayList;
11import java.util.HashSet;
12import java.util.Iterator;
13import java.util.List;
14
15import eu.siacs.conversations.Config;
16import eu.siacs.conversations.R;
17import eu.siacs.conversations.entities.Account;
18import eu.siacs.conversations.entities.Conversation;
19import eu.siacs.conversations.entities.Conversational;
20import eu.siacs.conversations.entities.ReceiptRequest;
21import eu.siacs.conversations.generator.AbstractGenerator;
22import eu.siacs.conversations.xml.Element;
23import eu.siacs.conversations.xmpp.Jid;
24import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
25import eu.siacs.conversations.xmpp.mam.MamReference;
26import eu.siacs.conversations.xmpp.stanzas.IqPacket;
27import eu.siacs.conversations.xmpp.stanzas.MessagePacket;
28
29public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
30
31 private final XmppConnectionService mXmppConnectionService;
32
33 private final HashSet<Query> queries = new HashSet<>();
34 private final ArrayList<Query> pendingQueries = new ArrayList<>();
35
36 public enum Version {
37 MAM_0("urn:xmpp:mam:0", true),
38 MAM_1("urn:xmpp:mam:1", false),
39 MAM_2("urn:xmpp:mam:2", false);
40
41 public final boolean legacy;
42 public final String namespace;
43
44 Version(String namespace, boolean legacy) {
45 this.namespace = namespace;
46 this.legacy = legacy;
47 }
48
49 public static Version get(Account account) {
50 return get(account, null);
51 }
52
53 public static Version get(Account account, Conversation conversation) {
54 if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
55 return get(account.getXmppConnection().getFeatures().getAccountFeatures());
56 } else {
57 return get(conversation.getMucOptions().getFeatures());
58 }
59 }
60
61 private static Version get(List<String> features) {
62 final Version[] values = values();
63 for (int i = values.length - 1; i >= 0; --i) {
64 for (String feature : features) {
65 if (values[i].namespace.equals(feature)) {
66 return values[i];
67 }
68 }
69 }
70 return MAM_0;
71 }
72
73 public static boolean has(List<String> features) {
74 for (String feature : features) {
75 for (Version version : values()) {
76 if (version.namespace.equals(feature)) {
77 return true;
78 }
79 }
80 }
81 return false;
82 }
83
84 public static Element findResult(MessagePacket packet) {
85 for (Version version : values()) {
86 Element result = packet.findChild("result", version.namespace);
87 if (result != null) {
88 return result;
89 }
90 }
91 return null;
92 }
93
94 }
95
96 MessageArchiveService(final XmppConnectionService service) {
97 this.mXmppConnectionService = service;
98 }
99
100 private void catchup(final Account account) {
101 synchronized (this.queries) {
102 for (Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext(); ) {
103 Query query = iterator.next();
104 if (query.getAccount() == account) {
105 iterator.remove();
106 }
107 }
108 }
109 MamReference mamReference = MamReference.max(
110 mXmppConnectionService.databaseBackend.getLastMessageReceived(account),
111 mXmppConnectionService.databaseBackend.getLastClearDate(account)
112 );
113 long endCatchup = account.getXmppConnection().getLastSessionEstablished();
114 final Query query;
115 if (mamReference.getTimestamp() == 0) {
116 return;
117 } else if (endCatchup - mamReference.getTimestamp() >= Config.MAM_MAX_CATCHUP) {
118 long startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
119 List<Conversation> conversations = mXmppConnectionService.getConversations();
120 for (Conversation conversation : conversations) {
121 if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted().getTimestamp()) {
122 this.query(conversation, startCatchup, true);
123 }
124 }
125 query = new Query(account, new MamReference(startCatchup), 0);
126 } else {
127 query = new Query(account, mamReference, 0);
128 }
129 synchronized (this.queries) {
130 this.queries.add(query);
131 }
132 this.execute(query);
133 }
134
135 void catchupMUC(final Conversation conversation) {
136 if (conversation.getLastMessageTransmitted().getTimestamp() < 0 && conversation.countMessages() == 0) {
137 query(conversation,
138 new MamReference(0),
139 0,
140 true);
141 } else {
142 query(conversation,
143 conversation.getLastMessageTransmitted(),
144 0,
145 true);
146 }
147 }
148
149 public Query query(final Conversation conversation) {
150 if (conversation.getLastMessageTransmitted().getTimestamp() < 0 && conversation.countMessages() == 0) {
151 return query(conversation,
152 new MamReference(0),
153 System.currentTimeMillis(),
154 false);
155 } else {
156 return query(conversation,
157 conversation.getLastMessageTransmitted(),
158 conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
159 false);
160 }
161 }
162
163 public boolean isCatchingUp(Conversation conversation) {
164 final Account account = conversation.getAccount();
165 if (account.getXmppConnection().isWaitingForSmCatchup()) {
166 return true;
167 } else {
168 synchronized (this.queries) {
169 for (Query query : this.queries) {
170 if (query.getAccount() == account && query.isCatchup() && ((conversation.getMode() == Conversation.MODE_SINGLE && query.getWith() == null) || query.getConversation() == conversation)) {
171 return true;
172 }
173 }
174 }
175 return false;
176 }
177 }
178
179 public Query query(final Conversation conversation, long end, boolean allowCatchup) {
180 return this.query(conversation, conversation.getLastMessageTransmitted(), end, allowCatchup);
181 }
182
183 public Query query(Conversation conversation, MamReference start, long end, boolean allowCatchup) {
184 synchronized (this.queries) {
185 final Query query;
186 final MamReference startActual = start;
187 if (start.getTimestamp() == 0) {
188 query = new Query(conversation, startActual, end, false);
189 query.reference = conversation.getFirstMamReference();
190 } else {
191 if (allowCatchup) {
192 MamReference maxCatchup = MamReference.max(startActual, System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
193 if (maxCatchup.greaterThan(startActual)) {
194 Query reverseCatchup = new Query(conversation, startActual, maxCatchup.getTimestamp(), false);
195 this.queries.add(reverseCatchup);
196 this.execute(reverseCatchup);
197 }
198 query = new Query(conversation, maxCatchup, end, true);
199 } else {
200 query = new Query(conversation, startActual, end, false);
201 }
202 }
203 if (end != 0 && start.greaterThan(end)) {
204 return null;
205 }
206 this.queries.add(query);
207 this.execute(query);
208 return query;
209 }
210 }
211
212 void executePendingQueries(final Account account) {
213 final List<Query> pending = new ArrayList<>();
214 synchronized (this.pendingQueries) {
215 for (Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
216 Query query = iterator.next();
217 if (query.getAccount() == account) {
218 pending.add(query);
219 iterator.remove();
220 }
221 }
222 }
223 for (Query query : pending) {
224 this.execute(query);
225 }
226 }
227
228 private void execute(final Query query) {
229 final Account account = query.getAccount();
230 if (account.getStatus() == Account.State.ONLINE) {
231 final Conversation conversation = query.getConversation();
232 if (conversation != null && conversation.getStatus() == Conversation.STATUS_ARCHIVED) {
233 throw new IllegalStateException("Attempted to run MAM query for archived conversation");
234 }
235 Log.d(Config.LOGTAG, account.getJid().asBareJid().toString() + ": running mam query " + query.toString());
236 final IqPacket packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
237 this.mXmppConnectionService.sendIqPacket(account, packet, (a, p) -> {
238 final Element fin = p.findChild("fin", query.version.namespace);
239 if (p.getType() == IqPacket.TYPE.TIMEOUT) {
240 synchronized (this.queries) {
241 this.queries.remove(query);
242 if (query.hasCallback()) {
243 query.callback(false);
244 }
245 }
246 } else if (p.getType() == IqPacket.TYPE.RESULT && fin != null) {
247 final boolean running;
248 synchronized (this.queries) {
249 running = this.queries.contains(query);
250 }
251 if (running) {
252 processFin(query, fin);
253 } else {
254 Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": ignoring MAM iq result because query had been killed");
255 }
256 } else if (p.getType() == IqPacket.TYPE.RESULT && query.isLegacy()) {
257 //do nothing
258 } else {
259 Log.d(Config.LOGTAG, a.getJid().asBareJid().toString() + ": error executing mam: " + p.toString());
260 try {
261 finalizeQuery(query, true);
262 } catch (final IllegalStateException e) {
263 //ignored
264 }
265 }
266 });
267 } else {
268 synchronized (this.pendingQueries) {
269 this.pendingQueries.add(query);
270 }
271 }
272 }
273
274 private void finalizeQuery(final Query query, boolean done) {
275 synchronized (this.queries) {
276 if (!this.queries.remove(query)) {
277 throw new IllegalStateException("Unable to remove query from queries");
278 }
279 }
280 final Conversation conversation = query.getConversation();
281 if (conversation != null) {
282 conversation.sort();
283 conversation.setHasMessagesLeftOnServer(!done);
284 } else {
285 for (Conversation tmp : this.mXmppConnectionService.getConversations()) {
286 if (tmp.getAccount() == query.getAccount()) {
287 tmp.sort();
288 }
289 }
290 }
291 if (query.hasCallback()) {
292 query.callback(done);
293 } else {
294 this.mXmppConnectionService.updateConversationUi();
295 }
296 }
297
298 boolean inCatchup(Account account) {
299 synchronized (this.queries) {
300 for (Query query : queries) {
301 if (query.account == account && query.isCatchup() && query.getWith() == null) {
302 return true;
303 }
304 }
305 }
306 return false;
307 }
308
309 public boolean isCatchupInProgress(Conversation conversation) {
310 synchronized (this.queries) {
311 for (Query query : queries) {
312 if (query.account == conversation.getAccount() && query.isCatchup()) {
313 final Jid with = query.getWith() == null ? null : query.getWith().asBareJid();
314 if ((conversation.getMode() == Conversational.MODE_SINGLE && with == null) || (conversation.getJid().asBareJid().equals(with))) {
315 return true;
316 }
317 }
318 }
319 }
320 return false;
321 }
322
323 boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
324 synchronized (this.queries) {
325 for (Query query : queries) {
326 if (query.conversation == conversation) {
327 if (!query.hasCallback() && callback != null) {
328 query.setCallback(callback);
329 }
330 return true;
331 }
332 }
333 return false;
334 }
335 }
336
337 public boolean queryInProgress(Conversation conversation) {
338 return queryInProgress(conversation, null);
339 }
340
341 public void processFinLegacy(Element fin, Jid from) {
342 Query query = findQuery(fin.getAttribute("queryid"));
343 if (query != null && query.validFrom(from)) {
344 processFin(query, fin);
345 }
346 }
347
348 private void processFin(Query query, Element fin) {
349 boolean complete = fin.getAttributeAsBoolean("complete");
350 Element set = fin.findChild("set", "http://jabber.org/protocol/rsm");
351 Element last = set == null ? null : set.findChild("last");
352 String count = set == null ? null : set.findChildContent("count");
353 Element first = set == null ? null : set.findChild("first");
354 Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
355 boolean abort = (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
356 if (query.getConversation() != null) {
357 query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
358 }
359 if (complete || relevant == null || abort) {
360 //TODO: FIX done logic to look at complete. using count is probably unreliable because it can be ommited and doesn’t work with paging.
361 boolean done;
362 if (query.isCatchup()) {
363 done = false;
364 } else {
365 if (count != null) {
366 try {
367 done = Integer.parseInt(count) <= query.getTotalCount();
368 } catch (NumberFormatException e) {
369 done = false;
370 }
371 } else {
372 done = query.getTotalCount() == 0;
373 }
374 }
375 done = done || (query.getActualMessageCount() == 0 && !query.isCatchup());
376 this.finalizeQuery(query, done);
377
378 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": finished mam after " + query.getTotalCount() + "(" + query.getActualMessageCount() + ") messages. messages left=" + !done + " count=" + count);
379 if (query.isCatchup() && query.getActualMessageCount() > 0) {
380 mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
381 }
382 processPostponed(query);
383 } else {
384 final Query nextQuery;
385 if (query.getPagingOrder() == PagingOrder.NORMAL) {
386 nextQuery = query.next(last == null ? null : last.getContent());
387 } else {
388 nextQuery = query.prev(first == null ? null : first.getContent());
389 }
390 this.execute(nextQuery);
391 this.finalizeQuery(query, false);
392 synchronized (this.queries) {
393 this.queries.add(nextQuery);
394 }
395 }
396 }
397
398 void kill(final Conversation conversation) {
399 final ArrayList<Query> toBeKilled = new ArrayList<>();
400 synchronized (this.pendingQueries) {
401 for (final Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
402 final Query query = iterator.next();
403 if (query.getConversation() == conversation) {
404 iterator.remove();
405 Log.d(Config.LOGTAG, conversation.getAccount().getJid().asBareJid() + ": killed pending MAM query for archived conversation");
406 }
407 }
408 }
409 synchronized (this.queries) {
410 for (final Query q : queries) {
411 if (q.conversation == conversation) {
412 toBeKilled.add(q);
413 }
414 }
415 }
416 for (final Query q : toBeKilled) {
417 kill(q);
418 }
419 }
420
421 private void kill(Query query) {
422 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": killing mam query prematurely");
423 query.callback = null;
424 this.finalizeQuery(query, false);
425 if (query.isCatchup() && query.getActualMessageCount() > 0) {
426 mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
427 }
428 this.processPostponed(query);
429 }
430
431 private void processPostponed(Query query) {
432 query.account.getAxolotlService().processPostponed();
433 query.pendingReceiptRequests.removeAll(query.receiptRequests);
434 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": found " + query.pendingReceiptRequests.size() + " pending receipt requests");
435 Iterator<ReceiptRequest> iterator = query.pendingReceiptRequests.iterator();
436 while (iterator.hasNext()) {
437 ReceiptRequest rr = iterator.next();
438 mXmppConnectionService.sendMessagePacket(query.account, mXmppConnectionService.getMessageGenerator().received(query.account, rr.getJid(), rr.getId()));
439 iterator.remove();
440 }
441 }
442
443 public Query findQuery(String id) {
444 if (id == null) {
445 return null;
446 }
447 synchronized (this.queries) {
448 for (Query query : this.queries) {
449 if (query.getQueryId().equals(id)) {
450 return query;
451 }
452 }
453 return null;
454 }
455 }
456
457 @Override
458 public void onAdvancedStreamFeaturesAvailable(Account account) {
459 if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
460 this.catchup(account);
461 }
462 }
463
464 public enum PagingOrder {
465 NORMAL,
466 REVERSE
467 }
468
469 public class Query {
470 private HashSet<ReceiptRequest> pendingReceiptRequests = new HashSet<>();
471 private HashSet<ReceiptRequest> receiptRequests = new HashSet<>();
472 private int totalCount = 0;
473 private int actualCount = 0;
474 private int actualInThisQuery = 0;
475 private long start;
476 private final long end;
477 private final String queryId;
478 private String reference = null;
479 private final Account account;
480 private Conversation conversation;
481 private PagingOrder pagingOrder = PagingOrder.NORMAL;
482 private XmppConnectionService.OnMoreMessagesLoaded callback = null;
483 private boolean catchup = true;
484 public final Version version;
485
486
487 Query(Conversation conversation, MamReference start, long end, boolean catchup) {
488 this(conversation.getAccount(), Version.get(conversation.getAccount(), conversation), catchup ? start : start.timeOnly(), end);
489 this.conversation = conversation;
490 this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
491 this.catchup = catchup;
492 }
493
494 Query(Account account, MamReference start, long end) {
495 this(account, Version.get(account), start, end);
496 }
497
498 Query(Account account, Version version, MamReference start, long end) {
499 this.account = account;
500 if (start.getReference() != null) {
501 this.reference = start.getReference();
502 } else {
503 this.start = start.getTimestamp();
504 }
505 this.end = end;
506 this.queryId = new BigInteger(50, SECURE_RANDOM).toString(32);
507 this.version = version;
508 }
509
510 private Query page(String reference) {
511 Query query = new Query(this.account, this.version, new MamReference(this.start, reference), this.end);
512 query.conversation = conversation;
513 query.totalCount = totalCount;
514 query.actualCount = actualCount;
515 query.pendingReceiptRequests = pendingReceiptRequests;
516 query.receiptRequests = receiptRequests;
517 query.callback = callback;
518 query.catchup = catchup;
519 return query;
520 }
521
522 public void removePendingReceiptRequest(ReceiptRequest receiptRequest) {
523 if (!this.pendingReceiptRequests.remove(receiptRequest)) {
524 this.receiptRequests.add(receiptRequest);
525 }
526 }
527
528 public void addPendingReceiptRequest(ReceiptRequest receiptRequest) {
529 this.pendingReceiptRequests.add(receiptRequest);
530 }
531
532 public boolean isLegacy() {
533 return version.legacy;
534 }
535
536 public boolean safeToExtractTrueCounterpart() {
537 return muc() && !isLegacy();
538 }
539
540 public Query next(String reference) {
541 Query query = page(reference);
542 query.pagingOrder = PagingOrder.NORMAL;
543 return query;
544 }
545
546 Query prev(String reference) {
547 Query query = page(reference);
548 query.pagingOrder = PagingOrder.REVERSE;
549 return query;
550 }
551
552 public String getReference() {
553 return reference;
554 }
555
556 public PagingOrder getPagingOrder() {
557 return this.pagingOrder;
558 }
559
560 public String getQueryId() {
561 return queryId;
562 }
563
564 public Jid getWith() {
565 return conversation == null ? null : conversation.getJid().asBareJid();
566 }
567
568 public boolean muc() {
569 return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
570 }
571
572 public long getStart() {
573 return start;
574 }
575
576 public boolean isCatchup() {
577 return catchup;
578 }
579
580 public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
581 this.callback = callback;
582 }
583
584 public void callback(boolean done) {
585 if (this.callback != null) {
586 this.callback.onMoreMessagesLoaded(actualCount, conversation);
587 if (done) {
588 this.callback.informUser(R.string.no_more_history_on_server);
589 }
590 }
591 }
592
593 public long getEnd() {
594 return end;
595 }
596
597 public Conversation getConversation() {
598 return conversation;
599 }
600
601 public Account getAccount() {
602 return this.account;
603 }
604
605 public void incrementMessageCount() {
606 this.totalCount++;
607 }
608
609 public void incrementActualMessageCount() {
610 this.actualInThisQuery++;
611 this.actualCount++;
612 }
613
614 int getTotalCount() {
615 return this.totalCount;
616 }
617
618 int getActualMessageCount() {
619 return this.actualCount;
620 }
621
622 public int getActualInThisQuery() {
623 return this.actualInThisQuery;
624 }
625
626 public boolean validFrom(Jid from) {
627 if (muc()) {
628 return getWith().equals(from);
629 } else {
630 return (from == null) || account.getJid().asBareJid().equals(from.asBareJid());
631 }
632 }
633
634 @NotNull
635 @Override
636 public String toString() {
637 StringBuilder builder = new StringBuilder();
638 if (this.muc()) {
639 builder.append("to=");
640 builder.append(this.getWith().toString());
641 } else {
642 builder.append("with=");
643 if (this.getWith() == null) {
644 builder.append("*");
645 } else {
646 builder.append(getWith().toString());
647 }
648 }
649 if (this.start != 0) {
650 builder.append(", start=");
651 builder.append(AbstractGenerator.getTimestamp(this.start));
652 }
653 if (this.end != 0) {
654 builder.append(", end=");
655 builder.append(AbstractGenerator.getTimestamp(this.end));
656 }
657 builder.append(", order=").append(pagingOrder.toString());
658 if (this.reference != null) {
659 if (this.pagingOrder == PagingOrder.NORMAL) {
660 builder.append(", after=");
661 } else {
662 builder.append(", before=");
663 }
664 builder.append(this.reference);
665 }
666 builder.append(", catchup=").append(catchup);
667 builder.append(", ns=").append(version.namespace);
668 return builder.toString();
669 }
670
671 boolean hasCallback() {
672 return this.callback != null;
673 }
674 }
675}