web.rb

  1# frozen_string_literal: true
  2
  3require "digest"
  4require "forwardable"
  5require "multibases"
  6require "multihashes"
  7require "roda"
  8require "thin"
  9require "sentry-ruby"
 10
 11require_relative "lib/call_attempt_repo"
 12require_relative "lib/cdr"
 13require_relative "lib/cdr_repo"
 14require_relative "lib/oob"
 15require_relative "lib/rev_ai"
 16require_relative "lib/roda_capture"
 17require_relative "lib/roda_em_promise"
 18require_relative "lib/rack_fiber"
 19
 20class OGMDownload
 21	def initialize(url)
 22		@digest = Digest::SHA512.new
 23		@f = Tempfile.open("ogm")
 24		@req = EM::HttpRequest.new(url, tls: { verify_peer: true })
 25	end
 26
 27	def download
 28		http = @req.aget
 29		http.stream do |chunk|
 30			@digest << chunk
 31			@f.write chunk
 32		end
 33		http.then { @f.close }.catch do |e|
 34			@f.close!
 35			EMPromise.reject(e)
 36		end
 37	end
 38
 39	def cid
 40		Multibases.encode(
 41			"base58btc",
 42			[1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
 43		).pack.to_s
 44	end
 45
 46	def path
 47		@f.path
 48	end
 49end
 50
 51# rubocop:disable Metrics/ClassLength
 52class Web < Roda
 53	use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
 54	use Sentry::Rack::CaptureExceptions
 55	plugin :json_parser
 56	plugin :type_routing
 57	plugin :public
 58	plugin :render, engine: "slim"
 59	plugin RodaCapture
 60	plugin RodaEMPromise # Must go last!
 61
 62	class << self
 63		attr_reader :customer_repo, :log, :outbound_transfers
 64
 65		def run(log, *listen_on)
 66			plugin :common_logger, log, method: :info
 67			@outbound_transfers = {}
 68			Thin::Logging.logger = log
 69			Thin::Server.start(
 70				*listen_on,
 71				freeze.app,
 72				signals: false
 73			)
 74		end
 75	end
 76
 77	extend Forwardable
 78	def_delegators :'self.class', :outbound_transfers
 79	def_delegators :request, :params
 80
 81	def log
 82		opts[:common_logger]
 83	end
 84
 85	def log_error(e)
 86		log.error(
 87			"Error raised during #{request.fullpath}: #{e.class}",
 88			e,
 89			loggable_params
 90		)
 91		if e.is_a?(::Exception)
 92			Sentry.capture_exception(e)
 93		else
 94			Sentry.capture_message(e.to_s)
 95		end
 96	end
 97
 98	def loggable_params
 99		params.dup.tap do |p|
100			p.delete("to")
101			p.delete("from")
102		end
103	end
104
105	def customer_repo(**kwargs)
106		kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
107		opts[:customer_repo] || CustomerRepo.new(**kwargs)
108	end
109
110	def find_by_tel_with_fallback(sgx_repo:, **kwargs)
111		customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
112			next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
113
114			log_error(e)
115			customer_repo(
116				sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
117			).find_by_tel(params["to"])
118		}
119	end
120
121	def call_attempt_repo
122		opts[:call_attempt_repo] || CallAttemptRepo.new
123	end
124
125	def cdr_repo
126		opts[:cdr_repo] || CDRRepo.new
127	end
128
129	def rev_ai
130		RevAi.new(logger: log.child(loggable_params))
131	end
132
133	TEL_CANDIDATES = {
134		"Restricted" => "14",
135		"anonymous" => "15",
136		"Anonymous" => "16",
137		"unavailable" => "17",
138		"Unavailable" => "18"
139	}.freeze
140
141	def sanitize_tel_candidate(candidate)
142		if candidate.length < 3
143			"13;phone-context=anonymous.phone-context.soprani.ca"
144		elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
145			candidate
146		else
147			"#{TEL_CANDIDATES.fetch(candidate, '19')}" \
148				";phone-context=anonymous.phone-context.soprani.ca"
149		end
150	end
151
152	def from_jid
153		Blather::JID.new(
154			sanitize_tel_candidate(params["from"]),
155			CONFIG[:component][:jid]
156		)
157	end
158
159	def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
160		[
161			"/inbound/calls/#{call_id || params['callId']}",
162			suffix
163		].compact.join("/") +
164			(customer_id ? "?customer_id=#{customer_id}" : "")
165	end
166
167	def url(path)
168		"#{request.base_url}#{path}"
169	end
170
171	def modify_call(call_id)
172		body = Bandwidth::ApiModifyCallRequest.new
173		yield body
174		BANDWIDTH_VOICE.modify_call(
175			CONFIG[:creds][:account],
176			call_id,
177			body: body
178		)
179	rescue Bandwidth::ApiErrorResponseException
180		# If call does not exist, don't need to hang up or send to voicemail
181		# Other side must have hung up already
182		raise $! unless $!.response_code.to_s == "404"
183	end
184
185	def start_transcription(customer, call_id, media_url)
186		return unless customer.transcription_enabled
187
188		rev_ai.language_id(
189			media_url,
190			url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
191			from_jid: from_jid,
192			customer_id: customer.customer_id
193		)
194	end
195
196	route do |r|
197		r.on "inbound" do
198			r.on "calls" do
199				r.post "status" do
200					if params["eventType"] == "disconnect"
201						if (outbound_leg = outbound_transfers.delete(params["callId"]))
202							modify_call(outbound_leg) do |call|
203								call.state = "completed"
204							end
205						end
206
207						customer_repo.find_by_tel(params["to"]).then do |customer|
208							cdr_repo.put(CDR.for_inbound(customer.customer_id, params))
209						end
210					end
211					"OK"
212				end
213
214				r.on :call_id do |call_id|
215					r.post "transfer_complete" do
216						outbound_leg = outbound_transfers.delete(call_id)
217						if params["cause"] == "hangup" && params["tag"] == "connected"
218							log.info "Normal hangup, now end #{call_id}", loggable_params
219							modify_call(call_id) { |call| call.state = "completed" }
220						elsif !outbound_leg
221							log.debug "Inbound disconnected", loggable_params
222						else
223							log.debug "Go to voicemail", loggable_params
224							modify_call(call_id) do |call|
225								call.redirect_url = url inbound_calls_path(:voicemail)
226							end
227						end
228						""
229					end
230
231					r.on "voicemail" do
232						r.post "audio" do
233							duration = Time.parse(params["endTime"]) -
234							           Time.parse(params["startTime"])
235							next "OK<5" unless duration > 5
236
237							jmp_media_url = params["mediaUrl"].sub(
238								/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
239								"https://jmp.chat"
240							)
241
242							find_by_tel_with_fallback(
243								sgx_repo: Bwmsgsv2Repo.new,
244								transcription_enabled: false
245							).then do |customer|
246								start_transcription(customer, call_id, jmp_media_url)
247
248								m = Blather::Stanza::Message.new
249								m.chat_state = nil
250								m.from = from_jid
251								m.subject = "New Voicemail"
252								m.body = jmp_media_url
253								m << OOB.new(jmp_media_url, desc: "Voicemail Recording")
254								customer.stanza_to(m)
255
256								"OK"
257							end
258						end
259
260						r.post "language_id" do
261							rev_ai.language_id_result(params).then { |result|
262								rev_ai.stt(
263									result["top_language"],
264									result.dig("metadata", "media_url"),
265									url(inbound_calls_path(
266										"voicemail/transcription",
267										call_id: call_id
268									)),
269									**result["metadata"].transform_keys(&:to_sym)
270								).then { "OK" }
271							}.catch_only(RevAi::Failed) { |e|
272								log_error(e)
273								"Failure logged"
274							}
275						end
276
277						r.post "transcription" do
278							rev_ai.stt_result(params, request.url).then { |result|
279								next "OK" if result["text"].to_s.empty?
280
281								customer_repo.find(
282									result.dig("metadata", "customer_id")
283								).then do |customer|
284									m = Blather::Stanza::Message.new
285									m.chat_state = nil
286									m.from = result.dig("metadata", "from_jid")
287									m.subject = "Voicemail Transcription"
288									m.body = result["text"]
289									customer.stanza_to(m)
290
291									"OK"
292								end
293							}.catch_only(RevAi::Failed) { |e|
294								log_error(e)
295								"Failure logged"
296							}
297						end
298
299						r.post do
300							find_by_tel_with_fallback(
301								sgx_repo: Bwmsgsv2Repo.new,
302								ogm_url: nil
303							).then { |c|
304								c.ogm(params["from"]) if c.fwd.voicemail_enabled?
305							}.then { |ogm|
306								next render :hangup unless ogm
307
308								render :voicemail, locals: { ogm: ogm }
309							}.catch_only(CustomerRepo::NotFound) {
310								render "inbound/no_customer"
311							}
312						end
313					end
314
315					r.post do
316						customer_repo(
317							sgx_repo: Bwmsgsv2Repo.new
318						).find(params.fetch("customer_id")).then do |customer|
319							call_attempt_repo.find_inbound(
320								customer,
321								params["from"],
322								call_id: call_id,
323								digits: params["digits"]
324							).then { |ca| render(*ca.to_render) }
325						end
326					end
327				end
328
329				r.post do
330					customer_repo(
331						sgx_repo: Bwmsgsv2Repo.new
332					).find_by_tel(params["to"]).then { |customer|
333						EMPromise.all([
334							customer.customer_id, customer.fwd,
335							call_attempt_repo.find_inbound(
336								customer, params["from"], call_id: params["callId"]
337							)
338						])
339					}.then { |(customer_id, fwd, ca)|
340						call = ca.create_call(fwd, CONFIG[:creds][:account]) { |cc|
341							cc.from = params["from"]
342							cc.application_id = params["applicationId"]
343							cc.answer_url = url inbound_calls_path(nil, customer_id)
344							cc.disconnect_url = url inbound_calls_path(:transfer_complete)
345						}
346
347						next EMPromise.reject(:voicemail) unless call
348
349						outbound_transfers[params["callId"]] = call
350						render :ring, locals: { duration: 300 }
351					}.catch_only(CustomerFwd::InfiniteTimeout) { |e|
352						render :forward, locals: { fwd: e.fwd, from: params["from"] }
353					}.catch { |e|
354						log_error(e) unless e == :voicemail
355						render :redirect, locals: { to: inbound_calls_path(:voicemail) }
356					}
357				end
358			end
359		end
360
361		r.on "outbound" do
362			r.on "calls" do
363				r.post "status" do
364					log.info "#{params['eventType']} #{params['callId']}", loggable_params
365					if params["eventType"] == "disconnect"
366						call_attempt_repo.ending_call(c, params["callId"])
367						cdr_repo.put(CDR.for_outbound(params)).catch(&method(:log_error))
368					end
369					"OK"
370				end
371
372				r.post do
373					from = params["from"].sub(/^(?:\+|c)/, "")
374					from = from.sub(/^1/, "") if from.length > 10
375					customer_repo(
376						sgx_repo: Bwmsgsv2Repo.new
377					).find_by_format(from).then { |c|
378						call_attempt_repo.find_outbound(
379							c,
380							params["to"],
381							call_id: params["callId"],
382							digits: params["digits"]
383						).then do |ca|
384							r.json { ca.to_json }
385
386							call_attempt_repo.starting_call(c, params["callId"])
387							render(*ca.to_render)
388						end
389					}.catch_only(CustomerRepo::NotFound) {
390						render "outbound/no_customer"
391					}
392				end
393			end
394		end
395
396		r.on "ogm" do
397			r.post "start" do
398				render :record_ogm, locals: { customer_id: params["customer_id"] }
399			end
400
401			r.post do
402				jmp_media_url = params["mediaUrl"].sub(
403					/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
404					"https://jmp.chat"
405				)
406				ogm = OGMDownload.new(jmp_media_url)
407				ogm.download.then do
408					File.rename(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
409					File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
410					customer_repo.find(params["customer_id"]).then do |customer|
411						customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
412					end
413				end
414			end
415		end
416
417		r.public
418	end
419end
420# rubocop:enable Metrics/ClassLength