1package eu.siacs.conversations.xmpp.manager;
2
3import android.content.Context;
4import android.util.Log;
5import androidx.annotation.NonNull;
6import com.google.common.util.concurrent.AsyncFunction;
7import com.google.common.util.concurrent.Futures;
8import com.google.common.util.concurrent.ListenableFuture;
9import com.google.common.util.concurrent.MoreExecutors;
10import eu.siacs.conversations.Config;
11import eu.siacs.conversations.xml.Namespace;
12import eu.siacs.conversations.xmpp.Jid;
13import eu.siacs.conversations.xmpp.XmppConnection;
14import im.conversations.android.xmpp.ExtensionFactory;
15import im.conversations.android.xmpp.IqErrorException;
16import im.conversations.android.xmpp.NodeConfiguration;
17import im.conversations.android.xmpp.PreconditionNotMetException;
18import im.conversations.android.xmpp.PubSubErrorException;
19import im.conversations.android.xmpp.model.Extension;
20import im.conversations.android.xmpp.model.data.Data;
21import im.conversations.android.xmpp.model.pubsub.Items;
22import im.conversations.android.xmpp.model.pubsub.PubSub;
23import im.conversations.android.xmpp.model.pubsub.Publish;
24import im.conversations.android.xmpp.model.pubsub.PublishOptions;
25import im.conversations.android.xmpp.model.pubsub.Retract;
26import im.conversations.android.xmpp.model.pubsub.error.PubSubError;
27import im.conversations.android.xmpp.model.pubsub.event.Delete;
28import im.conversations.android.xmpp.model.pubsub.event.Event;
29import im.conversations.android.xmpp.model.pubsub.event.Purge;
30import im.conversations.android.xmpp.model.pubsub.owner.Configure;
31import im.conversations.android.xmpp.model.pubsub.owner.PubSubOwner;
32import im.conversations.android.xmpp.model.stanza.Iq;
33import im.conversations.android.xmpp.model.stanza.Message;
34import java.util.Map;
35
36public class PubSubManager extends AbstractManager {
37
38 private static final String SINGLETON_ITEM_ID = "current";
39
40 public PubSubManager(Context context, XmppConnection connection) {
41 super(context, connection);
42 }
43
44 public void handleEvent(final Message message) {
45 final var event = message.getExtension(Event.class);
46 final var action = event.getAction();
47 final var from = message.getFrom();
48
49 if (from instanceof Jid.Invalid) {
50 Log.d(
51 Config.LOGTAG,
52 getAccount().getJid().asBareJid() + ": ignoring event from invalid jid");
53 return;
54 }
55
56 if (action instanceof Purge purge) {
57 // purge is a deletion of all items in a node
58 handlePurge(message, purge);
59 } else if (action instanceof Items items) {
60 // the items wrapper contains, new and updated items as well as retractions which are
61 // deletions of individual items in a node
62 handleItems(message, items);
63 } else if (action instanceof Delete delete) {
64 // delete is the deletion of the node itself
65 handleDelete(message, delete);
66 }
67 }
68
69 public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
70 final Jid address, final Class<T> clazz) {
71 final var id = ExtensionFactory.id(clazz);
72 if (id == null) {
73 return Futures.immediateFailedFuture(
74 new IllegalArgumentException(
75 String.format("%s is not a registered extension", clazz.getName())));
76 }
77 return fetchItems(address, id.namespace, clazz);
78 }
79
80 public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
81 final Jid address, final String node, final Class<T> clazz) {
82 final Iq request = new Iq(Iq.Type.GET);
83 request.setTo(address);
84 final var pubSub = request.addExtension(new PubSub());
85 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
86 itemsWrapper.setNode(node);
87 return Futures.transform(
88 connection.sendIqPacket(request),
89 response -> {
90 final var pubSubResponse = response.getExtension(PubSub.class);
91 if (pubSubResponse == null) {
92 throw new IllegalStateException();
93 }
94 final var items = pubSubResponse.getItems();
95 if (items == null) {
96 throw new IllegalStateException();
97 }
98 return items.getItemMap(clazz);
99 },
100 MoreExecutors.directExecutor());
101 }
102
103 public <T extends Extension> ListenableFuture<T> fetchItem(
104 final Jid address, final String itemId, final Class<T> clazz) {
105 final var id = ExtensionFactory.id(clazz);
106 if (id == null) {
107 return Futures.immediateFailedFuture(
108 new IllegalArgumentException(
109 String.format("%s is not a registered extension", clazz.getName())));
110 }
111 return fetchItem(address, id.namespace, itemId, clazz);
112 }
113
114 public <T extends Extension> ListenableFuture<T> fetchItem(
115 final Jid address, final String node, final String itemId, final Class<T> clazz) {
116 final Iq request = new Iq(Iq.Type.GET);
117 request.setTo(address);
118 final var pubSub = request.addExtension(new PubSub());
119 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
120 itemsWrapper.setNode(node);
121 final var item = itemsWrapper.addExtension(new PubSub.Item());
122 item.setId(itemId);
123 return Futures.transform(
124 connection.sendIqPacket(request),
125 response -> {
126 final var pubSubResponse = response.getExtension(PubSub.class);
127 if (pubSubResponse == null) {
128 throw new IllegalStateException();
129 }
130 final var items = pubSubResponse.getItems();
131 if (items == null) {
132 throw new IllegalStateException();
133 }
134 return items.getItemOrThrow(itemId, clazz);
135 },
136 MoreExecutors.directExecutor());
137 }
138
139 public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
140 final Jid address, final String node, final Class<T> clazz) {
141 final Iq request = new Iq(Iq.Type.GET);
142 request.setTo(address);
143 final var pubSub = request.addExtension(new PubSub());
144 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
145 itemsWrapper.setNode(node);
146 itemsWrapper.setMaxItems(1);
147 return Futures.transform(
148 connection.sendIqPacket(request),
149 response -> {
150 final var pubSubResponse = response.getExtension(PubSub.class);
151 if (pubSubResponse == null) {
152 throw new IllegalStateException();
153 }
154 final var items = pubSubResponse.getItems();
155 if (items == null) {
156 throw new IllegalStateException();
157 }
158 return items.getOnlyItem(clazz);
159 },
160 MoreExecutors.directExecutor());
161 }
162
163 private void handleItems(final Message message, final Items items) {
164 final var from = message.getFrom();
165 final var isFromBare = from == null || from.isBareJid();
166 final var node = items.getNode();
167 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
168 getManager(BookmarkManager.class).handleItems(items);
169 return;
170 }
171 if (connection.fromAccount(message) && Namespace.BOOKMARKS.equals(node)) {
172 getManager(LegacyBookmarkManager.class).handleItems(items);
173 return;
174 }
175 if (connection.fromAccount(message) && Namespace.MDS_DISPLAYED.equals(node)) {
176 getManager(MessageDisplayedSynchronizationManager.class).handleItems(items);
177 return;
178 }
179 if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
180 getManager(AvatarManager.class).handleItems(from, items);
181 return;
182 }
183 if (isFromBare && Namespace.NICK.equals(node)) {
184 getManager(NickManager.class).handleItems(from, items);
185 return;
186 }
187 if (isFromBare && Namespace.AXOLOTL_DEVICE_LIST.equals(node)) {
188 getManager(AxolotlManager.class).handleItems(from, items);
189 }
190 }
191
192 private void handlePurge(final Message message, final Purge purge) {
193 final var from = message.getFrom();
194 final var isFromBare = from == null || from.isBareJid();
195 final var node = purge.getNode();
196 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
197 getManager(BookmarkManager.class).handlePurge();
198 }
199 }
200
201 private void handleDelete(final Message message, final Delete delete) {
202 final var from = message.getFrom();
203 final var isFromBare = from == null || from.isBareJid();
204 final var node = delete.getNode();
205 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
206 getManager(BookmarkManager.class).handleDelete();
207 return;
208 }
209 if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
210 getManager(AvatarManager.class).handleDelete(from);
211 return;
212 }
213 if (isFromBare && Namespace.NICK.equals(node)) {
214 getManager(NickManager.class).handleDelete(from);
215 }
216 }
217
218 public ListenableFuture<Void> publishSingleton(
219 Jid address, Extension item, final NodeConfiguration nodeConfiguration) {
220 final var id = ExtensionFactory.id(item.getClass());
221 return publish(address, item, SINGLETON_ITEM_ID, id.namespace, nodeConfiguration);
222 }
223
224 public ListenableFuture<Void> publishSingleton(
225 Jid address,
226 Extension item,
227 final String node,
228 final NodeConfiguration nodeConfiguration) {
229 return publish(address, item, SINGLETON_ITEM_ID, node, nodeConfiguration);
230 }
231
232 public ListenableFuture<Void> publish(
233 Jid address,
234 Extension item,
235 final String itemId,
236 final NodeConfiguration nodeConfiguration) {
237 final var id = ExtensionFactory.id(item.getClass());
238 return publish(address, item, itemId, id.namespace, nodeConfiguration);
239 }
240
241 public ListenableFuture<Void> publish(
242 final Jid address,
243 final Extension itemPayload,
244 final String itemId,
245 final String node,
246 final NodeConfiguration nodeConfiguration) {
247 final var future = publishNoRetry(address, itemPayload, itemId, node, nodeConfiguration);
248 return Futures.catchingAsync(
249 future,
250 PreconditionNotMetException.class,
251 ex -> {
252 Log.d(
253 Config.LOGTAG,
254 "Node " + node + " on " + address + " requires reconfiguration");
255 final var reconfigurationFuture =
256 reconfigureNode(address, node, nodeConfiguration);
257 return Futures.transformAsync(
258 reconfigurationFuture,
259 ignored ->
260 publishNoRetry(
261 address, itemPayload, itemId, node, nodeConfiguration),
262 MoreExecutors.directExecutor());
263 },
264 MoreExecutors.directExecutor());
265 }
266
267 private ListenableFuture<Void> publishNoRetry(
268 final Jid address,
269 final Extension itemPayload,
270 final String itemId,
271 final String node,
272 final NodeConfiguration nodeConfiguration) {
273 final var iq = new Iq(Iq.Type.SET);
274 iq.setTo(address);
275 final var pubSub = iq.addExtension(new PubSub());
276 final var publish = pubSub.addExtension(new Publish());
277 publish.setNode(node);
278 final var item = publish.addExtension(new PubSub.Item());
279 item.setId(itemId);
280 item.addExtension(itemPayload);
281 pubSub.addExtension(PublishOptions.of(nodeConfiguration));
282 final ListenableFuture<Void> iqFuture =
283 Futures.transform(
284 connection.sendIqPacket(iq),
285 result -> null,
286 MoreExecutors.directExecutor());
287 return Futures.catchingAsync(
288 iqFuture,
289 IqErrorException.class,
290 new PubSubExceptionTransformer<>(),
291 MoreExecutors.directExecutor());
292 }
293
294 private ListenableFuture<Void> reconfigureNode(
295 final Jid address, final String node, final NodeConfiguration nodeConfiguration) {
296 final Iq iq = new Iq(Iq.Type.GET);
297 iq.setTo(address);
298 final var pubSub = iq.addExtension(new PubSubOwner());
299 final var configure = pubSub.addExtension(new Configure());
300 configure.setNode(node);
301 return Futures.transformAsync(
302 connection.sendIqPacket(iq),
303 result -> {
304 final var pubSubOwnerResult = result.getExtension(PubSubOwner.class);
305 final Configure configureResult =
306 pubSubOwnerResult == null
307 ? null
308 : pubSubOwnerResult.getExtension(Configure.class);
309 if (configureResult == null) {
310 throw new IllegalStateException(
311 "No configuration found in configuration request result");
312 }
313 final var data = configureResult.getData();
314 return setNodeConfiguration(address, node, data.submit(nodeConfiguration));
315 },
316 MoreExecutors.directExecutor());
317 }
318
319 private ListenableFuture<Void> setNodeConfiguration(
320 final Jid address, final String node, final Data data) {
321 final Iq iq = new Iq(Iq.Type.SET);
322 iq.setTo(address);
323 final var pubSub = iq.addExtension(new PubSubOwner());
324 final var configure = pubSub.addExtension(new Configure());
325 configure.setNode(node);
326 configure.addExtension(data);
327 return Futures.transform(
328 connection.sendIqPacket(iq),
329 result -> {
330 Log.d(Config.LOGTAG, "Modified node configuration " + node + " on " + address);
331 return null;
332 },
333 MoreExecutors.directExecutor());
334 }
335
336 public ListenableFuture<Iq> retract(final Jid address, final String itemId, final String node) {
337 final var iq = new Iq(Iq.Type.SET);
338 iq.setTo(address);
339 final var pubSub = iq.addExtension(new PubSub());
340 final var retract = pubSub.addExtension(new Retract());
341 retract.setNode(node);
342 retract.setNotify(true);
343 final var item = retract.addExtension(new PubSub.Item());
344 item.setId(itemId);
345 return connection.sendIqPacket(iq);
346 }
347
348 private static class PubSubExceptionTransformer<V>
349 implements AsyncFunction<IqErrorException, V> {
350
351 @Override
352 @NonNull
353 public ListenableFuture<V> apply(@NonNull IqErrorException ex) {
354 final var error = ex.getError();
355 if (error == null) {
356 return Futures.immediateFailedFuture(ex);
357 }
358 final PubSubError pubSubError = error.getExtension(PubSubError.class);
359 if (pubSubError instanceof PubSubError.PreconditionNotMet) {
360 return Futures.immediateFailedFuture(
361 new PreconditionNotMetException(ex.getResponse()));
362 } else if (pubSubError != null) {
363 return Futures.immediateFailedFuture(new PubSubErrorException(ex.getResponse()));
364 } else {
365 return Futures.immediateFailedFuture(ex);
366 }
367 }
368 }
369}