MessageArchiveService.java

  1package eu.siacs.conversations.services;
  2
  3import android.util.Log;
  4
  5import java.math.BigInteger;
  6import java.util.ArrayList;
  7import java.util.HashSet;
  8import java.util.Iterator;
  9import java.util.List;
 10
 11import eu.siacs.conversations.Config;
 12import eu.siacs.conversations.R;
 13import eu.siacs.conversations.entities.Account;
 14import eu.siacs.conversations.entities.Conversation;
 15import eu.siacs.conversations.generator.AbstractGenerator;
 16import eu.siacs.conversations.xml.Element;
 17import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded;
 18import eu.siacs.conversations.xmpp.OnIqPacketReceived;
 19import eu.siacs.conversations.xmpp.jid.Jid;
 20import eu.siacs.conversations.xmpp.stanzas.IqPacket;
 21
 22public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded {
 23
 24	private final XmppConnectionService mXmppConnectionService;
 25
 26	private final HashSet<Query> queries = new HashSet<Query>();
 27	private ArrayList<Query> pendingQueries = new ArrayList<Query>();
 28
 29	public enum PagingOrder {
 30		NORMAL,
 31		REVERSE
 32	};
 33
 34	public MessageArchiveService(final XmppConnectionService service) {
 35		this.mXmppConnectionService = service;
 36	}
 37
 38	public void catchup(final Account account) {
 39		long startCatchup = getLastMessageTransmitted(account);
 40		long endCatchup = account.getXmppConnection().getLastSessionEstablished();
 41		if (startCatchup == 0) {
 42			return;
 43		} else if (endCatchup - startCatchup >= Config.MAM_MAX_CATCHUP) {
 44			startCatchup = endCatchup - Config.MAM_MAX_CATCHUP;
 45			List<Conversation> conversations = mXmppConnectionService.getConversations();
 46			for (Conversation conversation : conversations) {
 47				if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted()) {
 48					this.query(conversation,startCatchup);
 49				}
 50			}
 51		}
 52		final Query query = new Query(account, startCatchup, endCatchup);
 53		this.queries.add(query);
 54		this.execute(query);
 55	}
 56
 57	private long getLastMessageTransmitted(final Account account) {
 58		long timestamp = 0;
 59		for(final Conversation conversation : mXmppConnectionService.getConversations()) {
 60			if (conversation.getAccount() == account) {
 61				long tmp = conversation.getLastMessageTransmitted();
 62				if (tmp > timestamp) {
 63					timestamp = tmp;
 64				}
 65			}
 66		}
 67		return timestamp;
 68	}
 69
 70	public Query query(final Conversation conversation) {
 71		return query(conversation,conversation.getAccount().getXmppConnection().getLastSessionEstablished());
 72	}
 73
 74	public Query query(final Conversation conversation, long end) {
 75		return this.query(conversation,conversation.getLastMessageTransmitted(),end);
 76	}
 77
 78	public Query query(Conversation conversation, long start, long end) {
 79		synchronized (this.queries) {
 80			if (start > end) {
 81				return null;
 82			}
 83			final Query query = new Query(conversation, start, end,PagingOrder.REVERSE);
 84			this.queries.add(query);
 85			this.execute(query);
 86			return query;
 87		}
 88	}
 89
 90	public void executePendingQueries(final Account account) {
 91		List<Query> pending = new ArrayList<>();
 92		synchronized(this.pendingQueries) {
 93			for(Iterator<Query> iterator = this.pendingQueries.iterator(); iterator.hasNext();) {
 94				Query query = iterator.next();
 95				if (query.getAccount() == account) {
 96					pending.add(query);
 97					iterator.remove();
 98				}
 99			}
100		}
101		for(Query query : pending) {
102			this.execute(query);
103		}
104	}
105
106	private void execute(final Query query) {
107		final Account account=  query.getAccount();
108		if (account.getStatus() == Account.State.ONLINE) {
109			Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": running mam query " + query.toString());
110			IqPacket packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query);
111			this.mXmppConnectionService.sendIqPacket(account, packet, new OnIqPacketReceived() {
112				@Override
113				public void onIqPacketReceived(Account account, IqPacket packet) {
114					if (packet.getType() == IqPacket.TYPE_ERROR) {
115						Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": error executing mam: " + packet.toString());
116						finalizeQuery(query);
117					}
118				}
119			});
120		} else {
121			synchronized (this.pendingQueries) {
122				this.pendingQueries.add(query);
123			}
124		}
125	}
126
127	private void finalizeQuery(Query query) {
128		synchronized (this.queries) {
129			this.queries.remove(query);
130		}
131		final Conversation conversation = query.getConversation();
132		if (conversation != null) {
133			conversation.sort();
134			if (conversation.setLastMessageTransmitted(query.getEnd())) {
135				this.mXmppConnectionService.databaseBackend.updateConversation(conversation);
136			}
137			if (query.hasCallback()) {
138				query.callback();
139			} else {
140				this.mXmppConnectionService.updateConversationUi();
141			}
142		} else {
143			for(Conversation tmp : this.mXmppConnectionService.getConversations()) {
144				if (tmp.getAccount() == query.getAccount()) {
145					tmp.sort();
146					if (tmp.setLastMessageTransmitted(query.getEnd())) {
147						this.mXmppConnectionService.databaseBackend.updateConversation(tmp);
148					}
149				}
150			}
151		}
152	}
153
154	public boolean queryInProgress(Conversation conversation) {
155		synchronized (this.queries) {
156			for(Query query : queries) {
157				if (query.conversation == conversation) {
158					return true;
159				}
160			}
161			return false;
162		}
163	}
164
165	public void processFin(Element fin) {
166		if (fin == null) {
167			return;
168		}
169		Query query = findQuery(fin.getAttribute("queryid"));
170		if (query == null) {
171			return;
172		}
173		boolean complete = fin.getAttributeAsBoolean("complete");
174		Element set = fin.findChild("set","http://jabber.org/protocol/rsm");
175		Element last = set == null ? null : set.findChild("last");
176		Element first = set == null ? null : set.findChild("first");
177		Element relevant = query.getPagingOrder() == PagingOrder.NORMAL ? last : first;
178		boolean abort = (query.getStart() == 0 && query.getTotalCount() >= Config.PAGE_SIZE) || query.getTotalCount() >= Config.MAM_MAX_MESSAGES;
179		if (complete || relevant == null || abort) {
180			this.finalizeQuery(query);
181			Log.d(Config.LOGTAG,query.getAccount().getJid().toBareJid().toString()+": finished mam after "+query.getTotalCount()+" messages");
182		} else {
183			final Query nextQuery;
184			if (query.getPagingOrder() == PagingOrder.NORMAL) {
185				nextQuery = query.next(last == null ? null : last.getContent());
186			} else {
187				nextQuery = query.prev(first == null ? null : first.getContent());
188			}
189			this.execute(nextQuery);
190			this.finalizeQuery(query);
191			synchronized (this.queries) {
192				this.queries.remove(query);
193				this.queries.add(nextQuery);
194			}
195		}
196	}
197
198	public Query findQuery(String id) {
199		if (id == null) {
200			return null;
201		}
202		synchronized (this.queries) {
203			for(Query query : this.queries) {
204				if (query.getQueryId().equals(id)) {
205					return query;
206				}
207			}
208			return null;
209		}
210	}
211
212	@Override
213	public void onAdvancedStreamFeaturesAvailable(Account account) {
214		if (account.getXmppConnection() != null && account.getXmppConnection().getFeatures().mam()) {
215			this.catchup(account);
216		}
217	}
218
219	public class Query {
220		private int totalCount = 0;
221		private int messageCount = 0;
222		private long start;
223		private long end;
224		private Jid with = null;
225		private String queryId;
226		private String reference = null;
227		private Account account;
228		private Conversation conversation;
229		private PagingOrder pagingOrder = PagingOrder.NORMAL;
230		private XmppConnectionService.OnMoreMessagesLoaded callback = null;
231
232
233		public Query(Conversation conversation, long start, long end) {
234			this(conversation.getAccount(), start, end);
235			this.conversation = conversation;
236			this.with = conversation.getContactJid().toBareJid();
237		}
238
239		public Query(Conversation conversation, long start, long end, PagingOrder order) {
240			this(conversation,start,end);
241			this.pagingOrder = order;
242		}
243
244		public Query(Account account, long start, long end) {
245			this.account = account;
246			this.start = start;
247			this.end = end;
248			this.queryId = new BigInteger(50, mXmppConnectionService.getRNG()).toString(32);
249		}
250
251		private Query page(String reference) {
252			Query query = new Query(this.account,this.start,this.end);
253			query.reference = reference;
254			query.conversation = conversation;
255			query.with = with;
256			query.totalCount = totalCount;
257			query.callback = callback;
258			return query;
259		}
260
261		public Query next(String reference) {
262			Query query = page(reference);
263			query.pagingOrder = PagingOrder.NORMAL;
264			return query;
265		}
266
267		public Query prev(String reference) {
268			Query query = page(reference);
269			query.pagingOrder = PagingOrder.REVERSE;
270			return query;
271		}
272
273		public String getReference() {
274			return reference;
275		}
276
277		public PagingOrder getPagingOrder() {
278			return this.pagingOrder;
279		}
280
281		public String getQueryId() {
282			return queryId;
283		}
284
285		public Jid getWith() {
286			return with;
287		}
288
289		public long getStart() {
290			return start;
291		}
292
293		public void setCallback(XmppConnectionService.OnMoreMessagesLoaded callback) {
294			this.callback = callback;
295		}
296
297		public void callback() {
298			if (this.callback != null) {
299				this.callback.onMoreMessagesLoaded(messageCount,conversation);
300				if (messageCount==0) {
301					this.callback.informUser(R.string.no_more_history_on_server);
302				}
303			}
304		}
305
306		public long getEnd() {
307			return end;
308		}
309
310		public Conversation getConversation() {
311			return conversation;
312		}
313
314		public Account getAccount() {
315			return this.account;
316		}
317
318		public void incrementTotalCount() {
319			this.totalCount++;
320		}
321
322		public void incrementMessageCount() {
323			this.messageCount++;
324		}
325
326		public int getTotalCount() {
327			return this.totalCount;
328		}
329
330		@Override
331		public String toString() {
332			StringBuilder builder = new StringBuilder();
333			builder.append("with=");
334			if (this.with==null) {
335				builder.append("*");
336			} else {
337				builder.append(with.toString());
338			}
339			builder.append(", start=");
340			builder.append(AbstractGenerator.getTimestamp(this.start));
341			builder.append(", end=");
342			builder.append(AbstractGenerator.getTimestamp(this.end));
343			if (this.reference!=null) {
344				if (this.pagingOrder == PagingOrder.NORMAL) {
345					builder.append(", after=");
346				} else {
347					builder.append(", before=");
348				}
349				builder.append(this.reference);
350			}
351			return builder.toString();
352		}
353
354		public boolean hasCallback() {
355			return this.callback != null;
356		}
357	}
358}