1package eu.siacs.conversations.xmpp.manager;
2
3import android.content.Context;
4import android.util.Log;
5import androidx.annotation.NonNull;
6import com.google.common.util.concurrent.AsyncFunction;
7import com.google.common.util.concurrent.Futures;
8import com.google.common.util.concurrent.ListenableFuture;
9import com.google.common.util.concurrent.MoreExecutors;
10import eu.siacs.conversations.Config;
11import eu.siacs.conversations.xml.Namespace;
12import eu.siacs.conversations.xmpp.Jid;
13import eu.siacs.conversations.xmpp.XmppConnection;
14import im.conversations.android.xmpp.ExtensionFactory;
15import im.conversations.android.xmpp.IqErrorException;
16import im.conversations.android.xmpp.NodeConfiguration;
17import im.conversations.android.xmpp.PreconditionNotMetException;
18import im.conversations.android.xmpp.PubSubErrorException;
19import im.conversations.android.xmpp.model.Extension;
20import im.conversations.android.xmpp.model.data.Data;
21import im.conversations.android.xmpp.model.pubsub.Items;
22import im.conversations.android.xmpp.model.pubsub.PubSub;
23import im.conversations.android.xmpp.model.pubsub.Publish;
24import im.conversations.android.xmpp.model.pubsub.PublishOptions;
25import im.conversations.android.xmpp.model.pubsub.Retract;
26import im.conversations.android.xmpp.model.pubsub.error.PubSubError;
27import im.conversations.android.xmpp.model.pubsub.event.Delete;
28import im.conversations.android.xmpp.model.pubsub.event.Event;
29import im.conversations.android.xmpp.model.pubsub.event.Purge;
30import im.conversations.android.xmpp.model.pubsub.owner.Configure;
31import im.conversations.android.xmpp.model.pubsub.owner.PubSubOwner;
32import im.conversations.android.xmpp.model.stanza.Iq;
33import im.conversations.android.xmpp.model.stanza.Message;
34import java.util.Map;
35
36public class PubSubManager extends AbstractManager {
37
38 private static final String SINGLETON_ITEM_ID = "current";
39
40 public PubSubManager(Context context, XmppConnection connection) {
41 super(context, connection);
42 }
43
44 public void handleEvent(final Message message) {
45 final var event = message.getExtension(Event.class);
46 final var action = event.getAction();
47 final var from = message.getFrom();
48
49 if (from instanceof Jid.Invalid) {
50 Log.d(
51 Config.LOGTAG,
52 getAccount().getJid().asBareJid() + ": ignoring event from invalid jid");
53 return;
54 }
55
56 if (action instanceof Purge purge) {
57 // purge is a deletion of all items in a node
58 handlePurge(message, purge);
59 } else if (action instanceof Items items) {
60 // the items wrapper contains, new and updated items as well as retractions which are
61 // deletions of individual items in a node
62 handleItems(message, items);
63 } else if (action instanceof Delete delete) {
64 // delete is the deletion of the node itself
65 handleDelete(message, delete);
66 }
67 }
68
69 public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
70 final Jid address, final Class<T> clazz) {
71 final var id = ExtensionFactory.id(clazz);
72 if (id == null) {
73 return Futures.immediateFailedFuture(
74 new IllegalArgumentException(
75 String.format("%s is not a registered extension", clazz.getName())));
76 }
77 return fetchItems(address, id.namespace, clazz);
78 }
79
80 public <T extends Extension> ListenableFuture<Map<String, T>> fetchItems(
81 final Jid address, final String node, final Class<T> clazz) {
82 final Iq request = new Iq(Iq.Type.GET);
83 request.setTo(address);
84 final var pubSub = request.addExtension(new PubSub());
85 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
86 itemsWrapper.setNode(node);
87 return Futures.transform(
88 connection.sendIqPacket(request),
89 response -> {
90 final var pubSubResponse = response.getExtension(PubSub.class);
91 if (pubSubResponse == null) {
92 throw new IllegalStateException();
93 }
94 final var items = pubSubResponse.getItems();
95 if (items == null) {
96 throw new IllegalStateException();
97 }
98 return items.getItemMap(clazz);
99 },
100 MoreExecutors.directExecutor());
101 }
102
103 public <T extends Extension> ListenableFuture<T> fetchItem(
104 final Jid address, final String itemId, final Class<T> clazz) {
105 final var id = ExtensionFactory.id(clazz);
106 if (id == null) {
107 return Futures.immediateFailedFuture(
108 new IllegalArgumentException(
109 String.format("%s is not a registered extension", clazz.getName())));
110 }
111 return fetchItem(address, id.namespace, itemId, clazz);
112 }
113
114 public <T extends Extension> ListenableFuture<T> fetchItem(
115 final Jid address, final String node, final String itemId, final Class<T> clazz) {
116 final Iq request = new Iq(Iq.Type.GET);
117 request.setTo(address);
118 final var pubSub = request.addExtension(new PubSub());
119 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
120 itemsWrapper.setNode(node);
121 final var item = itemsWrapper.addExtension(new PubSub.Item());
122 item.setId(itemId);
123 return Futures.transform(
124 connection.sendIqPacket(request),
125 response -> {
126 final var pubSubResponse = response.getExtension(PubSub.class);
127 if (pubSubResponse == null) {
128 throw new IllegalStateException();
129 }
130 final var items = pubSubResponse.getItems();
131 if (items == null) {
132 throw new IllegalStateException();
133 }
134 return items.getItemOrThrow(itemId, clazz);
135 },
136 MoreExecutors.directExecutor());
137 }
138
139 public <T extends Extension> ListenableFuture<T> fetchMostRecentItem(
140 final Jid address, final String node, final Class<T> clazz) {
141 final Iq request = new Iq(Iq.Type.GET);
142 request.setTo(address);
143 final var pubSub = request.addExtension(new PubSub());
144 final var itemsWrapper = pubSub.addExtension(new PubSub.ItemsWrapper());
145 itemsWrapper.setNode(node);
146 itemsWrapper.setMaxItems(1);
147 return Futures.transform(
148 connection.sendIqPacket(request),
149 response -> {
150 final var pubSubResponse = response.getExtension(PubSub.class);
151 if (pubSubResponse == null) {
152 throw new IllegalStateException();
153 }
154 final var items = pubSubResponse.getItems();
155 if (items == null) {
156 throw new IllegalStateException();
157 }
158 return items.getOnlyItem(clazz);
159 },
160 MoreExecutors.directExecutor());
161 }
162
163 private void handleItems(final Message message, final Items items) {
164 final var from = message.getFrom();
165 final var isFromBare = from == null || from.isBareJid();
166 final var node = items.getNode();
167 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
168 getManager(BookmarkManager.class).handleItems(items);
169 return;
170 }
171 if (connection.fromAccount(message) && Namespace.BOOKMARKS.equals(node)) {
172 getManager(LegacyBookmarkManager.class).handleItems(items);
173 return;
174 }
175 if (isFromBare && Namespace.AVATAR_METADATA.equals(node)) {
176 getManager(AvatarManager.class).handleItems(from, items);
177 return;
178 }
179 if (isFromBare && Namespace.NICK.equals(node)) {
180 getManager(NickManager.class).handleItems(from, items);
181 return;
182 }
183 if (isFromBare && Namespace.AXOLOTL_DEVICE_LIST.equals(node)) {
184 getManager(AxolotlManager.class).handleItems(from, items);
185 }
186 }
187
188 private void handlePurge(final Message message, final Purge purge) {
189 final var from = message.getFrom();
190 final var node = purge.getNode();
191 if (connection.fromAccount(message) && Namespace.BOOKMARKS2.equals(node)) {
192 getManager(BookmarkManager.class).deleteAllItems();
193 }
194 }
195
196 private void handleDelete(final Message message, final Delete delete) {}
197
198 public ListenableFuture<Void> publishSingleton(
199 Jid address, Extension item, final NodeConfiguration nodeConfiguration) {
200 final var id = ExtensionFactory.id(item.getClass());
201 return publish(address, item, SINGLETON_ITEM_ID, id.namespace, nodeConfiguration);
202 }
203
204 public ListenableFuture<Void> publishSingleton(
205 Jid address,
206 Extension item,
207 final String node,
208 final NodeConfiguration nodeConfiguration) {
209 return publish(address, item, SINGLETON_ITEM_ID, node, nodeConfiguration);
210 }
211
212 public ListenableFuture<Void> publish(
213 Jid address,
214 Extension item,
215 final String itemId,
216 final NodeConfiguration nodeConfiguration) {
217 final var id = ExtensionFactory.id(item.getClass());
218 return publish(address, item, itemId, id.namespace, nodeConfiguration);
219 }
220
221 public ListenableFuture<Void> publish(
222 final Jid address,
223 final Extension itemPayload,
224 final String itemId,
225 final String node,
226 final NodeConfiguration nodeConfiguration) {
227 final var future = publishNoRetry(address, itemPayload, itemId, node, nodeConfiguration);
228 return Futures.catchingAsync(
229 future,
230 PreconditionNotMetException.class,
231 ex -> {
232 Log.d(
233 Config.LOGTAG,
234 "Node " + node + " on " + address + " requires reconfiguration");
235 final var reconfigurationFuture =
236 reconfigureNode(address, node, nodeConfiguration);
237 return Futures.transformAsync(
238 reconfigurationFuture,
239 ignored ->
240 publishNoRetry(
241 address, itemPayload, itemId, node, nodeConfiguration),
242 MoreExecutors.directExecutor());
243 },
244 MoreExecutors.directExecutor());
245 }
246
247 private ListenableFuture<Void> publishNoRetry(
248 final Jid address,
249 final Extension itemPayload,
250 final String itemId,
251 final String node,
252 final NodeConfiguration nodeConfiguration) {
253 final var iq = new Iq(Iq.Type.SET);
254 iq.setTo(address);
255 final var pubSub = iq.addExtension(new PubSub());
256 final var publish = pubSub.addExtension(new Publish());
257 publish.setNode(node);
258 final var item = publish.addExtension(new PubSub.Item());
259 item.setId(itemId);
260 item.addExtension(itemPayload);
261 pubSub.addExtension(PublishOptions.of(nodeConfiguration));
262 final ListenableFuture<Void> iqFuture =
263 Futures.transform(
264 connection.sendIqPacket(iq),
265 result -> null,
266 MoreExecutors.directExecutor());
267 return Futures.catchingAsync(
268 iqFuture,
269 IqErrorException.class,
270 new PubSubExceptionTransformer<>(),
271 MoreExecutors.directExecutor());
272 }
273
274 private ListenableFuture<Void> reconfigureNode(
275 final Jid address, final String node, final NodeConfiguration nodeConfiguration) {
276 final Iq iq = new Iq(Iq.Type.GET);
277 iq.setTo(address);
278 final var pubSub = iq.addExtension(new PubSubOwner());
279 final var configure = pubSub.addExtension(new Configure());
280 configure.setNode(node);
281 return Futures.transformAsync(
282 connection.sendIqPacket(iq),
283 result -> {
284 final var pubSubOwnerResult = result.getExtension(PubSubOwner.class);
285 final Configure configureResult =
286 pubSubOwnerResult == null
287 ? null
288 : pubSubOwnerResult.getExtension(Configure.class);
289 if (configureResult == null) {
290 throw new IllegalStateException(
291 "No configuration found in configuration request result");
292 }
293 final var data = configureResult.getData();
294 return setNodeConfiguration(address, node, data.submit(nodeConfiguration));
295 },
296 MoreExecutors.directExecutor());
297 }
298
299 private ListenableFuture<Void> setNodeConfiguration(
300 final Jid address, final String node, final Data data) {
301 final Iq iq = new Iq(Iq.Type.SET);
302 iq.setTo(address);
303 final var pubSub = iq.addExtension(new PubSubOwner());
304 final var configure = pubSub.addExtension(new Configure());
305 configure.setNode(node);
306 configure.addExtension(data);
307 return Futures.transform(
308 connection.sendIqPacket(iq),
309 result -> {
310 Log.d(Config.LOGTAG, "Modified node configuration " + node + " on " + address);
311 return null;
312 },
313 MoreExecutors.directExecutor());
314 }
315
316 public ListenableFuture<Iq> retract(final Jid address, final String itemId, final String node) {
317 final var iq = new Iq(Iq.Type.SET);
318 iq.setTo(address);
319 final var pubSub = iq.addExtension(new PubSub());
320 final var retract = pubSub.addExtension(new Retract());
321 retract.setNode(node);
322 retract.setNotify(true);
323 final var item = retract.addExtension(new PubSub.Item());
324 item.setId(itemId);
325 return connection.sendIqPacket(iq);
326 }
327
328 private static class PubSubExceptionTransformer<V>
329 implements AsyncFunction<IqErrorException, V> {
330
331 @Override
332 @NonNull
333 public ListenableFuture<V> apply(@NonNull IqErrorException ex) {
334 final var error = ex.getError();
335 if (error == null) {
336 return Futures.immediateFailedFuture(ex);
337 }
338 final PubSubError pubSubError = error.getExtension(PubSubError.class);
339 if (pubSubError instanceof PubSubError.PreconditionNotMet) {
340 return Futures.immediateFailedFuture(
341 new PreconditionNotMetException(ex.getResponse()));
342 } else if (pubSubError != null) {
343 return Futures.immediateFailedFuture(new PubSubErrorException(ex.getResponse()));
344 } else {
345 return Futures.immediateFailedFuture(ex);
346 }
347 }
348 }
349}