1package eu.siacs.conversations.xmpp.manager;
2
3import android.content.Context;
4import android.util.Log;
5import androidx.annotation.NonNull;
6import com.google.common.util.concurrent.AsyncFunction;
7import com.google.common.util.concurrent.Futures;
8import com.google.common.util.concurrent.ListenableFuture;
9import com.google.common.util.concurrent.MoreExecutors;
10import eu.siacs.conversations.Config;
11import eu.siacs.conversations.xml.Namespace;
12import eu.siacs.conversations.xmpp.Jid;
13import eu.siacs.conversations.xmpp.XmppConnection;
14import im.conversations.android.xmpp.ExtensionFactory;
15import im.conversations.android.xmpp.IqErrorException;
16import im.conversations.android.xmpp.NodeConfiguration;
17import im.conversations.android.xmpp.PreconditionNotMetException;
18import im.conversations.android.xmpp.PubSubErrorException;
19import im.conversations.android.xmpp.model.Extension;
20import im.conversations.android.xmpp.model.data.Data;
21import im.conversations.android.xmpp.model.pubsub.Items;
22import im.conversations.android.xmpp.model.pubsub.PubSub;
23import im.conversations.android.xmpp.model.pubsub.Publish;
24import im.conversations.android.xmpp.model.pubsub.PublishOptions;
25import im.conversations.android.xmpp.model.pubsub.Retract;
26import im.conversations.android.xmpp.model.pubsub.error.PubSubError;
27import im.conversations.android.xmpp.model.pubsub.event.Delete;
28import im.conversations.android.xmpp.model.pubsub.event.Event;
29import im.conversations.android.xmpp.model.pubsub.event.Purge;
30import im.conversations.android.xmpp.model.pubsub.owner.Configure;
31import im.conversations.android.xmpp.model.pubsub.owner.PubSubOwner;
32import im.conversations.android.xmpp.model.stanza.Iq;
33import im.conversations.android.xmpp.model.stanza.Message;
34import java.util.Map;
35
36public class PubSubManager extends AbstractManager {
37
38 private static final String SINGLETON_ITEM_ID = "current";
39
40 public PubSubManager(Context context, XmppConnection connection) {
41 super(context, connection);
42 }
43
44 public void handleEvent(final Message message) {
45 final var event = message.getExtension(Event.class);
46 final var action = event.getAction();
47 final var from = message.getFrom();
48
49 if (from instanceof Jid.Invalid) {
50 Log.d(
51 Config.LOGTAG,
52 getAccount().getJid().asBareJid() + ": ignoring event from invalid jid");
53 return;
54 }
55
56 if (action instanceof Purge purge) {
57 // purge is a deletion of all items in a node
58 handlePurge(message, purge);
59 } else if (action instanceof Items items) {
60 // the items wrapper contains, new and updated items as well as retractions which are
61 // deletions of individual items in a node
62 handleItems(message, items);
63 } else if (action instanceof Delete delete) {
64 // delete is the deletion of the node itself
65 handleDelete(message, delete);
66 }
67 }
68
69 public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
70 final Jid address, final Class<T> clazz) {
71 final var id = ExtensionFactory.id(clazz);
72 if (id == null) {
73 return Futures.immediateFailedFuture(
74 new IllegalArgumentException(
75 String.format("%s is not a registered extension", clazz.getName())));
76 }
77 return fetchItems(address, id.namespace, clazz);
78 }
79
80 public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
81 final Jid address, final String node, final Class<T> clazz) {
82 final Iq request = new Iq(Iq.Type.GET);
83 request.setTo(address);
84 final var pubSub = request.addExtension(new PubSub());
85 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
86 itemsWrapper.setNode(node);
87 return Futures.transform(
88 connection.sendIqPacket(request),
89 response -> {
90 final var pubSubResponse = response.getExtension(PubSub.class);
91 if (pubSubResponse == null) {
92 throw new IllegalStateException();
93 }
94 final var items = pubSubResponse.getItems();
95 if (items == null) {
96 throw new IllegalStateException();
97 }
98 return items.getItemMap(clazz);
99 },
100 MoreExecutors.directExecutor());
101 }
102
103 public <T extends Extension> ListenableFuture<T> fetchItem(
104 final Jid address, final String itemId, final Class<T> clazz) {
105 final var id = ExtensionFactory.id(clazz);
106 if (id == null) {
107 return Futures.immediateFailedFuture(
108 new IllegalArgumentException(
109 String.format("%s is not a registered extension", clazz.getName())));
110 }
111 return fetchItem(address, id.namespace, itemId, clazz);
112 }
113
114 public <T extends Extension> ListenableFuture<T> fetchItem(
115 final Jid address, final String node, final String itemId, final Class<T> clazz) {
116 final Iq request = new Iq(Iq.Type.GET);
117 request.setTo(address);
118 final var pubSub = request.addExtension(new PubSub());
119 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
120 itemsWrapper.setNode(node);
121 final var item = itemsWrapper.addExtension(new PubSub.Item());
122 item.setId(itemId);
123 return Futures.transform(
124 connection.sendIqPacket(request),
125 response -> {
126 final var pubSubResponse = response.getExtension(PubSub.class);
127 if (pubSubResponse == null) {
128 throw new IllegalStateException();
129 }
130 final var items = pubSubResponse.getItems();
131 if (items == null) {
132 throw new IllegalStateException();
133 }
134 return items.getItemOrThrow(itemId, clazz);
135 },
136 MoreExecutors.directExecutor());
137 }
138
139 public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
140 final Jid address, final Class<T> clazz) {
141 final var id = ExtensionFactory.id(clazz);
142 if (id == null) {
143 return Futures.immediateFailedFuture(
144 new IllegalArgumentException(
145 String.format("%s is not a registered extension", clazz.getName())));
146 }
147 return fetchMostRecentItem(address, id.namespace, clazz);
148 }
149
150 public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
151 final Jid address, final String node, final Class<T> clazz) {
152 final Iq request = new Iq(Iq.Type.GET);
153 request.setTo(address);
154 final var pubSub = request.addExtension(new PubSub());
155 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
156 itemsWrapper.setNode(node);
157 itemsWrapper.setMaxItems(1);
158 return Futures.transform(
159 connection.sendIqPacket(request),
160 response -> {
161 final var pubSubResponse = response.getExtension(PubSub.class);
162 if (pubSubResponse == null) {
163 throw new IllegalStateException();
164 }
165 final var items = pubSubResponse.getItems();
166 if (items == null) {
167 throw new IllegalStateException();
168 }
169 return items.getOnlyItem(clazz);
170 },
171 MoreExecutors.directExecutor());
172 }
173
174 private void handleItems(final Message message, final Items items) {
175 final var from = message.getFrom();
176 final var isFromBare = from == null || from.isBareJid();
177 final var node = items.getNode();
178 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
179 getManager(BookmarkManager.class).handleItems(items);
180 return;
181 }
182 if (connection.fromAccount(message) && Namespace.BOOKMARKS.equals(node)) {
183 getManager(LegacyBookmarkManager.class).handleItems(items);
184 return;
185 }
186 if (connection.fromAccount(message) && Namespace.MDS_DISPLAYED.equals(node)) {
187 getManager(MessageDisplayedSynchronizationManager.class).handleItems(items);
188 return;
189 }
190 if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
191 getManager(AvatarManager.class).handleItems(from, items);
192 return;
193 }
194 if (isFromBare && Namespace.NICK.equals(node)) {
195 getManager(NickManager.class).handleItems(from, items);
196 return;
197 }
198 if (isFromBare && Namespace.AXOLOTL_DEVICE_LIST.equals(node)) {
199 getManager(AxolotlManager.class).handleItems(from, items);
200 }
201 }
202
203 private void handlePurge(final Message message, final Purge purge) {
204 final var from = message.getFrom();
205 final var isFromBare = from == null || from.isBareJid();
206 final var node = purge.getNode();
207 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
208 getManager(BookmarkManager.class).handlePurge();
209 }
210 }
211
212 private void handleDelete(final Message message, final Delete delete) {
213 final var from = message.getFrom();
214 final var isFromBare = from == null || from.isBareJid();
215 final var node = delete.getNode();
216 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
217 getManager(BookmarkManager.class).handleDelete();
218 return;
219 }
220 if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
221 getManager(AvatarManager.class).handleDelete(from);
222 return;
223 }
224 if (isFromBare && Namespace.NICK.equals(node)) {
225 getManager(NickManager.class).handleDelete(from);
226 }
227 }
228
229 public ListenableFuture<Void> publishSingleton(
230 Jid address, Extension item, final NodeConfiguration nodeConfiguration) {
231 final var id = ExtensionFactory.id(item.getClass());
232 return publish(address, item, SINGLETON_ITEM_ID, id.namespace, nodeConfiguration);
233 }
234
235 public ListenableFuture<Void> publishSingleton(
236 Jid address,
237 Extension item,
238 final String node,
239 final NodeConfiguration nodeConfiguration) {
240 return publish(address, item, SINGLETON_ITEM_ID, node, nodeConfiguration);
241 }
242
243 public ListenableFuture<Void> publish(
244 Jid address,
245 Extension item,
246 final String itemId,
247 final NodeConfiguration nodeConfiguration) {
248 final var id = ExtensionFactory.id(item.getClass());
249 return publish(address, item, itemId, id.namespace, nodeConfiguration);
250 }
251
252 public ListenableFuture<Void> publish(
253 final Jid address,
254 final Extension itemPayload,
255 final String itemId,
256 final String node,
257 final NodeConfiguration nodeConfiguration) {
258 final var future = publishNoRetry(address, itemPayload, itemId, node, nodeConfiguration);
259 return Futures.catchingAsync(
260 future,
261 PreconditionNotMetException.class,
262 ex -> {
263 Log.d(
264 Config.LOGTAG,
265 "Node " + node + " on " + address + " requires reconfiguration");
266 final var reconfigurationFuture =
267 reconfigureNode(address, node, nodeConfiguration);
268 return Futures.transformAsync(
269 reconfigurationFuture,
270 ignored ->
271 publishNoRetry(
272 address, itemPayload, itemId, node, nodeConfiguration),
273 MoreExecutors.directExecutor());
274 },
275 MoreExecutors.directExecutor());
276 }
277
278 private ListenableFuture<Void> publishNoRetry(
279 final Jid address,
280 final Extension itemPayload,
281 final String itemId,
282 final String node,
283 final NodeConfiguration nodeConfiguration) {
284 final var iq = new Iq(Iq.Type.SET);
285 iq.setTo(address);
286 final var pubSub = iq.addExtension(new PubSub());
287 final var publish = pubSub.addExtension(new Publish());
288 publish.setNode(node);
289 final var item = publish.addExtension(new PubSub.Item());
290 item.setId(itemId);
291 item.addExtension(itemPayload);
292 pubSub.addExtension(PublishOptions.of(nodeConfiguration));
293 final ListenableFuture<Void> iqFuture =
294 Futures.transform(
295 connection.sendIqPacket(iq),
296 result -> null,
297 MoreExecutors.directExecutor());
298 return Futures.catchingAsync(
299 iqFuture,
300 IqErrorException.class,
301 new PubSubExceptionTransformer<>(),
302 MoreExecutors.directExecutor());
303 }
304
305 private ListenableFuture<Void> reconfigureNode(
306 final Jid address, final String node, final NodeConfiguration nodeConfiguration) {
307 final Iq iq = new Iq(Iq.Type.GET);
308 iq.setTo(address);
309 final var pubSub = iq.addExtension(new PubSubOwner());
310 final var configure = pubSub.addExtension(new Configure());
311 configure.setNode(node);
312 return Futures.transformAsync(
313 connection.sendIqPacket(iq),
314 result -> {
315 final var pubSubOwnerResult = result.getExtension(PubSubOwner.class);
316 final Configure configureResult =
317 pubSubOwnerResult == null
318 ? null
319 : pubSubOwnerResult.getExtension(Configure.class);
320 if (configureResult == null) {
321 throw new IllegalStateException(
322 "No configuration found in configuration request result");
323 }
324 final var data = configureResult.getData();
325 return setNodeConfiguration(address, node, data.submit(nodeConfiguration));
326 },
327 MoreExecutors.directExecutor());
328 }
329
330 private ListenableFuture<Void> setNodeConfiguration(
331 final Jid address, final String node, final Data data) {
332 final Iq iq = new Iq(Iq.Type.SET);
333 iq.setTo(address);
334 final var pubSub = iq.addExtension(new PubSubOwner());
335 final var configure = pubSub.addExtension(new Configure());
336 configure.setNode(node);
337 configure.addExtension(data);
338 return Futures.transform(
339 connection.sendIqPacket(iq),
340 result -> {
341 Log.d(Config.LOGTAG, "Modified node configuration " + node + " on " + address);
342 return null;
343 },
344 MoreExecutors.directExecutor());
345 }
346
347 public ListenableFuture<Iq> retract(final Jid address, final String itemId, final String node) {
348 final var iq = new Iq(Iq.Type.SET);
349 iq.setTo(address);
350 final var pubSub = iq.addExtension(new PubSub());
351 final var retract = pubSub.addExtension(new Retract());
352 retract.setNode(node);
353 retract.setNotify(true);
354 final var item = retract.addExtension(new PubSub.Item());
355 item.setId(itemId);
356 return connection.sendIqPacket(iq);
357 }
358
359 public ListenableFuture<Iq> delete(final Jid address, final String node) {
360 final var iq = new Iq(Iq.Type.SET);
361 iq.setTo(address);
362 final var pubSub = iq.addExtension(new PubSubOwner());
363 final var delete =
364 pubSub.addExtension(new im.conversations.android.xmpp.model.pubsub.owner.Delete());
365 delete.setNode(node);
366 return connection.sendIqPacket(iq);
367 }
368
369 private static class PubSubExceptionTransformer<V>
370 implements AsyncFunction<IqErrorException, V> {
371
372 @Override
373 @NonNull
374 public ListenableFuture<V> apply(@NonNull IqErrorException ex) {
375 final var error = ex.getError();
376 if (error == null) {
377 return Futures.immediateFailedFuture(ex);
378 }
379 final PubSubError pubSubError = error.getExtension(PubSubError.class);
380 if (pubSubError instanceof PubSubError.PreconditionNotMet) {
381 return Futures.immediateFailedFuture(
382 new PreconditionNotMetException(ex.getResponse()));
383 } else if (pubSubError != null) {
384 return Futures.immediateFailedFuture(new PubSubErrorException(ex.getResponse()));
385 } else {
386 return Futures.immediateFailedFuture(ex);
387 }
388 }
389 }
390}