web.rb

  1# frozen_string_literal: true
  2
  3require "digest"
  4require "fileutils"
  5require "forwardable"
  6require "multibases"
  7require "multihashes"
  8require "roda"
  9require "sentry-ruby"
 10require "thin"
 11
 12require_relative "lib/bandwidth_tn_order"
 13require_relative "lib/call_attempt_repo"
 14require_relative "lib/trust_level_repo"
 15require_relative "lib/cdr"
 16require_relative "lib/cdr_repo"
 17require_relative "lib/oob"
 18require_relative "lib/rev_ai"
 19require_relative "lib/roda_capture"
 20require_relative "lib/roda_em_promise"
 21require_relative "lib/rack_fiber"
 22require_relative "lib/reachability_repo"
 23
 24class OGMDownload
 25	def initialize(url)
 26		@digest = Digest::SHA512.new
 27		@f = Tempfile.open("ogm")
 28		@req = EM::HttpRequest.new(url, tls: { verify_peer: true })
 29	end
 30
 31	def download
 32		http = @req.aget
 33		http.stream do |chunk|
 34			@digest << chunk
 35			@f.write chunk
 36		end
 37		http.then { @f.close }.catch do |e|
 38			@f.close!
 39			EMPromise.reject(e)
 40		end
 41	end
 42
 43	def cid
 44		Multibases.encode(
 45			"base58btc",
 46			[1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
 47		).pack.to_s
 48	end
 49
 50	def path
 51		@f.path
 52	end
 53end
 54
 55# rubocop:disable Metrics/ClassLength
 56class Web < Roda
 57	use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
 58	use Sentry::Rack::CaptureExceptions
 59	plugin :json_parser
 60	plugin :type_routing
 61	plugin :public
 62	plugin :render, engine: "slim"
 63	plugin RodaCapture
 64	plugin RodaEMPromise # Must go last!
 65
 66	class << self
 67		attr_reader :customer_repo, :log, :outbound_transfers
 68
 69		def run(log, *listen_on)
 70			plugin :common_logger, log, method: :info
 71			@outbound_transfers = {}
 72			Thin::Logging.logger = log
 73			Thin::Server.start(
 74				*listen_on,
 75				freeze.app,
 76				signals: false
 77			)
 78		end
 79	end
 80
 81	extend Forwardable
 82	def_delegators :'self.class', :outbound_transfers
 83	def_delegators :request, :params
 84
 85	def log
 86		opts[:common_logger]
 87	end
 88
 89	def log_error(e)
 90		log.error(
 91			"Error raised during #{request.fullpath}: #{e.class}",
 92			e,
 93			loggable_params
 94		)
 95		if e.is_a?(::Exception)
 96			Sentry.capture_exception(e)
 97		else
 98			Sentry.capture_message(e.to_s)
 99		end
100	end
101
102	def loggable_params
103		params.dup.tap do |p|
104			p.delete("to")
105			p.delete("from")
106		end
107	end
108
109	def customer_repo(**kwargs)
110		kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
111		opts[:customer_repo] || CustomerRepo.new(**kwargs)
112	end
113
114	def trust_level_repo(**kwargs)
115		opts[:trust_level_repo] || TrustLevelRepo.new(**kwargs)
116	end
117
118	def reachability_repo(**kwargs)
119		opts[:reachability_repo] || ReachabilityRepo::Voice.new(**kwargs)
120	end
121
122	def find_by_tel_with_fallback(sgx_repo:, **kwargs)
123		customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
124			next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
125
126			log_error(e)
127			customer_repo(
128				sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
129			).find_by_tel(params["to"])
130		}
131	end
132
133	def call_attempt_repo
134		opts[:call_attempt_repo] || CallAttemptRepo.new
135	end
136
137	def cdr_repo
138		opts[:cdr_repo] || CDRRepo.new
139	end
140
141	def rev_ai
142		RevAi.new(logger: log.child(loggable_params))
143	end
144
145	TEL_CANDIDATES = {
146		"Restricted" => "14",
147		"anonymous" => "15",
148		"Anonymous" => "16",
149		"unavailable" => "17",
150		"Unavailable" => "18"
151	}.freeze
152
153	def sanitize_tel_candidate(candidate)
154		if candidate.length < 3
155			"13;phone-context=anonymous.phone-context.soprani.ca"
156		elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
157			candidate
158		else
159			"#{TEL_CANDIDATES.fetch(candidate, '19')}" \
160				";phone-context=anonymous.phone-context.soprani.ca"
161		end
162	end
163
164	def from_jid
165		Blather::JID.new(
166			sanitize_tel_candidate(params["from"]),
167			CONFIG[:component][:jid]
168		)
169	end
170
171	def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
172		[
173			"/inbound/calls/#{call_id || params['callId']}",
174			suffix
175		].compact.join("/") +
176			(customer_id ? "?customer_id=#{customer_id}" : "")
177	end
178
179	def url(path)
180		"#{request.base_url}#{path}"
181	end
182
183	def modify_call(call_id)
184		body = Bandwidth::ApiModifyCallRequest.new
185		yield body
186		BANDWIDTH_VOICE.modify_call(
187			CONFIG[:creds][:account],
188			call_id,
189			body: body
190		)
191	rescue Bandwidth::APIException
192		# If call does not exist, don't need to hang up or send to voicemail
193		# Other side must have hung up already
194		raise $! unless [404, 409].include?($!.response_code)
195	end
196
197	def start_transcription(customer, call_id, media_url)
198		return unless customer.transcription_enabled
199
200		rev_ai.language_id(
201			media_url,
202			url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
203			from_jid: from_jid,
204			customer_id: customer.customer_id
205		)
206	end
207
208	def call_inputs(customer, from, call_id)
209		EMPromise.all([
210			customer.customer_id, customer.fwd,
211			call_attempt_repo.find_inbound(customer, from, call_id: call_id)
212		])
213	end
214
215	def json_call(customer, fwd, call_attempt)
216		raise "Not allowed" unless params["secret"] == CONFIG[:component][:secret]
217
218		customer.ogm(params["from"]).then do |ogm|
219			call_attempt.as_json.merge(
220				fwd: fwd,
221				ogm: ogm
222			).to_json
223		end
224	end
225
226	def transfer_complete_url(customer_id, call_id, tries)
227		url(
228			inbound_calls_path(:transfer_complete, customer_id, call_id: call_id)
229		) + (tries ? "&tries=#{tries}" : "")
230	end
231
232	def create_call(customer, from, call_id, application_id, tries: nil)
233		call_inputs(customer, from, call_id).then do |(customer_id, fwd, ca)|
234			request.json { json_call(customer, fwd, ca) }
235			ca.create_call(fwd, CONFIG[:creds][:account]) do |cc|
236				cc.from = from
237				cc.application_id = application_id
238				cc.answer_url = url inbound_calls_path(nil, customer_id)
239				cc.disconnect_url = transfer_complete_url(customer_id, call_id, tries)
240			end
241		end
242	end
243
244	def inbound_from
245		if params["from"] && params["from"] =~ /\A\+?\d+\Z/
246			params["from"]
247		else
248			log.info "Inbound call with unusual from: #{params['from']}"
249			TEL_CANDIDATES.fetch(params["from"], "19")
250		end
251	end
252
253	def hangup
254		request.json { {}.to_json }
255
256		render :hangup
257	end
258
259	route do |r|
260		r.get "healthcheck" do
261			"OK"
262		end
263
264		r.on "inbound" do
265			r.on "calls" do
266				r.post "status" do
267					if params["eventType"] == "disconnect"
268						if (outbound_leg = outbound_transfers.delete(params["callId"]))
269							modify_call(outbound_leg) do |call|
270								call.state = "completed"
271							end
272						end
273
274						customer_repo.find_by_tel(params["to"]).then do |customer|
275							cdr_repo.put(CDR.for_inbound(customer.customer_id, params))
276						end
277					end
278					"OK"
279				end
280
281				r.on :call_id do |call_id|
282					r.post "transfer_complete" do
283						outbound_leg = outbound_transfers.delete(call_id)
284						if params["cause"] == "hangup" && params["tag"] == "connected"
285							log.info "Normal hangup, now end #{call_id}", loggable_params
286							modify_call(call_id) { |call| call.state = "completed" }
287						elsif !outbound_leg
288							log.debug "Inbound disconnected", loggable_params
289						elsif params["cause"] == "error" && params["tries"].to_i < 15
290							log.info "2nd leg error, retry", loggable_params
291							customer_repo(
292								sgx_repo: Bwmsgsv2Repo.new
293							).find(params["customer_id"]).then { |customer|
294								create_call(
295									customer, params["from"], call_id, params["applicationId"],
296									tries: params["tries"].to_i + 1
297								).then { |call|
298									outbound_transfers[params["callId"]] = call
299								}.catch(&log.method(:error))
300							}
301						else
302							log.debug "Go to voicemail", loggable_params
303							modify_call(call_id) do |call|
304								call.redirect_url = url inbound_calls_path(:voicemail)
305							end
306						end
307						""
308					end
309
310					r.on "voicemail" do
311						r.post "audio" do
312							duration = Time.parse(params["endTime"]) -
313							           Time.parse(params["startTime"])
314							next "OK<5" unless duration > 5
315
316							jmp_media_url = params["mediaUrl"].sub(
317								/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
318								"https://jmp.chat"
319							)
320
321							find_by_tel_with_fallback(
322								sgx_repo: Bwmsgsv2Repo.new,
323								transcription_enabled: false
324							).then do |customer|
325								start_transcription(customer, call_id, jmp_media_url)
326
327								m = Blather::Stanza::Message.new
328								m.chat_state = nil
329								m.from = from_jid
330								m.subject = "New Voicemail"
331								m << OOB.new(jmp_media_url)
332								customer.stanza_to(m)
333
334								"OK"
335							end
336						end
337
338						r.post "language_id" do
339							rev_ai.language_id_result(params).then { |result|
340								rev_ai.stt(
341									result["top_language"],
342									result.dig("metadata", "media_url"),
343									url(inbound_calls_path(
344										"voicemail/transcription",
345										call_id: call_id
346									)),
347									**result["metadata"].transform_keys(&:to_sym)
348								).then { "OK" }
349							}.catch_only(RevAi::Failed) { |e|
350								log_error(e)
351								"Failure logged"
352							}
353						end
354
355						r.post "transcription" do
356							rev_ai.stt_result(params, request.url).then { |result|
357								next "OK" if result["text"].to_s.empty?
358
359								customer_repo.find(
360									result.dig("metadata", "customer_id")
361								).then do |customer|
362									m = Blather::Stanza::Message.new
363									m.chat_state = nil
364									m.from = result.dig("metadata", "from_jid")
365									m.subject = "Voicemail Transcription"
366									m.body = result["text"]
367									customer.stanza_to(m)
368
369									"OK"
370								end
371							}.catch_only(RevAi::Failed) { |e|
372								log_error(e)
373								"Failure logged"
374							}
375						end
376
377						r.post do
378							find_by_tel_with_fallback(
379								sgx_repo: Bwmsgsv2Repo.new,
380								ogm_url: nil
381							).then { |c|
382								c.ogm(params["from"])
383							}.then { |ogm|
384								next hangup unless ogm
385
386								render :voicemail, locals: { ogm: ogm }
387							}.catch_only(CustomerRepo::NotFound) {
388								render "inbound/no_customer"
389							}
390						end
391					end
392
393					r.post do
394						customer_repo(
395							sgx_repo: Bwmsgsv2Repo.new
396						).find(params.fetch("customer_id")).then do |customer|
397							call_attempt_repo.find_inbound(
398								customer,
399								params["from"],
400								call_id: call_id,
401								digits: params["digits"]
402							).then { |ca| render(*ca.to_render) }
403						end
404					end
405				end
406
407				r.post do
408					customer_repo(
409						sgx_repo: Bwmsgsv2Repo.new
410					).find_by_tel(params["to"]).then { |customer|
411						reachability_repo.find(customer, params["from"]).then do |reach|
412							reach.filter(if_yes: ->(_) { hangup }) do
413								create_call(
414									customer,
415									inbound_from,
416									params["callId"],
417									params["applicationId"]
418								).then { |call|
419									next EMPromise.reject(:voicemail) unless call
420
421									outbound_transfers[params["callId"]] = call
422									render :ring, locals: { duration: 300 }
423								}
424							end
425						end
426					}.catch_only(CustomerFwd::InfiniteTimeout) { |e|
427						render :forward, locals: { fwd: e.fwd, from: params["from"] }
428					}.catch { |e|
429						log_error(e) unless e == :voicemail
430						r.json { { error: e.to_s }.to_json }
431						render :redirect, locals: { to: inbound_calls_path(:voicemail) }
432					}
433				end
434			end
435		end
436
437		r.on "outbound" do
438			r.on "calls" do
439				r.post "status" do
440					log.info "#{params['eventType']} #{params['callId']}", loggable_params
441					if params["eventType"] == "disconnect"
442						from = params["from"].sub(/^(?:\+|c)/, "")
443
444						customer_repo.find_by_format(from).then { |customer|
445							trust_level_repo.find(customer).then { |tl| [customer, tl] }
446						}.then { |(customer, trust_level)|
447							next "OK" unless trust_level.write_cdr?
448
449							customer_id = customer.customer_id
450							call_attempt_repo.ending_call(customer_id, params["callId"])
451							cdr_repo
452								.put(CDR.for_outbound(customer_id, params))
453								.catch(&method(:log_error))
454
455							"OK"
456						}
457					else
458						"OK"
459					end
460				end
461
462				r.post do
463					from = params["from"].sub(/^(?:\+|c)/, "")
464					customer_repo(
465						sgx_repo: Bwmsgsv2Repo.new
466					).find_by_format(from).then { |c|
467						call_attempt_repo.find_outbound(
468							c,
469							params["to"],
470							call_id: params["callId"],
471							digits: params["digits"]
472						).then do |ca|
473							r.json { ca.to_json }
474
475							call_attempt_repo.starting_call(c, params["callId"]).then do
476								render(*ca.to_render)
477							end
478						end
479					}.catch_only(CustomerRepo::NotFound) {
480						log.info "No customer for outbound call matching: #{params['from']}"
481						render "outbound/no_customer"
482					}
483				end
484			end
485		end
486
487		r.on "ogm" do
488			r.post "start" do
489				render :record_ogm, locals: { customer_id: params["customer_id"] }
490			end
491
492			r.post do
493				jmp_media_url = params["mediaUrl"].sub(
494					/\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
495					"https://jmp.chat"
496				)
497				ogm = OGMDownload.new(jmp_media_url)
498				ogm.download.then do
499					FileUtils.mv(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
500					File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
501					customer_repo.find(params["customer_id"]).then do |customer|
502						customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
503					end
504				end
505			end
506		end
507
508		r.on "orders" do
509			token = r.env["HTTP_AUTHORIZATION"].to_s.sub(/\ABearer\s+/, "")
510
511			r.json do
512				if (orderer = CONFIG[:bulk_order_tokens][token.to_sym])
513					r.get :order_id do |order_id|
514						BandwidthTNOrder.get(order_id).then do |order|
515							if order.status == :complete
516								Transaction.new(
517									customer_id: orderer[:customer_id],
518									transaction_id: order.id,
519									amount: order.tels.length * -1.75,
520									note: "Bulk order",
521									ignore_duplicate: true
522								).insert.then do
523									{
524										id: order.id,
525										status: order.status,
526										tels: order.tels
527									}.to_json
528								end
529							else
530								{ id: order.id, status: order.status }.to_json
531							end
532						end
533					end
534
535					r.on "tels" do
536						r.on :tel, method: :delete do |tel|
537							tn_repo = BandwidthTnRepo.new
538							tn = tn_repo.find(tel)
539							if tn&.dig(:sip_peer, :peer_id).to_s == orderer[:peer_id]
540								tn_repo.disconnect(tel, orderer[:customer_id])
541								{ status: "disconnected" }.to_json
542							else
543								response.status = 401
544								{ error: "Number not found" }.to_json
545							end
546						end
547					end
548
549					r.post do
550						customer_repo.find(orderer[:customer_id]).then do |customer|
551							if customer.balance >= 1.75 * params["quantity"].to_i
552								BandwidthTNOrder.create_custom(
553									name: "Bulk order",
554									customer_order_id: orderer[:customer_id],
555									peer_id: orderer[:peer_id],
556									state_search_and_order_type: {
557										quantity: params["quantity"].to_i,
558										state: ["WA", "CA", "TX", "IL", "NY", "FL"].sample
559									}
560								).then do |order|
561									{ id: order.id }.to_json
562								end
563							else
564								response.status = 402
565								{ error: "Balance too low" }.to_json
566							end
567						end
568					end
569				else
570					response.status = 401
571					{ error: "Bad token" }.to_json
572				end
573			end
574		end
575
576		r.public
577	end
578end
579# rubocop:enable Metrics/ClassLength