MessageArchiveService.java

  1package eu.siacs.conversations.services;
  2
  3import android.util.Log;
  4import android.util.Pair;
  5
  6import java.math.BigInteger;
  7import java.util.ArrayList;
  8import java.util.HashSet;
  9import java.util.Iterator;
 10import java.util.List;
 11
 12import eu.siacs.conversations.Config;
 13import eu.siacs.conversations.R;
 14import eu.siacs.conversations.entities.Account;
 15import eu.siacs.conversations.entities.Conversation;
 16import eu.siacs.conversations.generator.AbstractGenerator;
 17import eu.siacs.conversations.xml.Namespace;
 18import eu.siacs.conversations.xml.Element;
 19import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
 20import eu.siacs.conversations.xmpp.OnIqPacketReceived;
 21import eu.siacs.conversations.xmpp.jid.Jid;
 22import eu.siacs.conversations.xmpp.stanzas.IqPacket;
 23
 24public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
 25
 26	private final XmppConnectionService mXmppConnectionService;
 27
 28	private final HashSet<Query> queries = new HashSet<>();
 29	private final ArrayList<Query> pendingQueries = new ArrayList<>();
 30
 31	public enum PagingOrder {
 32		NORMAL,
 33		REVERSE
 34	}
 35
 36	public MessageArchiveService(final XmppConnectionService service) {
 37		this.mXmppConnectionService = service;
 38	}
 39
 40	private void catchup(final Account account) {
 41		synchronized (this.queries) {
 42			for(Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext();) {
 43				Query query = iterator.next();
 44				if (query.getAccount() == account) {
 45					iterator.remove();
 46				}
 47			}
 48		}
 49		final Pair<Long,String> lastMessageReceived = mXmppConnectionService.databaseBackend.getLastMessageReceived(account);
 50		final Pair<Long,String> lastClearDate = mXmppConnectionService.databaseBackend.getLastClearDate(account);
 51		long startCatchup;
 52		final String reference;
 53		if (lastMessageReceived != null && lastMessageReceived.first >= lastClearDate.first) {
 54			startCatchup = lastMessageReceived.first;
 55			reference = lastMessageReceived.second;
 56		} else {
 57			startCatchup = lastClearDate.first;
 58			reference = null;
 59		}
 60		startCatchup = Math.max(startCatchup,mXmppConnectionService.getAutomaticMessageDeletionDate());
 61		long endCatchup = account.getXmppConnection().getLastSessionEstablished();
 62		final Query query;
 63		if (startCatchup == 0) {
 64			return;
 65		} else if (endCatchup - startCatchup >= Config.MAM_MAX_CATCHUP) {
 66			startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
 67			List<Conversation> conversations = mXmppConnectionService.getConversations();
 68			for (Conversation conversation : conversations) {
 69				if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted()) {
 70					this.query(conversation,startCatchup);
 71				}
 72			}
 73			query = new Query(account, startCatchup, endCatchup);
 74		} else {
 75			query = new Query(account, startCatchup, endCatchup);
 76			query.reference = reference;
 77		}
 78		this.queries.add(query);
 79		this.execute(query);
 80	}
 81
 82	public void catchupMUC(final Conversation conversation) {
 83		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
 84			query(conversation,
 85					0,
 86					System.currentTimeMillis());
 87		} else {
 88			query(conversation,
 89					conversation.getLastMessageTransmitted(),
 90					System.currentTimeMillis());
 91		}
 92	}
 93
 94	public Query query(final Conversation conversation) {
 95		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
 96			return query(conversation,
 97					0,
 98					System.currentTimeMillis());
 99		} else {
100			return query(conversation,
101					conversation.getLastMessageTransmitted(),
102					conversation.getAccount().getXmppConnection().getLastSessionEstablished());
103		}
104	}
105
106	public Query query(final Conversation conversation, long end) {
107		return this.query(conversation,conversation.getLastMessageTransmitted(),end);
108	}
109
110	public Query query(Conversation conversation, long start, long end) {
111		synchronized (this.queries) {
112			final Query query;
113			final long startActual = Math.max(start,mXmppConnectionService.getAutomaticMessageDeletionDate());
114			if (start==0) {
115				query = new Query(conversation, startActual, end, false);
116				query.reference = conversation.getFirstMamReference();
117			} else {
118				long maxCatchup = Math.max(startActual,System.currentTimeMillis() - Config.MAM_MAX_CATCHUP);
119				if (maxCatchup > startActual) {
120					Query reverseCatchup = new Query(conversation,startActual,maxCatchup,false);
121					this.queries.add(reverseCatchup);
122					this.execute(reverseCatchup);
123				}
124				query = new Query(conversation, maxCatchup, end);
125			}
126			if (start > end) {
127				return null;
128			}
129			this.queries.add(query);
130			this.execute(query);
131			return query;
132		}
133	}
134
135	public void executePendingQueries(final Account account) {
136		List<Query> pending = new ArrayList<>();
137		synchronized(this.pendingQueries) {
138			for(Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext();) {
139				Query query = iterator.next();
140				if (query.getAccount() == account) {
141					pending.add(query);
142					iterator.remove();
143				}
144			}
145		}
146		for(Query query : pending) {
147			this.execute(query);
148		}
149	}
150
151	private void execute(final Query query) {
152		final Account account=  query.getAccount();
153		if (account.getStatus() == Account.State.ONLINE) {
154			Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": running mam query " + query.toString());
155			IqPacket packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
156			this.mXmppConnectionService.sendIqPacket(account, packet, new OnIqPacketReceived() {
157				@Override
158				public void onIqPacketReceived(Account account, IqPacket packet) {
159					Element fin = packet.findChild("fin", Namespace.MAM);
160					if (packet.getType() == IqPacket.TYPE.TIMEOUT) {
161						synchronized (MessageArchiveService.this.queries) {
162							MessageArchiveService.this.queries.remove(query);
163							if (query.hasCallback()) {
164								query.callback(false);
165							}
166						}
167					} else if (packet.getType() == IqPacket.TYPE.RESULT && fin != null ) {
168						processFin(fin);
169					} else if (packet.getType() == IqPacket.TYPE.RESULT && query.isLegacy()) {
170						//do nothing
171					} else {
172						Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": error executing mam: " + packet.toString());
173						finalizeQuery(query, true);
174					}
175				}
176			});
177		} else {
178			synchronized (this.pendingQueries) {
179				this.pendingQueries.add(query);
180			}
181		}
182	}
183
184	private void finalizeQuery(Query query, boolean done) {
185		synchronized (this.queries) {
186			this.queries.remove(query);
187		}
188		final Conversation conversation = query.getConversation();
189		if (conversation != null) {
190			conversation.sort();
191			conversation.setHasMessagesLeftOnServer(!done);
192		} else {
193			for(Conversation tmp : this.mXmppConnectionService.getConversations()) {
194				if (tmp.getAccount() == query.getAccount()) {
195					tmp.sort();
196				}
197			}
198		}
199		if (query.hasCallback()) {
200			query.callback(done);
201		} else {
202			this.mXmppConnectionService.updateConversationUi();
203		}
204	}
205
206	public boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
207		synchronized (this.queries) {
208			for(Query query : queries) {
209				if (query.conversation == conversation) {
210					if (!query.hasCallback() && callback != null) {
211						query.setCallback(callback);
212					}
213					return true;
214				}
215			}
216			return false;
217		}
218	}
219
220	public boolean queryInProgress(Conversation conversation) {
221		return queryInProgress(conversation, null);
222	}
223
224	public void processFinLegacy(Element fin, Jid from) {
225		Query query = findQuery(fin.getAttribute("queryid"));
226		if (query != null && query.validFrom(from)) {
227			processFin(fin);
228		}
229	}
230
231	public void processFin(Element fin) {
232		Query query = findQuery(fin.getAttribute("queryid"));
233		if (query == null) {
234			return;
235		}
236		boolean complete = fin.getAttributeAsBoolean("complete");
237		Element set = fin.findChild("set","http://jabber.org/protocol/rsm");
238		Element last = set == null ? null : set.findChild("last");
239		Element first = set == null ? null : set.findChild("first");
240		Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
241		boolean abort = (!query.isCatchup() && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
242		if (query.getConversation() != null) {
243			query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
244		}
245		if (complete || relevant == null || abort) {
246			final boolean done = (complete || query.getActualMessageCount() == 0) && !query.isCatchup();
247			this.finalizeQuery(query, done);
248			Log.d(Config.LOGTAG,query.getAccount().getJid().toBareJid()+": finished mam after "+query.getTotalCount()+"("+query.getActualMessageCount()+") messages. messages left="+Boolean.toString(!done));
249			if (query.isCatchup() && query.getActualMessageCount() > 0) {
250				mXmppConnectionService.getNotificationService().finishBacklog(true,query.getAccount());
251			}
252		} else {
253			final Query nextQuery;
254			if (query.getPagingOrder() == PagingOrder.NORMAL) {
255				nextQuery = query.next(last == null ? null : last.getContent());
256			} else {
257				nextQuery = query.prev(first == null ? null : first.getContent());
258			}
259			this.execute(nextQuery);
260			this.finalizeQuery(query, false);
261			synchronized (this.queries) {
262				this.queries.add(nextQuery);
263			}
264		}
265	}
266
267	public Query findQuery(String id) {
268		if (id == null) {
269			return null;
270		}
271		synchronized (this.queries) {
272			for(Query query : this.queries) {
273				if (query.getQueryId().equals(id)) {
274					return query;
275				}
276			}
277			return null;
278		}
279	}
280
281	@Override
282	public void onAdvancedStreamFeaturesAvailable(Account account) {
283		if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
284			this.catchup(account);
285		}
286	}
287
288	public class Query {
289		private int totalCount = 0;
290		private int actualCount = 0;
291		private long start;
292		private long end;
293		private String queryId;
294		private String reference = null;
295		private Account account;
296		private Conversation conversation;
297		private PagingOrder pagingOrder = PagingOrder.NORMAL;
298		private XmppConnectionService.OnMoreMessagesLoaded callback = null;
299		private boolean catchup = true;
300
301
302		public Query(Conversation conversation, long start, long end) {
303			this(conversation.getAccount(), start, end);
304			this.conversation = conversation;
305		}
306
307		public Query(Conversation conversation, long start, long end, boolean catchup) {
308			this(conversation,start,end);
309			this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE;
310			this.catchup = catchup;
311		}
312
313		public Query(Account account, long start, long end) {
314			this.account = account;
315			this.start = start;
316			this.end = end;
317			this.queryId = new BigInteger(50, mXmppConnectionService.getRNG()).toString(32);
318		}
319		
320		private Query page(String reference) {
321			Query query = new Query(this.account,this.start,this.end);
322			query.reference = reference;
323			query.conversation = conversation;
324			query.totalCount = totalCount;
325			query.actualCount = actualCount;
326			query.callback = callback;
327			query.catchup = catchup;
328			return query;
329		}
330
331		public boolean isLegacy() {
332			if (conversation == null || conversation.getMode() == Conversation.MODE_SINGLE) {
333				return account.getXmppConnection().getFeatures().mamLegacy();
334			} else {
335				return conversation.getMucOptions().mamLegacy();
336			}
337		}
338
339		public Query next(String reference) {
340			Query query = page(reference);
341			query.pagingOrder = PagingOrder.NORMAL;
342			return query;
343		}
344
345		public Query prev(String reference) {
346			Query query = page(reference);
347			query.pagingOrder = PagingOrder.REVERSE;
348			return query;
349		}
350
351		public String getReference() {
352			return reference;
353		}
354
355		public PagingOrder getPagingOrder() {
356			return this.pagingOrder;
357		}
358
359		public String getQueryId() {
360			return queryId;
361		}
362
363		public Jid getWith() {
364			return conversation == null ? null : conversation.getJid().toBareJid();
365		}
366
367		public boolean muc() {
368			return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
369		}
370
371		public long getStart() {
372			return start;
373		}
374
375		public boolean isCatchup() {
376			return catchup;
377		}
378
379		public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
380			this.callback = callback;
381		}
382
383		public void callback(boolean done) {
384			if (this.callback != null) {
385				this.callback.onMoreMessagesLoaded(actualCount,conversation);
386				if (done) {
387					this.callback.informUser(R.string.no_more_history_on_server);
388				}
389			}
390		}
391
392		public long getEnd() {
393			return end;
394		}
395
396		public Conversation getConversation() {
397			return conversation;
398		}
399
400		public Account getAccount() {
401			return this.account;
402		}
403
404		public void incrementMessageCount() {
405			this.totalCount++;
406		}
407
408		public void incrementActualMessageCount() {
409			this.actualCount++;
410		}
411
412		public int getTotalCount() {
413			return this.totalCount;
414		}
415
416		public int getActualMessageCount() {
417			return this.actualCount;
418		}
419
420		public boolean validFrom(Jid from) {
421			if (muc()) {
422				return getWith().equals(from);
423			} else {
424				return (from == null) || account.getJid().toBareJid().equals(from.toBareJid());
425			}
426		}
427
428		@Override
429		public String toString() {
430			StringBuilder builder = new StringBuilder();
431			if (this.muc()) {
432				builder.append("to=");
433				builder.append(this.getWith().toString());
434			} else {
435				builder.append("with=");
436				if (this.getWith() == null) {
437					builder.append("*");
438				} else {
439					builder.append(getWith().toString());
440				}
441			}
442			builder.append(", start=");
443			builder.append(AbstractGenerator.getTimestamp(this.start));
444			builder.append(", end=");
445			builder.append(AbstractGenerator.getTimestamp(this.end));
446			builder.append(", order="+pagingOrder.toString());
447			if (this.reference!=null) {
448				if (this.pagingOrder == PagingOrder.NORMAL) {
449					builder.append(", after=");
450				} else {
451					builder.append(", before=");
452				}
453				builder.append(this.reference);
454			}
455			builder.append(", catchup="+Boolean.toString(catchup));
456			return builder.toString();
457		}
458
459		public boolean hasCallback() {
460			return this.callback != null;
461		}
462	}
463}