1package eu.siacs.conversations.xmpp.manager;
2
3import android.content.Context;
4import android.util.Log;
5import androidx.annotation.NonNull;
6import com.google.common.util.concurrent.AsyncFunction;
7import com.google.common.util.concurrent.Futures;
8import com.google.common.util.concurrent.ListenableFuture;
9import com.google.common.util.concurrent.MoreExecutors;
10import eu.siacs.conversations.Config;
11import eu.siacs.conversations.services.XmppConnectionService;
12import eu.siacs.conversations.xml.Namespace;
13import eu.siacs.conversations.xmpp.Jid;
14import eu.siacs.conversations.xmpp.XmppConnection;
15import im.conversations.android.xmpp.ExtensionFactory;
16import im.conversations.android.xmpp.IqErrorException;
17import im.conversations.android.xmpp.NodeConfiguration;
18import im.conversations.android.xmpp.PreconditionNotMetException;
19import im.conversations.android.xmpp.PubSubErrorException;
20import im.conversations.android.xmpp.model.Extension;
21import im.conversations.android.xmpp.model.data.Data;
22import im.conversations.android.xmpp.model.pubsub.Items;
23import im.conversations.android.xmpp.model.pubsub.PubSub;
24import im.conversations.android.xmpp.model.pubsub.Publish;
25import im.conversations.android.xmpp.model.pubsub.PublishOptions;
26import im.conversations.android.xmpp.model.pubsub.Retract;
27import im.conversations.android.xmpp.model.pubsub.error.PubSubError;
28import im.conversations.android.xmpp.model.pubsub.event.Delete;
29import im.conversations.android.xmpp.model.pubsub.event.Event;
30import im.conversations.android.xmpp.model.pubsub.event.Purge;
31import im.conversations.android.xmpp.model.pubsub.owner.Configure;
32import im.conversations.android.xmpp.model.pubsub.owner.PubSubOwner;
33import im.conversations.android.xmpp.model.stanza.Iq;
34import im.conversations.android.xmpp.model.stanza.Message;
35import java.util.Map;
36
37public class PubSubManager extends AbstractManager {
38
39 private static final String SINGLETON_ITEM_ID = "current";
40
41 public PubSubManager(XmppConnectionService context, XmppConnection connection) {
42 super(context, connection);
43 }
44
45 public void handleEvent(final Message message) {
46 final var event = message.getExtension(Event.class);
47 final var action = event.getAction();
48 final var from = message.getFrom();
49
50 if (from instanceof Jid.Invalid) {
51 Log.d(
52 Config.LOGTAG,
53 getAccount().getJid().asBareJid() + ": ignoring event from invalid jid");
54 return;
55 }
56
57 if (action instanceof Purge purge) {
58 // purge is a deletion of all items in a node
59 handlePurge(message, purge);
60 } else if (action instanceof Items items) {
61 // the items wrapper contains, new and updated items as well as retractions which are
62 // deletions of individual items in a node
63 handleItems(message, items);
64 } else if (action instanceof Delete delete) {
65 // delete is the deletion of the node itself
66 handleDelete(message, delete);
67 }
68 }
69
70 public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
71 final Jid address, final Class<T> clazz) {
72 final var id = ExtensionFactory.id(clazz);
73 if (id == null) {
74 return Futures.immediateFailedFuture(
75 new IllegalArgumentException(
76 String.format("%s is not a registered extension", clazz.getName())));
77 }
78 return fetchItems(address, id.namespace, clazz);
79 }
80
81 public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
82 final Jid address, final String node, final Class<T> clazz) {
83 final Iq request = new Iq(Iq.Type.GET);
84 request.setTo(address);
85 final var pubSub = request.addExtension(new PubSub());
86 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
87 itemsWrapper.setNode(node);
88 return Futures.transform(
89 connection.sendIqPacket(request),
90 response -> {
91 final var pubSubResponse = response.getExtension(PubSub.class);
92 if (pubSubResponse == null) {
93 throw new IllegalStateException();
94 }
95 final var items = pubSubResponse.getItems();
96 if (items == null) {
97 throw new IllegalStateException();
98 }
99 return items.getItemMap(clazz);
100 },
101 MoreExecutors.directExecutor());
102 }
103
104 public <T extends Extension> ListenableFuture<T> fetchItem(
105 final Jid address, final String itemId, final Class<T> clazz) {
106 final var id = ExtensionFactory.id(clazz);
107 if (id == null) {
108 return Futures.immediateFailedFuture(
109 new IllegalArgumentException(
110 String.format("%s is not a registered extension", clazz.getName())));
111 }
112 return fetchItem(address, id.namespace, itemId, clazz);
113 }
114
115 public <T extends Extension> ListenableFuture<T> fetchItem(
116 final Jid address, final String node, final String itemId, final Class<T> clazz) {
117 final Iq request = new Iq(Iq.Type.GET);
118 request.setTo(address);
119 final var pubSub = request.addExtension(new PubSub());
120 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
121 itemsWrapper.setNode(node);
122 final var item = itemsWrapper.addExtension(new PubSub.Item());
123 item.setId(itemId);
124 return Futures.transform(
125 connection.sendIqPacket(request),
126 response -> {
127 final var pubSubResponse = response.getExtension(PubSub.class);
128 if (pubSubResponse == null) {
129 throw new IllegalStateException();
130 }
131 final var items = pubSubResponse.getItems();
132 if (items == null) {
133 throw new IllegalStateException();
134 }
135 return items.getItemOrThrow(itemId, clazz);
136 },
137 MoreExecutors.directExecutor());
138 }
139
140 public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
141 final Jid address, final Class<T> clazz) {
142 final var id = ExtensionFactory.id(clazz);
143 if (id == null) {
144 return Futures.immediateFailedFuture(
145 new IllegalArgumentException(
146 String.format("%s is not a registered extension", clazz.getName())));
147 }
148 return fetchMostRecentItem(address, id.namespace, clazz);
149 }
150
151 public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
152 final Jid address, final String node, final Class<T> clazz) {
153 final Iq request = new Iq(Iq.Type.GET);
154 request.setTo(address);
155 final var pubSub = request.addExtension(new PubSub());
156 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
157 itemsWrapper.setNode(node);
158 itemsWrapper.setMaxItems(1);
159 return Futures.transform(
160 connection.sendIqPacket(request),
161 response -> {
162 final var pubSubResponse = response.getExtension(PubSub.class);
163 if (pubSubResponse == null) {
164 throw new IllegalStateException();
165 }
166 final var items = pubSubResponse.getItems();
167 if (items == null) {
168 throw new IllegalStateException();
169 }
170 return items.getOnlyItem(clazz);
171 },
172 MoreExecutors.directExecutor());
173 }
174
175 private void handleItems(final Message message, final Items items) {
176 final var from = message.getFrom();
177 final var isFromBare = from == null || from.isBareJid();
178 final var node = items.getNode();
179 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
180 getManager(NativeBookmarkManager.class).handleItems(items);
181 return;
182 }
183 if (connection.fromAccount(message) && Namespace.BOOKMARKS.equals(node)) {
184 getManager(LegacyBookmarkManager.class).handleItems(items);
185 return;
186 }
187 if (connection.fromAccount(message) && Namespace.MDS_DISPLAYED.equals(node)) {
188 getManager(MessageDisplayedSynchronizationManager.class).handleItems(items);
189 return;
190 }
191 if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
192 getManager(AvatarManager.class).handleItems(from, items);
193 return;
194 }
195 if (isFromBare && Namespace.NICK.equals(node)) {
196 getManager(NickManager.class).handleItems(from, items);
197 return;
198 }
199 if (isFromBare && Namespace.AXOLOTL_DEVICE_LIST.equals(node)) {
200 getManager(AxolotlManager.class).handleItems(from, items);
201 }
202 }
203
204 private void handlePurge(final Message message, final Purge purge) {
205 final var from = message.getFrom();
206 final var isFromBare = from == null || from.isBareJid();
207 final var node = purge.getNode();
208 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
209 getManager(NativeBookmarkManager.class).handlePurge();
210 }
211 if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
212 // purge (delete all items in a node) is functionally equivalent to delete
213 getManager(AvatarManager.class).handleDelete(from);
214 }
215 }
216
217 private void handleDelete(final Message message, final Delete delete) {
218 final var from = message.getFrom();
219 final var isFromBare = from == null || from.isBareJid();
220 final var node = delete.getNode();
221 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
222 getManager(NativeBookmarkManager.class).handleDelete();
223 return;
224 }
225 if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
226 getManager(AvatarManager.class).handleDelete(from);
227 return;
228 }
229 if (isFromBare && Namespace.NICK.equals(node)) {
230 getManager(NickManager.class).handleDelete(from);
231 }
232 }
233
234 public ListenableFuture<Void> publishSingleton(
235 Jid address, Extension item, final NodeConfiguration nodeConfiguration) {
236 final var id = ExtensionFactory.id(item.getClass());
237 return publish(address, item, SINGLETON_ITEM_ID, id.namespace, nodeConfiguration);
238 }
239
240 public ListenableFuture<Void> publishSingleton(
241 Jid address,
242 Extension item,
243 final String node,
244 final NodeConfiguration nodeConfiguration) {
245 return publish(address, item, SINGLETON_ITEM_ID, node, nodeConfiguration);
246 }
247
248 public ListenableFuture<Void> publish(
249 Jid address,
250 Extension item,
251 final String itemId,
252 final NodeConfiguration nodeConfiguration) {
253 final var id = ExtensionFactory.id(item.getClass());
254 return publish(address, item, itemId, id.namespace, nodeConfiguration);
255 }
256
257 public ListenableFuture<Void> publish(
258 final Jid address,
259 final Extension itemPayload,
260 final String itemId,
261 final String node,
262 final NodeConfiguration nodeConfiguration) {
263 final var future = publishNoRetry(address, itemPayload, itemId, node, nodeConfiguration);
264 return Futures.catchingAsync(
265 future,
266 PreconditionNotMetException.class,
267 ex -> {
268 Log.d(
269 Config.LOGTAG,
270 "Node " + node + " on " + address + " requires reconfiguration");
271 final var reconfigurationFuture =
272 reconfigureNode(address, node, nodeConfiguration);
273 return Futures.transformAsync(
274 reconfigurationFuture,
275 ignored ->
276 publishNoRetry(
277 address, itemPayload, itemId, node, nodeConfiguration),
278 MoreExecutors.directExecutor());
279 },
280 MoreExecutors.directExecutor());
281 }
282
283 private ListenableFuture<Void> publishNoRetry(
284 final Jid address,
285 final Extension itemPayload,
286 final String itemId,
287 final String node,
288 final NodeConfiguration nodeConfiguration) {
289 final var iq = new Iq(Iq.Type.SET);
290 iq.setTo(address);
291 final var pubSub = iq.addExtension(new PubSub());
292 final var publish = pubSub.addExtension(new Publish());
293 publish.setNode(node);
294 final var item = publish.addExtension(new PubSub.Item());
295 item.setId(itemId);
296 item.addExtension(itemPayload);
297 pubSub.addExtension(PublishOptions.of(nodeConfiguration));
298 final ListenableFuture<Void> iqFuture =
299 Futures.transform(
300 connection.sendIqPacket(iq),
301 result -> null,
302 MoreExecutors.directExecutor());
303 return Futures.catchingAsync(
304 iqFuture,
305 IqErrorException.class,
306 new PubSubExceptionTransformer<>(),
307 MoreExecutors.directExecutor());
308 }
309
310 private ListenableFuture<Void> reconfigureNode(
311 final Jid address, final String node, final NodeConfiguration nodeConfiguration) {
312 return Futures.transformAsync(
313 getNodeConfiguration(address, node),
314 data -> setNodeConfiguration(address, node, data.submit(nodeConfiguration)),
315 MoreExecutors.directExecutor());
316 }
317
318 public ListenableFuture<Data> getNodeConfiguration(final Jid address, final String node) {
319 final Iq iq = new Iq(Iq.Type.GET);
320 iq.setTo(address);
321 final var pubSub = iq.addExtension(new PubSubOwner());
322 final var configure = pubSub.addExtension(new Configure());
323 configure.setNode(node);
324 return Futures.transform(
325 connection.sendIqPacket(iq),
326 result -> {
327 final var pubSubOwnerResult = result.getExtension(PubSubOwner.class);
328 final Configure configureResult =
329 pubSubOwnerResult == null
330 ? null
331 : pubSubOwnerResult.getExtension(Configure.class);
332 if (configureResult == null) {
333 throw new IllegalStateException(
334 "No configuration found in configuration request result");
335 }
336 return configureResult.getData();
337 },
338 MoreExecutors.directExecutor());
339 }
340
341 private ListenableFuture<Void> setNodeConfiguration(
342 final Jid address, final String node, final Data data) {
343 final Iq iq = new Iq(Iq.Type.SET);
344 iq.setTo(address);
345 final var pubSub = iq.addExtension(new PubSubOwner());
346 final var configure = pubSub.addExtension(new Configure());
347 configure.setNode(node);
348 configure.addExtension(data);
349 return Futures.transform(
350 connection.sendIqPacket(iq),
351 result -> {
352 Log.d(Config.LOGTAG, "Modified node configuration " + node + " on " + address);
353 return null;
354 },
355 MoreExecutors.directExecutor());
356 }
357
358 public ListenableFuture<Iq> retract(final Jid address, final String itemId, final String node) {
359 final var iq = new Iq(Iq.Type.SET);
360 iq.setTo(address);
361 final var pubSub = iq.addExtension(new PubSub());
362 final var retract = pubSub.addExtension(new Retract());
363 retract.setNode(node);
364 retract.setNotify(true);
365 final var item = retract.addExtension(new PubSub.Item());
366 item.setId(itemId);
367 return connection.sendIqPacket(iq);
368 }
369
370 public ListenableFuture<Iq> delete(final Jid address, final String node) {
371 final var iq = new Iq(Iq.Type.SET);
372 iq.setTo(address);
373 final var pubSub = iq.addExtension(new PubSubOwner());
374 final var delete =
375 pubSub.addExtension(new im.conversations.android.xmpp.model.pubsub.owner.Delete());
376 delete.setNode(node);
377 return connection.sendIqPacket(iq);
378 }
379
380 private static class PubSubExceptionTransformer<V>
381 implements AsyncFunction<IqErrorException, V> {
382
383 @Override
384 @NonNull
385 public ListenableFuture<V> apply(@NonNull IqErrorException ex) {
386 final var error = ex.getError();
387 if (error == null) {
388 return Futures.immediateFailedFuture(ex);
389 }
390 final PubSubError pubSubError = error.getExtension(PubSubError.class);
391 if (pubSubError instanceof PubSubError.PreconditionNotMet) {
392 return Futures.immediateFailedFuture(
393 new PreconditionNotMetException(ex.getResponse()));
394 } else if (pubSubError != null) {
395 return Futures.immediateFailedFuture(new PubSubErrorException(ex.getResponse()));
396 } else {
397 return Futures.immediateFailedFuture(ex);
398 }
399 }
400 }
401}