PubSubManager.java

  1package eu.siacs.conversations.xmpp.manager;
  2
  3import android.content.Context;
  4import android.util.Log;
  5import androidx.annotation.NonNull;
  6import com.google.common.util.concurrent.AsyncFunction;
  7import com.google.common.util.concurrent.Futures;
  8import com.google.common.util.concurrent.ListenableFuture;
  9import com.google.common.util.concurrent.MoreExecutors;
 10import eu.siacs.conversations.Config;
 11import eu.siacs.conversations.xml.Namespace;
 12import eu.siacs.conversations.xmpp.Jid;
 13import eu.siacs.conversations.xmpp.XmppConnection;
 14import im.conversations.android.xmpp.ExtensionFactory;
 15import im.conversations.android.xmpp.IqErrorException;
 16import im.conversations.android.xmpp.NodeConfiguration;
 17import im.conversations.android.xmpp.PreconditionNotMetException;
 18import im.conversations.android.xmpp.PubSubErrorException;
 19import im.conversations.android.xmpp.model.Extension;
 20import im.conversations.android.xmpp.model.data.Data;
 21import im.conversations.android.xmpp.model.pubsub.Items;
 22import im.conversations.android.xmpp.model.pubsub.PubSub;
 23import im.conversations.android.xmpp.model.pubsub.Publish;
 24import im.conversations.android.xmpp.model.pubsub.PublishOptions;
 25import im.conversations.android.xmpp.model.pubsub.Retract;
 26import im.conversations.android.xmpp.model.pubsub.error.PubSubError;
 27import im.conversations.android.xmpp.model.pubsub.event.Delete;
 28import im.conversations.android.xmpp.model.pubsub.event.Event;
 29import im.conversations.android.xmpp.model.pubsub.event.Purge;
 30import im.conversations.android.xmpp.model.pubsub.owner.Configure;
 31import im.conversations.android.xmpp.model.pubsub.owner.PubSubOwner;
 32import im.conversations.android.xmpp.model.stanza.Iq;
 33import im.conversations.android.xmpp.model.stanza.Message;
 34import java.util.Map;
 35
 36public class PubSubManager extends AbstractManager {
 37
 38    private static final String SINGLETON_ITEM_ID = "current";
 39
 40    public PubSubManager(Context context, XmppConnection connection) {
 41        super(context, connection);
 42    }
 43
 44    public void handleEvent(final Message message) {
 45        final var event = message.getExtension(Event.class);
 46        final var action = event.getAction();
 47        final var from = message.getFrom();
 48
 49        if (from instanceof Jid.Invalid) {
 50            Log.d(
 51                    Config.LOGTAG,
 52                    getAccount().getJid().asBareJid() + ": ignoring event from invalid jid");
 53            return;
 54        }
 55
 56        if (action instanceof Purge purge) {
 57            // purge is a deletion of all items in a node
 58            handlePurge(message, purge);
 59        } else if (action instanceof Items items) {
 60            // the items wrapper contains, new and updated items as well as retractions which are
 61            // deletions of individual items in a node
 62            handleItems(message, items);
 63        } else if (action instanceof Delete delete) {
 64            // delete is the deletion of the node itself
 65            handleDelete(message, delete);
 66        }
 67    }
 68
 69    public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
 70            final Jid address, final Class<T> clazz) {
 71        final var id = ExtensionFactory.id(clazz);
 72        if (id == null) {
 73            return Futures.immediateFailedFuture(
 74                    new IllegalArgumentException(
 75                            String.format("%s is not a registered extension", clazz.getName())));
 76        }
 77        return fetchItems(address, id.namespace, clazz);
 78    }
 79
 80    public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
 81            final Jid address, final String node, final Class<T> clazz) {
 82        final Iq request = new Iq(Iq.Type.GET);
 83        request.setTo(address);
 84        final var pubSub = request.addExtension(new PubSub());
 85        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
 86        itemsWrapper.setNode(node);
 87        return Futures.transform(
 88                connection.sendIqPacket(request),
 89                response -> {
 90                    final var pubSubResponse = response.getExtension(PubSub.class);
 91                    if (pubSubResponse == null) {
 92                        throw new IllegalStateException();
 93                    }
 94                    final var items = pubSubResponse.getItems();
 95                    if (items == null) {
 96                        throw new IllegalStateException();
 97                    }
 98                    return items.getItemMap(clazz);
 99                },
100                MoreExecutors.directExecutor());
101    }
102
103    public <T extends Extension> ListenableFuture<T> fetchItem(
104            final Jid address, final String itemId, final Class<T> clazz) {
105        final var id = ExtensionFactory.id(clazz);
106        if (id == null) {
107            return Futures.immediateFailedFuture(
108                    new IllegalArgumentException(
109                            String.format("%s is not a registered extension", clazz.getName())));
110        }
111        return fetchItem(address, id.namespace, itemId, clazz);
112    }
113
114    public <T extends Extension> ListenableFuture<T> fetchItem(
115            final Jid address, final String node, final String itemId, final Class<T> clazz) {
116        final Iq request = new Iq(Iq.Type.GET);
117        request.setTo(address);
118        final var pubSub = request.addExtension(new PubSub());
119        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
120        itemsWrapper.setNode(node);
121        final var item = itemsWrapper.addExtension(new PubSub.Item());
122        item.setId(itemId);
123        return Futures.transform(
124                connection.sendIqPacket(request),
125                response -> {
126                    final var pubSubResponse = response.getExtension(PubSub.class);
127                    if (pubSubResponse == null) {
128                        throw new IllegalStateException();
129                    }
130                    final var items = pubSubResponse.getItems();
131                    if (items == null) {
132                        throw new IllegalStateException();
133                    }
134                    return items.getItemOrThrow(itemId, clazz);
135                },
136                MoreExecutors.directExecutor());
137    }
138
139    public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
140            final Jid address, final String node, final Class<T> clazz) {
141        final Iq request = new Iq(Iq.Type.GET);
142        request.setTo(address);
143        final var pubSub = request.addExtension(new PubSub());
144        final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
145        itemsWrapper.setNode(node);
146        itemsWrapper.setMaxItems(1);
147        return Futures.transform(
148                connection.sendIqPacket(request),
149                response -> {
150                    final var pubSubResponse = response.getExtension(PubSub.class);
151                    if (pubSubResponse == null) {
152                        throw new IllegalStateException();
153                    }
154                    final var items = pubSubResponse.getItems();
155                    if (items == null) {
156                        throw new IllegalStateException();
157                    }
158                    return items.getOnlyItem(clazz);
159                },
160                MoreExecutors.directExecutor());
161    }
162
163    private void handleItems(final Message message, final Items items) {
164        final var from = message.getFrom();
165        final var isFromBare = from == null || from.isBareJid();
166        final var node = items.getNode();
167        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
168            getManager(BookmarkManager.class).handleItems(items);
169            return;
170        }
171        if (connection.fromAccount(message) && Namespace.BOOKMARKS.equals(node)) {
172            getManager(LegacyBookmarkManager.class).handleItems(items);
173            return;
174        }
175        if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
176            getManager(AvatarManager.class).handleItems(from, items);
177            return;
178        }
179        if (isFromBare && Namespace.NICK.equals(node)) {
180            getManager(NickManager.class).handleItems(from, items);
181            return;
182        }
183        if (isFromBare && Namespace.AXOLOTL_DEVICE_LIST.equals(node)) {
184            getManager(AxolotlManager.class).handleItems(from, items);
185        }
186    }
187
188    private void handlePurge(final Message message, final Purge purge) {
189        final var from = message.getFrom();
190        final var node = purge.getNode();
191        if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
192            getManager(BookmarkManager.class).deleteAllItems();
193        }
194    }
195
196    private void handleDelete(final Message message, final Delete delete) {}
197
198    public ListenableFuture<Void> publishSingleton(
199            Jid address, Extension item, final NodeConfiguration nodeConfiguration) {
200        final var id = ExtensionFactory.id(item.getClass());
201        return publish(address, item, SINGLETON_ITEM_ID, id.namespace, nodeConfiguration);
202    }
203
204    public ListenableFuture<Void> publishSingleton(
205            Jid address,
206            Extension item,
207            final String node,
208            final NodeConfiguration nodeConfiguration) {
209        return publish(address, item, SINGLETON_ITEM_ID, node, nodeConfiguration);
210    }
211
212    public ListenableFuture<Void> publish(
213            Jid address,
214            Extension item,
215            final String itemId,
216            final NodeConfiguration nodeConfiguration) {
217        final var id = ExtensionFactory.id(item.getClass());
218        return publish(address, item, itemId, id.namespace, nodeConfiguration);
219    }
220
221    public ListenableFuture<Void> publish(
222            final Jid address,
223            final Extension itemPayload,
224            final String itemId,
225            final String node,
226            final NodeConfiguration nodeConfiguration) {
227        final var future = publishNoRetry(address, itemPayload, itemId, node, nodeConfiguration);
228        return Futures.catchingAsync(
229                future,
230                PreconditionNotMetException.class,
231                ex -> {
232                    Log.d(
233                            Config.LOGTAG,
234                            "Node " + node + " on " + address + " requires reconfiguration");
235                    final var reconfigurationFuture =
236                            reconfigureNode(address, node, nodeConfiguration);
237                    return Futures.transformAsync(
238                            reconfigurationFuture,
239                            ignored ->
240                                    publishNoRetry(
241                                            address, itemPayload, itemId, node, nodeConfiguration),
242                            MoreExecutors.directExecutor());
243                },
244                MoreExecutors.directExecutor());
245    }
246
247    private ListenableFuture<Void> publishNoRetry(
248            final Jid address,
249            final Extension itemPayload,
250            final String itemId,
251            final String node,
252            final NodeConfiguration nodeConfiguration) {
253        final var iq = new Iq(Iq.Type.SET);
254        iq.setTo(address);
255        final var pubSub = iq.addExtension(new PubSub());
256        final var publish = pubSub.addExtension(new Publish());
257        publish.setNode(node);
258        final var item = publish.addExtension(new PubSub.Item());
259        item.setId(itemId);
260        item.addExtension(itemPayload);
261        pubSub.addExtension(PublishOptions.of(nodeConfiguration));
262        final ListenableFuture<Void> iqFuture =
263                Futures.transform(
264                        connection.sendIqPacket(iq),
265                        result -> null,
266                        MoreExecutors.directExecutor());
267        return Futures.catchingAsync(
268                iqFuture,
269                IqErrorException.class,
270                new PubSubExceptionTransformer<>(),
271                MoreExecutors.directExecutor());
272    }
273
274    private ListenableFuture<Void> reconfigureNode(
275            final Jid address, final String node, final NodeConfiguration nodeConfiguration) {
276        final Iq iq = new Iq(Iq.Type.GET);
277        iq.setTo(address);
278        final var pubSub = iq.addExtension(new PubSubOwner());
279        final var configure = pubSub.addExtension(new Configure());
280        configure.setNode(node);
281        return Futures.transformAsync(
282                connection.sendIqPacket(iq),
283                result -> {
284                    final var pubSubOwnerResult = result.getExtension(PubSubOwner.class);
285                    final Configure configureResult =
286                            pubSubOwnerResult == null
287                                    ? null
288                                    : pubSubOwnerResult.getExtension(Configure.class);
289                    if (configureResult == null) {
290                        throw new IllegalStateException(
291                                "No configuration found in configuration request result");
292                    }
293                    final var data = configureResult.getData();
294                    return setNodeConfiguration(address, node, data.submit(nodeConfiguration));
295                },
296                MoreExecutors.directExecutor());
297    }
298
299    private ListenableFuture<Void> setNodeConfiguration(
300            final Jid address, final String node, final Data data) {
301        final Iq iq = new Iq(Iq.Type.SET);
302        iq.setTo(address);
303        final var pubSub = iq.addExtension(new PubSubOwner());
304        final var configure = pubSub.addExtension(new Configure());
305        configure.setNode(node);
306        configure.addExtension(data);
307        return Futures.transform(
308                connection.sendIqPacket(iq),
309                result -> {
310                    Log.d(Config.LOGTAG, "Modified node configuration " + node + " on " + address);
311                    return null;
312                },
313                MoreExecutors.directExecutor());
314    }
315
316    public ListenableFuture<Iq> retract(final Jid address, final String itemId, final String node) {
317        final var iq = new Iq(Iq.Type.SET);
318        iq.setTo(address);
319        final var pubSub = iq.addExtension(new PubSub());
320        final var retract = pubSub.addExtension(new Retract());
321        retract.setNode(node);
322        retract.setNotify(true);
323        final var item = retract.addExtension(new PubSub.Item());
324        item.setId(itemId);
325        return connection.sendIqPacket(iq);
326    }
327
328    private static class PubSubExceptionTransformer<V>
329            implements AsyncFunction<IqErrorException, V> {
330
331        @Override
332        @NonNull
333        public ListenableFuture<V> apply(@NonNull IqErrorException ex) {
334            final var error = ex.getError();
335            if (error == null) {
336                return Futures.immediateFailedFuture(ex);
337            }
338            final PubSubError pubSubError = error.getExtension(PubSubError.class);
339            if (pubSubError instanceof PubSubError.PreconditionNotMet) {
340                return Futures.immediateFailedFuture(
341                        new PreconditionNotMetException(ex.getResponse()));
342            } else if (pubSubError != null) {
343                return Futures.immediateFailedFuture(new PubSubErrorException(ex.getResponse()));
344            } else {
345                return Futures.immediateFailedFuture(ex);
346            }
347        }
348    }
349}