command.rb

  1# frozen_string_literal: true
  2
  3require "sentry-ruby"
  4require "statsd-instrument"
  5
  6require_relative "customer_repo"
  7require_relative "session_manager"
  8
  9class Command
 10	def self.execution
 11		Thread.current[:execution]
 12	end
 13
 14	def self.reply(stanza=nil, &blk)
 15		execution.reply(stanza, &blk)
 16	end
 17
 18	def self.finish(*args, **kwargs, &blk)
 19		execution.finish(*args, **kwargs, &blk)
 20	end
 21
 22	def self.customer
 23		execution.customer
 24	end
 25
 26	def self.log
 27		execution.log
 28	end
 29
 30	class Execution
 31		class Timeout < SessionManager::Timeout; end
 32
 33		class FinalStanza
 34			attr_reader :stanza
 35
 36			def initialize(stanza)
 37				@stanza = stanza
 38			end
 39		end
 40
 41		attr_reader :customer_repo, :log, :iq
 42
 43		def initialize(customer_repo, blather, format_error, iq)
 44			@customer_repo = customer_repo
 45			@blather = blather
 46			@format_error = format_error
 47			@iq = iq
 48			@log = LOG.child(node: iq.node)
 49		end
 50
 51		def execute
 52			StatsD.increment("command", tags: ["node:#{iq.node}"])
 53			EMPromise.resolve(nil).then {
 54				Thread.current[:execution] = self
 55				sentry_hub
 56				catch_after(EMPromise.resolve(yield self))
 57			}.catch(&method(:panic))
 58		end
 59
 60		def reply(stanza=nil)
 61			stanza ||= iq.reply.tap { |reply| reply.status = :executing }
 62			yield stanza if block_given?
 63			COMMAND_MANAGER.write(stanza).then { |new_iq|
 64				@iq = new_iq
 65			}.catch_only(Blather::Stanza::Iq) { |new_iq|
 66				@iq = new_iq if new_iq.set?
 67				EMPromise.reject(new_iq)
 68			}.catch_only(SessionManager::Timeout) do
 69				EMPromise.reject(Timeout.new)
 70			end
 71		end
 72
 73		def finish(text=nil, type: :info, status: :completed)
 74			reply = @iq.reply
 75			reply.status = status
 76			yield reply if block_given?
 77			if text
 78				reply.note_type = type
 79				reply.note_text = text
 80			end
 81			EMPromise.reject(FinalStanza.new(reply))
 82		end
 83
 84		def sentry_hub
 85			return @sentry_hub if @sentry_hub
 86
 87			# Stored on Fiber-local in 4.3.1 and earlier
 88			# https://github.com/getsentry/sentry-ruby/issues/1495
 89			@sentry_hub = Sentry.get_current_hub
 90			raise "Sentry.init has not been called" unless @sentry_hub
 91
 92			@sentry_hub.push_scope
 93			@sentry_hub.current_scope.clear_breadcrumbs
 94			@sentry_hub.current_scope.set_transaction_name(@iq.node)
 95			@sentry_hub.current_scope.set_user(jid: @iq.from.stripped.to_s)
 96			@sentry_hub
 97		end
 98
 99		def customer
100			@customer ||= @customer_repo.find_by_jid(@iq.from.stripped).then { |c|
101				sentry_hub.current_scope.set_user(
102					id: c.customer_id,
103					jid: @iq.from.stripped
104				)
105				c
106			}
107		end
108
109	protected
110
111		def catch_after(promise)
112			promise.catch_only(Blather::Stanza::Iq::Command) { |iq|
113				next EMPromise.reject(iq) unless iq.cancel?
114
115				finish(status: :canceled)
116			}.catch_only(Timeout) { nil }.catch_only(FinalStanza) { |e|
117				@blather << e.stanza
118			}.catch do |e|
119				log_error(e)
120				send_final_error(e)
121			end
122		end
123
124		def send_final_error(e)
125			finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s|
126				@blather << s.stanza
127			end
128		end
129
130		def log_error(e)
131			@log.error("Error raised during #{iq.node}: #{e.class}", e)
132			if e.is_a?(::Exception)
133				sentry_hub.capture_exception(e)
134			else
135				sentry_hub.capture_message(e.to_s)
136			end
137		end
138	end
139
140	attr_reader :node, :name
141
142	def initialize(
143		node,
144		name,
145		customer_repo: CustomerRepo.new,
146		list_for: ->(tel:, **) { !!tel },
147		format_error: ->(e) { e.respond_to?(:message) ? e.message : e.to_s },
148		&blk
149	)
150		@node = node
151		@name = name
152		@customer_repo = customer_repo
153		@list_for = list_for
154		@format_error = format_error
155		@blk = blk
156	end
157
158	def register(blather, guards: [:execute?, { node: @node, sessionid: nil }])
159		blather.command(*guards) do |iq|
160			Execution.new(@customer_repo, blather, @format_error, iq).execute(&@blk)
161		end
162		self
163	end
164
165	def list_for?(**kwargs)
166		@list_for.call(**kwargs)
167	end
168end