command.rb

  1# frozen_string_literal: true
  2
  3require "sentry-ruby"
  4require "statsd-instrument"
  5
  6require_relative "customer_repo"
  7require_relative "session_manager"
  8
  9class Command
 10	def self.execution
 11		Thread.current[:execution]
 12	end
 13
 14	def self.execution=(exe)
 15		Thread.current[:execution] = exe
 16	end
 17
 18	def self.reply(stanza=nil, &blk)
 19		execution.reply(stanza, &blk)
 20	end
 21
 22	def self.finish(*args, **kwargs, &blk)
 23		execution.finish(*args, **kwargs, &blk)
 24	end
 25
 26	def self.customer
 27		execution.customer
 28	end
 29
 30	def self.log
 31		execution.log
 32	end
 33
 34	class Execution
 35		class Timeout < SessionManager::Timeout; end
 36
 37		class FinalStanza
 38			attr_reader :stanza
 39
 40			def initialize(stanza)
 41				@stanza = stanza
 42			end
 43		end
 44
 45		attr_reader :customer_repo, :log, :iq
 46
 47		def initialize(customer_repo, blather, format_error, iq)
 48			@customer_repo = customer_repo
 49			@blather = blather
 50			@format_error = format_error
 51			@iq = iq
 52			@log = LOG.child(node: iq.node)
 53		end
 54
 55		def execute
 56			StatsD.increment("command", tags: ["node:#{iq.node}"])
 57			EMPromise.resolve(nil).then {
 58				Command.execution = self
 59				sentry_hub
 60				catch_after(EMPromise.resolve(yield self))
 61			}.catch(&method(:panic))
 62		end
 63
 64		def reply(stanza=nil)
 65			stanza ||= iq.reply.tap { |reply| reply.status = :executing }
 66			yield stanza if block_given?
 67			COMMAND_MANAGER.write(stanza).then { |new_iq|
 68				@iq = new_iq
 69			}.catch_only(Blather::Stanza::Iq) { |new_iq|
 70				@iq = new_iq if new_iq.set?
 71				EMPromise.reject(new_iq)
 72			}.catch_only(SessionManager::Timeout) do
 73				EMPromise.reject(Timeout.new)
 74			end
 75		end
 76
 77		def finish(text=nil, type: :info, status: :completed)
 78			reply = @iq.reply
 79			reply.status = status
 80			yield reply if block_given?
 81			if text
 82				reply.note_type = type
 83				reply.note_text = text
 84			end
 85			EMPromise.reject(FinalStanza.new(reply))
 86		end
 87
 88		def sentry_hub
 89			return @sentry_hub if @sentry_hub
 90
 91			# Stored on Fiber-local in 4.3.1 and earlier
 92			# https://github.com/getsentry/sentry-ruby/issues/1495
 93			@sentry_hub = Sentry.get_current_hub
 94			raise "Sentry.init has not been called" unless @sentry_hub
 95
 96			@sentry_hub.push_scope
 97			@sentry_hub.current_scope.clear_breadcrumbs
 98			@sentry_hub.current_scope.set_transaction_name(@iq.node)
 99			@sentry_hub.current_scope.set_user(jid: @iq.from.stripped.to_s)
100			@sentry_hub
101		end
102
103		def customer
104			@customer ||= @customer_repo.find_by_jid(@iq.from.stripped).then { |c|
105				sentry_hub.current_scope.set_user(
106					id: c.customer_id,
107					jid: @iq.from.stripped
108				)
109				c
110			}
111		end
112
113	protected
114
115		def catch_after(promise)
116			promise.catch_only(Blather::Stanza::Iq::Command) { |iq|
117				next EMPromise.reject(iq) unless iq.cancel?
118
119				finish(status: :canceled)
120			}.catch_only(Timeout) { nil }.catch_only(FinalStanza) { |e|
121				@blather << e.stanza
122			}.catch do |e|
123				log_error(e)
124				send_final_error(e)
125			end
126		end
127
128		def send_final_error(e)
129			finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s|
130				@blather << s.stanza
131			end
132		end
133
134		def log_error(e)
135			@log.error("Error raised during #{iq.node}: #{e.class}", e)
136			if e.is_a?(::Exception)
137				sentry_hub.capture_exception(e)
138			else
139				sentry_hub.capture_message(e.to_s)
140			end
141		end
142	end
143
144	attr_reader :node, :name
145
146	def initialize(
147		node,
148		name,
149		customer_repo: CustomerRepo.new,
150		list_for: ->(tel:, **) { !!tel },
151		format_error: ->(e) { e.respond_to?(:message) ? e.message : e.to_s },
152		&blk
153	)
154		@node = node
155		@name = name
156		@customer_repo = customer_repo
157		@list_for = list_for
158		@format_error = format_error
159		@blk = blk
160	end
161
162	def register(blather, guards: [:execute?, { node: @node, sessionid: nil }])
163		blather.command(*guards) do |iq|
164			Execution.new(@customer_repo, blather, @format_error, iq).execute(&@blk)
165		end
166		self
167	end
168
169	def list_for?(**kwargs)
170		@list_for.call(**kwargs)
171	end
172end