1# frozen_string_literal: true
2
3require "blather/client/dsl"
4require "em_promise"
5require "timeout"
6
7module BlatherNotify
8 extend Blather::DSL
9
10 class PubSub
11 class Address
12 attr_reader :node, :server
13
14 def initialize(node:, server:)
15 @node = node
16 @server = server
17 end
18
19 def to_uri
20 "xmpp:#{@server}?;node=#{@node}"
21 end
22 end
23
24 def initialize(blather, addr)
25 @blather = blather
26 @addr = addr
27 end
28
29 def publish(xml)
30 @blather.write_with_promise(
31 Blather::Stanza::PubSub::Publish.new(
32 @addr.server,
33 @addr.node,
34 :set,
35 xml
36 )
37 )
38 end
39 end
40
41 @ready = Queue.new
42
43 when_ready { @ready << :ready }
44
45 def self.start(jid, password, default_pubsub_addr: nil)
46 # workqueue_count MUST be 0 or else Blather uses threads!
47 setup(jid, password, nil, nil, nil, nil, workqueue_count: 0)
48 set_default_pubsub(default_pubsub_addr)
49
50 EM.error_handler(&method(:panic))
51
52 EM.next_tick { client.run }
53
54 block_until_ready
55 end
56
57 def self.block_until_ready
58 if EM.reactor_running?
59 promise = EMPromise.new
60 disconnected { true.tap { EM.next_tick { EM.stop } } }
61 Thread.new { promise.fulfill(@ready.pop) }
62 timeout_promise(promise, timeout: 30)
63 else
64 @thread = Thread.new { EM.run }
65 Timeout.timeout(30) { @ready.pop }
66 at_exit { wait_then_exit }
67 end
68 end
69
70 def self.panic(e)
71 warn e.message
72 warn e.backtrace
73 exit! 2
74 end
75
76 def self.wait_then_exit
77 disconnected { EM.stop }
78 EM.add_timer(30) { EM.stop }
79 shutdown
80 @thread&.join
81 end
82
83 def self.timeout_promise(promise, timeout: 15)
84 timer = EventMachine::Timer.new(timeout) {
85 promise.reject(:timeout)
86 }
87
88 promise.then do
89 timer.cancel
90 end
91 end
92
93 def self.write_with_promise(stanza)
94 promise = EMPromise.new
95 timeout_promise(promise)
96
97 client.write_with_handler(stanza) do |s|
98 if s.error? || s.type == :error
99 promise.reject(s)
100 else
101 promise.fulfill(s)
102 end
103 end
104 promise
105 end
106
107 def self.command(node, sessionid=nil, action: :execute, form: nil)
108 Blather::Stanza::Iq::Command.new.tap do |cmd|
109 cmd.to = CONFIG[:sgx_jmp]
110 cmd.node = node
111 cmd.command[:sessionid] = sessionid if sessionid
112 cmd.action = action
113 cmd.command << form if form
114 end
115 end
116
117 def self.execute(command_node, form=nil)
118 write_with_promise(command(command_node)).then do |iq|
119 next iq unless form
120
121 write_with_promise(command(command_node, iq.sessionid, form: form))
122 end
123 end
124
125 def self.pubsub(addr)
126 PubSub.new(self, addr)
127 end
128
129 def self.set_default_pubsub(addr)
130 @default_pubsub = addr && pubsub(addr)
131 end
132
133 def self.publish(xml)
134 raise "No default pubsub set!" unless @default_pubsub
135
136 @default_pubsub.publish(xml)
137 end
138end