1# frozen_string_literal: true
2
3require "digest"
4require "fileutils"
5require "forwardable"
6require "multibases"
7require "multihashes"
8require "roda"
9require "sentry-ruby"
10require "thin"
11
12require_relative "lib/call_attempt_repo"
13require_relative "lib/trust_level_repo"
14require_relative "lib/cdr"
15require_relative "lib/cdr_repo"
16require_relative "lib/oob"
17require_relative "lib/rev_ai"
18require_relative "lib/roda_capture"
19require_relative "lib/roda_em_promise"
20require_relative "lib/rack_fiber"
21require_relative "lib/reachability_repo"
22
23class OGMDownload
24 def initialize(url)
25 @digest = Digest::SHA512.new
26 @f = Tempfile.open("ogm")
27 @req = EM::HttpRequest.new(url, tls: { verify_peer: true })
28 end
29
30 def download
31 http = @req.aget
32 http.stream do |chunk|
33 @digest << chunk
34 @f.write chunk
35 end
36 http.then { @f.close }.catch do |e|
37 @f.close!
38 EMPromise.reject(e)
39 end
40 end
41
42 def cid
43 Multibases.encode(
44 "base58btc",
45 [1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
46 ).pack.to_s
47 end
48
49 def path
50 @f.path
51 end
52end
53
54# rubocop:disable Metrics/ClassLength
55class Web < Roda
56 use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
57 use Sentry::Rack::CaptureExceptions
58 plugin :json_parser
59 plugin :type_routing
60 plugin :public
61 plugin :render, engine: "slim"
62 plugin RodaCapture
63 plugin RodaEMPromise # Must go last!
64
65 class << self
66 attr_reader :customer_repo, :log, :outbound_transfers
67
68 def run(log, *listen_on)
69 plugin :common_logger, log, method: :info
70 @outbound_transfers = {}
71 Thin::Logging.logger = log
72 Thin::Server.start(
73 *listen_on,
74 freeze.app,
75 signals: false
76 )
77 end
78 end
79
80 extend Forwardable
81 def_delegators :'self.class', :outbound_transfers
82 def_delegators :request, :params
83
84 def log
85 opts[:common_logger]
86 end
87
88 def log_error(e)
89 log.error(
90 "Error raised during #{request.fullpath}: #{e.class}",
91 e,
92 loggable_params
93 )
94 if e.is_a?(::Exception)
95 Sentry.capture_exception(e)
96 else
97 Sentry.capture_message(e.to_s)
98 end
99 end
100
101 def loggable_params
102 params.dup.tap do |p|
103 p.delete("to")
104 p.delete("from")
105 end
106 end
107
108 def customer_repo(**kwargs)
109 kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
110 opts[:customer_repo] || CustomerRepo.new(**kwargs)
111 end
112
113 def trust_level_repo(**kwargs)
114 opts[:trust_level_repo] || TrustLevelRepo.new(**kwargs)
115 end
116
117 def reachability_repo(**kwargs)
118 opts[:reachability_repo] || ReachabilityRepo::Voice.new(**kwargs)
119 end
120
121 def find_by_tel_with_fallback(sgx_repo:, **kwargs)
122 customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
123 next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
124
125 log_error(e)
126 customer_repo(
127 sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
128 ).find_by_tel(params["to"])
129 }
130 end
131
132 def call_attempt_repo
133 opts[:call_attempt_repo] || CallAttemptRepo.new
134 end
135
136 def cdr_repo
137 opts[:cdr_repo] || CDRRepo.new
138 end
139
140 def rev_ai
141 RevAi.new(logger: log.child(loggable_params))
142 end
143
144 TEL_CANDIDATES = {
145 "Restricted" => "14",
146 "anonymous" => "15",
147 "Anonymous" => "16",
148 "unavailable" => "17",
149 "Unavailable" => "18"
150 }.freeze
151
152 def sanitize_tel_candidate(candidate)
153 if candidate.length < 3
154 "13;phone-context=anonymous.phone-context.soprani.ca"
155 elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
156 candidate
157 else
158 "#{TEL_CANDIDATES.fetch(candidate, '19')}" \
159 ";phone-context=anonymous.phone-context.soprani.ca"
160 end
161 end
162
163 def from_jid
164 Blather::JID.new(
165 sanitize_tel_candidate(params["from"]),
166 CONFIG[:component][:jid]
167 )
168 end
169
170 def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
171 [
172 "/inbound/calls/#{call_id || params['callId']}",
173 suffix
174 ].compact.join("/") +
175 (customer_id ? "?customer_id=#{customer_id}" : "")
176 end
177
178 def url(path)
179 "#{request.base_url}#{path}"
180 end
181
182 def modify_call(call_id)
183 body = Bandwidth::ApiModifyCallRequest.new
184 yield body
185 BANDWIDTH_VOICE.modify_call(
186 CONFIG[:creds][:account],
187 call_id,
188 body: body
189 )
190 rescue Bandwidth::APIException
191 # If call does not exist, don't need to hang up or send to voicemail
192 # Other side must have hung up already
193 raise $! unless [404, 409].include?($!.response_code)
194 end
195
196 def start_transcription(customer, call_id, media_url)
197 return unless customer.transcription_enabled
198
199 rev_ai.language_id(
200 media_url,
201 url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
202 from_jid: from_jid,
203 customer_id: customer.customer_id
204 )
205 end
206
207 def call_inputs(customer, from, call_id)
208 EMPromise.all([
209 customer.customer_id, customer.fwd,
210 call_attempt_repo.find_inbound(customer, from, call_id: call_id)
211 ])
212 end
213
214 def json_call(customer, fwd, call_attempt)
215 raise "Not allowed" unless params["secret"] == CONFIG[:component][:secret]
216
217 customer.ogm(params["from"]).then do |ogm|
218 call_attempt.as_json.merge(
219 fwd: fwd,
220 ogm: ogm
221 ).to_json
222 end
223 end
224
225 def transfer_complete_url(customer_id, call_id, tries)
226 url(
227 inbound_calls_path(:transfer_complete, customer_id, call_id: call_id)
228 ) + (tries ? "&tries=#{tries}" : "")
229 end
230
231 def create_call(customer, from, call_id, application_id, tries: nil)
232 call_inputs(customer, from, call_id).then do |(customer_id, fwd, ca)|
233 request.json { json_call(customer, fwd, ca) }
234 ca.create_call(fwd, CONFIG[:creds][:account]) do |cc|
235 cc.from = from
236 cc.application_id = application_id
237 cc.answer_url = url inbound_calls_path(nil, customer_id)
238 cc.disconnect_url = transfer_complete_url(customer_id, call_id, tries)
239 end
240 end
241 end
242
243 def inbound_from
244 if params["from"] && params["from"] =~ /\A\+?\d+\Z/
245 params["from"]
246 else
247 log.info "Inbound call with unusual from: #{params['from']}"
248 TEL_CANDIDATES.fetch(params["from"], "19")
249 end
250 end
251
252 def hangup
253 request.json { {}.to_json }
254
255 render :hangup
256 end
257
258 route do |r|
259 r.get "healthcheck" do
260 "OK"
261 end
262
263 r.on "inbound" do
264 r.on "calls" do
265 r.post "status" do
266 if params["eventType"] == "disconnect"
267 if (outbound_leg = outbound_transfers.delete(params["callId"]))
268 modify_call(outbound_leg) do |call|
269 call.state = "completed"
270 end
271 end
272
273 customer_repo.find_by_tel(params["to"]).then do |customer|
274 cdr_repo.put(CDR.for_inbound(customer.customer_id, params))
275 end
276 end
277 "OK"
278 end
279
280 r.on :call_id do |call_id|
281 r.post "transfer_complete" do
282 outbound_leg = outbound_transfers.delete(call_id)
283 if params["cause"] == "hangup" && params["tag"] == "connected"
284 log.info "Normal hangup, now end #{call_id}", loggable_params
285 modify_call(call_id) { |call| call.state = "completed" }
286 elsif !outbound_leg
287 log.debug "Inbound disconnected", loggable_params
288 elsif params["cause"] == "error" && params["tries"].to_i < 15
289 log.info "2nd leg error, retry", loggable_params
290 customer_repo(
291 sgx_repo: Bwmsgsv2Repo.new
292 ).find(params["customer_id"]).then { |customer|
293 create_call(
294 customer, params["from"], call_id, params["applicationId"],
295 tries: params["tries"].to_i + 1
296 ).then { |call|
297 outbound_transfers[params["callId"]] = call
298 }.catch(&log.method(:error))
299 }
300 else
301 log.debug "Go to voicemail", loggable_params
302 modify_call(call_id) do |call|
303 call.redirect_url = url inbound_calls_path(:voicemail)
304 end
305 end
306 ""
307 end
308
309 r.on "voicemail" do
310 r.post "audio" do
311 duration = Time.parse(params["endTime"]) -
312 Time.parse(params["startTime"])
313 next "OK<5" unless duration > 5
314
315 jmp_media_url = params["mediaUrl"].sub(
316 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
317 "https://jmp.chat"
318 )
319
320 find_by_tel_with_fallback(
321 sgx_repo: Bwmsgsv2Repo.new,
322 transcription_enabled: false
323 ).then do |customer|
324 start_transcription(customer, call_id, jmp_media_url)
325
326 m = Blather::Stanza::Message.new
327 m.chat_state = nil
328 m.from = from_jid
329 m.subject = "New Voicemail"
330 m << OOB.new(jmp_media_url)
331 customer.stanza_to(m)
332
333 "OK"
334 end
335 end
336
337 r.post "language_id" do
338 rev_ai.language_id_result(params).then { |result|
339 rev_ai.stt(
340 result["top_language"],
341 result.dig("metadata", "media_url"),
342 url(inbound_calls_path(
343 "voicemail/transcription",
344 call_id: call_id
345 )),
346 **result["metadata"].transform_keys(&:to_sym)
347 ).then { "OK" }
348 }.catch_only(RevAi::Failed) { |e|
349 log_error(e)
350 "Failure logged"
351 }
352 end
353
354 r.post "transcription" do
355 rev_ai.stt_result(params, request.url).then { |result|
356 next "OK" if result["text"].to_s.empty?
357
358 customer_repo.find(
359 result.dig("metadata", "customer_id")
360 ).then do |customer|
361 m = Blather::Stanza::Message.new
362 m.chat_state = nil
363 m.from = result.dig("metadata", "from_jid")
364 m.subject = "Voicemail Transcription"
365 m.body = result["text"]
366 customer.stanza_to(m)
367
368 "OK"
369 end
370 }.catch_only(RevAi::Failed) { |e|
371 log_error(e)
372 "Failure logged"
373 }
374 end
375
376 r.post do
377 find_by_tel_with_fallback(
378 sgx_repo: Bwmsgsv2Repo.new,
379 ogm_url: nil
380 ).then { |c|
381 c.ogm(params["from"])
382 }.then { |ogm|
383 next hangup unless ogm
384
385 render :voicemail, locals: { ogm: ogm }
386 }.catch_only(CustomerRepo::NotFound) {
387 render "inbound/no_customer"
388 }
389 end
390 end
391
392 r.post do
393 customer_repo(
394 sgx_repo: Bwmsgsv2Repo.new
395 ).find(params.fetch("customer_id")).then do |customer|
396 call_attempt_repo.find_inbound(
397 customer,
398 params["from"],
399 call_id: call_id,
400 digits: params["digits"]
401 ).then { |ca| render(*ca.to_render) }
402 end
403 end
404 end
405
406 r.post do
407 customer_repo(
408 sgx_repo: Bwmsgsv2Repo.new
409 ).find_by_tel(params["to"]).then { |customer|
410 reachability_repo.find(customer, params["from"]).then do |reach|
411 reach.filter(if_yes: ->(_) { hangup }) do
412 create_call(
413 customer,
414 inbound_from,
415 params["callId"],
416 params["applicationId"]
417 ).then { |call|
418 next EMPromise.reject(:voicemail) unless call
419
420 outbound_transfers[params["callId"]] = call
421 render :ring, locals: { duration: 300 }
422 }
423 end
424 end
425 }.catch_only(CustomerFwd::InfiniteTimeout) { |e|
426 render :forward, locals: { fwd: e.fwd, from: params["from"] }
427 }.catch { |e|
428 log_error(e) unless e == :voicemail
429 r.json { { error: e.to_s }.to_json }
430 render :redirect, locals: { to: inbound_calls_path(:voicemail) }
431 }
432 end
433 end
434 end
435
436 r.on "outbound" do
437 r.on "calls" do
438 r.post "status" do
439 log.info "#{params['eventType']} #{params['callId']}", loggable_params
440 if params["eventType"] == "disconnect"
441 from = params["from"].sub(/^(?:\+|c)/, "")
442
443 customer_repo.find_by_format(from).then { |customer|
444 trust_level_repo.find(customer).then { |tl| [customer, tl] }
445 }.then { |(customer, trust_level)|
446 next "OK" unless trust_level.write_cdr?
447
448 customer_id = customer.customer_id
449 call_attempt_repo.ending_call(customer_id, params["callId"])
450 cdr_repo
451 .put(CDR.for_outbound(customer_id, params))
452 .catch(&method(:log_error))
453
454 "OK"
455 }
456 else
457 "OK"
458 end
459 end
460
461 r.post do
462 from = params["from"].sub(/^(?:\+|c)/, "")
463 customer_repo(
464 sgx_repo: Bwmsgsv2Repo.new
465 ).find_by_format(from).then { |c|
466 call_attempt_repo.find_outbound(
467 c,
468 params["to"],
469 call_id: params["callId"],
470 digits: params["digits"]
471 ).then do |ca|
472 r.json { ca.to_json }
473
474 call_attempt_repo.starting_call(c, params["callId"]).then do
475 render(*ca.to_render)
476 end
477 end
478 }.catch_only(CustomerRepo::NotFound) {
479 render "outbound/no_customer"
480 }
481 end
482 end
483 end
484
485 r.on "ogm" do
486 r.post "start" do
487 render :record_ogm, locals: { customer_id: params["customer_id"] }
488 end
489
490 r.post do
491 jmp_media_url = params["mediaUrl"].sub(
492 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
493 "https://jmp.chat"
494 )
495 ogm = OGMDownload.new(jmp_media_url)
496 ogm.download.then do
497 FileUtils.mv(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
498 File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
499 customer_repo.find(params["customer_id"]).then do |customer|
500 customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
501 end
502 end
503 end
504 end
505
506 r.public
507 end
508end
509# rubocop:enable Metrics/ClassLength