1# frozen_string_literal: true
2
3require "sentry-ruby"
4require "statsd-instrument"
5
6require_relative "customer_repo"
7require_relative "session_manager"
8
9class Command
10 def self.execution
11 Thread.current[:execution]
12 end
13
14 def self.reply(stanza=nil, &blk)
15 execution.reply(stanza, &blk)
16 end
17
18 def self.finish(*args, **kwargs, &blk)
19 execution.finish(*args, **kwargs, &blk)
20 end
21
22 def self.customer
23 execution.customer
24 end
25
26 def self.log
27 execution.log
28 end
29
30 class Execution
31 class Timeout < SessionManager::Timeout; end
32
33 class FinalStanza
34 attr_reader :stanza
35
36 def initialize(stanza)
37 @stanza = stanza
38 end
39 end
40
41 attr_reader :customer_repo, :log, :iq
42
43 def initialize(customer_repo, blather, format_error, iq)
44 @customer_repo = customer_repo
45 @blather = blather
46 @format_error = format_error
47 @iq = iq
48 @log = LOG.child(node: iq.node)
49 end
50
51 def execute
52 StatsD.increment("command", tags: ["node:#{iq.node}"])
53 EMPromise.resolve(nil).then {
54 Thread.current[:execution] = self
55 sentry_hub
56 catch_after(EMPromise.resolve(yield self))
57 }.catch(&method(:panic))
58 end
59
60 def reply(stanza=nil)
61 stanza ||= iq.reply.tap { |reply| reply.status = :executing }
62 yield stanza if block_given?
63 COMMAND_MANAGER.write(stanza).then { |new_iq|
64 @iq = new_iq
65 }.catch_only(Blather::Stanza::Iq) { |new_iq|
66 @iq = new_iq if new_iq.set?
67 EMPromise.reject(new_iq)
68 }.catch_only(SessionManager::Timeout) do
69 EMPromise.reject(Timeout.new)
70 end
71 end
72
73 def finish(text=nil, type: :info, status: :completed)
74 reply = @iq.reply
75 reply.status = status
76 yield reply if block_given?
77 if text
78 reply.note_type = type
79 reply.note_text = text
80 end
81 EMPromise.reject(FinalStanza.new(reply))
82 end
83
84 def sentry_hub
85 return @sentry_hub if @sentry_hub
86
87 # Stored on Fiber-local in 4.3.1 and earlier
88 # https://github.com/getsentry/sentry-ruby/issues/1495
89 @sentry_hub = Sentry.get_current_hub
90 raise "Sentry.init has not been called" unless @sentry_hub
91
92 @sentry_hub.push_scope
93 @sentry_hub.current_scope.clear_breadcrumbs
94 @sentry_hub.current_scope.set_transaction_name(@iq.node)
95 @sentry_hub.current_scope.set_user(jid: @iq.from.stripped.to_s)
96 @sentry_hub
97 end
98
99 def customer
100 @customer ||= @customer_repo.find_by_jid(@iq.from.stripped).then { |c|
101 sentry_hub.current_scope.set_user(
102 id: c.customer_id,
103 jid: @iq.from.stripped
104 )
105 c
106 }
107 end
108
109 protected
110
111 def catch_after(promise)
112 promise.catch_only(Blather::Stanza::Iq::Command) { |iq|
113 next EMPromise.reject(iq) unless iq.cancel?
114
115 finish(status: :canceled)
116 }.catch_only(Timeout) { nil }.catch_only(FinalStanza) { |e|
117 @blather << e.stanza
118 }.catch do |e|
119 log_error(e)
120 send_final_error(e)
121 end
122 end
123
124 def send_final_error(e)
125 finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s|
126 @blather << s.stanza
127 end
128 end
129
130 def log_error(e)
131 @log.error("Error raised during #{iq.node}: #{e.class}", e)
132 if e.is_a?(::Exception)
133 sentry_hub.capture_exception(e)
134 else
135 sentry_hub.capture_message(e.to_s)
136 end
137 end
138 end
139
140 attr_reader :node, :name
141
142 def initialize(
143 node,
144 name,
145 customer_repo: CustomerRepo.new,
146 list_for: ->(tel:, **) { !!tel },
147 format_error: ->(e) { e.respond_to?(:message) ? e.message : e.to_s },
148 &blk
149 )
150 @node = node
151 @name = name
152 @customer_repo = customer_repo
153 @list_for = list_for
154 @format_error = format_error
155 @blk = blk
156 end
157
158 def register(blather, guards: [:execute?, { node: @node, sessionid: nil }])
159 blather.command(*guards) do |iq|
160 Execution.new(@customer_repo, blather, @format_error, iq).execute(&@blk)
161 end
162 self
163 end
164
165 def list_for?(**kwargs)
166 @list_for.call(**kwargs)
167 end
168end