MessageArchiveService.java

  1package eu.siacs.conversations.services;
  2
  3import android.util.Log;
  4import android.util.Pair;
  5
  6import java.math.BigInteger;
  7import java.util.ArrayList;
  8import java.util.HashSet;
  9import java.util.Iterator;
 10import java.util.List;
 11
 12import eu.siacs.conversations.Config;
 13import eu.siacs.conversations.R;
 14import eu.siacs.conversations.entities.Account;
 15import eu.siacs.conversations.entities.Conversation;
 16import eu.siacs.conversations.generator.AbstractGenerator;
 17import eu.siacs.conversations.xml.Namespace;
 18import eu.siacs.conversations.xml.Element;
 19import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
 20import eu.siacs.conversations.xmpp.OnIqPacketReceived;
 21import eu.siacs.conversations.xmpp.jid.Jid;
 22import eu.siacs.conversations.xmpp.stanzas.IqPacket;
 23
 24public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
 25
 26	private final XmppConnectionService mXmppConnectionService;
 27
 28	private final HashSet<Query> queries = new HashSet<>();
 29	private final ArrayList<Query> pendingQueries = new ArrayList<>();
 30
 31	public enum PagingOrder {
 32		NORMAL,
 33		REVERSE
 34	}
 35
 36	public MessageArchiveService(final XmppConnectionService service) {
 37		this.mXmppConnectionService = service;
 38	}
 39
 40	private void catchup(final Account account) {
 41		synchronized (this.queries) {
 42			for(Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext();) {
 43				Query query = iterator.next();
 44				if (query.getAccount() == account) {
 45					iterator.remove();
 46				}
 47			}
 48		}
 49		final Pair<Long,String> lastMessageReceived = mXmppConnectionService.databaseBackend.getLastMessageReceived(account);
 50		final Pair<Long,String> lastClearDate = mXmppConnectionService.databaseBackend.getLastClearDate(account);
 51		long startCatchup;
 52		final String reference;
 53		if (lastMessageReceived != null && lastMessageReceived.first >= lastClearDate.first) {
 54			startCatchup = lastMessageReceived.first;
 55			reference = lastMessageReceived.second;
 56		} else {
 57			startCatchup = lastClearDate.first;
 58			reference = null;
 59		}
 60		startCatchup = Math.max(startCatchup,mXmppConnectionService.getAutomaticMessageDeletionDate());
 61		long endCatchup = account.getXmppConnection().getLastSessionEstablished();
 62		final Query query;
 63		if (startCatchup == 0) {
 64			return;
 65		} else if (endCatchup - startCatchup >= Config.MAM_MAX_CATCHUP) {
 66			startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
 67			List<Conversation> conversations = mXmppConnectionService.getConversations();
 68			for (Conversation conversation : conversations) {
 69				if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted()) {
 70					this.query(conversation,startCatchup,true);
 71				}
 72			}
 73			query = new Query(account, startCatchup, endCatchup);
 74		} else {
 75			query = new Query(account, startCatchup, endCatchup);
 76			query.reference = reference;
 77		}
 78		synchronized (this.queries) {
 79			this.queries.add(query);
 80		}
 81		this.execute(query);
 82	}
 83
 84	public void catchupMUC(final Conversation conversation) {
 85		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
 86			query(conversation,
 87					0,
 88					System.currentTimeMillis(),
 89					true);
 90		} else {
 91			query(conversation,
 92					conversation.getLastMessageTransmitted(),
 93					System.currentTimeMillis(),
 94					true);
 95		}
 96	}
 97
 98	public Query query(final Conversation conversation) {
 99		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
100			return query(conversation,
101					0,
102					System.currentTimeMillis(),
103					false);
104		} else {
105			return query(conversation,
106					conversation.getLastMessageTransmitted(),
107					conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
108					false);
109		}
110	}
111
112	public Query query(final Conversation conversation, long end, boolean allowCatchup) {
113		return this.query(conversation,conversation.getLastMessageTransmitted(),end, allowCatchup);
114	}
115
116	public Query query(Conversation conversation, long start, long end, boolean allowCatchup) {
117		synchronized (this.queries) {
118			final Query query;
119			final long startActual = Math.max(start,mXmppConnectionService.getAutomaticMessageDeletionDate());
120			if (start==0) {
121				query = new Query(conversation, startActual, end, false);
122				query.reference = conversation.getFirstMamReference();
123			} else {
124				long maxCatchup = Math.max(startActual,System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
125				if (maxCatchup > startActual) {
126					Query reverseCatchup = new Query(conversation,startActual,maxCatchup,false);
127					this.queries.add(reverseCatchup);
128					this.execute(reverseCatchup);
129				}
130				query = new Query(conversation, maxCatchup, end, allowCatchup);
131			}
132			if (start > end) {
133				return null;
134			}
135			this.queries.add(query);
136			this.execute(query);
137			return query;
138		}
139	}
140
141	public void executePendingQueries(final Account account) {
142		List<Query> pending = new ArrayList<>();
143		synchronized(this.pendingQueries) {
144			for(Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext();) {
145				Query query = iterator.next();
146				if (query.getAccount() == account) {
147					pending.add(query);
148					iterator.remove();
149				}
150			}
151		}
152		for(Query query : pending) {
153			this.execute(query);
154		}
155	}
156
157	private void execute(final Query query) {
158		final Account account=  query.getAccount();
159		if (account.getStatus() == Account.State.ONLINE) {
160			Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": running mam query " + query.toString());
161			IqPacket packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
162			this.mXmppConnectionService.sendIqPacket(account, packet, new OnIqPacketReceived() {
163				@Override
164				public void onIqPacketReceived(Account account, IqPacket packet) {
165					Element fin = packet.findChild("fin", Namespace.MAM);
166					if (packet.getType() == IqPacket.TYPE.TIMEOUT) {
167						synchronized (MessageArchiveService.this.queries) {
168							MessageArchiveService.this.queries.remove(query);
169							if (query.hasCallback()) {
170								query.callback(false);
171							}
172						}
173					} else if (packet.getType() == IqPacket.TYPE.RESULT && fin != null ) {
174						processFin(fin);
175					} else if (packet.getType() == IqPacket.TYPE.RESULT && query.isLegacy()) {
176						//do nothing
177					} else {
178						Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": error executing mam: " + packet.toString());
179						finalizeQuery(query, true);
180					}
181				}
182			});
183		} else {
184			synchronized (this.pendingQueries) {
185				this.pendingQueries.add(query);
186			}
187		}
188	}
189
190	private void finalizeQuery(Query query, boolean done) {
191		synchronized (this.queries) {
192			this.queries.remove(query);
193		}
194		final Conversation conversation = query.getConversation();
195		if (conversation != null) {
196			conversation.sort();
197			conversation.setHasMessagesLeftOnServer(!done);
198		} else {
199			for(Conversation tmp : this.mXmppConnectionService.getConversations()) {
200				if (tmp.getAccount() == query.getAccount()) {
201					tmp.sort();
202				}
203			}
204		}
205		if (query.hasCallback()) {
206			query.callback(done);
207		} else {
208			this.mXmppConnectionService.updateConversationUi();
209		}
210	}
211
212	public boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
213		synchronized (this.queries) {
214			for(Query query : queries) {
215				if (query.conversation == conversation) {
216					if (!query.hasCallback() && callback != null) {
217						query.setCallback(callback);
218					}
219					return true;
220				}
221			}
222			return false;
223		}
224	}
225
226	public boolean queryInProgress(Conversation conversation) {
227		return queryInProgress(conversation, null);
228	}
229
230	public void processFinLegacy(Element fin, Jid from) {
231		Query query = findQuery(fin.getAttribute("queryid"));
232		if (query != null && query.validFrom(from)) {
233			processFin(fin);
234		}
235	}
236
237	public void processFin(Element fin) {
238		Query query = findQuery(fin.getAttribute("queryid"));
239		if (query == null) {
240			return;
241		}
242		boolean complete = fin.getAttributeAsBoolean("complete");
243		Element set = fin.findChild("set","http://jabber.org/protocol/rsm");
244		Element last = set == null ? null : set.findChild("last");
245		Element first = set == null ? null : set.findChild("first");
246		Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
247		boolean abort = (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
248		if (query.getConversation() != null) {
249			query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
250		}
251		if (complete || relevant == null || abort) {
252			final boolean done = (complete || query.getActualMessageCount() == 0) && !query.isCatchup();
253			this.finalizeQuery(query, done);
254			Log.d(Config.LOGTAG,query.getAccount().getJid().toBareJid()+": finished mam after "+query.getTotalCount()+"("+query.getActualMessageCount()+") messages. messages left="+Boolean.toString(!done));
255			if (query.isCatchup() && query.getActualMessageCount() > 0) {
256				mXmppConnectionService.getNotificationService().finishBacklog(true,query.getAccount());
257			}
258		} else {
259			final Query nextQuery;
260			if (query.getPagingOrder() == PagingOrder.NORMAL) {
261				nextQuery = query.next(last == null ? null : last.getContent());
262			} else {
263				nextQuery = query.prev(first == null ? null : first.getContent());
264			}
265			this.execute(nextQuery);
266			this.finalizeQuery(query, false);
267			synchronized (this.queries) {
268				this.queries.add(nextQuery);
269			}
270		}
271	}
272
273	public Query findQuery(String id) {
274		if (id == null) {
275			return null;
276		}
277		synchronized (this.queries) {
278			for(Query query : this.queries) {
279				if (query.getQueryId().equals(id)) {
280					return query;
281				}
282			}
283			return null;
284		}
285	}
286
287	@Override
288	public void onAdvancedStreamFeaturesAvailable(Account account) {
289		if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
290			this.catchup(account);
291		}
292	}
293
294	public class Query {
295		private int totalCount = 0;
296		private int actualCount = 0;
297		private long start;
298		private long end;
299		private String queryId;
300		private String reference = null;
301		private Account account;
302		private Conversation conversation;
303		private PagingOrder pagingOrder = PagingOrder.NORMAL;
304		private XmppConnectionService.OnMoreMessagesLoaded callback = null;
305		private boolean catchup = true;
306
307
308		public Query(Conversation conversation, long start, long end) {
309			this(conversation.getAccount(), start, end);
310			this.conversation = conversation;
311		}
312
313		public Query(Conversation conversation, long start, long end, boolean catchup) {
314			this(conversation,start,end);
315			this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
316			this.catchup = catchup;
317		}
318
319		public Query(Account account, long start, long end) {
320			this.account = account;
321			this.start = start;
322			this.end = end;
323			this.queryId = new BigInteger(50, mXmppConnectionService.getRNG()).toString(32);
324		}
325		
326		private Query page(String reference) {
327			Query query = new Query(this.account,this.start,this.end);
328			query.reference = reference;
329			query.conversation = conversation;
330			query.totalCount = totalCount;
331			query.actualCount = actualCount;
332			query.callback = callback;
333			query.catchup = catchup;
334			return query;
335		}
336
337		public boolean isLegacy() {
338			if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
339				return account.getXmppConnection().getFeatures().mamLegacy();
340			} else {
341				return conversation.getMucOptions().mamLegacy();
342			}
343		}
344
345		public Query next(String reference) {
346			Query query = page(reference);
347			query.pagingOrder = PagingOrder.NORMAL;
348			return query;
349		}
350
351		public Query prev(String reference) {
352			Query query = page(reference);
353			query.pagingOrder = PagingOrder.REVERSE;
354			return query;
355		}
356
357		public String getReference() {
358			return reference;
359		}
360
361		public PagingOrder getPagingOrder() {
362			return this.pagingOrder;
363		}
364
365		public String getQueryId() {
366			return queryId;
367		}
368
369		public Jid getWith() {
370			return conversation == null ? null : conversation.getJid().toBareJid();
371		}
372
373		public boolean muc() {
374			return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
375		}
376
377		public long getStart() {
378			return start;
379		}
380
381		public boolean isCatchup() {
382			return catchup;
383		}
384
385		public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
386			this.callback = callback;
387		}
388
389		public void callback(boolean done) {
390			if (this.callback != null) {
391				this.callback.onMoreMessagesLoaded(actualCount,conversation);
392				if (done) {
393					this.callback.informUser(R.string.no_more_history_on_server);
394				}
395			}
396		}
397
398		public long getEnd() {
399			return end;
400		}
401
402		public Conversation getConversation() {
403			return conversation;
404		}
405
406		public Account getAccount() {
407			return this.account;
408		}
409
410		public void incrementMessageCount() {
411			this.totalCount++;
412		}
413
414		public void incrementActualMessageCount() {
415			this.actualCount++;
416		}
417
418		public int getTotalCount() {
419			return this.totalCount;
420		}
421
422		public int getActualMessageCount() {
423			return this.actualCount;
424		}
425
426		public boolean validFrom(Jid from) {
427			if (muc()) {
428				return getWith().equals(from);
429			} else {
430				return (from == null) || account.getJid().toBareJid().equals(from.toBareJid());
431			}
432		}
433
434		@Override
435		public String toString() {
436			StringBuilder builder = new StringBuilder();
437			if (this.muc()) {
438				builder.append("to=");
439				builder.append(this.getWith().toString());
440			} else {
441				builder.append("with=");
442				if (this.getWith() == null) {
443					builder.append("*");
444				} else {
445					builder.append(getWith().toString());
446				}
447			}
448			builder.append(", start=");
449			builder.append(AbstractGenerator.getTimestamp(this.start));
450			builder.append(", end=");
451			builder.append(AbstractGenerator.getTimestamp(this.end));
452			builder.append(", order="+pagingOrder.toString());
453			if (this.reference!=null) {
454				if (this.pagingOrder == PagingOrder.NORMAL) {
455					builder.append(", after=");
456				} else {
457					builder.append(", before=");
458				}
459				builder.append(this.reference);
460			}
461			builder.append(", catchup="+Boolean.toString(catchup));
462			return builder.toString();
463		}
464
465		public boolean hasCallback() {
466			return this.callback != null;
467		}
468	}
469}