PubSubManager.java

  1package eu.siacs.conversations.xmpp.manager;
  2
  3import android.content.Context;
  4import android.util.Log;
  5import androidx.annotation.NonNull;
  6import com.google.common.util.concurrent.AsyncFunction;
  7import com.google.common.util.concurrent.Futures;
  8import com.google.common.util.concurrent.ListenableFuture;
  9import com.google.common.util.concurrent.MoreExecutors;
 10import eu.siacs.conversations.Config;
 11import eu.siacs.conversations.xml.Namespace;
 12import eu.siacs.conversations.xmpp.Jid;
 13import eu.siacs.conversations.xmpp.XmppConnection;
 14import im.conversations.android.xmpp.ExtensionFactory;
 15import im.conversations.android.xmpp.IqErrorException;
 16import im.conversations.android.xmpp.NodeConfiguration;
 17import im.conversations.android.xmpp.PreconditionNotMetException;
 18import im.conversations.android.xmpp.PubSubErrorException;
 19import im.conversations.android.xmpp.model.Extension;
 20import im.conversations.android.xmpp.model.data.Data;
 21import im.conversations.android.xmpp.model.pubsub.Items;
 22import im.conversations.android.xmpp.model.pubsub.PubSub;
 23import im.conversations.android.xmpp.model.pubsub.Publish;
 24import im.conversations.android.xmpp.model.pubsub.PublishOptions;
 25import im.conversations.android.xmpp.model.pubsub.Retract;
 26import im.conversations.android.xmpp.model.pubsub.error.PubSubError;
 27import im.conversations.android.xmpp.model.pubsub.event.Delete;
 28import im.conversations.android.xmpp.model.pubsub.event.Event;
 29import im.conversations.android.xmpp.model.pubsub.event.Purge;
 30import im.conversations.android.xmpp.model.pubsub.owner.Configure;
 31import im.conversations.android.xmpp.model.pubsub.owner.PubSubOwner;
 32import im.conversations.android.xmpp.model.stanza.Iq;
 33import im.conversations.android.xmpp.model.stanza.Message;
 34import java.util.Map;
 35
 36public class PubSubManager extends AbstractManager {
 37
 38    private static final String SINGLETON_ITEM_ID = "current";
 39
 40    public PubSubManager(Context context, XmppConnection connection) {
 41        super(context, connection);
 42    }
 43
 44    public void handleEvent(final Message message) {
 45        final var event = message.getExtension(Event.class);
 46        final var action = event.getAction();
 47        final var from = message.getFrom();
 48
 49        if (from instanceof Jid.Invalid) {
 50            Log.d(
 51                    Config.LOGTAG,
 52                    getAccount().getJid().asBareJid() + ": ignoring event from invalid jid");
 53            return;
 54        }
 55
 56        if (action instanceof Purge purge) {
 57            // purge is a deletion of all items in a node
 58            handlePurge(message, purge);
 59        } else if (action instanceof Items items) {
 60            // the items wrapper contains, new and updated items as well as retractions which are
 61            // deletions of individual items in a node
 62            handleItems(message, items);
 63        } else if (action instanceof Delete delete) {
 64            // delete is the deletion of the node itself
 65            handleDelete(message, delete);
 66        }
 67    }
 68
 69    public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
 70            final Jid address, final Class<T> clazz) {
 71        final var id = ExtensionFactory.id(clazz);
 72        if (id == null) {
 73            return Futures.immediateFailedFuture(
 74                    new IllegalArgumentException(
 75                            String.format("%s is not a registered extension", clazz.getName())));
 76        }
 77        return fetchItems(address, id.namespace, clazz);
 78    }
 79
 80    public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
 81            final Jid address, final String node, final Class<T> clazz) {
 82        final Iq request = new Iq(Iq.Type.GET);
 83        request.setTo(address);
 84        final var pubSub = request.addExtension(new PubSub());
 85        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
 86        itemsWrapper.setNode(node);
 87        return Futures.transform(
 88                connection.sendIqPacket(request),
 89                response -> {
 90                    final var pubSubResponse = response.getExtension(PubSub.class);
 91                    if (pubSubResponse == null) {
 92                        throw new IllegalStateException();
 93                    }
 94                    final var items = pubSubResponse.getItems();
 95                    if (items == null) {
 96                        throw new IllegalStateException();
 97                    }
 98                    return items.getItemMap(clazz);
 99                },
100                MoreExecutors.directExecutor());
101    }
102
103    public <T extends Extension> ListenableFuture<T> fetchItem(
104            final Jid address, final String itemId, final Class<T> clazz) {
105        final var id = ExtensionFactory.id(clazz);
106        if (id == null) {
107            return Futures.immediateFailedFuture(
108                    new IllegalArgumentException(
109                            String.format("%s is not a registered extension", clazz.getName())));
110        }
111        return fetchItem(address, id.namespace, itemId, clazz);
112    }
113
114    public <T extends Extension> ListenableFuture<T> fetchItem(
115            final Jid address, final String node, final String itemId, final Class<T> clazz) {
116        final Iq request = new Iq(Iq.Type.GET);
117        request.setTo(address);
118        final var pubSub = request.addExtension(new PubSub());
119        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
120        itemsWrapper.setNode(node);
121        final var item = itemsWrapper.addExtension(new PubSub.Item());
122        item.setId(itemId);
123        return Futures.transform(
124                connection.sendIqPacket(request),
125                response -> {
126                    final var pubSubResponse = response.getExtension(PubSub.class);
127                    if (pubSubResponse == null) {
128                        throw new IllegalStateException();
129                    }
130                    final var items = pubSubResponse.getItems();
131                    if (items == null) {
132                        throw new IllegalStateException();
133                    }
134                    return items.getItemOrThrow(itemId, clazz);
135                },
136                MoreExecutors.directExecutor());
137    }
138
139    public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
140            final Jid address, final String node, final Class<T> clazz) {
141        final Iq request = new Iq(Iq.Type.GET);
142        request.setTo(address);
143        final var pubSub = request.addExtension(new PubSub());
144        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
145        itemsWrapper.setNode(node);
146        itemsWrapper.setMaxItems(1);
147        return Futures.transform(
148                connection.sendIqPacket(request),
149                response -> {
150                    final var pubSubResponse = response.getExtension(PubSub.class);
151                    if (pubSubResponse == null) {
152                        throw new IllegalStateException();
153                    }
154                    final var items = pubSubResponse.getItems();
155                    if (items == null) {
156                        throw new IllegalStateException();
157                    }
158                    return items.getOnlyItem(clazz);
159                },
160                MoreExecutors.directExecutor());
161    }
162
163    private void handleItems(final Message message, final Items items) {
164        final var from = message.getFrom();
165        final var isFromBare = from == null || from.isBareJid();
166        final var node = items.getNode();
167        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
168            getManager(BookmarkManager.class).handleItems(items);
169            return;
170        }
171        if (connection.fromAccount(message) && Namespace.BOOKMARKS.equals(node)) {
172            getManager(LegacyBookmarkManager.class).handleItems(items);
173            return;
174        }
175        if (connection.fromAccount(message) && Namespace.MDS_DISPLAYED.equals(node)) {
176            getManager(MessageDisplayedSynchronizationManager.class).handleItems(items);
177            return;
178        }
179        if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
180            getManager(AvatarManager.class).handleItems(from, items);
181            return;
182        }
183        if (isFromBare && Namespace.NICK.equals(node)) {
184            getManager(NickManager.class).handleItems(from, items);
185            return;
186        }
187        if (isFromBare && Namespace.AXOLOTL_DEVICE_LIST.equals(node)) {
188            getManager(AxolotlManager.class).handleItems(from, items);
189        }
190    }
191
192    private void handlePurge(final Message message, final Purge purge) {
193        final var from = message.getFrom();
194        final var isFromBare = from == null || from.isBareJid();
195        final var node = purge.getNode();
196        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
197            getManager(BookmarkManager.class).handlePurge();
198        }
199    }
200
201    private void handleDelete(final Message message, final Delete delete) {
202        final var from = message.getFrom();
203        final var isFromBare = from == null || from.isBareJid();
204        final var node = delete.getNode();
205        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
206            getManager(BookmarkManager.class).handleDelete();
207            return;
208        }
209        if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
210            getManager(AvatarManager.class).handleDelete(from);
211            return;
212        }
213        if (isFromBare && Namespace.NICK.equals(node)) {
214            getManager(NickManager.class).handleDelete(from);
215        }
216    }
217
218    public ListenableFuture<Void> publishSingleton(
219            Jid address, Extension item, final NodeConfiguration nodeConfiguration) {
220        final var id = ExtensionFactory.id(item.getClass());
221        return publish(address, item, SINGLETON_ITEM_ID, id.namespace, nodeConfiguration);
222    }
223
224    public ListenableFuture<Void> publishSingleton(
225            Jid address,
226            Extension item,
227            final String node,
228            final NodeConfiguration nodeConfiguration) {
229        return publish(address, item, SINGLETON_ITEM_ID, node, nodeConfiguration);
230    }
231
232    public ListenableFuture<Void> publish(
233            Jid address,
234            Extension item,
235            final String itemId,
236            final NodeConfiguration nodeConfiguration) {
237        final var id = ExtensionFactory.id(item.getClass());
238        return publish(address, item, itemId, id.namespace, nodeConfiguration);
239    }
240
241    public ListenableFuture<Void> publish(
242            final Jid address,
243            final Extension itemPayload,
244            final String itemId,
245            final String node,
246            final NodeConfiguration nodeConfiguration) {
247        final var future = publishNoRetry(address, itemPayload, itemId, node, nodeConfiguration);
248        return Futures.catchingAsync(
249                future,
250                PreconditionNotMetException.class,
251                ex -> {
252                    Log.d(
253                            Config.LOGTAG,
254                            "Node " + node + " on " + address + " requires reconfiguration");
255                    final var reconfigurationFuture =
256                            reconfigureNode(address, node, nodeConfiguration);
257                    return Futures.transformAsync(
258                            reconfigurationFuture,
259                            ignored ->
260                                    publishNoRetry(
261                                            address, itemPayload, itemId, node, nodeConfiguration),
262                            MoreExecutors.directExecutor());
263                },
264                MoreExecutors.directExecutor());
265    }
266
267    private ListenableFuture<Void> publishNoRetry(
268            final Jid address,
269            final Extension itemPayload,
270            final String itemId,
271            final String node,
272            final NodeConfiguration nodeConfiguration) {
273        final var iq = new Iq(Iq.Type.SET);
274        iq.setTo(address);
275        final var pubSub = iq.addExtension(new PubSub());
276        final var publish = pubSub.addExtension(new Publish());
277        publish.setNode(node);
278        final var item = publish.addExtension(new PubSub.Item());
279        item.setId(itemId);
280        item.addExtension(itemPayload);
281        pubSub.addExtension(PublishOptions.of(nodeConfiguration));
282        final ListenableFuture<Void> iqFuture =
283                Futures.transform(
284                        connection.sendIqPacket(iq),
285                        result -> null,
286                        MoreExecutors.directExecutor());
287        return Futures.catchingAsync(
288                iqFuture,
289                IqErrorException.class,
290                new PubSubExceptionTransformer<>(),
291                MoreExecutors.directExecutor());
292    }
293
294    private ListenableFuture<Void> reconfigureNode(
295            final Jid address, final String node, final NodeConfiguration nodeConfiguration) {
296        final Iq iq = new Iq(Iq.Type.GET);
297        iq.setTo(address);
298        final var pubSub = iq.addExtension(new PubSubOwner());
299        final var configure = pubSub.addExtension(new Configure());
300        configure.setNode(node);
301        return Futures.transformAsync(
302                connection.sendIqPacket(iq),
303                result -> {
304                    final var pubSubOwnerResult = result.getExtension(PubSubOwner.class);
305                    final Configure configureResult =
306                            pubSubOwnerResult == null
307                                    ? null
308                                    : pubSubOwnerResult.getExtension(Configure.class);
309                    if (configureResult == null) {
310                        throw new IllegalStateException(
311                                "No configuration found in configuration request result");
312                    }
313                    final var data = configureResult.getData();
314                    return setNodeConfiguration(address, node, data.submit(nodeConfiguration));
315                },
316                MoreExecutors.directExecutor());
317    }
318
319    private ListenableFuture<Void> setNodeConfiguration(
320            final Jid address, final String node, final Data data) {
321        final Iq iq = new Iq(Iq.Type.SET);
322        iq.setTo(address);
323        final var pubSub = iq.addExtension(new PubSubOwner());
324        final var configure = pubSub.addExtension(new Configure());
325        configure.setNode(node);
326        configure.addExtension(data);
327        return Futures.transform(
328                connection.sendIqPacket(iq),
329                result -> {
330                    Log.d(Config.LOGTAG, "Modified node configuration " + node + " on " + address);
331                    return null;
332                },
333                MoreExecutors.directExecutor());
334    }
335
336    public ListenableFuture<Iq> retract(final Jid address, final String itemId, final String node) {
337        final var iq = new Iq(Iq.Type.SET);
338        iq.setTo(address);
339        final var pubSub = iq.addExtension(new PubSub());
340        final var retract = pubSub.addExtension(new Retract());
341        retract.setNode(node);
342        retract.setNotify(true);
343        final var item = retract.addExtension(new PubSub.Item());
344        item.setId(itemId);
345        return connection.sendIqPacket(iq);
346    }
347
348    private static class PubSubExceptionTransformer<V>
349            implements AsyncFunction<IqErrorException, V> {
350
351        @Override
352        @NonNull
353        public ListenableFuture<V> apply(@NonNull IqErrorException ex) {
354            final var error = ex.getError();
355            if (error == null) {
356                return Futures.immediateFailedFuture(ex);
357            }
358            final PubSubError pubSubError = error.getExtension(PubSubError.class);
359            if (pubSubError instanceof PubSubError.PreconditionNotMet) {
360                return Futures.immediateFailedFuture(
361                        new PreconditionNotMetException(ex.getResponse()));
362            } else if (pubSubError != null) {
363                return Futures.immediateFailedFuture(new PubSubErrorException(ex.getResponse()));
364            } else {
365                return Futures.immediateFailedFuture(ex);
366            }
367        }
368    }
369}