MessageArchiveService.java

  1package eu.siacs.conversations.services;
  2
  3import android.util.Log;
  4import android.util.Pair;
  5
  6import java.math.BigInteger;
  7import java.util.ArrayList;
  8import java.util.HashSet;
  9import java.util.Iterator;
 10import java.util.List;
 11
 12import eu.siacs.conversations.Config;
 13import eu.siacs.conversations.R;
 14import eu.siacs.conversations.entities.Account;
 15import eu.siacs.conversations.entities.Conversation;
 16import eu.siacs.conversations.generator.AbstractGenerator;
 17import eu.siacs.conversations.utils.Xmlns;
 18import eu.siacs.conversations.xml.Element;
 19import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
 20import eu.siacs.conversations.xmpp.OnIqPacketReceived;
 21import eu.siacs.conversations.xmpp.jid.Jid;
 22import eu.siacs.conversations.xmpp.stanzas.IqPacket;
 23
 24public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
 25
 26	private final XmppConnectionService mXmppConnectionService;
 27
 28	private final HashSet<Query> queries = new HashSet<>();
 29	private final ArrayList<Query> pendingQueries = new ArrayList<>();
 30
 31	public enum PagingOrder {
 32		NORMAL,
 33		REVERSE
 34	}
 35
 36	public MessageArchiveService(final XmppConnectionService service) {
 37		this.mXmppConnectionService = service;
 38	}
 39
 40	private void catchup(final Account account) {
 41		synchronized (this.queries) {
 42			for(Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext();) {
 43				Query query = iterator.next();
 44				if (query.getAccount() == account) {
 45					iterator.remove();
 46				}
 47			}
 48		}
 49		final Pair<Long,String> lastMessageReceived = mXmppConnectionService.databaseBackend.getLastMessageReceived(account);
 50		final Pair<Long,String> lastClearDate = mXmppConnectionService.databaseBackend.getLastClearDate(account);
 51		long startCatchup;
 52		final String reference;
 53		if (lastMessageReceived != null && lastMessageReceived.first >= lastClearDate.first) {
 54			startCatchup = lastMessageReceived.first;
 55			reference = lastMessageReceived.second;
 56		} else {
 57			startCatchup = lastClearDate.first;
 58			reference = null;
 59		}
 60		startCatchup = Math.max(startCatchup,mXmppConnectionService.getAutomaticMessageDeletionDate());
 61		long endCatchup = account.getXmppConnection().getLastSessionEstablished();
 62		final Query query;
 63		if (startCatchup == 0) {
 64			return;
 65		} else if (endCatchup - startCatchup >= Config.MAM_MAX_CATCHUP) {
 66			startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
 67			List<Conversation> conversations = mXmppConnectionService.getConversations();
 68			for (Conversation conversation : conversations) {
 69				if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted()) {
 70					this.query(conversation,startCatchup);
 71				}
 72			}
 73			query = new Query(account, startCatchup, endCatchup);
 74		} else {
 75			query = new Query(account, startCatchup, endCatchup);
 76			query.reference = reference;
 77		}
 78		this.queries.add(query);
 79		this.execute(query);
 80	}
 81
 82	public void catchupMUC(final Conversation conversation) {
 83		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
 84			query(conversation,
 85					0,
 86					System.currentTimeMillis());
 87		} else {
 88			query(conversation,
 89					conversation.getLastMessageTransmitted(),
 90					System.currentTimeMillis());
 91		}
 92	}
 93
 94	public Query query(final Conversation conversation) {
 95		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
 96			return query(conversation,
 97					0,
 98					System.currentTimeMillis());
 99		} else {
100			return query(conversation,
101					conversation.getLastMessageTransmitted(),
102					conversation.getAccount().getXmppConnection().getLastSessionEstablished());
103		}
104	}
105
106	public Query query(final Conversation conversation, long end) {
107		return this.query(conversation,conversation.getLastMessageTransmitted(),end);
108	}
109
110	public Query query(Conversation conversation, long start, long end) {
111		synchronized (this.queries) {
112			final Query query;
113			final long startActual = Math.max(start,mXmppConnectionService.getAutomaticMessageDeletionDate());
114			if (start==0) {
115				query = new Query(conversation, startActual, end, false);
116				query.reference = conversation.getFirstMamReference();
117			} else {
118				long maxCatchup = Math.max(startActual,System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
119				if (maxCatchup > startActual) {
120					Query reverseCatchup = new Query(conversation,startActual,maxCatchup,false);
121					this.queries.add(reverseCatchup);
122					this.execute(reverseCatchup);
123				}
124				query = new Query(conversation, maxCatchup, end);
125			}
126			if (start > end) {
127				return null;
128			}
129			this.queries.add(query);
130			this.execute(query);
131			return query;
132		}
133	}
134
135	public void executePendingQueries(final Account account) {
136		List<Query> pending = new ArrayList<>();
137		synchronized(this.pendingQueries) {
138			for(Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext();) {
139				Query query = iterator.next();
140				if (query.getAccount() == account) {
141					pending.add(query);
142					iterator.remove();
143				}
144			}
145		}
146		for(Query query : pending) {
147			this.execute(query);
148		}
149	}
150
151	private void execute(final Query query) {
152		final Account account=  query.getAccount();
153		if (account.getStatus() == Account.State.ONLINE) {
154			Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": running mam query " + query.toString());
155			IqPacket packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
156			this.mXmppConnectionService.sendIqPacket(account, packet, new OnIqPacketReceived() {
157				@Override
158				public void onIqPacketReceived(Account account, IqPacket packet) {
159					Element fin = packet.findChild("fin", Xmlns.MAM);
160					if (packet.getType() == IqPacket.TYPE.TIMEOUT) {
161						synchronized (MessageArchiveService.this.queries) {
162							MessageArchiveService.this.queries.remove(query);
163							if (query.hasCallback()) {
164								query.callback(false);
165							}
166						}
167					} else if (packet.getType() == IqPacket.TYPE.RESULT && fin != null) {
168						processFin(fin);
169					} else {
170						Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": error executing mam: " + packet.toString());
171						finalizeQuery(query, true);
172					}
173				}
174			});
175		} else {
176			synchronized (this.pendingQueries) {
177				this.pendingQueries.add(query);
178			}
179		}
180	}
181
182	private void finalizeQuery(Query query, boolean done) {
183		synchronized (this.queries) {
184			this.queries.remove(query);
185		}
186		final Conversation conversation = query.getConversation();
187		if (conversation != null) {
188			conversation.sort();
189			conversation.setHasMessagesLeftOnServer(!done);
190		} else {
191			for(Conversation tmp : this.mXmppConnectionService.getConversations()) {
192				if (tmp.getAccount() == query.getAccount()) {
193					tmp.sort();
194				}
195			}
196		}
197		if (query.hasCallback()) {
198			query.callback(done);
199		} else {
200			this.mXmppConnectionService.updateConversationUi();
201		}
202	}
203
204	public boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
205		synchronized (this.queries) {
206			for(Query query : queries) {
207				if (query.conversation == conversation) {
208					if (!query.hasCallback() && callback != null) {
209						query.setCallback(callback);
210					}
211					return true;
212				}
213			}
214			return false;
215		}
216	}
217
218	public boolean queryInProgress(Conversation conversation) {
219		return queryInProgress(conversation, null);
220	}
221
222	public void processFin(Element fin) {
223		Query query = findQuery(fin.getAttribute("queryid"));
224		if (query == null) {
225			return;
226		}
227		boolean complete = fin.getAttributeAsBoolean("complete");
228		Element set = fin.findChild("set","http://jabber.org/protocol/rsm");
229		Element last = set == null ? null : set.findChild("last");
230		Element first = set == null ? null : set.findChild("first");
231		Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
232		boolean abort = (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
233		if (query.getConversation() != null) {
234			query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
235		}
236		if (complete || relevant == null || abort) {
237			final boolean done = (complete || query.getActualMessageCount() == 0) && !query.isCatchup();
238			this.finalizeQuery(query, done);
239			Log.d(Config.LOGTAG,query.getAccount().getJid().toBareJid()+": finished mam after "+query.getTotalCount()+"("+query.getActualMessageCount()+") messages. messages left="+Boolean.toString(!done));
240			if (query.getWith() == null && query.getActualMessageCount() > 0) {
241				mXmppConnectionService.getNotificationService().finishBacklog(true,query.getAccount());
242			}
243		} else {
244			final Query nextQuery;
245			if (query.getPagingOrder() == PagingOrder.NORMAL) {
246				nextQuery = query.next(last == null ? null : last.getContent());
247			} else {
248				nextQuery = query.prev(first == null ? null : first.getContent());
249			}
250			this.execute(nextQuery);
251			this.finalizeQuery(query, false);
252			synchronized (this.queries) {
253				this.queries.add(nextQuery);
254			}
255		}
256	}
257
258	public Query findQuery(String id) {
259		if (id == null) {
260			return null;
261		}
262		synchronized (this.queries) {
263			for(Query query : this.queries) {
264				if (query.getQueryId().equals(id)) {
265					return query;
266				}
267			}
268			return null;
269		}
270	}
271
272	@Override
273	public void onAdvancedStreamFeaturesAvailable(Account account) {
274		if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
275			this.catchup(account);
276		}
277	}
278
279	public class Query {
280		private int totalCount = 0;
281		private int actualCount = 0;
282		private long start;
283		private long end;
284		private String queryId;
285		private String reference = null;
286		private Account account;
287		private Conversation conversation;
288		private PagingOrder pagingOrder = PagingOrder.NORMAL;
289		private XmppConnectionService.OnMoreMessagesLoaded callback = null;
290		private boolean catchup = true;
291
292
293		public Query(Conversation conversation, long start, long end) {
294			this(conversation.getAccount(), start, end);
295			this.conversation = conversation;
296		}
297
298		public Query(Conversation conversation, long start, long end, boolean catchup) {
299			this(conversation,start,end);
300			this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
301			this.catchup = catchup;
302		}
303
304		public Query(Account account, long start, long end) {
305			this.account = account;
306			this.start = start;
307			this.end = end;
308			this.queryId = new BigInteger(50, mXmppConnectionService.getRNG()).toString(32);
309		}
310		
311		private Query page(String reference) {
312			Query query = new Query(this.account,this.start,this.end);
313			query.reference = reference;
314			query.conversation = conversation;
315			query.totalCount = totalCount;
316			query.actualCount = actualCount;
317			query.callback = callback;
318			query.catchup = catchup;
319			return query;
320		}
321
322		public Query next(String reference) {
323			Query query = page(reference);
324			query.pagingOrder = PagingOrder.NORMAL;
325			return query;
326		}
327
328		public Query prev(String reference) {
329			Query query = page(reference);
330			query.pagingOrder = PagingOrder.REVERSE;
331			return query;
332		}
333
334		public String getReference() {
335			return reference;
336		}
337
338		public PagingOrder getPagingOrder() {
339			return this.pagingOrder;
340		}
341
342		public String getQueryId() {
343			return queryId;
344		}
345
346		public Jid getWith() {
347			return conversation == null ? null : conversation.getJid().toBareJid();
348		}
349
350		public boolean muc() {
351			return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
352		}
353
354		public long getStart() {
355			return start;
356		}
357
358		public boolean isCatchup() {
359			return catchup;
360		}
361
362		public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
363			this.callback = callback;
364		}
365
366		public void callback(boolean done) {
367			if (this.callback != null) {
368				this.callback.onMoreMessagesLoaded(actualCount,conversation);
369				if (done) {
370					this.callback.informUser(R.string.no_more_history_on_server);
371				}
372			}
373		}
374
375		public long getEnd() {
376			return end;
377		}
378
379		public Conversation getConversation() {
380			return conversation;
381		}
382
383		public Account getAccount() {
384			return this.account;
385		}
386
387		public void incrementMessageCount() {
388			this.totalCount++;
389		}
390
391		public void incrementActualMessageCount() {
392			this.actualCount++;
393		}
394
395		public int getTotalCount() {
396			return this.totalCount;
397		}
398
399		public int getActualMessageCount() {
400			return this.actualCount;
401		}
402
403		public boolean validFrom(Jid from) {
404			if (muc()) {
405				return getWith().equals(from);
406			} else {
407				return (from == null) || account.getJid().toBareJid().equals(from.toBareJid());
408			}
409		}
410
411		@Override
412		public String toString() {
413			StringBuilder builder = new StringBuilder();
414			if (this.muc()) {
415				builder.append("to=");
416				builder.append(this.getWith().toString());
417			} else {
418				builder.append("with=");
419				if (this.getWith() == null) {
420					builder.append("*");
421				} else {
422					builder.append(getWith().toString());
423				}
424			}
425			builder.append(", start=");
426			builder.append(AbstractGenerator.getTimestamp(this.start));
427			builder.append(", end=");
428			builder.append(AbstractGenerator.getTimestamp(this.end));
429			builder.append(", order="+pagingOrder.toString());
430			if (this.reference!=null) {
431				if (this.pagingOrder == PagingOrder.NORMAL) {
432					builder.append(", after=");
433				} else {
434					builder.append(", before=");
435				}
436				builder.append(this.reference);
437			}
438			builder.append(", catchup="+Boolean.toString(catchup));
439			return builder.toString();
440		}
441
442		public boolean hasCallback() {
443			return this.callback != null;
444		}
445	}
446}