1# frozen_string_literal: true
2
3require "sentry-ruby"
4require "statsd-instrument"
5
6require_relative "customer_repo"
7require_relative "session_manager"
8
9class Command
10 ACTIONS_FIELD = "http://jabber.org/protocol/commands#actions"
11
12 def self.execution
13 Thread.current[:execution]
14 end
15
16 def self.execution=(exe)
17 Thread.current[:execution] = exe
18 end
19
20 def self.reply(stanza=nil, &blk)
21 execution.reply(stanza, &blk)
22 end
23
24 def self.finish(*args, **kwargs, &blk)
25 execution.finish(*args, **kwargs, &blk)
26 end
27
28 def self.customer
29 execution.customer
30 end
31
32 def self.log
33 execution.log
34 end
35
36 class Execution
37 class Timeout < SessionManager::Timeout; end
38
39 class FinalStanza
40 attr_reader :stanza
41
42 def initialize(stanza)
43 @stanza = stanza
44 end
45 end
46
47 attr_reader :customer_repo, :log, :iq
48
49 def initialize(customer_repo, blather, format_error, iq)
50 @customer_repo = customer_repo
51 @blather = blather
52 @format_error = format_error
53 @iq = iq
54 @log = Thread.current[:log] || LOG
55 end
56
57 def execute
58 StatsD.increment("command", tags: ["node:#{iq.node}"])
59 EMPromise.resolve(nil).then {
60 Command.execution = self
61 catch_after(EMPromise.resolve(yield self))
62 }
63 end
64
65 def reply(stanza=nil)
66 stanza ||= iq.reply.tap { |reply| reply.status = :executing }
67 yield stanza if block_given?
68 COMMAND_MANAGER.write(stanza).then { |new_iq|
69 @iq = new_iq
70 }.catch_only(Blather::Stanza::Iq) { |new_iq|
71 @iq = new_iq if new_iq.set?
72 EMPromise.reject(new_iq)
73 }.catch_only(SessionManager::Timeout) do
74 EMPromise.reject(Timeout.new)
75 end
76 end
77
78 def finish(text=nil, type: :info, status: :completed)
79 reply = @iq.reply
80 reply.status = status
81 EMPromise.resolve(block_given? ? yield(reply) : nil).then {
82 if text
83 reply.note_type = type
84 reply.note_text = text
85 end
86 }.then do
87 EMPromise.reject(FinalStanza.new(reply))
88 end
89 end
90
91 def customer
92 @customer ||= @customer_repo.find_by_jid(@iq.from.stripped).then { |c|
93 Sentry.set_user(
94 id: c.customer_id,
95 jid: @iq.from.stripped
96 )
97 c
98 }
99 end
100
101 protected
102
103 def catch_after(promise)
104 promise.catch_only(Blather::Stanza::Iq::Command) { |iq|
105 next EMPromise.reject(iq) unless iq.cancel?
106
107 finish(status: :canceled)
108 }.catch_only(Timeout) { nil }.catch_only(FinalStanza) { |e|
109 @blather << e.stanza
110 }.catch do |e|
111 e = RuntimeError.new(e) if e.is_a?(String)
112 send_final_error(e)
113 EMPromise.reject(e)
114 end
115 end
116
117 def send_final_error(e)
118 def e.replied?
119 true
120 end
121
122 finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s|
123 @blather << s.stanza
124 end
125 end
126 end
127
128 attr_reader :node, :name
129
130 def initialize(
131 node,
132 name,
133 customer_repo: CustomerRepo.new,
134 list_for: ->(tel:, **) { !!tel },
135 format_error: ->(e) { e.respond_to?(:message) ? e.message : e.to_s },
136 &blk
137 )
138 @node = node
139 @name = name
140 @customer_repo = customer_repo
141 @list_for = list_for
142 @format_error = format_error
143 @blk = blk
144 end
145
146 def register(blather, guards: [:execute?, { node: @node, sessionid: nil }])
147 blather.command(*guards) do |iq|
148 Execution.new(@customer_repo, blather, @format_error, iq).execute(&@blk)
149 end
150 self
151 end
152
153 def list_for?(**kwargs)
154 @list_for.call(**kwargs)
155 end
156end