PubSubManager.java

  1package eu.siacs.conversations.xmpp.manager;
  2
  3import android.content.Context;
  4import android.util.Log;
  5import androidx.annotation.NonNull;
  6import com.google.common.util.concurrent.AsyncFunction;
  7import com.google.common.util.concurrent.Futures;
  8import com.google.common.util.concurrent.ListenableFuture;
  9import com.google.common.util.concurrent.MoreExecutors;
 10import eu.siacs.conversations.Config;
 11import eu.siacs.conversations.xml.Namespace;
 12import eu.siacs.conversations.xmpp.Jid;
 13import eu.siacs.conversations.xmpp.XmppConnection;
 14import im.conversations.android.xmpp.ExtensionFactory;
 15import im.conversations.android.xmpp.IqErrorException;
 16import im.conversations.android.xmpp.NodeConfiguration;
 17import im.conversations.android.xmpp.PreconditionNotMetException;
 18import im.conversations.android.xmpp.PubSubErrorException;
 19import im.conversations.android.xmpp.model.Extension;
 20import im.conversations.android.xmpp.model.data.Data;
 21import im.conversations.android.xmpp.model.pubsub.Items;
 22import im.conversations.android.xmpp.model.pubsub.PubSub;
 23import im.conversations.android.xmpp.model.pubsub.Publish;
 24import im.conversations.android.xmpp.model.pubsub.PublishOptions;
 25import im.conversations.android.xmpp.model.pubsub.Retract;
 26import im.conversations.android.xmpp.model.pubsub.error.PubSubError;
 27import im.conversations.android.xmpp.model.pubsub.event.Delete;
 28import im.conversations.android.xmpp.model.pubsub.event.Event;
 29import im.conversations.android.xmpp.model.pubsub.event.Purge;
 30import im.conversations.android.xmpp.model.pubsub.owner.Configure;
 31import im.conversations.android.xmpp.model.pubsub.owner.PubSubOwner;
 32import im.conversations.android.xmpp.model.stanza.Iq;
 33import im.conversations.android.xmpp.model.stanza.Message;
 34import java.util.Map;
 35
 36public class PubSubManager extends AbstractManager {
 37
 38    private static final String SINGLETON_ITEM_ID = "current";
 39
 40    public PubSubManager(Context context, XmppConnection connection) {
 41        super(context, connection);
 42    }
 43
 44    public void handleEvent(final Message message) {
 45        final var event = message.getExtension(Event.class);
 46        final var action = event.getAction();
 47        final var from = message.getFrom();
 48
 49        if (from instanceof Jid.Invalid) {
 50            Log.d(
 51                    Config.LOGTAG,
 52                    getAccount().getJid().asBareJid() + ": ignoring event from invalid jid");
 53            return;
 54        }
 55
 56        if (action instanceof Purge purge) {
 57            // purge is a deletion of all items in a node
 58            handlePurge(message, purge);
 59        } else if (action instanceof Items items) {
 60            // the items wrapper contains, new and updated items as well as retractions which are
 61            // deletions of individual items in a node
 62            handleItems(message, items);
 63        } else if (action instanceof Delete delete) {
 64            // delete is the deletion of the node itself
 65            handleDelete(message, delete);
 66        }
 67    }
 68
 69    public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
 70            final Jid address, final Class<T> clazz) {
 71        final var id = ExtensionFactory.id(clazz);
 72        if (id == null) {
 73            return Futures.immediateFailedFuture(
 74                    new IllegalArgumentException(
 75                            String.format("%s is not a registered extension", clazz.getName())));
 76        }
 77        return fetchItems(address, id.namespace, clazz);
 78    }
 79
 80    public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
 81            final Jid address, final String node, final Class<T> clazz) {
 82        final Iq request = new Iq(Iq.Type.GET);
 83        request.setTo(address);
 84        final var pubSub = request.addExtension(new PubSub());
 85        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
 86        itemsWrapper.setNode(node);
 87        return Futures.transform(
 88                connection.sendIqPacket(request),
 89                response -> {
 90                    final var pubSubResponse = response.getExtension(PubSub.class);
 91                    if (pubSubResponse == null) {
 92                        throw new IllegalStateException();
 93                    }
 94                    final var items = pubSubResponse.getItems();
 95                    if (items == null) {
 96                        throw new IllegalStateException();
 97                    }
 98                    return items.getItemMap(clazz);
 99                },
100                MoreExecutors.directExecutor());
101    }
102
103    public <T extends Extension> ListenableFuture<T> fetchItem(
104            final Jid address, final String itemId, final Class<T> clazz) {
105        final var id = ExtensionFactory.id(clazz);
106        if (id == null) {
107            return Futures.immediateFailedFuture(
108                    new IllegalArgumentException(
109                            String.format("%s is not a registered extension", clazz.getName())));
110        }
111        return fetchItem(address, id.namespace, itemId, clazz);
112    }
113
114    public <T extends Extension> ListenableFuture<T> fetchItem(
115            final Jid address, final String node, final String itemId, final Class<T> clazz) {
116        final Iq request = new Iq(Iq.Type.GET);
117        request.setTo(address);
118        final var pubSub = request.addExtension(new PubSub());
119        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
120        itemsWrapper.setNode(node);
121        final var item = itemsWrapper.addExtension(new PubSub.Item());
122        item.setId(itemId);
123        return Futures.transform(
124                connection.sendIqPacket(request),
125                response -> {
126                    final var pubSubResponse = response.getExtension(PubSub.class);
127                    if (pubSubResponse == null) {
128                        throw new IllegalStateException();
129                    }
130                    final var items = pubSubResponse.getItems();
131                    if (items == null) {
132                        throw new IllegalStateException();
133                    }
134                    return items.getItemOrThrow(itemId, clazz);
135                },
136                MoreExecutors.directExecutor());
137    }
138
139    public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
140            final Jid address, final Class<T> clazz) {
141        final var id = ExtensionFactory.id(clazz);
142        if (id == null) {
143            return Futures.immediateFailedFuture(
144                    new IllegalArgumentException(
145                            String.format("%s is not a registered extension", clazz.getName())));
146        }
147        return fetchMostRecentItem(address, id.namespace, clazz);
148    }
149
150    public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
151            final Jid address, final String node, final Class<T> clazz) {
152        final Iq request = new Iq(Iq.Type.GET);
153        request.setTo(address);
154        final var pubSub = request.addExtension(new PubSub());
155        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
156        itemsWrapper.setNode(node);
157        itemsWrapper.setMaxItems(1);
158        return Futures.transform(
159                connection.sendIqPacket(request),
160                response -> {
161                    final var pubSubResponse = response.getExtension(PubSub.class);
162                    if (pubSubResponse == null) {
163                        throw new IllegalStateException();
164                    }
165                    final var items = pubSubResponse.getItems();
166                    if (items == null) {
167                        throw new IllegalStateException();
168                    }
169                    return items.getOnlyItem(clazz);
170                },
171                MoreExecutors.directExecutor());
172    }
173
174    private void handleItems(final Message message, final Items items) {
175        final var from = message.getFrom();
176        final var isFromBare = from == null || from.isBareJid();
177        final var node = items.getNode();
178        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
179            getManager(NativeBookmarkManager.class).handleItems(items);
180            return;
181        }
182        if (connection.fromAccount(message) && Namespace.BOOKMARKS.equals(node)) {
183            getManager(LegacyBookmarkManager.class).handleItems(items);
184            return;
185        }
186        if (connection.fromAccount(message) && Namespace.MDS_DISPLAYED.equals(node)) {
187            getManager(MessageDisplayedSynchronizationManager.class).handleItems(items);
188            return;
189        }
190        if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
191            getManager(AvatarManager.class).handleItems(from, items);
192            return;
193        }
194        if (isFromBare && Namespace.NICK.equals(node)) {
195            getManager(NickManager.class).handleItems(from, items);
196            return;
197        }
198        if (isFromBare && Namespace.AXOLOTL_DEVICE_LIST.equals(node)) {
199            getManager(AxolotlManager.class).handleItems(from, items);
200        }
201    }
202
203    private void handlePurge(final Message message, final Purge purge) {
204        final var from = message.getFrom();
205        final var isFromBare = from == null || from.isBareJid();
206        final var node = purge.getNode();
207        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
208            getManager(NativeBookmarkManager.class).handlePurge();
209        }
210        if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
211            // purge (delete all items in a node) is functionally equivalent to delete
212            getManager(AvatarManager.class).handleDelete(from);
213        }
214    }
215
216    private void handleDelete(final Message message, final Delete delete) {
217        final var from = message.getFrom();
218        final var isFromBare = from == null || from.isBareJid();
219        final var node = delete.getNode();
220        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
221            getManager(NativeBookmarkManager.class).handleDelete();
222            return;
223        }
224        if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
225            getManager(AvatarManager.class).handleDelete(from);
226            return;
227        }
228        if (isFromBare && Namespace.NICK.equals(node)) {
229            getManager(NickManager.class).handleDelete(from);
230        }
231    }
232
233    public ListenableFuture<Void> publishSingleton(
234            Jid address, Extension item, final NodeConfiguration nodeConfiguration) {
235        final var id = ExtensionFactory.id(item.getClass());
236        return publish(address, item, SINGLETON_ITEM_ID, id.namespace, nodeConfiguration);
237    }
238
239    public ListenableFuture<Void> publishSingleton(
240            Jid address,
241            Extension item,
242            final String node,
243            final NodeConfiguration nodeConfiguration) {
244        return publish(address, item, SINGLETON_ITEM_ID, node, nodeConfiguration);
245    }
246
247    public ListenableFuture<Void> publish(
248            Jid address,
249            Extension item,
250            final String itemId,
251            final NodeConfiguration nodeConfiguration) {
252        final var id = ExtensionFactory.id(item.getClass());
253        return publish(address, item, itemId, id.namespace, nodeConfiguration);
254    }
255
256    public ListenableFuture<Void> publish(
257            final Jid address,
258            final Extension itemPayload,
259            final String itemId,
260            final String node,
261            final NodeConfiguration nodeConfiguration) {
262        final var future = publishNoRetry(address, itemPayload, itemId, node, nodeConfiguration);
263        return Futures.catchingAsync(
264                future,
265                PreconditionNotMetException.class,
266                ex -> {
267                    Log.d(
268                            Config.LOGTAG,
269                            "Node " + node + " on " + address + " requires reconfiguration");
270                    final var reconfigurationFuture =
271                            reconfigureNode(address, node, nodeConfiguration);
272                    return Futures.transformAsync(
273                            reconfigurationFuture,
274                            ignored ->
275                                    publishNoRetry(
276                                            address, itemPayload, itemId, node, nodeConfiguration),
277                            MoreExecutors.directExecutor());
278                },
279                MoreExecutors.directExecutor());
280    }
281
282    private ListenableFuture<Void> publishNoRetry(
283            final Jid address,
284            final Extension itemPayload,
285            final String itemId,
286            final String node,
287            final NodeConfiguration nodeConfiguration) {
288        final var iq = new Iq(Iq.Type.SET);
289        iq.setTo(address);
290        final var pubSub = iq.addExtension(new PubSub());
291        final var publish = pubSub.addExtension(new Publish());
292        publish.setNode(node);
293        final var item = publish.addExtension(new PubSub.Item());
294        item.setId(itemId);
295        item.addExtension(itemPayload);
296        pubSub.addExtension(PublishOptions.of(nodeConfiguration));
297        final ListenableFuture<Void> iqFuture =
298                Futures.transform(
299                        connection.sendIqPacket(iq),
300                        result -> null,
301                        MoreExecutors.directExecutor());
302        return Futures.catchingAsync(
303                iqFuture,
304                IqErrorException.class,
305                new PubSubExceptionTransformer<>(),
306                MoreExecutors.directExecutor());
307    }
308
309    private ListenableFuture<Void> reconfigureNode(
310            final Jid address, final String node, final NodeConfiguration nodeConfiguration) {
311        return Futures.transformAsync(
312                getNodeConfiguration(address, node),
313                data -> setNodeConfiguration(address, node, data.submit(nodeConfiguration)),
314                MoreExecutors.directExecutor());
315    }
316
317    public ListenableFuture<Data> getNodeConfiguration(final Jid address, final String node) {
318        final Iq iq = new Iq(Iq.Type.GET);
319        iq.setTo(address);
320        final var pubSub = iq.addExtension(new PubSubOwner());
321        final var configure = pubSub.addExtension(new Configure());
322        configure.setNode(node);
323        return Futures.transform(
324                connection.sendIqPacket(iq),
325                result -> {
326                    final var pubSubOwnerResult = result.getExtension(PubSubOwner.class);
327                    final Configure configureResult =
328                            pubSubOwnerResult == null
329                                    ? null
330                                    : pubSubOwnerResult.getExtension(Configure.class);
331                    if (configureResult == null) {
332                        throw new IllegalStateException(
333                                "No configuration found in configuration request result");
334                    }
335                    return configureResult.getData();
336                },
337                MoreExecutors.directExecutor());
338    }
339
340    private ListenableFuture<Void> setNodeConfiguration(
341            final Jid address, final String node, final Data data) {
342        final Iq iq = new Iq(Iq.Type.SET);
343        iq.setTo(address);
344        final var pubSub = iq.addExtension(new PubSubOwner());
345        final var configure = pubSub.addExtension(new Configure());
346        configure.setNode(node);
347        configure.addExtension(data);
348        return Futures.transform(
349                connection.sendIqPacket(iq),
350                result -> {
351                    Log.d(Config.LOGTAG, "Modified node configuration " + node + " on " + address);
352                    return null;
353                },
354                MoreExecutors.directExecutor());
355    }
356
357    public ListenableFuture<Iq> retract(final Jid address, final String itemId, final String node) {
358        final var iq = new Iq(Iq.Type.SET);
359        iq.setTo(address);
360        final var pubSub = iq.addExtension(new PubSub());
361        final var retract = pubSub.addExtension(new Retract());
362        retract.setNode(node);
363        retract.setNotify(true);
364        final var item = retract.addExtension(new PubSub.Item());
365        item.setId(itemId);
366        return connection.sendIqPacket(iq);
367    }
368
369    public ListenableFuture<Iq> delete(final Jid address, final String node) {
370        final var iq = new Iq(Iq.Type.SET);
371        iq.setTo(address);
372        final var pubSub = iq.addExtension(new PubSubOwner());
373        final var delete =
374                pubSub.addExtension(new im.conversations.android.xmpp.model.pubsub.owner.Delete());
375        delete.setNode(node);
376        return connection.sendIqPacket(iq);
377    }
378
379    private static class PubSubExceptionTransformer<V>
380            implements AsyncFunction<IqErrorException, V> {
381
382        @Override
383        @NonNull
384        public ListenableFuture<V> apply(@NonNull IqErrorException ex) {
385            final var error = ex.getError();
386            if (error == null) {
387                return Futures.immediateFailedFuture(ex);
388            }
389            final PubSubError pubSubError = error.getExtension(PubSubError.class);
390            if (pubSubError instanceof PubSubError.PreconditionNotMet) {
391                return Futures.immediateFailedFuture(
392                        new PreconditionNotMetException(ex.getResponse()));
393            } else if (pubSubError != null) {
394                return Futures.immediateFailedFuture(new PubSubErrorException(ex.getResponse()));
395            } else {
396                return Futures.immediateFailedFuture(ex);
397            }
398        }
399    }
400}