1# frozen_string_literal: true
  2
  3require "digest"
  4require "fileutils"
  5require "forwardable"
  6require "multibases"
  7require "multihashes"
  8require "roda"
  9require "sentry-ruby"
 10require "thin"
 11
 12require_relative "lib/call_attempt_repo"
 13require_relative "lib/trust_level_repo"
 14require_relative "lib/cdr"
 15require_relative "lib/cdr_repo"
 16require_relative "lib/oob"
 17require_relative "lib/rev_ai"
 18require_relative "lib/roda_capture"
 19require_relative "lib/roda_em_promise"
 20require_relative "lib/rack_fiber"
 21require_relative "lib/reachability_repo"
 22
 23class OGMDownload
 24	def initialize(url)
 25		@digest = Digest::SHA512.new
 26		@f = Tempfile.open("ogm")
 27		@req = EM::HttpRequest.new(url, tls: { verify_peer: true })
 28	end
 29
 30	def download
 31		http = @req.aget
 32		http.stream do |chunk|
 33			@digest << chunk
 34			@f.write chunk
 35		end
 36		http.then { @f.close }.catch do |e|
 37			@f.close!
 38			EMPromise.reject(e)
 39		end
 40	end
 41
 42	def cid
 43		Multibases.encode(
 44			"base58btc",
 45			[1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
 46		).pack.to_s
 47	end
 48
 49	def path
 50		@f.path
 51	end
 52end
 53
 54# rubocop:disable Metrics/ClassLength
 55class Web < Roda
 56	use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
 57	use Sentry::Rack::CaptureExceptions
 58	plugin :json_parser
 59	plugin :type_routing
 60	plugin :public
 61	plugin :render, engine: "slim"
 62	plugin RodaCapture
 63	plugin RodaEMPromise # Must go last!
 64
 65	class << self
 66		attr_reader :customer_repo, :log, :outbound_transfers
 67
 68		def run(log, *listen_on)
 69			plugin :common_logger, log, method: :info
 70			@outbound_transfers = {}
 71			Thin::Logging.logger = log
 72			Thin::Server.start(
 73				*listen_on,
 74				freeze.app,
 75				signals: false
 76			)
 77		end
 78	end
 79
 80	extend Forwardable
 81	def_delegators :'self.class', :outbound_transfers
 82	def_delegators :request, :params
 83
 84	def log
 85		opts[:common_logger]
 86	end
 87
 88	def log_error(e)
 89		log.error(
 90			"Error raised during #{request.fullpath}: #{e.class}",
 91			e,
 92			loggable_params
 93		)
 94		if e.is_a?(::Exception)
 95			Sentry.capture_exception(e)
 96		else
 97			Sentry.capture_message(e.to_s)
 98		end
 99	end
100
101	def loggable_params
102		params.dup.tap do |p|
103			p.delete("to")
104			p.delete("from")
105		end
106	end
107
108	def customer_repo(**kwargs)
109		kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
110		opts[:customer_repo] || CustomerRepo.new(**kwargs)
111	end
112
113	def trust_level_repo(**kwargs)
114		opts[:trust_level_repo] || TrustLevelRepo.new(**kwargs)
115	end
116
117	def reachability_repo(**kwargs)
118		opts[:reachability_repo] || ReachabilityRepo::Voice.new(**kwargs)
119	end
120
121	def find_by_tel_with_fallback(sgx_repo:, **kwargs)
122		customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
123			next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
124
125			log_error(e)
126			customer_repo(
127				sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
128			).find_by_tel(params["to"])
129		}
130	end
131
132	def call_attempt_repo
133		opts[:call_attempt_repo] || CallAttemptRepo.new
134	end
135
136	def cdr_repo
137		opts[:cdr_repo] || CDRRepo.new
138	end
139
140	def rev_ai
141		RevAi.new(logger: log.child(loggable_params))
142	end
143
144	TEL_CANDIDATES = {
145		"Restricted" => "14",
146		"anonymous" => "15",
147		"Anonymous" => "16",
148		"unavailable" => "17",
149		"Unavailable" => "18"
150	}.freeze
151
152	def sanitize_tel_candidate(candidate)
153		if candidate.length < 3
154			"13;phone-context=anonymous.phone-context.soprani.ca"
155		elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
156			candidate
157		else
158			"#{TEL_CANDIDATES.fetch(candidate, '19')}" \
159				";phone-context=anonymous.phone-context.soprani.ca"
160		end
161	end
162
163	def from_jid
164		Blather::JID.new(
165			sanitize_tel_candidate(params["from"]),
166			CONFIG[:component][:jid]
167		)
168	end
169
170	def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
171		[
172			"/inbound/calls/#{call_id || params['callId']}",
173			suffix
174		].compact.join("/") +
175			(customer_id ? "?customer_id=#{customer_id}" : "")
176	end
177
178	def url(path)
179		"#{request.base_url}#{path}"
180	end
181
182	def modify_call(call_id)
183		body = Bandwidth::ApiModifyCallRequest.new
184		yield body
185		BANDWIDTH_VOICE.modify_call(
186			CONFIG[:creds][:account],
187			call_id,
188			body: body
189		)
190	rescue Bandwidth::APIException
191		# If call does not exist, don't need to hang up or send to voicemail
192		# Other side must have hung up already
193		raise $! unless [404, 409].include?($!.response_code)
194	end
195
196	def start_transcription(customer, call_id, media_url)
197		return unless customer.transcription_enabled
198
199		rev_ai.language_id(
200			media_url,
201			url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
202			from_jid: from_jid,
203			customer_id: customer.customer_id
204		)
205	end
206
207	def call_inputs(customer, from, call_id)
208		EMPromise.all([
209			customer.customer_id, customer.fwd,
210			call_attempt_repo.find_inbound(customer, from, call_id: call_id)
211		])
212	end
213
214	def json_call(customer, fwd, call_attempt)
215		raise "Not allowed" unless params["secret"] == CONFIG[:component][:secret]
216
217		customer.ogm(params["from"]).then do |ogm|
218			call_attempt.as_json.merge(
219				fwd: fwd,
220				ogm: ogm
221			).to_json
222		end
223	end
224
225	def transfer_complete_url(customer_id, call_id, tries)
226		url(
227			inbound_calls_path(:transfer_complete, customer_id, call_id: call_id)
228		) + (tries ? "&tries=#{tries}" : "")
229	end
230
231	def create_call(customer, from, call_id, application_id, tries: nil)
232		call_inputs(customer, from, call_id).then do |(customer_id, fwd, ca)|
233			request.json { json_call(customer, fwd, ca) }
234			ca.create_call(fwd, CONFIG[:creds][:account]) do |cc|
235				cc.from = from
236				cc.application_id = application_id
237				cc.answer_url = url inbound_calls_path(nil, customer_id)
238				cc.disconnect_url = transfer_complete_url(customer_id, call_id, tries)
239			end
240		end
241	end
242
243	def inbound_from
244		if params["from"] && params["from"] =~ /\A\+?\d+\Z/
245			params["from"]
246		else
247			log.info "Inbound call with unusual from: #{params['from']}"
248			TEL_CANDIDATES.fetch(params["from"], "19")
249		end
250	end
251
252	def hangup
253		request.json { {}.to_json }
254
255		render :hangup
256	end
257
258	route do |r|
259		r.get "healthcheck" do
260			"OK"
261		end
262
263		r.on "inbound" do
264			r.on "calls" do
265				r.post "status" do
266					if params["eventType"] == "disconnect"
267						if (outbound_leg = outbound_transfers.delete(params["callId"]))
268							modify_call(outbound_leg) do |call|
269								call.state = "completed"
270							end
271						end
272
273						customer_repo.find_by_tel(params["to"]).then do |customer|
274							cdr_repo.put(CDR.for_inbound(customer.customer_id, params))
275						end
276					end
277					"OK"
278				end
279
280				r.on :call_id do |call_id|
281					r.post "transfer_complete" do
282						outbound_leg = outbound_transfers.delete(call_id)
283						if params["cause"] == "hangup" && params["tag"] == "connected"
284							log.info "Normal hangup, now end #{call_id}", loggable_params
285							modify_call(call_id) { |call| call.state = "completed" }
286						elsif !outbound_leg
287							log.debug "Inbound disconnected", loggable_params
288						elsif params["cause"] == "error" && params["tries"].to_i < 15
289							log.info "2nd leg error, retry", loggable_params
290							customer_repo(
291								sgx_repo: Bwmsgsv2Repo.new
292							).find(params["customer_id"]).then { |customer|
293								create_call(
294									customer, params["from"], call_id, params["applicationId"],
295									tries: params["tries"].to_i + 1
296								).then { |call|
297									outbound_transfers[params["callId"]] = call
298								}.catch(&log.method(:error))
299							}
300						else
301							log.debug "Go to voicemail", loggable_params
302							modify_call(call_id) do |call|
303								call.redirect_url = url inbound_calls_path(:voicemail)
304							end
305						end
306						""
307					end
308
309					r.on "voicemail" do
310						r.post "audio" do
311							duration = Time.parse(params["endTime"]) -
312							           Time.parse(params["startTime"])
313							next "OK<5" unless duration > 5
314
315							jmp_media_url = params["mediaUrl"].sub(
316								/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
317								"https://jmp.chat"
318							)
319
320							find_by_tel_with_fallback(
321								sgx_repo: Bwmsgsv2Repo.new,
322								transcription_enabled: false
323							).then do |customer|
324								start_transcription(customer, call_id, jmp_media_url)
325
326								m = Blather::Stanza::Message.new
327								m.chat_state = nil
328								m.from = from_jid
329								m.subject = "New Voicemail"
330								m << OOB.new(jmp_media_url)
331								customer.stanza_to(m)
332
333								"OK"
334							end
335						end
336
337						r.post "language_id" do
338							rev_ai.language_id_result(params).then { |result|
339								rev_ai.stt(
340									result["top_language"],
341									result.dig("metadata", "media_url"),
342									url(inbound_calls_path(
343										"voicemail/transcription",
344										call_id: call_id
345									)),
346									**result["metadata"].transform_keys(&:to_sym)
347								).then { "OK" }
348							}.catch_only(RevAi::Failed) { |e|
349								log_error(e)
350								"Failure logged"
351							}
352						end
353
354						r.post "transcription" do
355							rev_ai.stt_result(params, request.url).then { |result|
356								next "OK" if result["text"].to_s.empty?
357
358								customer_repo.find(
359									result.dig("metadata", "customer_id")
360								).then do |customer|
361									m = Blather::Stanza::Message.new
362									m.chat_state = nil
363									m.from = result.dig("metadata", "from_jid")
364									m.subject = "Voicemail Transcription"
365									m.body = result["text"]
366									customer.stanza_to(m)
367
368									"OK"
369								end
370							}.catch_only(RevAi::Failed) { |e|
371								log_error(e)
372								"Failure logged"
373							}
374						end
375
376						r.post do
377							find_by_tel_with_fallback(
378								sgx_repo: Bwmsgsv2Repo.new,
379								ogm_url: nil
380							).then { |c|
381								c.ogm(params["from"])
382							}.then { |ogm|
383								next hangup unless ogm
384
385								render :voicemail, locals: { ogm: ogm }
386							}.catch_only(CustomerRepo::NotFound) {
387								render "inbound/no_customer"
388							}
389						end
390					end
391
392					r.post do
393						customer_repo(
394							sgx_repo: Bwmsgsv2Repo.new
395						).find(params.fetch("customer_id")).then do |customer|
396							call_attempt_repo.find_inbound(
397								customer,
398								params["from"],
399								call_id: call_id,
400								digits: params["digits"]
401							).then { |ca| render(*ca.to_render) }
402						end
403					end
404				end
405
406				r.post do
407					customer_repo(
408						sgx_repo: Bwmsgsv2Repo.new
409					).find_by_tel(params["to"]).then { |customer|
410						reachability_repo.find(customer, params["from"]).then do |reach|
411							reach.filter(if_yes: ->(_) { hangup }) do
412								create_call(
413									customer,
414									inbound_from,
415									params["callId"],
416									params["applicationId"]
417								).then { |call|
418									next EMPromise.reject(:voicemail) unless call
419
420									outbound_transfers[params["callId"]] = call
421									render :ring, locals: { duration: 300 }
422								}
423							end
424						end
425					}.catch_only(CustomerFwd::InfiniteTimeout) { |e|
426						render :forward, locals: { fwd: e.fwd, from: params["from"] }
427					}.catch { |e|
428						log_error(e) unless e == :voicemail
429						r.json { { error: e.to_s }.to_json }
430						render :redirect, locals: { to: inbound_calls_path(:voicemail) }
431					}
432				end
433			end
434		end
435
436		r.on "outbound" do
437			r.on "calls" do
438				r.post "status" do
439					log.info "#{params['eventType']} #{params['callId']}", loggable_params
440					if params["eventType"] == "disconnect"
441						from = params["from"].sub(/^(?:\+|c)/, "")
442
443						customer_repo.find_by_format(from).then { |customer|
444							trust_level_repo.find(customer).then { |tl| [customer, tl] }
445						}.then { |(customer, trust_level)|
446							next "OK" unless trust_level.write_cdr?
447
448							customer_id = customer.customer_id
449							call_attempt_repo.ending_call(customer_id, params["callId"])
450							cdr_repo
451								.put(CDR.for_outbound(customer_id, params))
452								.catch(&method(:log_error))
453
454							"OK"
455						}
456					else
457						"OK"
458					end
459				end
460
461				r.post do
462					from = params["from"].sub(/^(?:\+|c)/, "")
463					customer_repo(
464						sgx_repo: Bwmsgsv2Repo.new
465					).find_by_format(from).then { |c|
466						call_attempt_repo.find_outbound(
467							c,
468							params["to"],
469							call_id: params["callId"],
470							digits: params["digits"]
471						).then do |ca|
472							r.json { ca.to_json }
473
474							call_attempt_repo.starting_call(c, params["callId"]).then do
475								render(*ca.to_render)
476							end
477						end
478					}.catch_only(CustomerRepo::NotFound) {
479						render "outbound/no_customer"
480					}
481				end
482			end
483		end
484
485		r.on "ogm" do
486			r.post "start" do
487				render :record_ogm, locals: { customer_id: params["customer_id"] }
488			end
489
490			r.post do
491				jmp_media_url = params["mediaUrl"].sub(
492					/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
493					"https://jmp.chat"
494				)
495				ogm = OGMDownload.new(jmp_media_url)
496				ogm.download.then do
497					FileUtils.mv(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
498					File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
499					customer_repo.find(params["customer_id"]).then do |customer|
500						customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
501					end
502				end
503			end
504		end
505
506		r.public
507	end
508end
509# rubocop:enable Metrics/ClassLength