1package eu.siacs.conversations.xmpp.manager;
2
3import android.content.Context;
4import android.util.Log;
5import androidx.annotation.NonNull;
6import com.google.common.util.concurrent.AsyncFunction;
7import com.google.common.util.concurrent.Futures;
8import com.google.common.util.concurrent.ListenableFuture;
9import com.google.common.util.concurrent.MoreExecutors;
10import eu.siacs.conversations.Config;
11import eu.siacs.conversations.xml.Namespace;
12import eu.siacs.conversations.xmpp.Jid;
13import eu.siacs.conversations.xmpp.XmppConnection;
14import im.conversations.android.xmpp.ExtensionFactory;
15import im.conversations.android.xmpp.IqErrorException;
16import im.conversations.android.xmpp.NodeConfiguration;
17import im.conversations.android.xmpp.PreconditionNotMetException;
18import im.conversations.android.xmpp.PubSubErrorException;
19import im.conversations.android.xmpp.model.Extension;
20import im.conversations.android.xmpp.model.data.Data;
21import im.conversations.android.xmpp.model.pubsub.Items;
22import im.conversations.android.xmpp.model.pubsub.PubSub;
23import im.conversations.android.xmpp.model.pubsub.Publish;
24import im.conversations.android.xmpp.model.pubsub.PublishOptions;
25import im.conversations.android.xmpp.model.pubsub.Retract;
26import im.conversations.android.xmpp.model.pubsub.error.PubSubError;
27import im.conversations.android.xmpp.model.pubsub.event.Delete;
28import im.conversations.android.xmpp.model.pubsub.event.Event;
29import im.conversations.android.xmpp.model.pubsub.event.Purge;
30import im.conversations.android.xmpp.model.pubsub.owner.Configure;
31import im.conversations.android.xmpp.model.pubsub.owner.PubSubOwner;
32import im.conversations.android.xmpp.model.stanza.Iq;
33import im.conversations.android.xmpp.model.stanza.Message;
34import java.util.Map;
35
36public class PubSubManager extends AbstractManager {
37
38 private static final String SINGLETON_ITEM_ID = "current";
39
40 public PubSubManager(Context context, XmppConnection connection) {
41 super(context, connection);
42 }
43
44 public void handleEvent(final Message message) {
45 final var event = message.getExtension(Event.class);
46 final var action = event.getAction();
47 final var from = message.getFrom();
48
49 if (from instanceof Jid.Invalid) {
50 Log.d(
51 Config.LOGTAG,
52 getAccount().getJid().asBareJid() + ": ignoring event from invalid jid");
53 return;
54 }
55
56 if (action instanceof Purge purge) {
57 // purge is a deletion of all items in a node
58 handlePurge(message, purge);
59 } else if (action instanceof Items items) {
60 // the items wrapper contains, new and updated items as well as retractions which are
61 // deletions of individual items in a node
62 handleItems(message, items);
63 } else if (action instanceof Delete delete) {
64 // delete is the deletion of the node itself
65 handleDelete(message, delete);
66 }
67 }
68
69 public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
70 final Jid address, final Class<T> clazz) {
71 final var id = ExtensionFactory.id(clazz);
72 if (id == null) {
73 return Futures.immediateFailedFuture(
74 new IllegalArgumentException(
75 String.format("%s is not a registered extension", clazz.getName())));
76 }
77 return fetchItems(address, id.namespace, clazz);
78 }
79
80 public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
81 final Jid address, final String node, final Class<T> clazz) {
82 final Iq request = new Iq(Iq.Type.GET);
83 request.setTo(address);
84 final var pubSub = request.addExtension(new PubSub());
85 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
86 itemsWrapper.setNode(node);
87 return Futures.transform(
88 connection.sendIqPacket(request),
89 response -> {
90 final var pubSubResponse = response.getExtension(PubSub.class);
91 if (pubSubResponse == null) {
92 throw new IllegalStateException();
93 }
94 final var items = pubSubResponse.getItems();
95 if (items == null) {
96 throw new IllegalStateException();
97 }
98 return items.getItemMap(clazz);
99 },
100 MoreExecutors.directExecutor());
101 }
102
103 public <T extends Extension> ListenableFuture<T> fetchItem(
104 final Jid address, final String itemId, final Class<T> clazz) {
105 final var id = ExtensionFactory.id(clazz);
106 if (id == null) {
107 return Futures.immediateFailedFuture(
108 new IllegalArgumentException(
109 String.format("%s is not a registered extension", clazz.getName())));
110 }
111 return fetchItem(address, id.namespace, itemId, clazz);
112 }
113
114 public <T extends Extension> ListenableFuture<T> fetchItem(
115 final Jid address, final String node, final String itemId, final Class<T> clazz) {
116 final Iq request = new Iq(Iq.Type.GET);
117 request.setTo(address);
118 final var pubSub = request.addExtension(new PubSub());
119 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
120 itemsWrapper.setNode(node);
121 final var item = itemsWrapper.addExtension(new PubSub.Item());
122 item.setId(itemId);
123 return Futures.transform(
124 connection.sendIqPacket(request),
125 response -> {
126 final var pubSubResponse = response.getExtension(PubSub.class);
127 if (pubSubResponse == null) {
128 throw new IllegalStateException();
129 }
130 final var items = pubSubResponse.getItems();
131 if (items == null) {
132 throw new IllegalStateException();
133 }
134 return items.getItemOrThrow(itemId, clazz);
135 },
136 MoreExecutors.directExecutor());
137 }
138
139 public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
140 final Jid address, final Class<T> clazz) {
141 final var id = ExtensionFactory.id(clazz);
142 if (id == null) {
143 return Futures.immediateFailedFuture(
144 new IllegalArgumentException(
145 String.format("%s is not a registered extension", clazz.getName())));
146 }
147 return fetchMostRecentItem(address, id.namespace, clazz);
148 }
149
150 public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
151 final Jid address, final String node, final Class<T> clazz) {
152 final Iq request = new Iq(Iq.Type.GET);
153 request.setTo(address);
154 final var pubSub = request.addExtension(new PubSub());
155 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
156 itemsWrapper.setNode(node);
157 itemsWrapper.setMaxItems(1);
158 return Futures.transform(
159 connection.sendIqPacket(request),
160 response -> {
161 final var pubSubResponse = response.getExtension(PubSub.class);
162 if (pubSubResponse == null) {
163 throw new IllegalStateException();
164 }
165 final var items = pubSubResponse.getItems();
166 if (items == null) {
167 throw new IllegalStateException();
168 }
169 return items.getOnlyItem(clazz);
170 },
171 MoreExecutors.directExecutor());
172 }
173
174 private void handleItems(final Message message, final Items items) {
175 final var from = message.getFrom();
176 final var isFromBare = from == null || from.isBareJid();
177 final var node = items.getNode();
178 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
179 getManager(NativeBookmarkManager.class).handleItems(items);
180 return;
181 }
182 if (connection.fromAccount(message) && Namespace.BOOKMARKS.equals(node)) {
183 getManager(LegacyBookmarkManager.class).handleItems(items);
184 return;
185 }
186 if (connection.fromAccount(message) && Namespace.MDS_DISPLAYED.equals(node)) {
187 getManager(MessageDisplayedSynchronizationManager.class).handleItems(items);
188 return;
189 }
190 if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
191 getManager(AvatarManager.class).handleItems(from, items);
192 return;
193 }
194 if (isFromBare && Namespace.NICK.equals(node)) {
195 getManager(NickManager.class).handleItems(from, items);
196 return;
197 }
198 if (isFromBare && Namespace.AXOLOTL_DEVICE_LIST.equals(node)) {
199 getManager(AxolotlManager.class).handleItems(from, items);
200 }
201 }
202
203 private void handlePurge(final Message message, final Purge purge) {
204 final var from = message.getFrom();
205 final var isFromBare = from == null || from.isBareJid();
206 final var node = purge.getNode();
207 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
208 getManager(NativeBookmarkManager.class).handlePurge();
209 }
210 if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
211 // purge (delete all items in a node) is functionally equivalent to delete
212 getManager(AvatarManager.class).handleDelete(from);
213 }
214 }
215
216 private void handleDelete(final Message message, final Delete delete) {
217 final var from = message.getFrom();
218 final var isFromBare = from == null || from.isBareJid();
219 final var node = delete.getNode();
220 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
221 getManager(NativeBookmarkManager.class).handleDelete();
222 return;
223 }
224 if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
225 getManager(AvatarManager.class).handleDelete(from);
226 return;
227 }
228 if (isFromBare && Namespace.NICK.equals(node)) {
229 getManager(NickManager.class).handleDelete(from);
230 }
231 }
232
233 public ListenableFuture<Void> publishSingleton(
234 Jid address, Extension item, final NodeConfiguration nodeConfiguration) {
235 final var id = ExtensionFactory.id(item.getClass());
236 return publish(address, item, SINGLETON_ITEM_ID, id.namespace, nodeConfiguration);
237 }
238
239 public ListenableFuture<Void> publishSingleton(
240 Jid address,
241 Extension item,
242 final String node,
243 final NodeConfiguration nodeConfiguration) {
244 return publish(address, item, SINGLETON_ITEM_ID, node, nodeConfiguration);
245 }
246
247 public ListenableFuture<Void> publish(
248 Jid address,
249 Extension item,
250 final String itemId,
251 final NodeConfiguration nodeConfiguration) {
252 final var id = ExtensionFactory.id(item.getClass());
253 return publish(address, item, itemId, id.namespace, nodeConfiguration);
254 }
255
256 public ListenableFuture<Void> publish(
257 final Jid address,
258 final Extension itemPayload,
259 final String itemId,
260 final String node,
261 final NodeConfiguration nodeConfiguration) {
262 final var future = publishNoRetry(address, itemPayload, itemId, node, nodeConfiguration);
263 return Futures.catchingAsync(
264 future,
265 PreconditionNotMetException.class,
266 ex -> {
267 Log.d(
268 Config.LOGTAG,
269 "Node " + node + " on " + address + " requires reconfiguration");
270 final var reconfigurationFuture =
271 reconfigureNode(address, node, nodeConfiguration);
272 return Futures.transformAsync(
273 reconfigurationFuture,
274 ignored ->
275 publishNoRetry(
276 address, itemPayload, itemId, node, nodeConfiguration),
277 MoreExecutors.directExecutor());
278 },
279 MoreExecutors.directExecutor());
280 }
281
282 private ListenableFuture<Void> publishNoRetry(
283 final Jid address,
284 final Extension itemPayload,
285 final String itemId,
286 final String node,
287 final NodeConfiguration nodeConfiguration) {
288 final var iq = new Iq(Iq.Type.SET);
289 iq.setTo(address);
290 final var pubSub = iq.addExtension(new PubSub());
291 final var publish = pubSub.addExtension(new Publish());
292 publish.setNode(node);
293 final var item = publish.addExtension(new PubSub.Item());
294 item.setId(itemId);
295 item.addExtension(itemPayload);
296 pubSub.addExtension(PublishOptions.of(nodeConfiguration));
297 final ListenableFuture<Void> iqFuture =
298 Futures.transform(
299 connection.sendIqPacket(iq),
300 result -> null,
301 MoreExecutors.directExecutor());
302 return Futures.catchingAsync(
303 iqFuture,
304 IqErrorException.class,
305 new PubSubExceptionTransformer<>(),
306 MoreExecutors.directExecutor());
307 }
308
309 private ListenableFuture<Void> reconfigureNode(
310 final Jid address, final String node, final NodeConfiguration nodeConfiguration) {
311 return Futures.transformAsync(
312 getNodeConfiguration(address, node),
313 data -> setNodeConfiguration(address, node, data.submit(nodeConfiguration)),
314 MoreExecutors.directExecutor());
315 }
316
317 public ListenableFuture<Data> getNodeConfiguration(final Jid address, final String node) {
318 final Iq iq = new Iq(Iq.Type.GET);
319 iq.setTo(address);
320 final var pubSub = iq.addExtension(new PubSubOwner());
321 final var configure = pubSub.addExtension(new Configure());
322 configure.setNode(node);
323 return Futures.transform(
324 connection.sendIqPacket(iq),
325 result -> {
326 final var pubSubOwnerResult = result.getExtension(PubSubOwner.class);
327 final Configure configureResult =
328 pubSubOwnerResult == null
329 ? null
330 : pubSubOwnerResult.getExtension(Configure.class);
331 if (configureResult == null) {
332 throw new IllegalStateException(
333 "No configuration found in configuration request result");
334 }
335 return configureResult.getData();
336 },
337 MoreExecutors.directExecutor());
338 }
339
340 private ListenableFuture<Void> setNodeConfiguration(
341 final Jid address, final String node, final Data data) {
342 final Iq iq = new Iq(Iq.Type.SET);
343 iq.setTo(address);
344 final var pubSub = iq.addExtension(new PubSubOwner());
345 final var configure = pubSub.addExtension(new Configure());
346 configure.setNode(node);
347 configure.addExtension(data);
348 return Futures.transform(
349 connection.sendIqPacket(iq),
350 result -> {
351 Log.d(Config.LOGTAG, "Modified node configuration " + node + " on " + address);
352 return null;
353 },
354 MoreExecutors.directExecutor());
355 }
356
357 public ListenableFuture<Iq> retract(final Jid address, final String itemId, final String node) {
358 final var iq = new Iq(Iq.Type.SET);
359 iq.setTo(address);
360 final var pubSub = iq.addExtension(new PubSub());
361 final var retract = pubSub.addExtension(new Retract());
362 retract.setNode(node);
363 retract.setNotify(true);
364 final var item = retract.addExtension(new PubSub.Item());
365 item.setId(itemId);
366 return connection.sendIqPacket(iq);
367 }
368
369 public ListenableFuture<Iq> delete(final Jid address, final String node) {
370 final var iq = new Iq(Iq.Type.SET);
371 iq.setTo(address);
372 final var pubSub = iq.addExtension(new PubSubOwner());
373 final var delete =
374 pubSub.addExtension(new im.conversations.android.xmpp.model.pubsub.owner.Delete());
375 delete.setNode(node);
376 return connection.sendIqPacket(iq);
377 }
378
379 private static class PubSubExceptionTransformer<V>
380 implements AsyncFunction<IqErrorException, V> {
381
382 @Override
383 @NonNull
384 public ListenableFuture<V> apply(@NonNull IqErrorException ex) {
385 final var error = ex.getError();
386 if (error == null) {
387 return Futures.immediateFailedFuture(ex);
388 }
389 final PubSubError pubSubError = error.getExtension(PubSubError.class);
390 if (pubSubError instanceof PubSubError.PreconditionNotMet) {
391 return Futures.immediateFailedFuture(
392 new PreconditionNotMetException(ex.getResponse()));
393 } else if (pubSubError != null) {
394 return Futures.immediateFailedFuture(new PubSubErrorException(ex.getResponse()));
395 } else {
396 return Futures.immediateFailedFuture(ex);
397 }
398 }
399 }
400}