1# frozen_string_literal: true
2
3require "digest"
4require "forwardable"
5require "multibases"
6require "multihashes"
7require "roda"
8require "thin"
9require "sentry-ruby"
10
11require_relative "lib/call_attempt_repo"
12require_relative "lib/cdr"
13require_relative "lib/cdr_repo"
14require_relative "lib/oob"
15require_relative "lib/rev_ai"
16require_relative "lib/roda_capture"
17require_relative "lib/roda_em_promise"
18require_relative "lib/rack_fiber"
19require_relative "lib/reachability_repo"
20
21class OGMDownload
22 def initialize(url)
23 @digest = Digest::SHA512.new
24 @f = Tempfile.open("ogm")
25 @req = EM::HttpRequest.new(url, tls: { verify_peer: true })
26 end
27
28 def download
29 http = @req.aget
30 http.stream do |chunk|
31 @digest << chunk
32 @f.write chunk
33 end
34 http.then { @f.close }.catch do |e|
35 @f.close!
36 EMPromise.reject(e)
37 end
38 end
39
40 def cid
41 Multibases.encode(
42 "base58btc",
43 [1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
44 ).pack.to_s
45 end
46
47 def path
48 @f.path
49 end
50end
51
52# rubocop:disable Metrics/ClassLength
53class Web < Roda
54 use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
55 use Sentry::Rack::CaptureExceptions
56 plugin :json_parser
57 plugin :type_routing
58 plugin :public
59 plugin :render, engine: "slim"
60 plugin RodaCapture
61 plugin RodaEMPromise # Must go last!
62
63 class << self
64 attr_reader :customer_repo, :log, :outbound_transfers
65
66 def run(log, *listen_on)
67 plugin :common_logger, log, method: :info
68 @outbound_transfers = {}
69 Thin::Logging.logger = log
70 Thin::Server.start(
71 *listen_on,
72 freeze.app,
73 signals: false
74 )
75 end
76 end
77
78 extend Forwardable
79 def_delegators :'self.class', :outbound_transfers
80 def_delegators :request, :params
81
82 def log
83 opts[:common_logger]
84 end
85
86 def log_error(e)
87 log.error(
88 "Error raised during #{request.fullpath}: #{e.class}",
89 e,
90 loggable_params
91 )
92 if e.is_a?(::Exception)
93 Sentry.capture_exception(e)
94 else
95 Sentry.capture_message(e.to_s)
96 end
97 end
98
99 def loggable_params
100 params.dup.tap do |p|
101 p.delete("to")
102 p.delete("from")
103 end
104 end
105
106 def customer_repo(**kwargs)
107 kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
108 opts[:customer_repo] || CustomerRepo.new(**kwargs)
109 end
110
111 def reachability_repo(**kwargs)
112 opts[:reachability_repo] || ReachabilityRepo::Voice.new(**kwargs)
113 end
114
115 def find_by_tel_with_fallback(sgx_repo:, **kwargs)
116 customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
117 next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
118
119 log_error(e)
120 customer_repo(
121 sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
122 ).find_by_tel(params["to"])
123 }
124 end
125
126 def call_attempt_repo
127 opts[:call_attempt_repo] || CallAttemptRepo.new
128 end
129
130 def cdr_repo
131 opts[:cdr_repo] || CDRRepo.new
132 end
133
134 def rev_ai
135 RevAi.new(logger: log.child(loggable_params))
136 end
137
138 TEL_CANDIDATES = {
139 "Restricted" => "14",
140 "anonymous" => "15",
141 "Anonymous" => "16",
142 "unavailable" => "17",
143 "Unavailable" => "18"
144 }.freeze
145
146 def sanitize_tel_candidate(candidate)
147 if candidate.length < 3
148 "13;phone-context=anonymous.phone-context.soprani.ca"
149 elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
150 candidate
151 else
152 "#{TEL_CANDIDATES.fetch(candidate, '19')}" \
153 ";phone-context=anonymous.phone-context.soprani.ca"
154 end
155 end
156
157 def from_jid
158 Blather::JID.new(
159 sanitize_tel_candidate(params["from"]),
160 CONFIG[:component][:jid]
161 )
162 end
163
164 def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
165 [
166 "/inbound/calls/#{call_id || params['callId']}",
167 suffix
168 ].compact.join("/") +
169 (customer_id ? "?customer_id=#{customer_id}" : "")
170 end
171
172 def url(path)
173 "#{request.base_url}#{path}"
174 end
175
176 def modify_call(call_id)
177 body = Bandwidth::ApiModifyCallRequest.new
178 yield body
179 BANDWIDTH_VOICE.modify_call(
180 CONFIG[:creds][:account],
181 call_id,
182 body: body
183 )
184 rescue Bandwidth::APIException
185 # If call does not exist, don't need to hang up or send to voicemail
186 # Other side must have hung up already
187 raise $! unless [404, 409].include?($!.response_code)
188 end
189
190 def start_transcription(customer, call_id, media_url)
191 return unless customer.transcription_enabled
192
193 rev_ai.language_id(
194 media_url,
195 url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
196 from_jid: from_jid,
197 customer_id: customer.customer_id
198 )
199 end
200
201 def call_inputs(customer, from, call_id)
202 EMPromise.all([
203 customer.customer_id, customer.fwd,
204 call_attempt_repo.find_inbound(customer, from, call_id: call_id)
205 ])
206 end
207
208 def create_call(customer, from, call_id, application_id, tries: nil)
209 call_inputs(customer, from, call_id).then do |(customer_id, fwd, ca)|
210 ca.create_call(fwd, CONFIG[:creds][:account]) do |cc|
211 cc.from = from
212 cc.application_id = application_id
213 cc.answer_url = url inbound_calls_path(nil, customer_id)
214 cc.disconnect_url = url(
215 inbound_calls_path(:transfer_complete, customer_id, call_id: call_id)
216 ) + (tries ? "&tries=#{tries}" : "")
217 end
218 end
219 end
220
221 route do |r|
222 r.get "healthcheck" do
223 "OK"
224 end
225
226 r.on "inbound" do
227 r.on "calls" do
228 r.post "status" do
229 if params["eventType"] == "disconnect"
230 if (outbound_leg = outbound_transfers.delete(params["callId"]))
231 modify_call(outbound_leg) do |call|
232 call.state = "completed"
233 end
234 end
235
236 customer_repo.find_by_tel(params["to"]).then do |customer|
237 cdr_repo.put(CDR.for_inbound(customer.customer_id, params))
238 end
239 end
240 "OK"
241 end
242
243 r.on :call_id do |call_id|
244 r.post "transfer_complete" do
245 outbound_leg = outbound_transfers.delete(call_id)
246 if params["cause"] == "hangup" && params["tag"] == "connected"
247 log.info "Normal hangup, now end #{call_id}", loggable_params
248 modify_call(call_id) { |call| call.state = "completed" }
249 elsif !outbound_leg
250 log.debug "Inbound disconnected", loggable_params
251 elsif params["cause"] == "error" && params["tries"].to_i < 15
252 log.info "2nd leg error, retry", loggable_params
253 customer_repo(
254 sgx_repo: Bwmsgsv2Repo.new
255 ).find(params["customer_id"]).then { |customer|
256 create_call(
257 customer, params["from"], call_id, params["applicationId"],
258 tries: params["tries"].to_i + 1
259 ).then { |call|
260 outbound_transfers[params["callId"]] = call
261 }.catch(&log.method(:error))
262 }
263 else
264 log.debug "Go to voicemail", loggable_params
265 modify_call(call_id) do |call|
266 call.redirect_url = url inbound_calls_path(:voicemail)
267 end
268 end
269 ""
270 end
271
272 r.on "voicemail" do
273 r.post "audio" do
274 duration = Time.parse(params["endTime"]) -
275 Time.parse(params["startTime"])
276 next "OK<5" unless duration > 5
277
278 jmp_media_url = params["mediaUrl"].sub(
279 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
280 "https://jmp.chat"
281 )
282
283 find_by_tel_with_fallback(
284 sgx_repo: Bwmsgsv2Repo.new,
285 transcription_enabled: false
286 ).then do |customer|
287 start_transcription(customer, call_id, jmp_media_url)
288
289 m = Blather::Stanza::Message.new
290 m.chat_state = nil
291 m.from = from_jid
292 m.subject = "New Voicemail"
293 m << OOB.new(jmp_media_url)
294 customer.stanza_to(m)
295
296 "OK"
297 end
298 end
299
300 r.post "language_id" do
301 rev_ai.language_id_result(params).then { |result|
302 rev_ai.stt(
303 result["top_language"],
304 result.dig("metadata", "media_url"),
305 url(inbound_calls_path(
306 "voicemail/transcription",
307 call_id: call_id
308 )),
309 **result["metadata"].transform_keys(&:to_sym)
310 ).then { "OK" }
311 }.catch_only(RevAi::Failed) { |e|
312 log_error(e)
313 "Failure logged"
314 }
315 end
316
317 r.post "transcription" do
318 rev_ai.stt_result(params, request.url).then { |result|
319 next "OK" if result["text"].to_s.empty?
320
321 customer_repo.find(
322 result.dig("metadata", "customer_id")
323 ).then do |customer|
324 m = Blather::Stanza::Message.new
325 m.chat_state = nil
326 m.from = result.dig("metadata", "from_jid")
327 m.subject = "Voicemail Transcription"
328 m.body = result["text"]
329 customer.stanza_to(m)
330
331 "OK"
332 end
333 }.catch_only(RevAi::Failed) { |e|
334 log_error(e)
335 "Failure logged"
336 }
337 end
338
339 r.post do
340 find_by_tel_with_fallback(
341 sgx_repo: Bwmsgsv2Repo.new,
342 ogm_url: nil
343 ).then { |c|
344 c.ogm(params["from"])
345 }.then { |ogm|
346 next render :hangup unless ogm
347
348 render :voicemail, locals: { ogm: ogm }
349 }.catch_only(CustomerRepo::NotFound) {
350 render "inbound/no_customer"
351 }
352 end
353 end
354
355 r.post do
356 customer_repo(
357 sgx_repo: Bwmsgsv2Repo.new
358 ).find(params.fetch("customer_id")).then do |customer|
359 call_attempt_repo.find_inbound(
360 customer,
361 params["from"],
362 call_id: call_id,
363 digits: params["digits"]
364 ).then { |ca| render(*ca.to_render) }
365 end
366 end
367 end
368
369 r.post do
370 customer_repo(
371 sgx_repo: Bwmsgsv2Repo.new
372 ).find_by_tel(params["to"]).then { |customer|
373 reachability_repo.find(customer, params["from"]).then do |reach|
374 reach.filter(if_yes: ->(_) { render :hangup }) do
375 create_call(
376 customer,
377 params["from"],
378 params["callId"],
379 params["applicationId"]
380 ).then { |call|
381 next EMPromise.reject(:voicemail) unless call
382
383 outbound_transfers[params["callId"]] = call
384 render :ring, locals: { duration: 300 }
385 }
386 end
387 end
388 }.catch_only(CustomerFwd::InfiniteTimeout) { |e|
389 render :forward, locals: { fwd: e.fwd, from: params["from"] }
390 }.catch { |e|
391 log_error(e) unless e == :voicemail
392 render :redirect, locals: { to: inbound_calls_path(:voicemail) }
393 }
394 end
395 end
396 end
397
398 r.on "outbound" do
399 r.on "calls" do
400 r.post "status" do
401 log.info "#{params['eventType']} #{params['callId']}", loggable_params
402 if params["eventType"] == "disconnect"
403 customer_id = params["from"].sub(/^(?:\+1|c)/, "")
404 call_attempt_repo.ending_call(customer_id, params["callId"])
405 cdr_repo
406 .put(CDR.for_outbound(customer_id, params))
407 .catch(&method(:log_error))
408 end
409 "OK"
410 end
411
412 r.post do
413 from = params["from"].sub(/^(?:\+1|c)/, "")
414 customer_repo(
415 sgx_repo: Bwmsgsv2Repo.new
416 ).find_by_format(from).then { |c|
417 call_attempt_repo.find_outbound(
418 c,
419 params["to"],
420 call_id: params["callId"],
421 digits: params["digits"]
422 ).then do |ca|
423 r.json { ca.to_json }
424
425 call_attempt_repo.starting_call(c, params["callId"])
426 render(*ca.to_render)
427 end
428 }.catch_only(CustomerRepo::NotFound) {
429 render "outbound/no_customer"
430 }
431 end
432 end
433 end
434
435 r.on "ogm" do
436 r.post "start" do
437 render :record_ogm, locals: { customer_id: params["customer_id"] }
438 end
439
440 r.post do
441 jmp_media_url = params["mediaUrl"].sub(
442 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
443 "https://jmp.chat"
444 )
445 ogm = OGMDownload.new(jmp_media_url)
446 ogm.download.then do
447 File.rename(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
448 File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
449 customer_repo.find(params["customer_id"]).then do |customer|
450 customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
451 end
452 end
453 end
454 end
455
456 r.public
457 end
458end
459# rubocop:enable Metrics/ClassLength