command.rb

  1# frozen_string_literal: true
  2
  3require "sentry-ruby"
  4require "statsd-instrument"
  5
  6require_relative "customer_repo"
  7require_relative "session_manager"
  8
  9class Command
 10	ACTIONS_FIELD = "http://jabber.org/protocol/commands#actions"
 11
 12	def self.execution
 13		Thread.current[:execution]
 14	end
 15
 16	def self.execution=(exe)
 17		Thread.current[:execution] = exe
 18	end
 19
 20	def self.reply(stanza=nil, &blk)
 21		execution.reply(stanza, &blk)
 22	end
 23
 24	def self.finish(*args, **kwargs, &blk)
 25		execution.finish(*args, **kwargs, &blk)
 26	end
 27
 28	def self.customer
 29		execution.customer
 30	end
 31
 32	def self.log
 33		execution.log
 34	end
 35
 36	class Execution
 37		class Timeout < SessionManager::Timeout; end
 38
 39		class FinalStanza
 40			attr_reader :stanza
 41
 42			def initialize(stanza)
 43				@stanza = stanza
 44			end
 45		end
 46
 47		attr_reader :customer_repo, :log, :iq
 48
 49		def initialize(customer_repo, blather, format_error, iq)
 50			@customer_repo = customer_repo
 51			@blather = blather
 52			@format_error = format_error
 53			@iq = iq
 54			@log = Thread.current[:log] || LOG
 55		end
 56
 57		def execute
 58			StatsD.increment("command", tags: ["node:#{iq.node}"])
 59			EMPromise.resolve(nil).then {
 60				Command.execution = self
 61				catch_after(EMPromise.resolve(yield self))
 62			}
 63		end
 64
 65		def reply(stanza=nil)
 66			stanza ||= iq.reply.tap { |reply| reply.status = :executing }
 67			yield stanza if block_given?
 68			COMMAND_MANAGER.write(stanza).then { |new_iq|
 69				@iq = new_iq
 70			}.catch_only(Blather::Stanza::Iq) { |new_iq|
 71				@iq = new_iq if new_iq.set?
 72				EMPromise.reject(new_iq)
 73			}.catch_only(SessionManager::Timeout) do
 74				EMPromise.reject(Timeout.new)
 75			end
 76		end
 77
 78		def finish(text=nil, type: :info, status: :completed)
 79			reply = @iq.reply
 80			reply.status = status
 81			EMPromise.resolve(block_given? ? yield(reply) : nil).then {
 82				if text
 83					reply.note_type = type
 84					reply.note_text = text
 85				end
 86			}.then do
 87				EMPromise.reject(FinalStanza.new(reply))
 88			end
 89		end
 90
 91		def customer
 92			@customer ||= @customer_repo.find_by_jid(@iq.from.stripped).then { |c|
 93				Sentry.set_user(
 94					id: c.customer_id,
 95					jid: @iq.from.stripped
 96				)
 97				c
 98			}
 99		end
100
101	protected
102
103		def catch_after(promise)
104			promise.catch_only(Blather::Stanza::Iq::Command) { |iq|
105				next EMPromise.reject(iq) unless iq.cancel?
106
107				finish(status: :canceled)
108			}.catch_only(Timeout) { nil }.catch_only(FinalStanza) { |e|
109				@blather << e.stanza
110			}.catch do |e|
111				e = RuntimeError.new(e) if e.is_a?(String)
112				send_final_error(e)
113				EMPromise.reject(e)
114			end
115		end
116
117		def send_final_error(e)
118			def e.replied?
119				true
120			end
121
122			finish(@format_error.call(e), type: :error).catch_only(FinalStanza) do |s|
123				@blather << s.stanza
124			end
125		end
126	end
127
128	attr_reader :node, :name
129
130	def initialize(
131		node,
132		name,
133		customer_repo: CustomerRepo.new,
134		list_for: ->(tel:, **) { !!tel },
135		format_error: ->(e) { e.respond_to?(:message) ? e.message : e.to_s },
136		&blk
137	)
138		@node = node
139		@name = name
140		@customer_repo = customer_repo
141		@list_for = list_for
142		@format_error = format_error
143		@blk = blk
144	end
145
146	def register(blather, guards: [:execute?, { node: @node, sessionid: nil }])
147		blather.command(*guards) do |iq|
148			Execution.new(@customer_repo, blather, @format_error, iq).execute(&@blk)
149		end
150		self
151	end
152
153	def list_for?(**kwargs)
154		@list_for.call(**kwargs)
155	end
156end