1package eu.siacs.conversations.services;
2
3import android.util.Log;
4
5import java.math.BigInteger;
6import java.util.ArrayList;
7import java.util.HashSet;
8import java.util.Iterator;
9import java.util.List;
10
11import eu.siacs.conversations.Config;
12import eu.siacs.conversations.R;
13import eu.siacs.conversations.entities.Account;
14import eu.siacs.conversations.entities.Conversation;
15import eu.siacs.conversations.entities.Conversational;
16import eu.siacs.conversations.entities.ReceiptRequest;
17import eu.siacs.conversations.generator.AbstractGenerator;
18import eu.siacs.conversations.xml.Namespace;
19import eu.siacs.conversations.xml.Element;
20import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
21import eu.siacs.conversations.xmpp.mam.MamReference;
22import eu.siacs.conversations.xmpp.stanzas.IqPacket;
23import eu.siacs.conversations.xmpp.stanzas.MessagePacket;
24import rocks.xmpp.addr.Jid;
25
26public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
27
28 private final XmppConnectionService mXmppConnectionService;
29
30 private final HashSet<Query> queries = new HashSet<>();
31 private final ArrayList<Query> pendingQueries = new ArrayList<>();
32
33 public enum Version {
34 MAM_0("urn:xmpp:mam:0", true),
35 MAM_1("urn:xmpp:mam:1", false),
36 MAM_2("urn:xmpp:mam:2", false);
37
38 public final boolean legacy;
39 public final String namespace;
40
41 Version(String namespace, boolean legacy) {
42 this.namespace = namespace;
43 this.legacy = legacy;
44 }
45
46 public static Version get(Account account) {
47 return get(account,null);
48 }
49
50 public static Version get(Account account, Conversation conversation) {
51 if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
52 return get(account.getXmppConnection().getFeatures().getAccountFeatures());
53 } else {
54 return get(conversation.getMucOptions().getFeatures());
55 }
56 }
57
58 private static Version get(List<String> features) {
59 final Version[] values = values();
60 for(int i = values.length -1; i >= 0; --i) {
61 for(String feature : features) {
62 if (values[i].namespace.equals(feature)) {
63 return values[i];
64 }
65 }
66 }
67 return MAM_0;
68 }
69
70 public static boolean has(List<String> features) {
71 for(String feature : features) {
72 for(Version version : values()) {
73 if (version.namespace.equals(feature)) {
74 return true;
75 }
76 }
77 }
78 return false;
79 }
80
81 public static Element findResult(MessagePacket packet) {
82 for(Version version : values()) {
83 Element result = packet.findChild("result", version.namespace);
84 if (result != null) {
85 return result;
86 }
87 }
88 return null;
89 }
90
91 };
92
93 MessageArchiveService(final XmppConnectionService service) {
94 this.mXmppConnectionService = service;
95 }
96
97 private void catchup(final Account account) {
98 synchronized (this.queries) {
99 for (Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext(); ) {
100 Query query = iterator.next();
101 if (query.getAccount() == account) {
102 iterator.remove();
103 }
104 }
105 }
106 MamReference mamReference = MamReference.max(
107 mXmppConnectionService.databaseBackend.getLastMessageReceived(account),
108 mXmppConnectionService.databaseBackend.getLastClearDate(account)
109 );
110 mamReference = MamReference.max(mamReference, mXmppConnectionService.getAutomaticMessageDeletionDate());
111 long endCatchup = account.getXmppConnection().getLastSessionEstablished();
112 final Query query;
113 if (mamReference.getTimestamp() == 0) {
114 return;
115 } else if (endCatchup - mamReference.getTimestamp() >= Config.MAM_MAX_CATCHUP) {
116 long startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
117 List<Conversation> conversations = mXmppConnectionService.getConversations();
118 for (Conversation conversation : conversations) {
119 if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted().getTimestamp()) {
120 this.query(conversation, startCatchup, true);
121 }
122 }
123 query = new Query(account, new MamReference(startCatchup), 0);
124 } else {
125 query = new Query(account, mamReference, 0);
126 }
127 synchronized (this.queries) {
128 this.queries.add(query);
129 }
130 this.execute(query);
131 }
132
133 void catchupMUC(final Conversation conversation) {
134 if (conversation.getLastMessageTransmitted().getTimestamp() < 0 && conversation.countMessages() == 0) {
135 query(conversation,
136 new MamReference(0),
137 0,
138 true);
139 } else {
140 query(conversation,
141 conversation.getLastMessageTransmitted(),
142 0,
143 true);
144 }
145 }
146
147 public Query query(final Conversation conversation) {
148 if (conversation.getLastMessageTransmitted().getTimestamp() < 0 && conversation.countMessages() == 0) {
149 return query(conversation,
150 new MamReference(0),
151 System.currentTimeMillis(),
152 false);
153 } else {
154 return query(conversation,
155 conversation.getLastMessageTransmitted(),
156 conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
157 false);
158 }
159 }
160
161 public boolean isCatchingUp(Conversation conversation) {
162 final Account account = conversation.getAccount();
163 if (account.getXmppConnection().isWaitingForSmCatchup()) {
164 return true;
165 } else {
166 synchronized (this.queries) {
167 for (Query query : this.queries) {
168 if (query.getAccount() == account && query.isCatchup() && ((conversation.getMode() == Conversation.MODE_SINGLE && query.getWith() == null) || query.getConversation() == conversation)) {
169 return true;
170 }
171 }
172 }
173 return false;
174 }
175 }
176
177 public Query query(final Conversation conversation, long end, boolean allowCatchup) {
178 return this.query(conversation, conversation.getLastMessageTransmitted(), end, allowCatchup);
179 }
180
181 public Query query(Conversation conversation, MamReference start, long end, boolean allowCatchup) {
182 synchronized (this.queries) {
183 final Query query;
184 final MamReference startActual = MamReference.max(start, mXmppConnectionService.getAutomaticMessageDeletionDate());
185 if (start.getTimestamp() == 0) {
186 query = new Query(conversation, startActual, end, false);
187 query.reference = conversation.getFirstMamReference();
188 } else {
189 if (allowCatchup) {
190 MamReference maxCatchup = MamReference.max(startActual, System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
191 if (maxCatchup.greaterThan(startActual)) {
192 Query reverseCatchup = new Query(conversation, startActual, maxCatchup.getTimestamp(), false);
193 this.queries.add(reverseCatchup);
194 this.execute(reverseCatchup);
195 }
196 query = new Query(conversation, maxCatchup, end, true);
197 } else {
198 query = new Query(conversation, startActual, end, false);
199 }
200 }
201 if (end != 0 && start.greaterThan(end)) {
202 return null;
203 }
204 this.queries.add(query);
205 this.execute(query);
206 return query;
207 }
208 }
209
210 void executePendingQueries(final Account account) {
211 List<Query> pending = new ArrayList<>();
212 synchronized (this.pendingQueries) {
213 for (Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) {
214 Query query = iterator.next();
215 if (query.getAccount() == account) {
216 pending.add(query);
217 iterator.remove();
218 }
219 }
220 }
221 for (Query query : pending) {
222 this.execute(query);
223 }
224 }
225
226 private void execute(final Query query) {
227 final Account account = query.getAccount();
228 if (account.getStatus() == Account.State.ONLINE) {
229 Log.d(Config.LOGTAG, account.getJid().asBareJid().toString() + ": running mam query " + query.toString());
230 IqPacket packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
231 this.mXmppConnectionService.sendIqPacket(account, packet, (a, p) -> {
232 Element fin = p.findChild("fin", query.version.namespace);
233 if (p.getType() == IqPacket.TYPE.TIMEOUT) {
234 synchronized (MessageArchiveService.this.queries) {
235 MessageArchiveService.this.queries.remove(query);
236 if (query.hasCallback()) {
237 query.callback(false);
238 }
239 }
240 } else if (p.getType() == IqPacket.TYPE.RESULT && fin != null) {
241 processFin(query, fin);
242 } else if (p.getType() == IqPacket.TYPE.RESULT && query.isLegacy()) {
243 //do nothing
244 } else {
245 Log.d(Config.LOGTAG, a.getJid().asBareJid().toString() + ": error executing mam: " + p.toString());
246 finalizeQuery(query, true);
247 }
248 });
249 } else {
250 synchronized (this.pendingQueries) {
251 this.pendingQueries.add(query);
252 }
253 }
254 }
255
256 private void finalizeQuery(Query query, boolean done) {
257 synchronized (this.queries) {
258 this.queries.remove(query);
259 }
260 final Conversation conversation = query.getConversation();
261 if (conversation != null) {
262 conversation.sort();
263 conversation.setHasMessagesLeftOnServer(!done);
264 } else {
265 for (Conversation tmp : this.mXmppConnectionService.getConversations()) {
266 if (tmp.getAccount() == query.getAccount()) {
267 tmp.sort();
268 }
269 }
270 }
271 if (query.hasCallback()) {
272 query.callback(done);
273 } else {
274 this.mXmppConnectionService.updateConversationUi();
275 }
276 }
277
278 boolean inCatchup(Account account) {
279 synchronized (this.queries) {
280 for (Query query : queries) {
281 if (query.account == account && query.isCatchup() && query.getWith() == null) {
282 return true;
283 }
284 }
285 }
286 return false;
287 }
288
289 public boolean isCatchupInProgress(Conversation conversation) {
290 synchronized (this.queries) {
291 for(Query query : queries) {
292 if (query.account == conversation.getAccount() && query.isCatchup()) {
293 final Jid with = query.getWith() == null ? null : query.getWith().asBareJid();
294 if ((conversation.getMode() == Conversational.MODE_SINGLE && with == null) || (conversation.getJid().asBareJid().equals(with))) {
295 return true;
296 }
297 }
298 }
299 }
300 return false;
301 }
302
303 boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
304 synchronized (this.queries) {
305 for (Query query : queries) {
306 if (query.conversation == conversation) {
307 if (!query.hasCallback() && callback != null) {
308 query.setCallback(callback);
309 }
310 return true;
311 }
312 }
313 return false;
314 }
315 }
316
317 public boolean queryInProgress(Conversation conversation) {
318 return queryInProgress(conversation, null);
319 }
320
321 public void processFinLegacy(Element fin, Jid from) {
322 Query query = findQuery(fin.getAttribute("queryid"));
323 if (query != null && query.validFrom(from)) {
324 processFin(query, fin);
325 }
326 }
327
328 private void processFin(Query query, Element fin) {
329 boolean complete = fin.getAttributeAsBoolean("complete");
330 Element set = fin.findChild("set", "http://jabber.org/protocol/rsm");
331 Element last = set == null ? null : set.findChild("last");
332 String count = set == null ? null : set.findChildContent("count");
333 Element first = set == null ? null : set.findChild("first");
334 Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
335 boolean abort = (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
336 if (query.getConversation() != null) {
337 query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
338 }
339 if (complete || relevant == null || abort) {
340 boolean done;
341 if (query.isCatchup()) {
342 done = false;
343 } else {
344 if (count != null) {
345 try {
346 done = Integer.parseInt(count) <= query.getTotalCount();
347 } catch (NumberFormatException e) {
348 done = false;
349 }
350 } else {
351 done = query.getTotalCount() == 0;
352 }
353 }
354 done = done || (query.getActualMessageCount() == 0 && !query.isCatchup());
355 this.finalizeQuery(query, done);
356
357 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": finished mam after " + query.getTotalCount() + "(" + query.getActualMessageCount() + ") messages. messages left=" + Boolean.toString(!done) + " count=" + count);
358 if (query.isCatchup() && query.getActualMessageCount() > 0) {
359 mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
360 }
361 processPostponed(query);
362 } else {
363 final Query nextQuery;
364 if (query.getPagingOrder() == PagingOrder.NORMAL) {
365 nextQuery = query.next(last == null ? null : last.getContent());
366 } else {
367 nextQuery = query.prev(first == null ? null : first.getContent());
368 }
369 this.execute(nextQuery);
370 this.finalizeQuery(query, false);
371 synchronized (this.queries) {
372 this.queries.add(nextQuery);
373 }
374 }
375 }
376
377 void kill(Conversation conversation) {
378 final ArrayList<Query> toBeKilled = new ArrayList<>();
379 synchronized (this.queries) {
380 for (Query q : queries) {
381 if (q.conversation == conversation) {
382 toBeKilled.add(q);
383 }
384 }
385 }
386 for (Query q : toBeKilled) {
387 kill(q);
388 }
389 }
390
391 private void kill(Query query) {
392 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": killing mam query prematurely");
393 query.callback = null;
394 this.finalizeQuery(query, false);
395 if (query.isCatchup() && query.getActualMessageCount() > 0) {
396 mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount());
397 }
398 this.processPostponed(query);
399 }
400
401 private void processPostponed(Query query) {
402 query.account.getAxolotlService().processPostponed();
403 query.pendingReceiptRequests.removeAll(query.receiptRequests);
404 Log.d(Config.LOGTAG, query.getAccount().getJid().asBareJid() + ": found " + query.pendingReceiptRequests.size() + " pending receipt requests");
405 Iterator<ReceiptRequest> iterator = query.pendingReceiptRequests.iterator();
406 while (iterator.hasNext()) {
407 ReceiptRequest rr = iterator.next();
408 mXmppConnectionService.sendMessagePacket(query.account, mXmppConnectionService.getMessageGenerator().received(query.account, rr.getJid(), rr.getId()));
409 iterator.remove();
410 }
411 }
412
413 public Query findQuery(String id) {
414 if (id == null) {
415 return null;
416 }
417 synchronized (this.queries) {
418 for (Query query : this.queries) {
419 if (query.getQueryId().equals(id)) {
420 return query;
421 }
422 }
423 return null;
424 }
425 }
426
427 @Override
428 public void onAdvancedStreamFeaturesAvailable(Account account) {
429 if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
430 this.catchup(account);
431 }
432 }
433
434 public enum PagingOrder {
435 NORMAL,
436 REVERSE
437 }
438
439 public class Query {
440 private HashSet<ReceiptRequest> pendingReceiptRequests = new HashSet<>();
441 private HashSet<ReceiptRequest> receiptRequests = new HashSet<>();
442 private int totalCount = 0;
443 private int actualCount = 0;
444 private int actualInThisQuery = 0;
445 private long start;
446 private long end;
447 private String queryId;
448 private String reference = null;
449 private Account account;
450 private Conversation conversation;
451 private PagingOrder pagingOrder = PagingOrder.NORMAL;
452 private XmppConnectionService.OnMoreMessagesLoaded callback = null;
453 private boolean catchup = true;
454 public final Version version;
455
456
457 Query(Conversation conversation, MamReference start, long end, boolean catchup) {
458 this(conversation.getAccount(), Version.get(conversation.getAccount(), conversation), catchup ? start : start.timeOnly(), end);
459 this.conversation = conversation;
460 this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
461 this.catchup = catchup;
462 }
463
464 Query(Account account, MamReference start, long end) {
465 this(account, Version.get(account), start, end);
466 }
467
468 Query(Account account, Version version, MamReference start, long end) {
469 this.account = account;
470 if (start.getReference() != null) {
471 this.reference = start.getReference();
472 } else {
473 this.start = start.getTimestamp();
474 }
475 this.end = end;
476 this.queryId = new BigInteger(50, mXmppConnectionService.getRNG()).toString(32);
477 this.version = version;
478 }
479
480 private Query page(String reference) {
481 Query query = new Query(this.account, this.version, new MamReference(this.start, reference), this.end);
482 query.conversation = conversation;
483 query.totalCount = totalCount;
484 query.actualCount = actualCount;
485 query.pendingReceiptRequests = pendingReceiptRequests;
486 query.receiptRequests = receiptRequests;
487 query.callback = callback;
488 query.catchup = catchup;
489 return query;
490 }
491
492 public void removePendingReceiptRequest(ReceiptRequest receiptRequest) {
493 if (!this.pendingReceiptRequests.remove(receiptRequest)) {
494 this.receiptRequests.add(receiptRequest);
495 }
496 }
497
498 public void addPendingReceiptRequest(ReceiptRequest receiptRequest) {
499 this.pendingReceiptRequests.add(receiptRequest);
500 }
501
502 public boolean isLegacy() {
503 return version.legacy;
504 }
505
506 public boolean safeToExtractTrueCounterpart() {
507 return muc() && !isLegacy();
508 }
509
510 public Query next(String reference) {
511 Query query = page(reference);
512 query.pagingOrder = PagingOrder.NORMAL;
513 return query;
514 }
515
516 Query prev(String reference) {
517 Query query = page(reference);
518 query.pagingOrder = PagingOrder.REVERSE;
519 return query;
520 }
521
522 public String getReference() {
523 return reference;
524 }
525
526 public PagingOrder getPagingOrder() {
527 return this.pagingOrder;
528 }
529
530 public String getQueryId() {
531 return queryId;
532 }
533
534 public Jid getWith() {
535 return conversation == null ? null : conversation.getJid().asBareJid();
536 }
537
538 public boolean muc() {
539 return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
540 }
541
542 public long getStart() {
543 return start;
544 }
545
546 public boolean isCatchup() {
547 return catchup;
548 }
549
550 public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
551 this.callback = callback;
552 }
553
554 public void callback(boolean done) {
555 if (this.callback != null) {
556 this.callback.onMoreMessagesLoaded(actualCount, conversation);
557 if (done) {
558 this.callback.informUser(R.string.no_more_history_on_server);
559 }
560 }
561 }
562
563 public long getEnd() {
564 return end;
565 }
566
567 public Conversation getConversation() {
568 return conversation;
569 }
570
571 public Account getAccount() {
572 return this.account;
573 }
574
575 public void incrementMessageCount() {
576 this.totalCount++;
577 }
578
579 public void incrementActualMessageCount() {
580 this.actualInThisQuery++;
581 this.actualCount++;
582 }
583
584 int getTotalCount() {
585 return this.totalCount;
586 }
587
588 int getActualMessageCount() {
589 return this.actualCount;
590 }
591
592 public int getActualInThisQuery() {
593 return this.actualInThisQuery;
594 }
595
596 public boolean validFrom(Jid from) {
597 if (muc()) {
598 return getWith().equals(from);
599 } else {
600 return (from == null) || account.getJid().asBareJid().equals(from.asBareJid());
601 }
602 }
603
604 @Override
605 public String toString() {
606 StringBuilder builder = new StringBuilder();
607 if (this.muc()) {
608 builder.append("to=");
609 builder.append(this.getWith().toString());
610 } else {
611 builder.append("with=");
612 if (this.getWith() == null) {
613 builder.append("*");
614 } else {
615 builder.append(getWith().toString());
616 }
617 }
618 if (this.start != 0) {
619 builder.append(", start=");
620 builder.append(AbstractGenerator.getTimestamp(this.start));
621 }
622 if (this.end != 0) {
623 builder.append(", end=");
624 builder.append(AbstractGenerator.getTimestamp(this.end));
625 }
626 builder.append(", order=").append(pagingOrder.toString());
627 if (this.reference != null) {
628 if (this.pagingOrder == PagingOrder.NORMAL) {
629 builder.append(", after=");
630 } else {
631 builder.append(", before=");
632 }
633 builder.append(this.reference);
634 }
635 builder.append(", catchup=").append(Boolean.toString(catchup));
636 builder.append(", ns=").append(version.namespace);
637 return builder.toString();
638 }
639
640 boolean hasCallback() {
641 return this.callback != null;
642 }
643 }
644}