web.rb

  1# frozen_string_literal: true
  2
  3require "digest"
  4require "fileutils"
  5require "forwardable"
  6require "multibases"
  7require "multihashes"
  8require "roda"
  9require "sentry-ruby"
 10require "thin"
 11
 12require_relative "lib/call_attempt_repo"
 13require_relative "lib/cdr"
 14require_relative "lib/cdr_repo"
 15require_relative "lib/oob"
 16require_relative "lib/rev_ai"
 17require_relative "lib/roda_capture"
 18require_relative "lib/roda_em_promise"
 19require_relative "lib/rack_fiber"
 20require_relative "lib/reachability_repo"
 21
 22class OGMDownload
 23	def initialize(url)
 24		@digest = Digest::SHA512.new
 25		@f = Tempfile.open("ogm")
 26		@req = EM::HttpRequest.new(url, tls: { verify_peer: true })
 27	end
 28
 29	def download
 30		http = @req.aget
 31		http.stream do |chunk|
 32			@digest << chunk
 33			@f.write chunk
 34		end
 35		http.then { @f.close }.catch do |e|
 36			@f.close!
 37			EMPromise.reject(e)
 38		end
 39	end
 40
 41	def cid
 42		Multibases.encode(
 43			"base58btc",
 44			[1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
 45		).pack.to_s
 46	end
 47
 48	def path
 49		@f.path
 50	end
 51end
 52
 53# rubocop:disable Metrics/ClassLength
 54class Web < Roda
 55	use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
 56	use Sentry::Rack::CaptureExceptions
 57	plugin :json_parser
 58	plugin :type_routing
 59	plugin :public
 60	plugin :render, engine: "slim"
 61	plugin RodaCapture
 62	plugin RodaEMPromise # Must go last!
 63
 64	class << self
 65		attr_reader :customer_repo, :log, :outbound_transfers
 66
 67		def run(log, *listen_on)
 68			plugin :common_logger, log, method: :info
 69			@outbound_transfers = {}
 70			Thin::Logging.logger = log
 71			Thin::Server.start(
 72				*listen_on,
 73				freeze.app,
 74				signals: false
 75			)
 76		end
 77	end
 78
 79	extend Forwardable
 80	def_delegators :'self.class', :outbound_transfers
 81	def_delegators :request, :params
 82
 83	def log
 84		opts[:common_logger]
 85	end
 86
 87	def log_error(e)
 88		log.error(
 89			"Error raised during #{request.fullpath}: #{e.class}",
 90			e,
 91			loggable_params
 92		)
 93		if e.is_a?(::Exception)
 94			Sentry.capture_exception(e)
 95		else
 96			Sentry.capture_message(e.to_s)
 97		end
 98	end
 99
100	def loggable_params
101		params.dup.tap do |p|
102			p.delete("to")
103			p.delete("from")
104		end
105	end
106
107	def customer_repo(**kwargs)
108		kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
109		opts[:customer_repo] || CustomerRepo.new(**kwargs)
110	end
111
112	def reachability_repo(**kwargs)
113		opts[:reachability_repo] || ReachabilityRepo::Voice.new(**kwargs)
114	end
115
116	def find_by_tel_with_fallback(sgx_repo:, **kwargs)
117		customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
118			next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
119
120			log_error(e)
121			customer_repo(
122				sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
123			).find_by_tel(params["to"])
124		}
125	end
126
127	def call_attempt_repo
128		opts[:call_attempt_repo] || CallAttemptRepo.new
129	end
130
131	def cdr_repo
132		opts[:cdr_repo] || CDRRepo.new
133	end
134
135	def rev_ai
136		RevAi.new(logger: log.child(loggable_params))
137	end
138
139	TEL_CANDIDATES = {
140		"Restricted" => "14",
141		"anonymous" => "15",
142		"Anonymous" => "16",
143		"unavailable" => "17",
144		"Unavailable" => "18"
145	}.freeze
146
147	def sanitize_tel_candidate(candidate)
148		if candidate.length < 3
149			"13;phone-context=anonymous.phone-context.soprani.ca"
150		elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
151			candidate
152		else
153			"#{TEL_CANDIDATES.fetch(candidate, '19')}" \
154				";phone-context=anonymous.phone-context.soprani.ca"
155		end
156	end
157
158	def from_jid
159		Blather::JID.new(
160			sanitize_tel_candidate(params["from"]),
161			CONFIG[:component][:jid]
162		)
163	end
164
165	def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
166		[
167			"/inbound/calls/#{call_id || params['callId']}",
168			suffix
169		].compact.join("/") +
170			(customer_id ? "?customer_id=#{customer_id}" : "")
171	end
172
173	def url(path)
174		"#{request.base_url}#{path}"
175	end
176
177	def modify_call(call_id)
178		body = Bandwidth::ApiModifyCallRequest.new
179		yield body
180		BANDWIDTH_VOICE.modify_call(
181			CONFIG[:creds][:account],
182			call_id,
183			body: body
184		)
185	rescue Bandwidth::APIException
186		# If call does not exist, don't need to hang up or send to voicemail
187		# Other side must have hung up already
188		raise $! unless [404, 409].include?($!.response_code)
189	end
190
191	def start_transcription(customer, call_id, media_url)
192		return unless customer.transcription_enabled
193
194		rev_ai.language_id(
195			media_url,
196			url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
197			from_jid: from_jid,
198			customer_id: customer.customer_id
199		)
200	end
201
202	def call_inputs(customer, from, call_id)
203		EMPromise.all([
204			customer.customer_id, customer.fwd,
205			call_attempt_repo.find_inbound(customer, from, call_id: call_id)
206		])
207	end
208
209	def json_call(customer, fwd, call_attempt)
210		raise "Not allowed" unless params["secret"] == CONFIG[:component][:secret]
211
212		customer.ogm(params["from"]).then do |ogm|
213			call_attempt.as_json.merge(
214				fwd: fwd,
215				ogm: ogm
216			).to_json
217		end
218	end
219
220	def transfer_complete_url(customer_id, call_id, tries)
221		url(
222			inbound_calls_path(:transfer_complete, customer_id, call_id: call_id)
223		) + (tries ? "&tries=#{tries}" : "")
224	end
225
226	def create_call(customer, from, call_id, application_id, tries: nil)
227		call_inputs(customer, from, call_id).then do |(customer_id, fwd, ca)|
228			request.json { json_call(customer, fwd, ca) }
229			ca.create_call(fwd, CONFIG[:creds][:account]) do |cc|
230				cc.from = from
231				cc.application_id = application_id
232				cc.answer_url = url inbound_calls_path(nil, customer_id)
233				cc.disconnect_url = transfer_complete_url(customer_id, call_id, tries)
234			end
235		end
236	end
237
238	def inbound_from
239		if params["from"] && params["from"] =~ /\A\+?\d+\Z/
240			params["from"]
241		else
242			log.info "Inbound call with unusual from: #{params['from']}"
243			TEL_CANDIDATES.fetch(params["from"], "19")
244		end
245	end
246
247	def hangup
248		request.json { {}.to_json }
249
250		render :hangup
251	end
252
253	route do |r|
254		r.get "healthcheck" do
255			"OK"
256		end
257
258		r.on "inbound" do
259			r.on "calls" do
260				r.post "status" do
261					if params["eventType"] == "disconnect"
262						if (outbound_leg = outbound_transfers.delete(params["callId"]))
263							modify_call(outbound_leg) do |call|
264								call.state = "completed"
265							end
266						end
267
268						customer_repo.find_by_tel(params["to"]).then do |customer|
269							cdr_repo.put(CDR.for_inbound(customer.customer_id, params))
270						end
271					end
272					"OK"
273				end
274
275				r.on :call_id do |call_id|
276					r.post "transfer_complete" do
277						outbound_leg = outbound_transfers.delete(call_id)
278						if params["cause"] == "hangup" && params["tag"] == "connected"
279							log.info "Normal hangup, now end #{call_id}", loggable_params
280							modify_call(call_id) { |call| call.state = "completed" }
281						elsif !outbound_leg
282							log.debug "Inbound disconnected", loggable_params
283						elsif params["cause"] == "error" && params["tries"].to_i < 15
284							log.info "2nd leg error, retry", loggable_params
285							customer_repo(
286								sgx_repo: Bwmsgsv2Repo.new
287							).find(params["customer_id"]).then { |customer|
288								create_call(
289									customer, params["from"], call_id, params["applicationId"],
290									tries: params["tries"].to_i + 1
291								).then { |call|
292									outbound_transfers[params["callId"]] = call
293								}.catch(&log.method(:error))
294							}
295						else
296							log.debug "Go to voicemail", loggable_params
297							modify_call(call_id) do |call|
298								call.redirect_url = url inbound_calls_path(:voicemail)
299							end
300						end
301						""
302					end
303
304					r.on "voicemail" do
305						r.post "audio" do
306							duration = Time.parse(params["endTime"]) -
307							           Time.parse(params["startTime"])
308							next "OK<5" unless duration > 5
309
310							jmp_media_url = params["mediaUrl"].sub(
311								/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
312								"https://jmp.chat"
313							)
314
315							find_by_tel_with_fallback(
316								sgx_repo: Bwmsgsv2Repo.new,
317								transcription_enabled: false
318							).then do |customer|
319								start_transcription(customer, call_id, jmp_media_url)
320
321								m = Blather::Stanza::Message.new
322								m.chat_state = nil
323								m.from = from_jid
324								m.subject = "New Voicemail"
325								m << OOB.new(jmp_media_url)
326								customer.stanza_to(m)
327
328								"OK"
329							end
330						end
331
332						r.post "language_id" do
333							rev_ai.language_id_result(params).then { |result|
334								rev_ai.stt(
335									result["top_language"],
336									result.dig("metadata", "media_url"),
337									url(inbound_calls_path(
338										"voicemail/transcription",
339										call_id: call_id
340									)),
341									**result["metadata"].transform_keys(&:to_sym)
342								).then { "OK" }
343							}.catch_only(RevAi::Failed) { |e|
344								log_error(e)
345								"Failure logged"
346							}
347						end
348
349						r.post "transcription" do
350							rev_ai.stt_result(params, request.url).then { |result|
351								next "OK" if result["text"].to_s.empty?
352
353								customer_repo.find(
354									result.dig("metadata", "customer_id")
355								).then do |customer|
356									m = Blather::Stanza::Message.new
357									m.chat_state = nil
358									m.from = result.dig("metadata", "from_jid")
359									m.subject = "Voicemail Transcription"
360									m.body = result["text"]
361									customer.stanza_to(m)
362
363									"OK"
364								end
365							}.catch_only(RevAi::Failed) { |e|
366								log_error(e)
367								"Failure logged"
368							}
369						end
370
371						r.post do
372							find_by_tel_with_fallback(
373								sgx_repo: Bwmsgsv2Repo.new,
374								ogm_url: nil
375							).then { |c|
376								c.ogm(params["from"])
377							}.then { |ogm|
378								next hangup unless ogm
379
380								render :voicemail, locals: { ogm: ogm }
381							}.catch_only(CustomerRepo::NotFound) {
382								render "inbound/no_customer"
383							}
384						end
385					end
386
387					r.post do
388						customer_repo(
389							sgx_repo: Bwmsgsv2Repo.new
390						).find(params.fetch("customer_id")).then do |customer|
391							call_attempt_repo.find_inbound(
392								customer,
393								params["from"],
394								call_id: call_id,
395								digits: params["digits"]
396							).then { |ca| render(*ca.to_render) }
397						end
398					end
399				end
400
401				r.post do
402					customer_repo(
403						sgx_repo: Bwmsgsv2Repo.new
404					).find_by_tel(params["to"]).then { |customer|
405						reachability_repo.find(customer, params["from"]).then do |reach|
406							reach.filter(if_yes: ->(_) { hangup }) do
407								create_call(
408									customer,
409									inbound_from,
410									params["callId"],
411									params["applicationId"]
412								).then { |call|
413									next EMPromise.reject(:voicemail) unless call
414
415									outbound_transfers[params["callId"]] = call
416									render :ring, locals: { duration: 300 }
417								}
418							end
419						end
420					}.catch_only(CustomerFwd::InfiniteTimeout) { |e|
421						render :forward, locals: { fwd: e.fwd, from: params["from"] }
422					}.catch { |e|
423						log_error(e) unless e == :voicemail
424						r.json { { error: e.to_s }.to_json }
425						render :redirect, locals: { to: inbound_calls_path(:voicemail) }
426					}
427				end
428			end
429		end
430
431		r.on "outbound" do
432			r.on "calls" do
433				r.post "status" do
434					log.info "#{params['eventType']} #{params['callId']}", loggable_params
435					if params["eventType"] == "disconnect"
436						customer_id = params["from"].sub(/^(?:\+|c)/, "")
437						call_attempt_repo.ending_call(customer_id, params["callId"])
438						cdr_repo
439							.put(CDR.for_outbound(customer_id, params))
440							.catch(&method(:log_error))
441					end
442					"OK"
443				end
444
445				r.post do
446					from = params["from"].sub(/^(?:\+|c)/, "")
447					customer_repo(
448						sgx_repo: Bwmsgsv2Repo.new
449					).find_by_format(from).then { |c|
450						call_attempt_repo.find_outbound(
451							c,
452							params["to"],
453							call_id: params["callId"],
454							digits: params["digits"]
455						).then do |ca|
456							r.json { ca.to_json }
457
458							call_attempt_repo.starting_call(c, params["callId"])
459							render(*ca.to_render)
460						end
461					}.catch_only(CustomerRepo::NotFound) {
462						render "outbound/no_customer"
463					}
464				end
465			end
466		end
467
468		r.on "ogm" do
469			r.post "start" do
470				render :record_ogm, locals: { customer_id: params["customer_id"] }
471			end
472
473			r.post do
474				jmp_media_url = params["mediaUrl"].sub(
475					/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
476					"https://jmp.chat"
477				)
478				ogm = OGMDownload.new(jmp_media_url)
479				ogm.download.then do
480					FileUtils.mv(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
481					File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
482					customer_repo.find(params["customer_id"]).then do |customer|
483						customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
484					end
485				end
486			end
487		end
488
489		r.public
490	end
491end
492# rubocop:enable Metrics/ClassLength