MessageArchiveService.java

  1package eu.siacs.conversations.services;
  2
  3import android.util.Log;
  4import android.util.Pair;
  5
  6import java.math.BigInteger;
  7import java.util.ArrayList;
  8import java.util.HashSet;
  9import java.util.Iterator;
 10import java.util.List;
 11
 12import eu.siacs.conversations.Config;
 13import eu.siacs.conversations.R;
 14import eu.siacs.conversations.entities.Account;
 15import eu.siacs.conversations.entities.Conversation;
 16import eu.siacs.conversations.generator.AbstractGenerator;
 17import eu.siacs.conversations.xml.Element;
 18import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
 19import eu.siacs.conversations.xmpp.OnIqPacketReceived;
 20import eu.siacs.conversations.xmpp.jid.Jid;
 21import eu.siacs.conversations.xmpp.stanzas.IqPacket;
 22
 23public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
 24
 25	private final XmppConnectionService mXmppConnectionService;
 26
 27	private final HashSet<Query> queries = new HashSet<>();
 28	private final ArrayList<Query> pendingQueries = new ArrayList<>();
 29
 30	public enum PagingOrder {
 31		NORMAL,
 32		REVERSE
 33	}
 34
 35	public MessageArchiveService(final XmppConnectionService service) {
 36		this.mXmppConnectionService = service;
 37	}
 38
 39	private void catchup(final Account account) {
 40		synchronized (this.queries) {
 41			for(Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext();) {
 42				Query query = iterator.next();
 43				if (query.getAccount() == account) {
 44					iterator.remove();
 45				}
 46			}
 47		}
 48		Pair<Long,String> pair = mXmppConnectionService.databaseBackend.getLastMessageReceived(account);
 49		long startCatchup = pair == null ? 0 : pair.first;
 50		long endCatchup = account.getXmppConnection().getLastSessionEstablished();
 51		final Query query;
 52		if (startCatchup == 0) {
 53			return;
 54		} else if (endCatchup - startCatchup >= Config.MAM_MAX_CATCHUP) {
 55			startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
 56			List<Conversation> conversations = mXmppConnectionService.getConversations();
 57			for (Conversation conversation : conversations) {
 58				if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted()) {
 59					this.query(conversation,startCatchup);
 60				}
 61			}
 62			query = new Query(account, startCatchup, endCatchup);
 63		} else {
 64			if (pair.second == null) {
 65				query = new Query(account, startCatchup, endCatchup);
 66			} else {
 67				query = new Query(account, pair.second, endCatchup);
 68			}
 69		}
 70		this.queries.add(query);
 71		this.execute(query);
 72	}
 73
 74	public void catchupMUC(final Conversation conversation) {
 75		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
 76			query(conversation,
 77					0,
 78					System.currentTimeMillis());
 79		} else {
 80			query(conversation,
 81					conversation.getLastMessageTransmitted(),
 82					System.currentTimeMillis());
 83		}
 84	}
 85
 86	public Query query(final Conversation conversation) {
 87		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
 88			return query(conversation,
 89					0,
 90					System.currentTimeMillis());
 91		} else {
 92			return query(conversation,
 93					conversation.getLastMessageTransmitted(),
 94					conversation.getAccount().getXmppConnection().getLastSessionEstablished());
 95		}
 96	}
 97
 98	public Query query(final Conversation conversation, long end) {
 99		return this.query(conversation,conversation.getLastMessageTransmitted(),end);
100	}
101
102	public Query query(Conversation conversation, long start, long end) {
103		synchronized (this.queries) {
104			if (start > end) {
105				return null;
106			}
107			final Query query = new Query(conversation, start, end,PagingOrder.REVERSE);
108			query.reference = conversation.getFirstMamReference();
109			this.queries.add(query);
110			this.execute(query);
111			return query;
112		}
113	}
114
115	public void executePendingQueries(final Account account) {
116		List<Query> pending = new ArrayList<>();
117		synchronized(this.pendingQueries) {
118			for(Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext();) {
119				Query query = iterator.next();
120				if (query.getAccount() == account) {
121					pending.add(query);
122					iterator.remove();
123				}
124			}
125		}
126		for(Query query : pending) {
127			this.execute(query);
128		}
129	}
130
131	private void execute(final Query query) {
132		final Account account=  query.getAccount();
133		if (account.getStatus() == Account.State.ONLINE) {
134			Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": running mam query " + query.toString());
135			IqPacket packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
136			this.mXmppConnectionService.sendIqPacket(account, packet, new OnIqPacketReceived() {
137				@Override
138				public void onIqPacketReceived(Account account, IqPacket packet) {
139					if (packet.getType() == IqPacket.TYPE.TIMEOUT) {
140						synchronized (MessageArchiveService.this.queries) {
141							MessageArchiveService.this.queries.remove(query);
142							if (query.hasCallback()) {
143								query.callback(false);
144							}
145						}
146					} else if (packet.getType() != IqPacket.TYPE.RESULT) {
147						Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": error executing mam: " + packet.toString());
148						finalizeQuery(query, true);
149					}
150				}
151			});
152		} else {
153			synchronized (this.pendingQueries) {
154				this.pendingQueries.add(query);
155			}
156		}
157	}
158
159	private void finalizeQuery(Query query, boolean done) {
160		synchronized (this.queries) {
161			this.queries.remove(query);
162		}
163		final Conversation conversation = query.getConversation();
164		if (conversation != null) {
165			conversation.sort();
166			conversation.setHasMessagesLeftOnServer(!done);
167		} else {
168			for(Conversation tmp : this.mXmppConnectionService.getConversations()) {
169				if (tmp.getAccount() == query.getAccount()) {
170					tmp.sort();
171				}
172			}
173		}
174		if (query.hasCallback()) {
175			query.callback(done);
176		} else {
177			this.mXmppConnectionService.updateConversationUi();
178		}
179	}
180
181	public boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
182		synchronized (this.queries) {
183			for(Query query : queries) {
184				if (query.conversation == conversation) {
185					if (!query.hasCallback() && callback != null) {
186						query.setCallback(callback);
187					}
188					return true;
189				}
190			}
191			return false;
192		}
193	}
194
195	public boolean queryInProgress(Conversation conversation) {
196		return queryInProgress(conversation, null);
197	}
198
199	public void processFin(Element fin, Jid from) {
200		if (fin == null) {
201			return;
202		}
203		Query query = findQuery(fin.getAttribute("queryid"));
204		if (query == null || !query.validFrom(from)) {
205			return;
206		}
207		boolean complete = fin.getAttributeAsBoolean("complete");
208		Element set = fin.findChild("set","http://jabber.org/protocol/rsm");
209		Element last = set == null ? null : set.findChild("last");
210		Element first = set == null ? null : set.findChild("first");
211		Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
212		boolean abort = (query.getStart() == 0 && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
213		if (query.getConversation() != null) {
214			query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
215		}
216		if (complete || relevant == null || abort) {
217			final boolean done = (complete || query.getMessageCount() == 0) && query.getStart() == 0;
218			this.finalizeQuery(query, done);
219			Log.d(Config.LOGTAG,query.getAccount().getJid().toBareJid()+": finished mam after "+query.getTotalCount()+" messages. messages left="+Boolean.toString(!done));
220			if (query.getWith() == null && query.getMessageCount() > 0) {
221				mXmppConnectionService.getNotificationService().finishBacklog(true);
222			}
223		} else {
224			final Query nextQuery;
225			if (query.getPagingOrder() == PagingOrder.NORMAL) {
226				nextQuery = query.next(last == null ? null : last.getContent());
227			} else {
228				nextQuery = query.prev(first == null ? null : first.getContent());
229			}
230			this.execute(nextQuery);
231			this.finalizeQuery(query, false);
232			synchronized (this.queries) {
233				this.queries.add(nextQuery);
234			}
235		}
236	}
237
238	public Query findQuery(String id) {
239		if (id == null) {
240			return null;
241		}
242		synchronized (this.queries) {
243			for(Query query : this.queries) {
244				if (query.getQueryId().equals(id)) {
245					return query;
246				}
247			}
248			return null;
249		}
250	}
251
252	@Override
253	public void onAdvancedStreamFeaturesAvailable(Account account) {
254		if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
255			this.catchup(account);
256		}
257	}
258
259	public class Query {
260		private int totalCount = 0;
261		private int messageCount = 0;
262		private long start;
263		private long end;
264		private String queryId;
265		private String reference = null;
266		private Account account;
267		private Conversation conversation;
268		private PagingOrder pagingOrder = PagingOrder.NORMAL;
269		private XmppConnectionService.OnMoreMessagesLoaded callback = null;
270
271
272		public Query(Conversation conversation, long start, long end) {
273			this(conversation.getAccount(), start, end);
274			this.conversation = conversation;
275		}
276
277		public Query(Conversation conversation, long start, long end, PagingOrder order) {
278			this(conversation,start,end);
279			this.pagingOrder = order;
280		}
281
282		public Query(Account account, long start, long end) {
283			this.account = account;
284			this.start = start;
285			this.end = end;
286			this.queryId = new BigInteger(50, mXmppConnectionService.getRNG()).toString(32);
287		}
288
289		public Query(Account account, String reference, long end) {
290			this.account = account;
291			this.reference = reference;
292			this.end = end;
293			this.queryId = new BigInteger(50, mXmppConnectionService.getRNG()).toString(32);
294		}
295
296		private Query page(String reference) {
297			Query query = new Query(this.account,this.start,this.end);
298			query.reference = reference;
299			query.conversation = conversation;
300			query.totalCount = totalCount;
301			query.callback = callback;
302			return query;
303		}
304
305		public Query next(String reference) {
306			Query query = page(reference);
307			query.pagingOrder = PagingOrder.NORMAL;
308			return query;
309		}
310
311		public Query prev(String reference) {
312			Query query = page(reference);
313			query.pagingOrder = PagingOrder.REVERSE;
314			return query;
315		}
316
317		public String getReference() {
318			return reference;
319		}
320
321		public PagingOrder getPagingOrder() {
322			return this.pagingOrder;
323		}
324
325		public String getQueryId() {
326			return queryId;
327		}
328
329		public Jid getWith() {
330			return conversation == null ? null : conversation.getJid().toBareJid();
331		}
332
333		public boolean muc() {
334			return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
335		}
336
337		public long getStart() {
338			return start;
339		}
340
341		public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
342			this.callback = callback;
343		}
344
345		public void callback(boolean done) {
346			if (this.callback != null) {
347				this.callback.onMoreMessagesLoaded(messageCount,conversation);
348				if (done) {
349					this.callback.informUser(R.string.no_more_history_on_server);
350				}
351			}
352		}
353
354		public long getEnd() {
355			return end;
356		}
357
358		public Conversation getConversation() {
359			return conversation;
360		}
361
362		public Account getAccount() {
363			return this.account;
364		}
365
366		public void incrementMessageCount() {
367			this.messageCount++;
368			this.totalCount++;
369		}
370
371		public int getTotalCount() {
372			return this.totalCount;
373		}
374
375		public int getMessageCount() {
376			return this.messageCount;
377		}
378
379		public boolean validFrom(Jid from) {
380			if (muc()) {
381				return getWith().equals(from);
382			} else {
383				return (from == null) || account.getJid().toBareJid().equals(from.toBareJid());
384			}
385		}
386
387		@Override
388		public String toString() {
389			StringBuilder builder = new StringBuilder();
390			if (this.muc()) {
391				builder.append("to=");
392				builder.append(this.getWith().toString());
393			} else {
394				builder.append("with=");
395				if (this.getWith() == null) {
396					builder.append("*");
397				} else {
398					builder.append(getWith().toString());
399				}
400			}
401			builder.append(", start=");
402			builder.append(AbstractGenerator.getTimestamp(this.start));
403			builder.append(", end=");
404			builder.append(AbstractGenerator.getTimestamp(this.end));
405			if (this.reference!=null) {
406				if (this.pagingOrder == PagingOrder.NORMAL) {
407					builder.append(", after=");
408				} else {
409					builder.append(", before=");
410				}
411				builder.append(this.reference);
412			}
413			return builder.toString();
414		}
415
416		public boolean hasCallback() {
417			return this.callback != null;
418		}
419	}
420}