MessageArchiveService.java

  1package eu.siacs.conversations.services;
  2
  3import android.util.Log;
  4
  5import java.math.BigInteger;
  6import java.util.ArrayList;
  7import java.util.HashSet;
  8import java.util.Iterator;
  9import java.util.List;
 10
 11import eu.siacs.conversations.Config;
 12import eu.siacs.conversations.entities.Account;
 13import eu.siacs.conversations.entities.Conversation;
 14import eu.siacs.conversations.generator.AbstractGenerator;
 15import eu.siacs.conversations.xml.Element;
 16import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
 17import eu.siacs.conversations.xmpp.OnIqPacketReceived;
 18import eu.siacs.conversations.xmpp.jid.Jid;
 19import eu.siacs.conversations.xmpp.stanzas.IqPacket;
 20
 21public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
 22
 23	private final XmppConnectionService mXmppConnectionService;
 24
 25	private final HashSet<Query> queries = new HashSet<Query>();
 26	private ArrayList<Query> pendingQueries = new ArrayList<Query>();
 27
 28	public enum PagingOrder {
 29		NORMAL,
 30		REVERSE
 31	};
 32
 33	public MessageArchiveService(final XmppConnectionService service) {
 34		this.mXmppConnectionService = service;
 35	}
 36
 37	public void catchup(final Account account) {
 38		long startCatchup = getLastMessageTransmitted(account);
 39		long endCatchup = account.getXmppConnection().getLastSessionEstablished();
 40		if (startCatchup == 0) {
 41			return;
 42		} else if (endCatchup - startCatchup >= Config.MAM_MAX_CATCHUP) {
 43			startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
 44			List<Conversation> conversations = mXmppConnectionService.getConversations();
 45			for (Conversation conversation : conversations) {
 46				if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted()) {
 47					this.query(conversation,startCatchup);
 48				}
 49			}
 50		}
 51		final Query query = new Query(account, startCatchup, endCatchup);
 52		this.queries.add(query);
 53		this.execute(query);
 54	}
 55
 56	private long getLastMessageTransmitted(final Account account) {
 57		long timestamp = 0;
 58		for(final Conversation conversation : mXmppConnectionService.getConversations()) {
 59			if (conversation.getAccount() == account) {
 60				long tmp = conversation.getLastMessageTransmitted();
 61				if (tmp > timestamp) {
 62					timestamp = tmp;
 63				}
 64			}
 65		}
 66		return timestamp;
 67	}
 68
 69	public Query query(final Conversation conversation) {
 70		return query(conversation,conversation.getAccount().getXmppConnection().getLastSessionEstablished());
 71	}
 72
 73	public Query query(final Conversation conversation, long end) {
 74		return this.query(conversation,conversation.getLastMessageTransmitted(),end);
 75	}
 76
 77	public Query query(Conversation conversation, long start, long end) {
 78		synchronized (this.queries) {
 79			if (start > end) {
 80				return null;
 81			}
 82			final Query query = new Query(conversation, start, end,PagingOrder.REVERSE);
 83			this.queries.add(query);
 84			this.execute(query);
 85			return query;
 86		}
 87	}
 88
 89	public void executePendingQueries(final Account account) {
 90		List<Query> pending = new ArrayList<>();
 91		synchronized(this.pendingQueries) {
 92			for(Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext();) {
 93				Query query = iterator.next();
 94				if (query.getAccount() == account) {
 95					pending.add(query);
 96					iterator.remove();
 97				}
 98			}
 99		}
100		for(Query query : pending) {
101			this.execute(query);
102		}
103	}
104
105	private void execute(final Query query) {
106		final Account account=  query.getAccount();
107		if (account.getStatus() == Account.State.ONLINE) {
108			Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": running mam query " + query.toString());
109			IqPacket packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
110			this.mXmppConnectionService.sendIqPacket(account, packet, new OnIqPacketReceived() {
111				@Override
112				public void onIqPacketReceived(Account account, IqPacket packet) {
113					if (packet.getType() == IqPacket.TYPE_ERROR) {
114						Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": error executing mam: " + packet.toString());
115						finalizeQuery(query);
116					}
117				}
118			});
119		} else {
120			synchronized (this.pendingQueries) {
121				this.pendingQueries.add(query);
122			}
123		}
124	}
125
126	private void finalizeQuery(Query query) {
127		synchronized (this.queries) {
128			this.queries.remove(query);
129		}
130		final Conversation conversation = query.getConversation();
131		if (conversation != null) {
132			conversation.sort();
133			if (conversation.setLastMessageTransmitted(query.getEnd())) {
134				this.mXmppConnectionService.databaseBackend.updateConversation(conversation);
135			}
136			if (query.hasCallback()) {
137				query.callback();
138			} else {
139				this.mXmppConnectionService.updateConversationUi();
140			}
141		} else {
142			for(Conversation tmp : this.mXmppConnectionService.getConversations()) {
143				if (tmp.getAccount() == query.getAccount()) {
144					tmp.sort();
145					if (tmp.setLastMessageTransmitted(query.getEnd())) {
146						this.mXmppConnectionService.databaseBackend.updateConversation(tmp);
147					}
148				}
149			}
150		}
151	}
152
153	public boolean queryInProgress(Conversation conversation) {
154		synchronized (this.queries) {
155			for(Query query : queries) {
156				if (query.conversation == conversation) {
157					return true;
158				}
159			}
160			return false;
161		}
162	}
163
164	public void processFin(Element fin) {
165		if (fin == null) {
166			return;
167		}
168		Query query = findQuery(fin.getAttribute("queryid"));
169		if (query == null) {
170			return;
171		}
172		boolean complete = fin.getAttributeAsBoolean("complete");
173		Element set = fin.findChild("set","http://jabber.org/protocol/rsm");
174		Element last = set == null ? null : set.findChild("last");
175		Element first = set == null ? null : set.findChild("first");
176		Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
177		boolean abort = (query.getStart() == 0 && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
178		if (complete || relevant == null || abort) {
179			this.finalizeQuery(query);
180			Log.d(Config.LOGTAG,query.getAccount().getJid().toBareJid().toString()+": finished mam after "+query.getTotalCount()+" messages");
181		} else {
182			final Query nextQuery;
183			if (query.getPagingOrder() == PagingOrder.NORMAL) {
184				nextQuery = query.next(last == null ? null : last.getContent());
185			} else {
186				nextQuery = query.prev(first == null ? null : first.getContent());
187			}
188			this.execute(nextQuery);
189			this.finalizeQuery(query);
190			synchronized (this.queries) {
191				this.queries.remove(query);
192				this.queries.add(nextQuery);
193			}
194		}
195	}
196
197	public Query findQuery(String id) {
198		if (id == null) {
199			return null;
200		}
201		synchronized (this.queries) {
202			for(Query query : this.queries) {
203				if (query.getQueryId().equals(id)) {
204					return query;
205				}
206			}
207			return null;
208		}
209	}
210
211	@Override
212	public void onAdvancedStreamFeaturesAvailable(Account account) {
213		if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
214			this.catchup(account);
215		}
216	}
217
218	public class Query {
219		private int totalCount = 0;
220		private int count = 0;
221		private long start;
222		private long end;
223		private Jid with = null;
224		private String queryId;
225		private String reference = null;
226		private Account account;
227		private Conversation conversation;
228		private PagingOrder pagingOrder = PagingOrder.NORMAL;
229		private XmppConnectionService.OnMoreMessagesLoaded callback = null;
230
231
232		public Query(Conversation conversation, long start, long end) {
233			this(conversation.getAccount(), start, end);
234			this.conversation = conversation;
235			this.with = conversation.getContactJid().toBareJid();
236		}
237
238		public Query(Conversation conversation, long start, long end, PagingOrder order) {
239			this(conversation,start,end);
240			this.pagingOrder = order;
241		}
242
243		public Query(Account account, long start, long end) {
244			this.account = account;
245			this.start = start;
246			this.end = end;
247			this.queryId = new BigInteger(50, mXmppConnectionService.getRNG()).toString(32);
248		}
249
250		private Query page(String reference) {
251			Query query = new Query(this.account,this.start,this.end);
252			query.reference = reference;
253			query.conversation = conversation;
254			query.with = with;
255			query.totalCount = totalCount;
256			query.callback = callback;
257			return query;
258		}
259
260		public Query next(String reference) {
261			Query query = page(reference);
262			query.pagingOrder = PagingOrder.NORMAL;
263			return query;
264		}
265
266		public Query prev(String reference) {
267			Query query = page(reference);
268			query.pagingOrder = PagingOrder.REVERSE;
269			return query;
270		}
271
272		public String getReference() {
273			return reference;
274		}
275
276		public PagingOrder getPagingOrder() {
277			return this.pagingOrder;
278		}
279
280		public String getQueryId() {
281			return queryId;
282		}
283
284		public Jid getWith() {
285			return with;
286		}
287
288		public long getStart() {
289			return start;
290		}
291
292		public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
293			this.callback = callback;
294		}
295
296		public void callback() {
297			if (this.callback != null) {
298				this.callback.onMoreMessagesLoaded(count,conversation);
299			}
300		}
301
302		public long getEnd() {
303			return end;
304		}
305
306		public Conversation getConversation() {
307			return conversation;
308		}
309
310		public Account getAccount() {
311			return this.account;
312		}
313
314		public void incrementTotalCount() {
315			this.totalCount++;
316		}
317
318		public void incrementMessageCount() {
319			this.count++;
320		}
321
322		public int getTotalCount() {
323			return this.totalCount;
324		}
325
326		@Override
327		public String toString() {
328			StringBuilder builder = new StringBuilder();
329			builder.append("with=");
330			if (this.with==null) {
331				builder.append("*");
332			} else {
333				builder.append(with.toString());
334			}
335			builder.append(", start=");
336			builder.append(AbstractGenerator.getTimestamp(this.start));
337			builder.append(", end=");
338			builder.append(AbstractGenerator.getTimestamp(this.end));
339			if (this.reference!=null) {
340				if (this.pagingOrder == PagingOrder.NORMAL) {
341					builder.append(", after=");
342				} else {
343					builder.append(", before=");
344				}
345				builder.append(this.reference);
346			}
347			return builder.toString();
348		}
349
350		public boolean hasCallback() {
351			return this.callback != null;
352		}
353	}
354}