1# frozen_string_literal: true
2
3require "sentry-ruby"
4require "statsd-instrument"
5
6require_relative "customer_repo"
7require_relative "session_manager"
8
9class Command
10 def self.execution
11 Thread.current[:execution]
12 end
13
14 def self.execution=(exe)
15 Thread.current[:execution] = exe
16 end
17
18 def self.reply(stanza=nil, &blk)
19 execution.reply(stanza, &blk)
20 end
21
22 def self.finish(*args, **kwargs, &blk)
23 execution.finish(*args, **kwargs, &blk)
24 end
25
26 def self.customer
27 execution.customer
28 end
29
30 def self.log
31 execution.log
32 end
33
34 class Execution
35 class Timeout < SessionManager::Timeout; end
36
37 class FinalStanza
38 attr_reader :stanza
39
40 def initialize(stanza)
41 @stanza = stanza
42 end
43 end
44
45 attr_reader :customer_repo, :log, :iq
46
47 def initialize(customer_repo, blather, format_error, iq)
48 @customer_repo = customer_repo
49 @blather = blather
50 @format_error = format_error
51 @iq = iq
52 @log = LOG.child(node: iq.node)
53 end
54
55 def execute
56 StatsD.increment("command", tags: ["node:#{iq.node}"])
57 EMPromise.resolve(nil).then {
58 Command.execution = self
59 sentry_hub
60 catch_after(EMPromise.resolve(yield self))
61 }.catch(&method(:panic))
62 end
63
64 def reply(stanza=nil)
65 stanza ||= iq.reply.tap { |reply| reply.status = :executing }
66 yield stanza if block_given?
67 COMMAND_MANAGER.write(stanza).then { |new_iq|
68 @iq = new_iq
69 }.catch_only(Blather::Stanza::Iq) { |new_iq|
70 @iq = new_iq if new_iq.set?
71 EMPromise.reject(new_iq)
72 }.catch_only(SessionManager::Timeout) do
73 EMPromise.reject(Timeout.new)
74 end
75 end
76
77 def finish(text=nil, type: :info, status: :completed)
78 reply = @iq.reply
79 reply.status = status
80 yield reply if block_given?
81 if text
82 reply.note_type = type
83 reply.note_text = text
84 end
85 EMPromise.reject(FinalStanza.new(reply))
86 end
87
88 def sentry_hub
89 return @sentry_hub if @sentry_hub
90
91 # Stored on Fiber-local in 4.3.1 and earlier
92 # https://github.com/getsentry/sentry-ruby/issues/1495
93 @sentry_hub = Sentry.get_current_hub
94 raise "Sentry.init has not been called" unless @sentry_hub
95
96 @sentry_hub.push_scope
97 @sentry_hub.current_scope.clear_breadcrumbs
98 @sentry_hub.current_scope.set_transaction_name(@iq.node)
99 @sentry_hub.current_scope.set_user(jid: @iq.from.stripped.to_s)
100 @sentry_hub
101 end
102
103 def customer
104 @customer ||= @customer_repo.find_by_jid(@iq.from.stripped).then { |c|
105 sentry_hub.current_scope.set_user(
106 id: c.customer_id,
107 jid: @iq.from.stripped
108 )
109 c
110 }
111 end
112
113 protected
114
115 def catch_after(promise)
116 promise.catch_only(Blather::Stanza::Iq::Command) { |iq|
117 next EMPromise.reject(iq) unless iq.cancel?
118
119 finish(status: :canceled)
120 }.catch_only(Timeout) { nil }.catch_only(FinalStanza) { |e|
121 @blather << e.stanza
122 }.catch do |e|
123 log_error(e)
124 send_final_error(e)
125 end
126 end
127
128 def send_final_error(e)
129 finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s|
130 @blather << s.stanza
131 end
132 end
133
134 def log_error(e)
135 @log.error("Error raised during #{iq.node}: #{e.class}", e)
136 if e.is_a?(::Exception)
137 sentry_hub.capture_exception(e)
138 else
139 sentry_hub.capture_message(e.to_s)
140 end
141 end
142 end
143
144 attr_reader :node, :name
145
146 def initialize(
147 node,
148 name,
149 customer_repo: CustomerRepo.new,
150 list_for: ->(tel:, **) { !!tel },
151 format_error: ->(e) { e.respond_to?(:message) ? e.message : e.to_s },
152 &blk
153 )
154 @node = node
155 @name = name
156 @customer_repo = customer_repo
157 @list_for = list_for
158 @format_error = format_error
159 @blk = blk
160 end
161
162 def register(blather, guards: [:execute?, { node: @node, sessionid: nil }])
163 blather.command(*guards) do |iq|
164 Execution.new(@customer_repo, blather, @format_error, iq).execute(&@blk)
165 end
166 self
167 end
168
169 def list_for?(**kwargs)
170 @list_for.call(**kwargs)
171 end
172end