MessageArchiveService.java

  1package eu.siacs.conversations.services;
  2
  3import android.util.Log;
  4import android.util.Pair;
  5
  6import java.math.BigInteger;
  7import java.util.ArrayList;
  8import java.util.HashSet;
  9import java.util.Iterator;
 10import java.util.List;
 11
 12import eu.siacs.conversations.Config;
 13import eu.siacs.conversations.R;
 14import eu.siacs.conversations.entities.Account;
 15import eu.siacs.conversations.entities.Conversation;
 16import eu.siacs.conversations.generator.AbstractGenerator;
 17import eu.siacs.conversations.xml.Namespace;
 18import eu.siacs.conversations.xml.Element;
 19import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
 20import eu.siacs.conversations.xmpp.OnIqPacketReceived;
 21import eu.siacs.conversations.xmpp.jid.Jid;
 22import eu.siacs.conversations.xmpp.stanzas.IqPacket;
 23
 24public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
 25
 26	private final XmppConnectionService mXmppConnectionService;
 27
 28	private final HashSet<Query> queries = new HashSet<>();
 29	private final ArrayList<Query> pendingQueries = new ArrayList<>();
 30
 31	public enum PagingOrder {
 32		NORMAL,
 33		REVERSE
 34	}
 35
 36	public MessageArchiveService(final XmppConnectionService service) {
 37		this.mXmppConnectionService = service;
 38	}
 39
 40	private void catchup(final Account account) {
 41		synchronized (this.queries) {
 42			for(Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext();) {
 43				Query query = iterator.next();
 44				if (query.getAccount() == account) {
 45					iterator.remove();
 46				}
 47			}
 48		}
 49		final Pair<Long,String> lastMessageReceived = mXmppConnectionService.databaseBackend.getLastMessageReceived(account);
 50		final Pair<Long,String> lastClearDate = mXmppConnectionService.databaseBackend.getLastClearDate(account);
 51		long startCatchup;
 52		final String reference;
 53		if (lastMessageReceived != null && lastMessageReceived.first >= lastClearDate.first) {
 54			startCatchup = lastMessageReceived.first;
 55			reference = lastMessageReceived.second;
 56		} else {
 57			startCatchup = lastClearDate.first;
 58			reference = null;
 59		}
 60		startCatchup = Math.max(startCatchup,mXmppConnectionService.getAutomaticMessageDeletionDate());
 61		long endCatchup = account.getXmppConnection().getLastSessionEstablished();
 62		final Query query;
 63		if (startCatchup == 0) {
 64			return;
 65		} else if (endCatchup - startCatchup >= Config.MAM_MAX_CATCHUP) {
 66			startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
 67			List<Conversation> conversations = mXmppConnectionService.getConversations();
 68			for (Conversation conversation : conversations) {
 69				if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted()) {
 70					this.query(conversation,startCatchup,true);
 71				}
 72			}
 73			query = new Query(account, startCatchup, endCatchup);
 74		} else {
 75			query = new Query(account, startCatchup, endCatchup);
 76			query.reference = reference;
 77		}
 78		this.queries.add(query);
 79		this.execute(query);
 80	}
 81
 82	public void catchupMUC(final Conversation conversation) {
 83		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
 84			query(conversation,
 85					0,
 86					System.currentTimeMillis(),
 87					true);
 88		} else {
 89			query(conversation,
 90					conversation.getLastMessageTransmitted(),
 91					System.currentTimeMillis(),
 92					true);
 93		}
 94	}
 95
 96	public Query query(final Conversation conversation) {
 97		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
 98			return query(conversation,
 99					0,
100					System.currentTimeMillis(),
101					false);
102		} else {
103			return query(conversation,
104					conversation.getLastMessageTransmitted(),
105					conversation.getAccount().getXmppConnection().getLastSessionEstablished(),
106					false);
107		}
108	}
109
110	public Query query(final Conversation conversation, long end, boolean allowCatchup) {
111		return this.query(conversation,conversation.getLastMessageTransmitted(),end, allowCatchup);
112	}
113
114	public Query query(Conversation conversation, long start, long end, boolean allowCatchup) {
115		synchronized (this.queries) {
116			final Query query;
117			final long startActual = Math.max(start,mXmppConnectionService.getAutomaticMessageDeletionDate());
118			if (start==0) {
119				query = new Query(conversation, startActual, end, false);
120				query.reference = conversation.getFirstMamReference();
121			} else {
122				long maxCatchup = Math.max(startActual,System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
123				if (maxCatchup > startActual) {
124					Query reverseCatchup = new Query(conversation,startActual,maxCatchup,false);
125					this.queries.add(reverseCatchup);
126					this.execute(reverseCatchup);
127				}
128				query = new Query(conversation, maxCatchup, end, allowCatchup);
129			}
130			if (start > end) {
131				return null;
132			}
133			this.queries.add(query);
134			this.execute(query);
135			return query;
136		}
137	}
138
139	public void executePendingQueries(final Account account) {
140		List<Query> pending = new ArrayList<>();
141		synchronized(this.pendingQueries) {
142			for(Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext();) {
143				Query query = iterator.next();
144				if (query.getAccount() == account) {
145					pending.add(query);
146					iterator.remove();
147				}
148			}
149		}
150		for(Query query : pending) {
151			this.execute(query);
152		}
153	}
154
155	private void execute(final Query query) {
156		final Account account=  query.getAccount();
157		if (account.getStatus() == Account.State.ONLINE) {
158			Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": running mam query " + query.toString());
159			IqPacket packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
160			this.mXmppConnectionService.sendIqPacket(account, packet, new OnIqPacketReceived() {
161				@Override
162				public void onIqPacketReceived(Account account, IqPacket packet) {
163					Element fin = packet.findChild("fin", Namespace.MAM);
164					if (packet.getType() == IqPacket.TYPE.TIMEOUT) {
165						synchronized (MessageArchiveService.this.queries) {
166							MessageArchiveService.this.queries.remove(query);
167							if (query.hasCallback()) {
168								query.callback(false);
169							}
170						}
171					} else if (packet.getType() == IqPacket.TYPE.RESULT && fin != null ) {
172						processFin(fin);
173					} else if (packet.getType() == IqPacket.TYPE.RESULT && query.isLegacy()) {
174						//do nothing
175					} else {
176						Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": error executing mam: " + packet.toString());
177						finalizeQuery(query, true);
178					}
179				}
180			});
181		} else {
182			synchronized (this.pendingQueries) {
183				this.pendingQueries.add(query);
184			}
185		}
186	}
187
188	private void finalizeQuery(Query query, boolean done) {
189		synchronized (this.queries) {
190			this.queries.remove(query);
191		}
192		final Conversation conversation = query.getConversation();
193		if (conversation != null) {
194			conversation.sort();
195			conversation.setHasMessagesLeftOnServer(!done);
196		} else {
197			for(Conversation tmp : this.mXmppConnectionService.getConversations()) {
198				if (tmp.getAccount() == query.getAccount()) {
199					tmp.sort();
200				}
201			}
202		}
203		if (query.hasCallback()) {
204			query.callback(done);
205		} else {
206			this.mXmppConnectionService.updateConversationUi();
207		}
208	}
209
210	public boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
211		synchronized (this.queries) {
212			for(Query query : queries) {
213				if (query.conversation == conversation) {
214					if (!query.hasCallback() && callback != null) {
215						query.setCallback(callback);
216					}
217					return true;
218				}
219			}
220			return false;
221		}
222	}
223
224	public boolean queryInProgress(Conversation conversation) {
225		return queryInProgress(conversation, null);
226	}
227
228	public void processFinLegacy(Element fin, Jid from) {
229		Query query = findQuery(fin.getAttribute("queryid"));
230		if (query != null && query.validFrom(from)) {
231			processFin(fin);
232		}
233	}
234
235	public void processFin(Element fin) {
236		Query query = findQuery(fin.getAttribute("queryid"));
237		if (query == null) {
238			return;
239		}
240		boolean complete = fin.getAttributeAsBoolean("complete");
241		Element set = fin.findChild("set","http://jabber.org/protocol/rsm");
242		Element last = set == null ? null : set.findChild("last");
243		Element first = set == null ? null : set.findChild("first");
244		Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
245		boolean abort = (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
246		if (query.getConversation() != null) {
247			query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
248		}
249		if (complete || relevant == null || abort) {
250			final boolean done = (complete || query.getActualMessageCount() == 0) && !query.isCatchup();
251			this.finalizeQuery(query, done);
252			Log.d(Config.LOGTAG,query.getAccount().getJid().toBareJid()+": finished mam after "+query.getTotalCount()+"("+query.getActualMessageCount()+") messages. messages left="+Boolean.toString(!done));
253			if (query.isCatchup() && query.getActualMessageCount() > 0) {
254				mXmppConnectionService.getNotificationService().finishBacklog(true,query.getAccount());
255			}
256		} else {
257			final Query nextQuery;
258			if (query.getPagingOrder() == PagingOrder.NORMAL) {
259				nextQuery = query.next(last == null ? null : last.getContent());
260			} else {
261				nextQuery = query.prev(first == null ? null : first.getContent());
262			}
263			this.execute(nextQuery);
264			this.finalizeQuery(query, false);
265			synchronized (this.queries) {
266				this.queries.add(nextQuery);
267			}
268		}
269	}
270
271	public Query findQuery(String id) {
272		if (id == null) {
273			return null;
274		}
275		synchronized (this.queries) {
276			for(Query query : this.queries) {
277				if (query.getQueryId().equals(id)) {
278					return query;
279				}
280			}
281			return null;
282		}
283	}
284
285	@Override
286	public void onAdvancedStreamFeaturesAvailable(Account account) {
287		if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
288			this.catchup(account);
289		}
290	}
291
292	public class Query {
293		private int totalCount = 0;
294		private int actualCount = 0;
295		private long start;
296		private long end;
297		private String queryId;
298		private String reference = null;
299		private Account account;
300		private Conversation conversation;
301		private PagingOrder pagingOrder = PagingOrder.NORMAL;
302		private XmppConnectionService.OnMoreMessagesLoaded callback = null;
303		private boolean catchup = true;
304
305
306		public Query(Conversation conversation, long start, long end) {
307			this(conversation.getAccount(), start, end);
308			this.conversation = conversation;
309		}
310
311		public Query(Conversation conversation, long start, long end, boolean catchup) {
312			this(conversation,start,end);
313			this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
314			this.catchup = catchup;
315		}
316
317		public Query(Account account, long start, long end) {
318			this.account = account;
319			this.start = start;
320			this.end = end;
321			this.queryId = new BigInteger(50, mXmppConnectionService.getRNG()).toString(32);
322		}
323		
324		private Query page(String reference) {
325			Query query = new Query(this.account,this.start,this.end);
326			query.reference = reference;
327			query.conversation = conversation;
328			query.totalCount = totalCount;
329			query.actualCount = actualCount;
330			query.callback = callback;
331			query.catchup = catchup;
332			return query;
333		}
334
335		public boolean isLegacy() {
336			if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
337				return account.getXmppConnection().getFeatures().mamLegacy();
338			} else {
339				return conversation.getMucOptions().mamLegacy();
340			}
341		}
342
343		public Query next(String reference) {
344			Query query = page(reference);
345			query.pagingOrder = PagingOrder.NORMAL;
346			return query;
347		}
348
349		public Query prev(String reference) {
350			Query query = page(reference);
351			query.pagingOrder = PagingOrder.REVERSE;
352			return query;
353		}
354
355		public String getReference() {
356			return reference;
357		}
358
359		public PagingOrder getPagingOrder() {
360			return this.pagingOrder;
361		}
362
363		public String getQueryId() {
364			return queryId;
365		}
366
367		public Jid getWith() {
368			return conversation == null ? null : conversation.getJid().toBareJid();
369		}
370
371		public boolean muc() {
372			return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
373		}
374
375		public long getStart() {
376			return start;
377		}
378
379		public boolean isCatchup() {
380			return catchup;
381		}
382
383		public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
384			this.callback = callback;
385		}
386
387		public void callback(boolean done) {
388			if (this.callback != null) {
389				this.callback.onMoreMessagesLoaded(actualCount,conversation);
390				if (done) {
391					this.callback.informUser(R.string.no_more_history_on_server);
392				}
393			}
394		}
395
396		public long getEnd() {
397			return end;
398		}
399
400		public Conversation getConversation() {
401			return conversation;
402		}
403
404		public Account getAccount() {
405			return this.account;
406		}
407
408		public void incrementMessageCount() {
409			this.totalCount++;
410		}
411
412		public void incrementActualMessageCount() {
413			this.actualCount++;
414		}
415
416		public int getTotalCount() {
417			return this.totalCount;
418		}
419
420		public int getActualMessageCount() {
421			return this.actualCount;
422		}
423
424		public boolean validFrom(Jid from) {
425			if (muc()) {
426				return getWith().equals(from);
427			} else {
428				return (from == null) || account.getJid().toBareJid().equals(from.toBareJid());
429			}
430		}
431
432		@Override
433		public String toString() {
434			StringBuilder builder = new StringBuilder();
435			if (this.muc()) {
436				builder.append("to=");
437				builder.append(this.getWith().toString());
438			} else {
439				builder.append("with=");
440				if (this.getWith() == null) {
441					builder.append("*");
442				} else {
443					builder.append(getWith().toString());
444				}
445			}
446			builder.append(", start=");
447			builder.append(AbstractGenerator.getTimestamp(this.start));
448			builder.append(", end=");
449			builder.append(AbstractGenerator.getTimestamp(this.end));
450			builder.append(", order="+pagingOrder.toString());
451			if (this.reference!=null) {
452				if (this.pagingOrder == PagingOrder.NORMAL) {
453					builder.append(", after=");
454				} else {
455					builder.append(", before=");
456				}
457				builder.append(this.reference);
458			}
459			builder.append(", catchup="+Boolean.toString(catchup));
460			return builder.toString();
461		}
462
463		public boolean hasCallback() {
464			return this.callback != null;
465		}
466	}
467}