1# frozen_string_literal: true
2
3require "sentry-ruby"
4require "statsd-instrument"
5
6require_relative "customer_repo"
7require_relative "session_manager"
8
9class Command
10 def self.execution
11 Thread.current[:execution]
12 end
13
14 def self.reply(stanza=nil, &blk)
15 execution.reply(stanza, &blk)
16 end
17
18 def self.finish(*args, **kwargs, &blk)
19 execution.finish(*args, **kwargs, &blk)
20 end
21
22 def self.customer
23 execution.customer
24 end
25
26 def self.log
27 execution.log
28 end
29
30 class Execution
31 class Timeout < SessionManager::Timeout; end
32
33 class FinalStanza
34 attr_reader :stanza
35
36 def initialize(stanza)
37 @stanza = stanza
38 end
39 end
40
41 attr_reader :customer_repo, :log, :iq
42
43 def initialize(customer_repo, blather, format_error, iq)
44 @customer_repo = customer_repo
45 @blather = blather
46 @format_error = format_error
47 @iq = iq
48 @log = LOG.child(node: iq.node)
49 end
50
51 def execute
52 StatsD.increment("command", tags: ["node:#{iq.node}"])
53 EMPromise.resolve(nil).then {
54 Thread.current[:execution] = self
55 sentry_hub
56 catch_after(EMPromise.resolve(yield self))
57 }.catch(&method(:panic))
58 end
59
60 def reply(stanza=nil)
61 stanza ||= iq.reply.tap { |reply|
62 reply.status = :executing
63 }
64 yield stanza if block_given?
65 COMMAND_MANAGER.write(stanza).then { |new_iq|
66 @iq = new_iq
67 }.catch_only(SessionManager::Timeout) do
68 EMPromise.reject(Timeout.new)
69 end
70 end
71
72 def finish(text=nil, type: :info, status: :completed)
73 reply = @iq.reply
74 reply.status = status
75 yield reply if block_given?
76 if text
77 reply.note_type = type
78 reply.note_text = text
79 end
80 EMPromise.reject(FinalStanza.new(reply))
81 end
82
83 def sentry_hub
84 return @sentry_hub if @sentry_hub
85
86 # Stored on Fiber-local in 4.3.1 and earlier
87 # https://github.com/getsentry/sentry-ruby/issues/1495
88 @sentry_hub = Sentry.get_current_hub
89 raise "Sentry.init has not been called" unless @sentry_hub
90
91 @sentry_hub.push_scope
92 @sentry_hub.current_scope.clear_breadcrumbs
93 @sentry_hub.current_scope.set_transaction_name(@iq.node)
94 @sentry_hub.current_scope.set_user(jid: @iq.from.stripped.to_s)
95 @sentry_hub
96 end
97
98 def customer
99 @customer ||= @customer_repo.find_by_jid(@iq.from.stripped).then { |c|
100 sentry_hub.current_scope.set_user(
101 id: c.customer_id,
102 jid: @iq.from.stripped
103 )
104 c
105 }
106 end
107
108 protected
109
110 def catch_after(promise)
111 promise.catch_only(Blather::Stanza::Iq::Command) { |iq|
112 next EMPromise.reject(iq) unless iq.cancel?
113
114 finish(status: :canceled)
115 }.catch_only(Timeout) { nil }.catch_only(FinalStanza) { |e|
116 @blather << e.stanza
117 }.catch do |e|
118 log_error(e)
119 send_final_error(e)
120 end
121 end
122
123 def send_final_error(e)
124 finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s|
125 @blather << s.stanza
126 end
127 end
128
129 def log_error(e)
130 @log.error("Error raised during #{iq.node}: #{e.class}", e)
131 if e.is_a?(::Exception)
132 sentry_hub.capture_exception(e)
133 else
134 sentry_hub.capture_message(e.to_s)
135 end
136 end
137 end
138
139 attr_reader :node, :name
140
141 def initialize(
142 node,
143 name,
144 customer_repo: CustomerRepo.new,
145 list_for: ->(tel:, **) { !!tel },
146 format_error: ->(e) { e.respond_to?(:message) ? e.message : e.to_s },
147 &blk
148 )
149 @node = node
150 @name = name
151 @customer_repo = customer_repo
152 @list_for = list_for
153 @format_error = format_error
154 @blk = blk
155 end
156
157 def register(blather, guards: [:execute?, { node: @node, sessionid: nil }])
158 blather.command(*guards) do |iq|
159 Execution.new(@customer_repo, blather, @format_error, iq).execute(&@blk)
160 end
161 self
162 end
163
164 def list_for?(**kwargs)
165 @list_for.call(**kwargs)
166 end
167end