web.rb

  1# frozen_string_literal: true
  2
  3require "digest"
  4require "forwardable"
  5require "multibases"
  6require "multihashes"
  7require "roda"
  8require "thin"
  9require "sentry-ruby"
 10
 11require_relative "lib/call_attempt_repo"
 12require_relative "lib/cdr"
 13require_relative "lib/cdr_repo"
 14require_relative "lib/oob"
 15require_relative "lib/rev_ai"
 16require_relative "lib/roda_capture"
 17require_relative "lib/roda_em_promise"
 18require_relative "lib/rack_fiber"
 19require_relative "lib/reachability_repo"
 20
 21class OGMDownload
 22	def initialize(url)
 23		@digest = Digest::SHA512.new
 24		@f = Tempfile.open("ogm")
 25		@req = EM::HttpRequest.new(url, tls: { verify_peer: true })
 26	end
 27
 28	def download
 29		http = @req.aget
 30		http.stream do |chunk|
 31			@digest << chunk
 32			@f.write chunk
 33		end
 34		http.then { @f.close }.catch do |e|
 35			@f.close!
 36			EMPromise.reject(e)
 37		end
 38	end
 39
 40	def cid
 41		Multibases.encode(
 42			"base58btc",
 43			[1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
 44		).pack.to_s
 45	end
 46
 47	def path
 48		@f.path
 49	end
 50end
 51
 52# rubocop:disable Metrics/ClassLength
 53class Web < Roda
 54	use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
 55	use Sentry::Rack::CaptureExceptions
 56	plugin :json_parser
 57	plugin :type_routing
 58	plugin :public
 59	plugin :render, engine: "slim"
 60	plugin RodaCapture
 61	plugin RodaEMPromise # Must go last!
 62
 63	class << self
 64		attr_reader :customer_repo, :log, :outbound_transfers
 65
 66		def run(log, *listen_on)
 67			plugin :common_logger, log, method: :info
 68			@outbound_transfers = {}
 69			Thin::Logging.logger = log
 70			Thin::Server.start(
 71				*listen_on,
 72				freeze.app,
 73				signals: false
 74			)
 75		end
 76	end
 77
 78	extend Forwardable
 79	def_delegators :'self.class', :outbound_transfers
 80	def_delegators :request, :params
 81
 82	def log
 83		opts[:common_logger]
 84	end
 85
 86	def log_error(e)
 87		log.error(
 88			"Error raised during #{request.fullpath}: #{e.class}",
 89			e,
 90			loggable_params
 91		)
 92		if e.is_a?(::Exception)
 93			Sentry.capture_exception(e)
 94		else
 95			Sentry.capture_message(e.to_s)
 96		end
 97	end
 98
 99	def loggable_params
100		params.dup.tap do |p|
101			p.delete("to")
102			p.delete("from")
103		end
104	end
105
106	def customer_repo(**kwargs)
107		kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
108		opts[:customer_repo] || CustomerRepo.new(**kwargs)
109	end
110
111	def reachability_repo(**kwargs)
112		opts[:reachability_repo] || ReachabilityRepo::Voice.new(**kwargs)
113	end
114
115	def find_by_tel_with_fallback(sgx_repo:, **kwargs)
116		customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
117			next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
118
119			log_error(e)
120			customer_repo(
121				sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
122			).find_by_tel(params["to"])
123		}
124	end
125
126	def call_attempt_repo
127		opts[:call_attempt_repo] || CallAttemptRepo.new
128	end
129
130	def cdr_repo
131		opts[:cdr_repo] || CDRRepo.new
132	end
133
134	def rev_ai
135		RevAi.new(logger: log.child(loggable_params))
136	end
137
138	TEL_CANDIDATES = {
139		"Restricted" => "14",
140		"anonymous" => "15",
141		"Anonymous" => "16",
142		"unavailable" => "17",
143		"Unavailable" => "18"
144	}.freeze
145
146	def sanitize_tel_candidate(candidate)
147		if candidate.length < 3
148			"13;phone-context=anonymous.phone-context.soprani.ca"
149		elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
150			candidate
151		else
152			"#{TEL_CANDIDATES.fetch(candidate, '19')}" \
153				";phone-context=anonymous.phone-context.soprani.ca"
154		end
155	end
156
157	def from_jid
158		Blather::JID.new(
159			sanitize_tel_candidate(params["from"]),
160			CONFIG[:component][:jid]
161		)
162	end
163
164	def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
165		[
166			"/inbound/calls/#{call_id || params['callId']}",
167			suffix
168		].compact.join("/") +
169			(customer_id ? "?customer_id=#{customer_id}" : "")
170	end
171
172	def url(path)
173		"#{request.base_url}#{path}"
174	end
175
176	def modify_call(call_id)
177		body = Bandwidth::ApiModifyCallRequest.new
178		yield body
179		BANDWIDTH_VOICE.modify_call(
180			CONFIG[:creds][:account],
181			call_id,
182			body: body
183		)
184	rescue Bandwidth::APIException
185		# If call does not exist, don't need to hang up or send to voicemail
186		# Other side must have hung up already
187		raise $! unless [404, 409].include?($!.response_code)
188	end
189
190	def start_transcription(customer, call_id, media_url)
191		return unless customer.transcription_enabled
192
193		rev_ai.language_id(
194			media_url,
195			url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
196			from_jid: from_jid,
197			customer_id: customer.customer_id
198		)
199	end
200
201	def call_inputs(customer, from, call_id)
202		EMPromise.all([
203			customer.customer_id, customer.fwd,
204			call_attempt_repo.find_inbound(customer, from, call_id: call_id)
205		])
206	end
207
208	def create_call(customer, from, call_id, application_id, tries: nil)
209		call_inputs(customer, from, call_id).then do |(customer_id, fwd, ca)|
210			ca.create_call(fwd, CONFIG[:creds][:account]) do |cc|
211				cc.from = from
212				cc.application_id = application_id
213				cc.answer_url = url inbound_calls_path(nil, customer_id)
214				cc.disconnect_url = url(
215					inbound_calls_path(:transfer_complete, customer_id, call_id: call_id)
216				) + (tries ? "&tries=#{tries}" : "")
217			end
218		end
219	end
220
221	route do |r|
222		r.get "healthcheck" do
223			"OK"
224		end
225
226		r.on "inbound" do
227			r.on "calls" do
228				r.post "status" do
229					if params["eventType"] == "disconnect"
230						if (outbound_leg = outbound_transfers.delete(params["callId"]))
231							modify_call(outbound_leg) do |call|
232								call.state = "completed"
233							end
234						end
235
236						customer_repo.find_by_tel(params["to"]).then do |customer|
237							cdr_repo.put(CDR.for_inbound(customer.customer_id, params))
238						end
239					end
240					"OK"
241				end
242
243				r.on :call_id do |call_id|
244					r.post "transfer_complete" do
245						outbound_leg = outbound_transfers.delete(call_id)
246						if params["cause"] == "hangup" && params["tag"] == "connected"
247							log.info "Normal hangup, now end #{call_id}", loggable_params
248							modify_call(call_id) { |call| call.state = "completed" }
249						elsif !outbound_leg
250							log.debug "Inbound disconnected", loggable_params
251						elsif params["cause"] == "error" && params["tries"].to_i < 15
252							log.info "2nd leg error, retry", loggable_params
253							customer_repo(
254								sgx_repo: Bwmsgsv2Repo.new
255							).find(params["customer_id"]).then { |customer|
256								create_call(
257									customer, params["from"], call_id, params["applicationId"],
258									tries: params["tries"].to_i + 1
259								).then { |call|
260									outbound_transfers[params["callId"]] = call
261								}.catch(&log.method(:error))
262							}
263						else
264							log.debug "Go to voicemail", loggable_params
265							modify_call(call_id) do |call|
266								call.redirect_url = url inbound_calls_path(:voicemail)
267							end
268						end
269						""
270					end
271
272					r.on "voicemail" do
273						r.post "audio" do
274							duration = Time.parse(params["endTime"]) -
275							           Time.parse(params["startTime"])
276							next "OK<5" unless duration > 5
277
278							jmp_media_url = params["mediaUrl"].sub(
279								/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
280								"https://jmp.chat"
281							)
282
283							find_by_tel_with_fallback(
284								sgx_repo: Bwmsgsv2Repo.new,
285								transcription_enabled: false
286							).then do |customer|
287								start_transcription(customer, call_id, jmp_media_url)
288
289								m = Blather::Stanza::Message.new
290								m.chat_state = nil
291								m.from = from_jid
292								m.subject = "New Voicemail"
293								m << OOB.new(jmp_media_url)
294								customer.stanza_to(m)
295
296								"OK"
297							end
298						end
299
300						r.post "language_id" do
301							rev_ai.language_id_result(params).then { |result|
302								rev_ai.stt(
303									result["top_language"],
304									result.dig("metadata", "media_url"),
305									url(inbound_calls_path(
306										"voicemail/transcription",
307										call_id: call_id
308									)),
309									**result["metadata"].transform_keys(&:to_sym)
310								).then { "OK" }
311							}.catch_only(RevAi::Failed) { |e|
312								log_error(e)
313								"Failure logged"
314							}
315						end
316
317						r.post "transcription" do
318							rev_ai.stt_result(params, request.url).then { |result|
319								next "OK" if result["text"].to_s.empty?
320
321								customer_repo.find(
322									result.dig("metadata", "customer_id")
323								).then do |customer|
324									m = Blather::Stanza::Message.new
325									m.chat_state = nil
326									m.from = result.dig("metadata", "from_jid")
327									m.subject = "Voicemail Transcription"
328									m.body = result["text"]
329									customer.stanza_to(m)
330
331									"OK"
332								end
333							}.catch_only(RevAi::Failed) { |e|
334								log_error(e)
335								"Failure logged"
336							}
337						end
338
339						r.post do
340							find_by_tel_with_fallback(
341								sgx_repo: Bwmsgsv2Repo.new,
342								ogm_url: nil
343							).then { |c|
344								c.ogm(params["from"])
345							}.then { |ogm|
346								next render :hangup unless ogm
347
348								render :voicemail, locals: { ogm: ogm }
349							}.catch_only(CustomerRepo::NotFound) {
350								render "inbound/no_customer"
351							}
352						end
353					end
354
355					r.post do
356						customer_repo(
357							sgx_repo: Bwmsgsv2Repo.new
358						).find(params.fetch("customer_id")).then do |customer|
359							call_attempt_repo.find_inbound(
360								customer,
361								params["from"],
362								call_id: call_id,
363								digits: params["digits"]
364							).then { |ca| render(*ca.to_render) }
365						end
366					end
367				end
368
369				r.post do
370					customer_repo(
371						sgx_repo: Bwmsgsv2Repo.new
372					).find_by_tel(params["to"]).then { |customer|
373						reachability_repo.find(customer, params["from"]).then do |reach|
374							reach.filter(if_yes: ->(_) { render :hangup }) do
375								create_call(
376									customer,
377									params["from"],
378									params["callId"],
379									params["applicationId"]
380								).then { |call|
381									next EMPromise.reject(:voicemail) unless call
382
383									outbound_transfers[params["callId"]] = call
384									render :ring, locals: { duration: 300 }
385								}
386							end
387						end
388					}.catch_only(CustomerFwd::InfiniteTimeout) { |e|
389						render :forward, locals: { fwd: e.fwd, from: params["from"] }
390					}.catch { |e|
391						log_error(e) unless e == :voicemail
392						render :redirect, locals: { to: inbound_calls_path(:voicemail) }
393					}
394				end
395			end
396		end
397
398		r.on "outbound" do
399			r.on "calls" do
400				r.post "status" do
401					log.info "#{params['eventType']} #{params['callId']}", loggable_params
402					if params["eventType"] == "disconnect"
403						customer_id = params["from"].sub(/^(?:\+1|c)/, "")
404						call_attempt_repo.ending_call(customer_id, params["callId"])
405						cdr_repo
406							.put(CDR.for_outbound(customer_id, params))
407							.catch(&method(:log_error))
408					end
409					"OK"
410				end
411
412				r.post do
413					from = params["from"].sub(/^(?:\+1|c)/, "")
414					customer_repo(
415						sgx_repo: Bwmsgsv2Repo.new
416					).find_by_format(from).then { |c|
417						call_attempt_repo.find_outbound(
418							c,
419							params["to"],
420							call_id: params["callId"],
421							digits: params["digits"]
422						).then do |ca|
423							r.json { ca.to_json }
424
425							call_attempt_repo.starting_call(c, params["callId"])
426							render(*ca.to_render)
427						end
428					}.catch_only(CustomerRepo::NotFound) {
429						render "outbound/no_customer"
430					}
431				end
432			end
433		end
434
435		r.on "ogm" do
436			r.post "start" do
437				render :record_ogm, locals: { customer_id: params["customer_id"] }
438			end
439
440			r.post do
441				jmp_media_url = params["mediaUrl"].sub(
442					/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
443					"https://jmp.chat"
444				)
445				ogm = OGMDownload.new(jmp_media_url)
446				ogm.download.then do
447					File.rename(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
448					File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
449					customer_repo.find(params["customer_id"]).then do |customer|
450						customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
451					end
452				end
453			end
454		end
455
456		r.public
457	end
458end
459# rubocop:enable Metrics/ClassLength