1# frozen_string_literal: true
2
3require "blather/client/dsl"
4require "em_promise"
5require "timeout"
6
7module BlatherNotify
8 extend Blather::DSL
9
10 class PubSub
11 class Address
12 attr_reader :node, :server
13
14 def initialize(node:, server:)
15 @node = node
16 @server = server
17 end
18
19 def to_uri
20 "xmpp:#{@server}?;node=#{@node}"
21 end
22 end
23
24 def initialize(blather, addr)
25 @blather = blather
26 @addr = addr
27 end
28
29 def publish(xml)
30 @blather.write_with_promise(
31 Blather::Stanza::PubSub::Publish.new(
32 @addr.server,
33 @addr.node,
34 :set,
35 xml
36 )
37 )
38 end
39 end
40
41 @ready = Queue.new
42
43 when_ready { @ready << :ready }
44
45 def self.start(jid, password, default_pubsub_addr: nil)
46 # workqueue_count MUST be 0 or else Blather uses threads!
47 setup(jid, password, nil, nil, nil, nil, workqueue_count: 0)
48 set_default_pubsub(default_pubsub_addr)
49
50 EM.error_handler(&method(:panic))
51
52 @thread = Thread.new {
53 EM.run do
54 client.run
55 end
56 }
57
58 Timeout.timeout(30) { @ready.pop }
59 at_exit { wait_then_exit }
60 end
61
62 def self.panic(e)
63 warn e.message
64 warn e.backtrace
65 exit! 2
66 end
67
68 def self.wait_then_exit
69 disconnected { EM.stop }
70 EM.add_timer(30) { EM.stop }
71 shutdown
72 @thread.join
73 end
74
75 def self.timeout_promise(promise, timeout: 15)
76 timer = EM.add_timer(timeout) {
77 promise.reject(:timeout)
78 }
79
80 promise.then do
81 timer.cancel
82 end
83 end
84
85 def self.write_with_promise(stanza)
86 promise = EMPromise.new
87 timeout_promise(promise)
88
89 client.write_with_handler(stanza) do |s|
90 if s.error? || s.type == :error
91 promise.reject(s)
92 else
93 promise.fulfill(s)
94 end
95 end
96 promise
97 end
98
99 def self.command(node, sessionid=nil, action: :execute, form: nil)
100 Blather::Stanza::Iq::Command.new.tap do |cmd|
101 cmd.to = CONFIG[:sgx_jmp]
102 cmd.node = node
103 cmd.command[:sessionid] = sessionid if sessionid
104 cmd.action = action
105 cmd.command << form if form
106 end
107 end
108
109 def self.execute(command_node, form=nil)
110 write_with_promise(command(command_node)).then do |iq|
111 next iq unless form
112
113 write_with_promise(command(command_node, iq.sessionid, form: form))
114 end
115 end
116
117 def self.pubsub(addr)
118 PubSub.new(self, addr)
119 end
120
121 def self.set_default_pubsub(addr)
122 @default_pubsub = addr && pubsub(addr)
123 end
124
125 def self.publish(xml)
126 raise "No default pubsub set!" unless @default_pubsub
127
128 @default_pubsub.publish(xml)
129 end
130end