PubSubManager.java

  1package eu.siacs.conversations.xmpp.manager;
  2
  3import android.content.Context;
  4import android.util.Log;
  5import androidx.annotation.NonNull;
  6import com.google.common.util.concurrent.AsyncFunction;
  7import com.google.common.util.concurrent.Futures;
  8import com.google.common.util.concurrent.ListenableFuture;
  9import com.google.common.util.concurrent.MoreExecutors;
 10import eu.siacs.conversations.Config;
 11import eu.siacs.conversations.xml.Namespace;
 12import eu.siacs.conversations.xmpp.Jid;
 13import eu.siacs.conversations.xmpp.XmppConnection;
 14import im.conversations.android.xmpp.ExtensionFactory;
 15import im.conversations.android.xmpp.IqErrorException;
 16import im.conversations.android.xmpp.NodeConfiguration;
 17import im.conversations.android.xmpp.PreconditionNotMetException;
 18import im.conversations.android.xmpp.PubSubErrorException;
 19import im.conversations.android.xmpp.model.Extension;
 20import im.conversations.android.xmpp.model.data.Data;
 21import im.conversations.android.xmpp.model.pubsub.Items;
 22import im.conversations.android.xmpp.model.pubsub.PubSub;
 23import im.conversations.android.xmpp.model.pubsub.Publish;
 24import im.conversations.android.xmpp.model.pubsub.PublishOptions;
 25import im.conversations.android.xmpp.model.pubsub.Retract;
 26import im.conversations.android.xmpp.model.pubsub.error.PubSubError;
 27import im.conversations.android.xmpp.model.pubsub.event.Delete;
 28import im.conversations.android.xmpp.model.pubsub.event.Event;
 29import im.conversations.android.xmpp.model.pubsub.event.Purge;
 30import im.conversations.android.xmpp.model.pubsub.owner.Configure;
 31import im.conversations.android.xmpp.model.pubsub.owner.PubSubOwner;
 32import im.conversations.android.xmpp.model.stanza.Iq;
 33import im.conversations.android.xmpp.model.stanza.Message;
 34import java.util.Map;
 35
 36public class PubSubManager extends AbstractManager {
 37
 38    private static final String SINGLETON_ITEM_ID = "current";
 39
 40    public PubSubManager(Context context, XmppConnection connection) {
 41        super(context, connection);
 42    }
 43
 44    public void handleEvent(final Message message) {
 45        final var event = message.getExtension(Event.class);
 46        final var action = event.getAction();
 47        final var from = message.getFrom();
 48
 49        if (from instanceof Jid.Invalid) {
 50            Log.d(
 51                    Config.LOGTAG,
 52                    getAccount().getJid().asBareJid() + ": ignoring event from invalid jid");
 53            return;
 54        }
 55
 56        if (action instanceof Purge purge) {
 57            // purge is a deletion of all items in a node
 58            handlePurge(message, purge);
 59        } else if (action instanceof Items items) {
 60            // the items wrapper contains, new and updated items as well as retractions which are
 61            // deletions of individual items in a node
 62            handleItems(message, items);
 63        } else if (action instanceof Delete delete) {
 64            // delete is the deletion of the node itself
 65            handleDelete(message, delete);
 66        }
 67    }
 68
 69    public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
 70            final Jid address, final Class<T> clazz) {
 71        final var id = ExtensionFactory.id(clazz);
 72        if (id == null) {
 73            return Futures.immediateFailedFuture(
 74                    new IllegalArgumentException(
 75                            String.format("%s is not a registered extension", clazz.getName())));
 76        }
 77        return fetchItems(address, id.namespace, clazz);
 78    }
 79
 80    public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
 81            final Jid address, final String node, final Class<T> clazz) {
 82        final Iq request = new Iq(Iq.Type.GET);
 83        request.setTo(address);
 84        final var pubSub = request.addExtension(new PubSub());
 85        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
 86        itemsWrapper.setNode(node);
 87        return Futures.transform(
 88                connection.sendIqPacket(request),
 89                response -> {
 90                    final var pubSubResponse = response.getExtension(PubSub.class);
 91                    if (pubSubResponse == null) {
 92                        throw new IllegalStateException();
 93                    }
 94                    final var items = pubSubResponse.getItems();
 95                    if (items == null) {
 96                        throw new IllegalStateException();
 97                    }
 98                    return items.getItemMap(clazz);
 99                },
100                MoreExecutors.directExecutor());
101    }
102
103    public <T extends Extension> ListenableFuture<T> fetchItem(
104            final Jid address, final String itemId, final Class<T> clazz) {
105        final var id = ExtensionFactory.id(clazz);
106        if (id == null) {
107            return Futures.immediateFailedFuture(
108                    new IllegalArgumentException(
109                            String.format("%s is not a registered extension", clazz.getName())));
110        }
111        return fetchItem(address, id.namespace, itemId, clazz);
112    }
113
114    public <T extends Extension> ListenableFuture<T> fetchItem(
115            final Jid address, final String node, final String itemId, final Class<T> clazz) {
116        final Iq request = new Iq(Iq.Type.GET);
117        request.setTo(address);
118        final var pubSub = request.addExtension(new PubSub());
119        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
120        itemsWrapper.setNode(node);
121        final var item = itemsWrapper.addExtension(new PubSub.Item());
122        item.setId(itemId);
123        return Futures.transform(
124                connection.sendIqPacket(request),
125                response -> {
126                    final var pubSubResponse = response.getExtension(PubSub.class);
127                    if (pubSubResponse == null) {
128                        throw new IllegalStateException();
129                    }
130                    final var items = pubSubResponse.getItems();
131                    if (items == null) {
132                        throw new IllegalStateException();
133                    }
134                    return items.getItemOrThrow(itemId, clazz);
135                },
136                MoreExecutors.directExecutor());
137    }
138
139    public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
140            final Jid address, final Class<T> clazz) {
141        final var id = ExtensionFactory.id(clazz);
142        if (id == null) {
143            return Futures.immediateFailedFuture(
144                    new IllegalArgumentException(
145                            String.format("%s is not a registered extension", clazz.getName())));
146        }
147        return fetchMostRecentItem(address, id.namespace, clazz);
148    }
149
150    public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
151            final Jid address, final String node, final Class<T> clazz) {
152        final Iq request = new Iq(Iq.Type.GET);
153        request.setTo(address);
154        final var pubSub = request.addExtension(new PubSub());
155        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
156        itemsWrapper.setNode(node);
157        itemsWrapper.setMaxItems(1);
158        return Futures.transform(
159                connection.sendIqPacket(request),
160                response -> {
161                    final var pubSubResponse = response.getExtension(PubSub.class);
162                    if (pubSubResponse == null) {
163                        throw new IllegalStateException();
164                    }
165                    final var items = pubSubResponse.getItems();
166                    if (items == null) {
167                        throw new IllegalStateException();
168                    }
169                    return items.getOnlyItem(clazz);
170                },
171                MoreExecutors.directExecutor());
172    }
173
174    private void handleItems(final Message message, final Items items) {
175        final var from = message.getFrom();
176        final var isFromBare = from == null || from.isBareJid();
177        final var node = items.getNode();
178        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
179            getManager(BookmarkManager.class).handleItems(items);
180            return;
181        }
182        if (connection.fromAccount(message) && Namespace.BOOKMARKS.equals(node)) {
183            getManager(LegacyBookmarkManager.class).handleItems(items);
184            return;
185        }
186        if (connection.fromAccount(message) && Namespace.MDS_DISPLAYED.equals(node)) {
187            getManager(MessageDisplayedSynchronizationManager.class).handleItems(items);
188            return;
189        }
190        if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
191            getManager(AvatarManager.class).handleItems(from, items);
192            return;
193        }
194        if (isFromBare && Namespace.NICK.equals(node)) {
195            getManager(NickManager.class).handleItems(from, items);
196            return;
197        }
198        if (isFromBare && Namespace.AXOLOTL_DEVICE_LIST.equals(node)) {
199            getManager(AxolotlManager.class).handleItems(from, items);
200        }
201    }
202
203    private void handlePurge(final Message message, final Purge purge) {
204        final var from = message.getFrom();
205        final var isFromBare = from == null || from.isBareJid();
206        final var node = purge.getNode();
207        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
208            getManager(BookmarkManager.class).handlePurge();
209        }
210    }
211
212    private void handleDelete(final Message message, final Delete delete) {
213        final var from = message.getFrom();
214        final var isFromBare = from == null || from.isBareJid();
215        final var node = delete.getNode();
216        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
217            getManager(BookmarkManager.class).handleDelete();
218            return;
219        }
220        if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
221            getManager(AvatarManager.class).handleDelete(from);
222            return;
223        }
224        if (isFromBare && Namespace.NICK.equals(node)) {
225            getManager(NickManager.class).handleDelete(from);
226        }
227    }
228
229    public ListenableFuture<Void> publishSingleton(
230            Jid address, Extension item, final NodeConfiguration nodeConfiguration) {
231        final var id = ExtensionFactory.id(item.getClass());
232        return publish(address, item, SINGLETON_ITEM_ID, id.namespace, nodeConfiguration);
233    }
234
235    public ListenableFuture<Void> publishSingleton(
236            Jid address,
237            Extension item,
238            final String node,
239            final NodeConfiguration nodeConfiguration) {
240        return publish(address, item, SINGLETON_ITEM_ID, node, nodeConfiguration);
241    }
242
243    public ListenableFuture<Void> publish(
244            Jid address,
245            Extension item,
246            final String itemId,
247            final NodeConfiguration nodeConfiguration) {
248        final var id = ExtensionFactory.id(item.getClass());
249        return publish(address, item, itemId, id.namespace, nodeConfiguration);
250    }
251
252    public ListenableFuture<Void> publish(
253            final Jid address,
254            final Extension itemPayload,
255            final String itemId,
256            final String node,
257            final NodeConfiguration nodeConfiguration) {
258        final var future = publishNoRetry(address, itemPayload, itemId, node, nodeConfiguration);
259        return Futures.catchingAsync(
260                future,
261                PreconditionNotMetException.class,
262                ex -> {
263                    Log.d(
264                            Config.LOGTAG,
265                            "Node " + node + " on " + address + " requires reconfiguration");
266                    final var reconfigurationFuture =
267                            reconfigureNode(address, node, nodeConfiguration);
268                    return Futures.transformAsync(
269                            reconfigurationFuture,
270                            ignored ->
271                                    publishNoRetry(
272                                            address, itemPayload, itemId, node, nodeConfiguration),
273                            MoreExecutors.directExecutor());
274                },
275                MoreExecutors.directExecutor());
276    }
277
278    private ListenableFuture<Void> publishNoRetry(
279            final Jid address,
280            final Extension itemPayload,
281            final String itemId,
282            final String node,
283            final NodeConfiguration nodeConfiguration) {
284        final var iq = new Iq(Iq.Type.SET);
285        iq.setTo(address);
286        final var pubSub = iq.addExtension(new PubSub());
287        final var publish = pubSub.addExtension(new Publish());
288        publish.setNode(node);
289        final var item = publish.addExtension(new PubSub.Item());
290        item.setId(itemId);
291        item.addExtension(itemPayload);
292        pubSub.addExtension(PublishOptions.of(nodeConfiguration));
293        final ListenableFuture<Void> iqFuture =
294                Futures.transform(
295                        connection.sendIqPacket(iq),
296                        result -> null,
297                        MoreExecutors.directExecutor());
298        return Futures.catchingAsync(
299                iqFuture,
300                IqErrorException.class,
301                new PubSubExceptionTransformer<>(),
302                MoreExecutors.directExecutor());
303    }
304
305    private ListenableFuture<Void> reconfigureNode(
306            final Jid address, final String node, final NodeConfiguration nodeConfiguration) {
307        final Iq iq = new Iq(Iq.Type.GET);
308        iq.setTo(address);
309        final var pubSub = iq.addExtension(new PubSubOwner());
310        final var configure = pubSub.addExtension(new Configure());
311        configure.setNode(node);
312        return Futures.transformAsync(
313                connection.sendIqPacket(iq),
314                result -> {
315                    final var pubSubOwnerResult = result.getExtension(PubSubOwner.class);
316                    final Configure configureResult =
317                            pubSubOwnerResult == null
318                                    ? null
319                                    : pubSubOwnerResult.getExtension(Configure.class);
320                    if (configureResult == null) {
321                        throw new IllegalStateException(
322                                "No configuration found in configuration request result");
323                    }
324                    final var data = configureResult.getData();
325                    return setNodeConfiguration(address, node, data.submit(nodeConfiguration));
326                },
327                MoreExecutors.directExecutor());
328    }
329
330    private ListenableFuture<Void> setNodeConfiguration(
331            final Jid address, final String node, final Data data) {
332        final Iq iq = new Iq(Iq.Type.SET);
333        iq.setTo(address);
334        final var pubSub = iq.addExtension(new PubSubOwner());
335        final var configure = pubSub.addExtension(new Configure());
336        configure.setNode(node);
337        configure.addExtension(data);
338        return Futures.transform(
339                connection.sendIqPacket(iq),
340                result -> {
341                    Log.d(Config.LOGTAG, "Modified node configuration " + node + " on " + address);
342                    return null;
343                },
344                MoreExecutors.directExecutor());
345    }
346
347    public ListenableFuture<Iq> retract(final Jid address, final String itemId, final String node) {
348        final var iq = new Iq(Iq.Type.SET);
349        iq.setTo(address);
350        final var pubSub = iq.addExtension(new PubSub());
351        final var retract = pubSub.addExtension(new Retract());
352        retract.setNode(node);
353        retract.setNotify(true);
354        final var item = retract.addExtension(new PubSub.Item());
355        item.setId(itemId);
356        return connection.sendIqPacket(iq);
357    }
358
359    public ListenableFuture<Iq> delete(final Jid address, final String node) {
360        final var iq = new Iq(Iq.Type.SET);
361        iq.setTo(address);
362        final var pubSub = iq.addExtension(new PubSubOwner());
363        final var delete =
364                pubSub.addExtension(new im.conversations.android.xmpp.model.pubsub.owner.Delete());
365        delete.setNode(node);
366        return connection.sendIqPacket(iq);
367    }
368
369    private static class PubSubExceptionTransformer<V>
370            implements AsyncFunction<IqErrorException, V> {
371
372        @Override
373        @NonNull
374        public ListenableFuture<V> apply(@NonNull IqErrorException ex) {
375            final var error = ex.getError();
376            if (error == null) {
377                return Futures.immediateFailedFuture(ex);
378            }
379            final PubSubError pubSubError = error.getExtension(PubSubError.class);
380            if (pubSubError instanceof PubSubError.PreconditionNotMet) {
381                return Futures.immediateFailedFuture(
382                        new PreconditionNotMetException(ex.getResponse()));
383            } else if (pubSubError != null) {
384                return Futures.immediateFailedFuture(new PubSubErrorException(ex.getResponse()));
385            } else {
386                return Futures.immediateFailedFuture(ex);
387            }
388        }
389    }
390}