MessageArchiveService.java

  1package eu.siacs.conversations.services;
  2
  3import android.util.Log;
  4import android.util.Pair;
  5
  6import java.math.BigInteger;
  7import java.util.ArrayList;
  8import java.util.HashSet;
  9import java.util.Iterator;
 10import java.util.List;
 11
 12import eu.siacs.conversations.Config;
 13import eu.siacs.conversations.R;
 14import eu.siacs.conversations.entities.Account;
 15import eu.siacs.conversations.entities.Conversation;
 16import eu.siacs.conversations.generator.AbstractGenerator;
 17import eu.siacs.conversations.xml.Element;
 18import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
 19import eu.siacs.conversations.xmpp.OnIqPacketReceived;
 20import eu.siacs.conversations.xmpp.jid.Jid;
 21import eu.siacs.conversations.xmpp.stanzas.IqPacket;
 22
 23public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
 24
 25	private final XmppConnectionService mXmppConnectionService;
 26
 27	private final HashSet<Query> queries = new HashSet<>();
 28	private final ArrayList<Query> pendingQueries = new ArrayList<>();
 29
 30	public enum PagingOrder {
 31		NORMAL,
 32		REVERSE
 33	}
 34
 35	public MessageArchiveService(final XmppConnectionService service) {
 36		this.mXmppConnectionService = service;
 37	}
 38
 39	private void catchup(final Account account) {
 40		synchronized (this.queries) {
 41			for(Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext();) {
 42				Query query = iterator.next();
 43				if (query.getAccount() == account) {
 44					iterator.remove();
 45				}
 46			}
 47		}
 48		long startCatchup = getLastMessageTransmitted(account);
 49		long endCatchup = account.getXmppConnection().getLastSessionEstablished();
 50		if (startCatchup == 0) {
 51			return;
 52		} else if (endCatchup - startCatchup >= Config.MAM_MAX_CATCHUP) {
 53			startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
 54			List<Conversation> conversations = mXmppConnectionService.getConversations();
 55			for (Conversation conversation : conversations) {
 56				if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted()) {
 57					this.query(conversation,startCatchup);
 58				}
 59			}
 60		}
 61		final Query query = new Query(account, startCatchup, endCatchup);
 62		this.queries.add(query);
 63		this.execute(query);
 64	}
 65
 66	public void catchupMUC(final Conversation conversation) {
 67		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
 68			query(conversation,
 69					0,
 70					System.currentTimeMillis());
 71		} else {
 72			query(conversation,
 73					conversation.getLastMessageTransmitted(),
 74					System.currentTimeMillis());
 75		}
 76	}
 77
 78	private long getLastMessageTransmitted(final Account account) {
 79		Pair<Long,String> pair = mXmppConnectionService.databaseBackend.getLastMessageReceived(account);
 80		return pair == null ? 0 : pair.first;
 81	}
 82
 83	public Query query(final Conversation conversation) {
 84		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
 85			return query(conversation,
 86					0,
 87					System.currentTimeMillis());
 88		} else {
 89			return query(conversation,
 90					conversation.getLastMessageTransmitted(),
 91					conversation.getAccount().getXmppConnection().getLastSessionEstablished());
 92		}
 93	}
 94
 95	public Query query(final Conversation conversation, long end) {
 96		return this.query(conversation,conversation.getLastMessageTransmitted(),end);
 97	}
 98
 99	public Query query(Conversation conversation, long start, long end) {
100		synchronized (this.queries) {
101			if (start > end) {
102				return null;
103			}
104			final Query query = new Query(conversation, start, end,PagingOrder.REVERSE);
105			query.reference = conversation.getFirstMamReference();
106			this.queries.add(query);
107			this.execute(query);
108			return query;
109		}
110	}
111
112	public void executePendingQueries(final Account account) {
113		List<Query> pending = new ArrayList<>();
114		synchronized(this.pendingQueries) {
115			for(Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext();) {
116				Query query = iterator.next();
117				if (query.getAccount() == account) {
118					pending.add(query);
119					iterator.remove();
120				}
121			}
122		}
123		for(Query query : pending) {
124			this.execute(query);
125		}
126	}
127
128	private void execute(final Query query) {
129		final Account account=  query.getAccount();
130		if (account.getStatus() == Account.State.ONLINE) {
131			Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": running mam query " + query.toString());
132			IqPacket packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
133			this.mXmppConnectionService.sendIqPacket(account, packet, new OnIqPacketReceived() {
134				@Override
135				public void onIqPacketReceived(Account account, IqPacket packet) {
136					if (packet.getType() == IqPacket.TYPE.TIMEOUT) {
137						synchronized (MessageArchiveService.this.queries) {
138							MessageArchiveService.this.queries.remove(query);
139							if (query.hasCallback()) {
140								query.callback(false);
141							}
142						}
143					} else if (packet.getType() != IqPacket.TYPE.RESULT) {
144						Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": error executing mam: " + packet.toString());
145						finalizeQuery(query, true);
146					}
147				}
148			});
149		} else {
150			synchronized (this.pendingQueries) {
151				this.pendingQueries.add(query);
152			}
153		}
154	}
155
156	private void finalizeQuery(Query query, boolean done) {
157		synchronized (this.queries) {
158			this.queries.remove(query);
159		}
160		final Conversation conversation = query.getConversation();
161		if (conversation != null) {
162			conversation.sort();
163			conversation.setHasMessagesLeftOnServer(!done);
164		} else {
165			for(Conversation tmp : this.mXmppConnectionService.getConversations()) {
166				if (tmp.getAccount() == query.getAccount()) {
167					tmp.sort();
168				}
169			}
170		}
171		if (query.hasCallback()) {
172			query.callback(done);
173		} else {
174			this.mXmppConnectionService.updateConversationUi();
175		}
176	}
177
178	public boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
179		synchronized (this.queries) {
180			for(Query query : queries) {
181				if (query.conversation == conversation) {
182					if (!query.hasCallback() && callback != null) {
183						query.setCallback(callback);
184					}
185					return true;
186				}
187			}
188			return false;
189		}
190	}
191
192	public void processFin(Element fin, Jid from) {
193		if (fin == null) {
194			return;
195		}
196		Query query = findQuery(fin.getAttribute("queryid"));
197		if (query == null || !query.validFrom(from)) {
198			return;
199		}
200		boolean complete = fin.getAttributeAsBoolean("complete");
201		Element set = fin.findChild("set","http://jabber.org/protocol/rsm");
202		Element last = set == null ? null : set.findChild("last");
203		Element first = set == null ? null : set.findChild("first");
204		Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
205		boolean abort = (query.getStart() == 0 && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
206		if (query.getConversation() != null) {
207			query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
208		}
209		if (complete || relevant == null || abort) {
210			final boolean done = (complete || query.getMessageCount() == 0) && query.getStart() == 0;
211			this.finalizeQuery(query, done);
212			Log.d(Config.LOGTAG,query.getAccount().getJid().toBareJid()+": finished mam after "+query.getTotalCount()+" messages. messages left="+Boolean.toString(!done));
213			if (query.getWith() == null && query.getMessageCount() > 0) {
214				mXmppConnectionService.getNotificationService().finishBacklog(true);
215			}
216		} else {
217			final Query nextQuery;
218			if (query.getPagingOrder() == PagingOrder.NORMAL) {
219				nextQuery = query.next(last == null ? null : last.getContent());
220			} else {
221				nextQuery = query.prev(first == null ? null : first.getContent());
222			}
223			this.execute(nextQuery);
224			this.finalizeQuery(query, false);
225			synchronized (this.queries) {
226				this.queries.remove(query);
227				this.queries.add(nextQuery);
228			}
229		}
230	}
231
232	public Query findQuery(String id) {
233		if (id == null) {
234			return null;
235		}
236		synchronized (this.queries) {
237			for(Query query : this.queries) {
238				if (query.getQueryId().equals(id)) {
239					return query;
240				}
241			}
242			return null;
243		}
244	}
245
246	@Override
247	public void onAdvancedStreamFeaturesAvailable(Account account) {
248		if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
249			this.catchup(account);
250		}
251	}
252
253	public class Query {
254		private int totalCount = 0;
255		private int messageCount = 0;
256		private long start;
257		private long end;
258		private String queryId;
259		private String reference = null;
260		private Account account;
261		private Conversation conversation;
262		private PagingOrder pagingOrder = PagingOrder.NORMAL;
263		private XmppConnectionService.OnMoreMessagesLoaded callback = null;
264
265
266		public Query(Conversation conversation, long start, long end) {
267			this(conversation.getAccount(), start, end);
268			this.conversation = conversation;
269		}
270
271		public Query(Conversation conversation, long start, long end, PagingOrder order) {
272			this(conversation,start,end);
273			this.pagingOrder = order;
274		}
275
276		public Query(Account account, long start, long end) {
277			this.account = account;
278			this.start = start;
279			this.end = end;
280			this.queryId = new BigInteger(50, mXmppConnectionService.getRNG()).toString(32);
281		}
282
283		private Query page(String reference) {
284			Query query = new Query(this.account,this.start,this.end);
285			query.reference = reference;
286			query.conversation = conversation;
287			query.totalCount = totalCount;
288			query.callback = callback;
289			return query;
290		}
291
292		public Query next(String reference) {
293			Query query = page(reference);
294			query.pagingOrder = PagingOrder.NORMAL;
295			return query;
296		}
297
298		public Query prev(String reference) {
299			Query query = page(reference);
300			query.pagingOrder = PagingOrder.REVERSE;
301			return query;
302		}
303
304		public String getReference() {
305			return reference;
306		}
307
308		public PagingOrder getPagingOrder() {
309			return this.pagingOrder;
310		}
311
312		public String getQueryId() {
313			return queryId;
314		}
315
316		public Jid getWith() {
317			return conversation == null ? null : conversation.getJid().toBareJid();
318		}
319
320		public boolean muc() {
321			return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
322		}
323
324		public long getStart() {
325			return start;
326		}
327
328		public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
329			this.callback = callback;
330		}
331
332		public void callback(boolean done) {
333			if (this.callback != null) {
334				this.callback.onMoreMessagesLoaded(messageCount,conversation);
335				if (done) {
336					this.callback.informUser(R.string.no_more_history_on_server);
337				}
338			}
339		}
340
341		public long getEnd() {
342			return end;
343		}
344
345		public Conversation getConversation() {
346			return conversation;
347		}
348
349		public Account getAccount() {
350			return this.account;
351		}
352
353		public void incrementMessageCount() {
354			this.messageCount++;
355			this.totalCount++;
356		}
357
358		public int getTotalCount() {
359			return this.totalCount;
360		}
361
362		public int getMessageCount() {
363			return this.messageCount;
364		}
365
366		public boolean validFrom(Jid from) {
367			if (muc()) {
368				return getWith().equals(from);
369			} else {
370				return (from == null) || account.getJid().toBareJid().equals(from.toBareJid());
371			}
372		}
373
374		@Override
375		public String toString() {
376			StringBuilder builder = new StringBuilder();
377			if (this.muc()) {
378				builder.append("to=");
379				builder.append(this.getWith().toString());
380			} else {
381				builder.append("with=");
382				if (this.getWith() == null) {
383					builder.append("*");
384				} else {
385					builder.append(getWith().toString());
386				}
387			}
388			builder.append(", start=");
389			builder.append(AbstractGenerator.getTimestamp(this.start));
390			builder.append(", end=");
391			builder.append(AbstractGenerator.getTimestamp(this.end));
392			if (this.reference!=null) {
393				if (this.pagingOrder == PagingOrder.NORMAL) {
394					builder.append(", after=");
395				} else {
396					builder.append(", before=");
397				}
398				builder.append(this.reference);
399			}
400			return builder.toString();
401		}
402
403		public boolean hasCallback() {
404			return this.callback != null;
405		}
406	}
407}