MessageArchiveService.java

  1package eu.siacs.conversations.services;
  2
  3import android.util.Log;
  4import android.util.Pair;
  5
  6import java.math.BigInteger;
  7import java.util.ArrayList;
  8import java.util.HashSet;
  9import java.util.Iterator;
 10import java.util.List;
 11
 12import eu.siacs.conversations.Config;
 13import eu.siacs.conversations.R;
 14import eu.siacs.conversations.entities.Account;
 15import eu.siacs.conversations.entities.Conversation;
 16import eu.siacs.conversations.generator.AbstractGenerator;
 17import eu.siacs.conversations.xml.Element;
 18import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
 19import eu.siacs.conversations.xmpp.OnIqPacketReceived;
 20import eu.siacs.conversations.xmpp.jid.Jid;
 21import eu.siacs.conversations.xmpp.stanzas.IqPacket;
 22
 23public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
 24
 25	private final XmppConnectionService mXmppConnectionService;
 26
 27	private final HashSet<Query> queries = new HashSet<>();
 28	private final ArrayList<Query> pendingQueries = new ArrayList<>();
 29
 30	public enum PagingOrder {
 31		NORMAL,
 32		REVERSE
 33	}
 34
 35	public MessageArchiveService(final XmppConnectionService service) {
 36		this.mXmppConnectionService = service;
 37	}
 38
 39	private void catchup(final Account account) {
 40		synchronized (this.queries) {
 41			for(Iterator<Query> iterator = this.queries.iterator(); iterator.hasNext();) {
 42				Query query = iterator.next();
 43				if (query.getAccount() == account) {
 44					iterator.remove();
 45				}
 46			}
 47		}
 48		final Pair<Long,String> lastMessageReceived = mXmppConnectionService.databaseBackend.getLastMessageReceived(account);
 49		final Pair<Long,String> lastClearDate = mXmppConnectionService.databaseBackend.getLastClearDate(account);
 50		long startCatchup;
 51		final String reference;
 52		if (lastMessageReceived != null && lastMessageReceived.first >= lastClearDate.first) {
 53			startCatchup = lastMessageReceived.first;
 54			reference = lastMessageReceived.second;
 55		} else {
 56			startCatchup = lastClearDate.first;
 57			reference = null;
 58		}
 59		long endCatchup = account.getXmppConnection().getLastSessionEstablished();
 60		final Query query;
 61		if (startCatchup == 0) {
 62			return;
 63		} else if (endCatchup - startCatchup >= Config.MAM_MAX_CATCHUP) {
 64			startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
 65			List<Conversation> conversations = mXmppConnectionService.getConversations();
 66			for (Conversation conversation : conversations) {
 67				if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted()) {
 68					this.query(conversation,startCatchup);
 69				}
 70			}
 71			query = new Query(account, startCatchup, endCatchup);
 72		} else {
 73			query = new Query(account, startCatchup, endCatchup);
 74			query.reference = reference;
 75		}
 76		this.queries.add(query);
 77		this.execute(query);
 78	}
 79
 80	public void catchupMUC(final Conversation conversation) {
 81		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
 82			query(conversation,
 83					0,
 84					System.currentTimeMillis());
 85		} else {
 86			query(conversation,
 87					conversation.getLastMessageTransmitted(),
 88					System.currentTimeMillis());
 89		}
 90	}
 91
 92	public Query query(final Conversation conversation) {
 93		if (conversation.getLastMessageTransmitted() < 0 && conversation.countMessages() == 0) {
 94			return query(conversation,
 95					0,
 96					System.currentTimeMillis());
 97		} else {
 98			return query(conversation,
 99					conversation.getLastMessageTransmitted(),
100					conversation.getAccount().getXmppConnection().getLastSessionEstablished());
101		}
102	}
103
104	public Query query(final Conversation conversation, long end) {
105		return this.query(conversation,conversation.getLastMessageTransmitted(),end);
106	}
107
108	public Query query(Conversation conversation, long start, long end) {
109		synchronized (this.queries) {
110			if (start > end) {
111				return null;
112			}
113			final Query query = new Query(conversation, start, end,PagingOrder.REVERSE);
114			if (start==0) {
115				query.reference = conversation.getFirstMamReference();
116			}
117			this.queries.add(query);
118			this.execute(query);
119			return query;
120		}
121	}
122
123	public void executePendingQueries(final Account account) {
124		List<Query> pending = new ArrayList<>();
125		synchronized(this.pendingQueries) {
126			for(Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext();) {
127				Query query = iterator.next();
128				if (query.getAccount() == account) {
129					pending.add(query);
130					iterator.remove();
131				}
132			}
133		}
134		for(Query query : pending) {
135			this.execute(query);
136		}
137	}
138
139	private void execute(final Query query) {
140		final Account account=  query.getAccount();
141		if (account.getStatus() == Account.State.ONLINE) {
142			Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": running mam query " + query.toString());
143			IqPacket packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
144			this.mXmppConnectionService.sendIqPacket(account, packet, new OnIqPacketReceived() {
145				@Override
146				public void onIqPacketReceived(Account account, IqPacket packet) {
147					if (packet.getType() == IqPacket.TYPE.TIMEOUT) {
148						synchronized (MessageArchiveService.this.queries) {
149							MessageArchiveService.this.queries.remove(query);
150							if (query.hasCallback()) {
151								query.callback(false);
152							}
153						}
154					} else if (packet.getType() != IqPacket.TYPE.RESULT) {
155						Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": error executing mam: " + packet.toString());
156						finalizeQuery(query, true);
157					}
158				}
159			});
160		} else {
161			synchronized (this.pendingQueries) {
162				this.pendingQueries.add(query);
163			}
164		}
165	}
166
167	private void finalizeQuery(Query query, boolean done) {
168		synchronized (this.queries) {
169			this.queries.remove(query);
170		}
171		final Conversation conversation = query.getConversation();
172		if (conversation != null) {
173			conversation.sort();
174			conversation.setHasMessagesLeftOnServer(!done);
175		} else {
176			for(Conversation tmp : this.mXmppConnectionService.getConversations()) {
177				if (tmp.getAccount() == query.getAccount()) {
178					tmp.sort();
179				}
180			}
181		}
182		if (query.hasCallback()) {
183			query.callback(done);
184		} else {
185			this.mXmppConnectionService.updateConversationUi();
186		}
187	}
188
189	public boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) {
190		synchronized (this.queries) {
191			for(Query query : queries) {
192				if (query.conversation == conversation) {
193					if (!query.hasCallback() && callback != null) {
194						query.setCallback(callback);
195					}
196					return true;
197				}
198			}
199			return false;
200		}
201	}
202
203	public boolean queryInProgress(Conversation conversation) {
204		return queryInProgress(conversation, null);
205	}
206
207	public void processFin(Element fin, Jid from) {
208		if (fin == null) {
209			return;
210		}
211		Query query = findQuery(fin.getAttribute("queryid"));
212		if (query == null || !query.validFrom(from)) {
213			return;
214		}
215		boolean complete = fin.getAttributeAsBoolean("complete");
216		Element set = fin.findChild("set","http://jabber.org/protocol/rsm");
217		Element last = set == null ? null : set.findChild("last");
218		Element first = set == null ? null : set.findChild("first");
219		Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
220		boolean abort = (query.getStart() == 0 && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
221		if (query.getConversation() != null) {
222			query.getConversation().setFirstMamReference(first == null ? null : first.getContent());
223		}
224		if (complete || relevant == null || abort) {
225			final boolean done = (complete || query.getMessageCount() == 0) && query.getStart() == 0;
226			this.finalizeQuery(query, done);
227			Log.d(Config.LOGTAG,query.getAccount().getJid().toBareJid()+": finished mam after "+query.getTotalCount()+" messages. messages left="+Boolean.toString(!done));
228			if (query.getWith() == null && query.getMessageCount() > 0) {
229				mXmppConnectionService.getNotificationService().finishBacklog(true,query.getAccount());
230			}
231		} else {
232			final Query nextQuery;
233			if (query.getPagingOrder() == PagingOrder.NORMAL) {
234				nextQuery = query.next(last == null ? null : last.getContent());
235			} else {
236				nextQuery = query.prev(first == null ? null : first.getContent());
237			}
238			this.execute(nextQuery);
239			this.finalizeQuery(query, false);
240			synchronized (this.queries) {
241				this.queries.add(nextQuery);
242			}
243		}
244	}
245
246	public Query findQuery(String id) {
247		if (id == null) {
248			return null;
249		}
250		synchronized (this.queries) {
251			for(Query query : this.queries) {
252				if (query.getQueryId().equals(id)) {
253					return query;
254				}
255			}
256			return null;
257		}
258	}
259
260	@Override
261	public void onAdvancedStreamFeaturesAvailable(Account account) {
262		if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
263			this.catchup(account);
264		}
265	}
266
267	public class Query {
268		private int totalCount = 0;
269		private int messageCount = 0;
270		private long start;
271		private long end;
272		private String queryId;
273		private String reference = null;
274		private Account account;
275		private Conversation conversation;
276		private PagingOrder pagingOrder = PagingOrder.NORMAL;
277		private XmppConnectionService.OnMoreMessagesLoaded callback = null;
278
279
280		public Query(Conversation conversation, long start, long end) {
281			this(conversation.getAccount(), start, end);
282			this.conversation = conversation;
283		}
284
285		public Query(Conversation conversation, long start, long end, PagingOrder order) {
286			this(conversation,start,end);
287			this.pagingOrder = order;
288		}
289
290		public Query(Account account, long start, long end) {
291			this.account = account;
292			this.start = start;
293			this.end = end;
294			this.queryId = new BigInteger(50, mXmppConnectionService.getRNG()).toString(32);
295		}
296		
297		private Query page(String reference) {
298			Query query = new Query(this.account,this.start,this.end);
299			query.reference = reference;
300			query.conversation = conversation;
301			query.totalCount = totalCount;
302			query.callback = callback;
303			return query;
304		}
305
306		public Query next(String reference) {
307			Query query = page(reference);
308			query.pagingOrder = PagingOrder.NORMAL;
309			return query;
310		}
311
312		public Query prev(String reference) {
313			Query query = page(reference);
314			query.pagingOrder = PagingOrder.REVERSE;
315			return query;
316		}
317
318		public String getReference() {
319			return reference;
320		}
321
322		public PagingOrder getPagingOrder() {
323			return this.pagingOrder;
324		}
325
326		public String getQueryId() {
327			return queryId;
328		}
329
330		public Jid getWith() {
331			return conversation == null ? null : conversation.getJid().toBareJid();
332		}
333
334		public boolean muc() {
335			return conversation != null && conversation.getMode() == Conversation.MODE_MULTI;
336		}
337
338		public long getStart() {
339			return start;
340		}
341
342		public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
343			this.callback = callback;
344		}
345
346		public void callback(boolean done) {
347			if (this.callback != null) {
348				this.callback.onMoreMessagesLoaded(messageCount,conversation);
349				if (done) {
350					this.callback.informUser(R.string.no_more_history_on_server);
351				}
352			}
353		}
354
355		public long getEnd() {
356			return end;
357		}
358
359		public Conversation getConversation() {
360			return conversation;
361		}
362
363		public Account getAccount() {
364			return this.account;
365		}
366
367		public void incrementMessageCount() {
368			this.messageCount++;
369			this.totalCount++;
370		}
371
372		public int getTotalCount() {
373			return this.totalCount;
374		}
375
376		public int getMessageCount() {
377			return this.messageCount;
378		}
379
380		public boolean validFrom(Jid from) {
381			if (muc()) {
382				return getWith().equals(from);
383			} else {
384				return (from == null) || account.getJid().toBareJid().equals(from.toBareJid());
385			}
386		}
387
388		@Override
389		public String toString() {
390			StringBuilder builder = new StringBuilder();
391			if (this.muc()) {
392				builder.append("to=");
393				builder.append(this.getWith().toString());
394			} else {
395				builder.append("with=");
396				if (this.getWith() == null) {
397					builder.append("*");
398				} else {
399					builder.append(getWith().toString());
400				}
401			}
402			builder.append(", start=");
403			builder.append(AbstractGenerator.getTimestamp(this.start));
404			builder.append(", end=");
405			builder.append(AbstractGenerator.getTimestamp(this.end));
406			if (this.reference!=null) {
407				if (this.pagingOrder == PagingOrder.NORMAL) {
408					builder.append(", after=");
409				} else {
410					builder.append(", before=");
411				}
412				builder.append(this.reference);
413			}
414			return builder.toString();
415		}
416
417		public boolean hasCallback() {
418			return this.callback != null;
419		}
420	}
421}