PubSubManager.java

  1package eu.siacs.conversations.xmpp.manager;
  2
  3import android.content.Context;
  4import android.util.Log;
  5import androidx.annotation.NonNull;
  6import com.google.common.util.concurrent.AsyncFunction;
  7import com.google.common.util.concurrent.Futures;
  8import com.google.common.util.concurrent.ListenableFuture;
  9import com.google.common.util.concurrent.MoreExecutors;
 10import eu.siacs.conversations.Config;
 11import eu.siacs.conversations.services.XmppConnectionService;
 12import eu.siacs.conversations.xml.Namespace;
 13import eu.siacs.conversations.xmpp.Jid;
 14import eu.siacs.conversations.xmpp.XmppConnection;
 15import im.conversations.android.xmpp.ExtensionFactory;
 16import im.conversations.android.xmpp.IqErrorException;
 17import im.conversations.android.xmpp.NodeConfiguration;
 18import im.conversations.android.xmpp.PreconditionNotMetException;
 19import im.conversations.android.xmpp.PubSubErrorException;
 20import im.conversations.android.xmpp.model.Extension;
 21import im.conversations.android.xmpp.model.data.Data;
 22import im.conversations.android.xmpp.model.pubsub.Items;
 23import im.conversations.android.xmpp.model.pubsub.PubSub;
 24import im.conversations.android.xmpp.model.pubsub.Publish;
 25import im.conversations.android.xmpp.model.pubsub.PublishOptions;
 26import im.conversations.android.xmpp.model.pubsub.Retract;
 27import im.conversations.android.xmpp.model.pubsub.error.PubSubError;
 28import im.conversations.android.xmpp.model.pubsub.event.Delete;
 29import im.conversations.android.xmpp.model.pubsub.event.Event;
 30import im.conversations.android.xmpp.model.pubsub.event.Purge;
 31import im.conversations.android.xmpp.model.pubsub.owner.Configure;
 32import im.conversations.android.xmpp.model.pubsub.owner.PubSubOwner;
 33import im.conversations.android.xmpp.model.stanza.Iq;
 34import im.conversations.android.xmpp.model.stanza.Message;
 35import java.util.Map;
 36
 37public class PubSubManager extends AbstractManager {
 38
 39    private static final String SINGLETON_ITEM_ID = "current";
 40
 41    public PubSubManager(XmppConnectionService context, XmppConnection connection) {
 42        super(context, connection);
 43    }
 44
 45    public void handleEvent(final Message message) {
 46        final var event = message.getExtension(Event.class);
 47        final var action = event.getAction();
 48        final var from = message.getFrom();
 49
 50        if (from instanceof Jid.Invalid) {
 51            Log.d(
 52                    Config.LOGTAG,
 53                    getAccount().getJid().asBareJid() + ": ignoring event from invalid jid");
 54            return;
 55        }
 56
 57        if (action instanceof Purge purge) {
 58            // purge is a deletion of all items in a node
 59            handlePurge(message, purge);
 60        } else if (action instanceof Items items) {
 61            // the items wrapper contains, new and updated items as well as retractions which are
 62            // deletions of individual items in a node
 63            handleItems(message, items);
 64        } else if (action instanceof Delete delete) {
 65            // delete is the deletion of the node itself
 66            handleDelete(message, delete);
 67        }
 68    }
 69
 70    public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
 71            final Jid address, final Class<T> clazz) {
 72        final var id = ExtensionFactory.id(clazz);
 73        if (id == null) {
 74            return Futures.immediateFailedFuture(
 75                    new IllegalArgumentException(
 76                            String.format("%s is not a registered extension", clazz.getName())));
 77        }
 78        return fetchItems(address, id.namespace, clazz);
 79    }
 80
 81    public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
 82            final Jid address, final String node, final Class<T> clazz) {
 83        final Iq request = new Iq(Iq.Type.GET);
 84        request.setTo(address);
 85        final var pubSub = request.addExtension(new PubSub());
 86        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
 87        itemsWrapper.setNode(node);
 88        return Futures.transform(
 89                connection.sendIqPacket(request),
 90                response -> {
 91                    final var pubSubResponse = response.getExtension(PubSub.class);
 92                    if (pubSubResponse == null) {
 93                        throw new IllegalStateException();
 94                    }
 95                    final var items = pubSubResponse.getItems();
 96                    if (items == null) {
 97                        throw new IllegalStateException();
 98                    }
 99                    return items.getItemMap(clazz);
100                },
101                MoreExecutors.directExecutor());
102    }
103
104    public <T extends Extension> ListenableFuture<T> fetchItem(
105            final Jid address, final String itemId, final Class<T> clazz) {
106        final var id = ExtensionFactory.id(clazz);
107        if (id == null) {
108            return Futures.immediateFailedFuture(
109                    new IllegalArgumentException(
110                            String.format("%s is not a registered extension", clazz.getName())));
111        }
112        return fetchItem(address, id.namespace, itemId, clazz);
113    }
114
115    public <T extends Extension> ListenableFuture<T> fetchItem(
116            final Jid address, final String node, final String itemId, final Class<T> clazz) {
117        final Iq request = new Iq(Iq.Type.GET);
118        request.setTo(address);
119        final var pubSub = request.addExtension(new PubSub());
120        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
121        itemsWrapper.setNode(node);
122        final var item = itemsWrapper.addExtension(new PubSub.Item());
123        item.setId(itemId);
124        return Futures.transform(
125                connection.sendIqPacket(request),
126                response -> {
127                    final var pubSubResponse = response.getExtension(PubSub.class);
128                    if (pubSubResponse == null) {
129                        throw new IllegalStateException();
130                    }
131                    final var items = pubSubResponse.getItems();
132                    if (items == null) {
133                        throw new IllegalStateException();
134                    }
135                    return items.getItemOrThrow(itemId, clazz);
136                },
137                MoreExecutors.directExecutor());
138    }
139
140    public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
141            final Jid address, final Class<T> clazz) {
142        final var id = ExtensionFactory.id(clazz);
143        if (id == null) {
144            return Futures.immediateFailedFuture(
145                    new IllegalArgumentException(
146                            String.format("%s is not a registered extension", clazz.getName())));
147        }
148        return fetchMostRecentItem(address, id.namespace, clazz);
149    }
150
151    public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
152            final Jid address, final String node, final Class<T> clazz) {
153        final Iq request = new Iq(Iq.Type.GET);
154        request.setTo(address);
155        final var pubSub = request.addExtension(new PubSub());
156        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
157        itemsWrapper.setNode(node);
158        itemsWrapper.setMaxItems(1);
159        return Futures.transform(
160                connection.sendIqPacket(request),
161                response -> {
162                    final var pubSubResponse = response.getExtension(PubSub.class);
163                    if (pubSubResponse == null) {
164                        throw new IllegalStateException();
165                    }
166                    final var items = pubSubResponse.getItems();
167                    if (items == null) {
168                        throw new IllegalStateException();
169                    }
170                    return items.getOnlyItem(clazz);
171                },
172                MoreExecutors.directExecutor());
173    }
174
175    private void handleItems(final Message message, final Items items) {
176        final var from = message.getFrom();
177        final var isFromBare = from == null || from.isBareJid();
178        final var node = items.getNode();
179        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
180            getManager(NativeBookmarkManager.class).handleItems(items);
181            return;
182        }
183        if (connection.fromAccount(message) && Namespace.BOOKMARKS.equals(node)) {
184            getManager(LegacyBookmarkManager.class).handleItems(items);
185            return;
186        }
187        if (connection.fromAccount(message) && Namespace.MDS_DISPLAYED.equals(node)) {
188            getManager(MessageDisplayedSynchronizationManager.class).handleItems(items);
189            return;
190        }
191        if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
192            getManager(AvatarManager.class).handleItems(from, items);
193            return;
194        }
195        if (isFromBare && Namespace.NICK.equals(node)) {
196            getManager(NickManager.class).handleItems(from, items);
197            return;
198        }
199        if (isFromBare && Namespace.AXOLOTL_DEVICE_LIST.equals(node)) {
200            getManager(AxolotlManager.class).handleItems(from, items);
201        }
202    }
203
204    private void handlePurge(final Message message, final Purge purge) {
205        final var from = message.getFrom();
206        final var isFromBare = from == null || from.isBareJid();
207        final var node = purge.getNode();
208        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
209            getManager(NativeBookmarkManager.class).handlePurge();
210        }
211        if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
212            // purge (delete all items in a node) is functionally equivalent to delete
213            getManager(AvatarManager.class).handleDelete(from);
214        }
215    }
216
217    private void handleDelete(final Message message, final Delete delete) {
218        final var from = message.getFrom();
219        final var isFromBare = from == null || from.isBareJid();
220        final var node = delete.getNode();
221        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
222            getManager(NativeBookmarkManager.class).handleDelete();
223            return;
224        }
225        if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
226            getManager(AvatarManager.class).handleDelete(from);
227            return;
228        }
229        if (isFromBare && Namespace.NICK.equals(node)) {
230            getManager(NickManager.class).handleDelete(from);
231        }
232    }
233
234    public ListenableFuture<Void> publishSingleton(
235            Jid address, Extension item, final NodeConfiguration nodeConfiguration) {
236        final var id = ExtensionFactory.id(item.getClass());
237        return publish(address, item, SINGLETON_ITEM_ID, id.namespace, nodeConfiguration);
238    }
239
240    public ListenableFuture<Void> publishSingleton(
241            Jid address,
242            Extension item,
243            final String node,
244            final NodeConfiguration nodeConfiguration) {
245        return publish(address, item, SINGLETON_ITEM_ID, node, nodeConfiguration);
246    }
247
248    public ListenableFuture<Void> publish(
249            Jid address,
250            Extension item,
251            final String itemId,
252            final NodeConfiguration nodeConfiguration) {
253        final var id = ExtensionFactory.id(item.getClass());
254        return publish(address, item, itemId, id.namespace, nodeConfiguration);
255    }
256
257    public ListenableFuture<Void> publish(
258            final Jid address,
259            final Extension itemPayload,
260            final String itemId,
261            final String node,
262            final NodeConfiguration nodeConfiguration) {
263        final var future = publishNoRetry(address, itemPayload, itemId, node, nodeConfiguration);
264        return Futures.catchingAsync(
265                future,
266                PreconditionNotMetException.class,
267                ex -> {
268                    Log.d(
269                            Config.LOGTAG,
270                            "Node " + node + " on " + address + " requires reconfiguration");
271                    final var reconfigurationFuture =
272                            reconfigureNode(address, node, nodeConfiguration);
273                    return Futures.transformAsync(
274                            reconfigurationFuture,
275                            ignored ->
276                                    publishNoRetry(
277                                            address, itemPayload, itemId, node, nodeConfiguration),
278                            MoreExecutors.directExecutor());
279                },
280                MoreExecutors.directExecutor());
281    }
282
283    private ListenableFuture<Void> publishNoRetry(
284            final Jid address,
285            final Extension itemPayload,
286            final String itemId,
287            final String node,
288            final NodeConfiguration nodeConfiguration) {
289        final var iq = new Iq(Iq.Type.SET);
290        iq.setTo(address);
291        final var pubSub = iq.addExtension(new PubSub());
292        final var publish = pubSub.addExtension(new Publish());
293        publish.setNode(node);
294        final var item = publish.addExtension(new PubSub.Item());
295        item.setId(itemId);
296        item.addExtension(itemPayload);
297        pubSub.addExtension(PublishOptions.of(nodeConfiguration));
298        final ListenableFuture<Void> iqFuture =
299                Futures.transform(
300                        connection.sendIqPacket(iq),
301                        result -> null,
302                        MoreExecutors.directExecutor());
303        return Futures.catchingAsync(
304                iqFuture,
305                IqErrorException.class,
306                new PubSubExceptionTransformer<>(),
307                MoreExecutors.directExecutor());
308    }
309
310    private ListenableFuture<Void> reconfigureNode(
311            final Jid address, final String node, final NodeConfiguration nodeConfiguration) {
312        return Futures.transformAsync(
313                getNodeConfiguration(address, node),
314                data -> setNodeConfiguration(address, node, data.submit(nodeConfiguration)),
315                MoreExecutors.directExecutor());
316    }
317
318    public ListenableFuture<Data> getNodeConfiguration(final Jid address, final String node) {
319        final Iq iq = new Iq(Iq.Type.GET);
320        iq.setTo(address);
321        final var pubSub = iq.addExtension(new PubSubOwner());
322        final var configure = pubSub.addExtension(new Configure());
323        configure.setNode(node);
324        return Futures.transform(
325                connection.sendIqPacket(iq),
326                result -> {
327                    final var pubSubOwnerResult = result.getExtension(PubSubOwner.class);
328                    final Configure configureResult =
329                            pubSubOwnerResult == null
330                                    ? null
331                                    : pubSubOwnerResult.getExtension(Configure.class);
332                    if (configureResult == null) {
333                        throw new IllegalStateException(
334                                "No configuration found in configuration request result");
335                    }
336                    return configureResult.getData();
337                },
338                MoreExecutors.directExecutor());
339    }
340
341    private ListenableFuture<Void> setNodeConfiguration(
342            final Jid address, final String node, final Data data) {
343        final Iq iq = new Iq(Iq.Type.SET);
344        iq.setTo(address);
345        final var pubSub = iq.addExtension(new PubSubOwner());
346        final var configure = pubSub.addExtension(new Configure());
347        configure.setNode(node);
348        configure.addExtension(data);
349        return Futures.transform(
350                connection.sendIqPacket(iq),
351                result -> {
352                    Log.d(Config.LOGTAG, "Modified node configuration " + node + " on " + address);
353                    return null;
354                },
355                MoreExecutors.directExecutor());
356    }
357
358    public ListenableFuture<Iq> retract(final Jid address, final String itemId, final String node) {
359        final var iq = new Iq(Iq.Type.SET);
360        iq.setTo(address);
361        final var pubSub = iq.addExtension(new PubSub());
362        final var retract = pubSub.addExtension(new Retract());
363        retract.setNode(node);
364        retract.setNotify(true);
365        final var item = retract.addExtension(new PubSub.Item());
366        item.setId(itemId);
367        return connection.sendIqPacket(iq);
368    }
369
370    public ListenableFuture<Iq> delete(final Jid address, final String node) {
371        final var iq = new Iq(Iq.Type.SET);
372        iq.setTo(address);
373        final var pubSub = iq.addExtension(new PubSubOwner());
374        final var delete =
375                pubSub.addExtension(new im.conversations.android.xmpp.model.pubsub.owner.Delete());
376        delete.setNode(node);
377        return connection.sendIqPacket(iq);
378    }
379
380    private static class PubSubExceptionTransformer<V>
381            implements AsyncFunction<IqErrorException, V> {
382
383        @Override
384        @NonNull
385        public ListenableFuture<V> apply(@NonNull IqErrorException ex) {
386            final var error = ex.getError();
387            if (error == null) {
388                return Futures.immediateFailedFuture(ex);
389            }
390            final PubSubError pubSubError = error.getExtension(PubSubError.class);
391            if (pubSubError instanceof PubSubError.PreconditionNotMet) {
392                return Futures.immediateFailedFuture(
393                        new PreconditionNotMetException(ex.getResponse()));
394            } else if (pubSubError != null) {
395                return Futures.immediateFailedFuture(new PubSubErrorException(ex.getResponse()));
396            } else {
397                return Futures.immediateFailedFuture(ex);
398            }
399        }
400    }
401}