1# frozen_string_literal: true
2
3require "digest"
4require "fileutils"
5require "forwardable"
6require "multibases"
7require "multihashes"
8require "roda"
9require "sentry-ruby"
10require "thin"
11
12require_relative "lib/bandwidth_tn_order"
13require_relative "lib/call_attempt_repo"
14require_relative "lib/trust_level_repo"
15require_relative "lib/cdr"
16require_relative "lib/cdr_repo"
17require_relative "lib/oob"
18require_relative "lib/rev_ai"
19require_relative "lib/roda_capture"
20require_relative "lib/roda_em_promise"
21require_relative "lib/rack_fiber"
22require_relative "lib/reachability_repo"
23
24class OGMDownload
25 def initialize(url)
26 @digest = Digest::SHA512.new
27 @f = Tempfile.open("ogm")
28 @req = EM::HttpRequest.new(url, tls: { verify_peer: true })
29 end
30
31 def download
32 http = @req.aget
33 http.stream do |chunk|
34 @digest << chunk
35 @f.write chunk
36 end
37 http.then { @f.close }.catch do |e|
38 @f.close!
39 EMPromise.reject(e)
40 end
41 end
42
43 def cid
44 Multibases.encode(
45 "base58btc",
46 [1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
47 ).pack.to_s
48 end
49
50 def path
51 @f.path
52 end
53end
54
55# rubocop:disable Metrics/ClassLength
56class Web < Roda
57 use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
58 use Sentry::Rack::CaptureExceptions
59 plugin :json_parser
60 plugin :type_routing
61 plugin :public
62 plugin :render, engine: "slim"
63 plugin RodaCapture
64 plugin RodaEMPromise # Must go last!
65
66 class << self
67 attr_reader :customer_repo, :log, :outbound_transfers
68
69 def run(log, *listen_on)
70 plugin :common_logger, log, method: :info
71 @outbound_transfers = {}
72 Thin::Logging.logger = log
73 Thin::Server.start(
74 *listen_on,
75 freeze.app,
76 signals: false
77 )
78 end
79 end
80
81 extend Forwardable
82 def_delegators :'self.class', :outbound_transfers
83 def_delegators :request, :params
84
85 def log
86 opts[:common_logger]
87 end
88
89 def log_error(e)
90 log.error(
91 "Error raised during #{request.fullpath}: #{e.class}",
92 e,
93 loggable_params
94 )
95 if e.is_a?(::Exception)
96 Sentry.capture_exception(e)
97 else
98 Sentry.capture_message(e.to_s)
99 end
100 end
101
102 def loggable_params
103 params.dup.tap do |p|
104 p.delete("to")
105 p.delete("from")
106 end
107 end
108
109 def customer_repo(**kwargs)
110 kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
111 opts[:customer_repo] || CustomerRepo.new(**kwargs)
112 end
113
114 def trust_level_repo(**kwargs)
115 opts[:trust_level_repo] || TrustLevelRepo.new(**kwargs)
116 end
117
118 def reachability_repo(**kwargs)
119 opts[:reachability_repo] || ReachabilityRepo::Voice.new(**kwargs)
120 end
121
122 def find_by_tel_with_fallback(sgx_repo:, **kwargs)
123 customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
124 next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
125
126 log_error(e)
127 customer_repo(
128 sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
129 ).find_by_tel(params["to"])
130 }
131 end
132
133 def call_attempt_repo
134 opts[:call_attempt_repo] || CallAttemptRepo.new
135 end
136
137 def cdr_repo
138 opts[:cdr_repo] || CDRRepo.new
139 end
140
141 def rev_ai
142 RevAi.new(logger: log.child(loggable_params))
143 end
144
145 TEL_CANDIDATES = {
146 "Restricted" => "14",
147 "anonymous" => "15",
148 "Anonymous" => "16",
149 "unavailable" => "17",
150 "Unavailable" => "18"
151 }.freeze
152
153 def sanitize_tel_candidate(candidate)
154 if candidate.length < 3
155 "13;phone-context=anonymous.phone-context.soprani.ca"
156 elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
157 candidate
158 else
159 "#{TEL_CANDIDATES.fetch(candidate, '19')}" \
160 ";phone-context=anonymous.phone-context.soprani.ca"
161 end
162 end
163
164 def from_jid
165 Blather::JID.new(
166 sanitize_tel_candidate(params["from"]),
167 CONFIG[:component][:jid]
168 )
169 end
170
171 def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
172 [
173 "/inbound/calls/#{call_id || params['callId']}",
174 suffix
175 ].compact.join("/") +
176 (customer_id ? "?customer_id=#{customer_id}" : "")
177 end
178
179 def url(path)
180 "#{request.base_url}#{path}"
181 end
182
183 def modify_call(call_id)
184 body = Bandwidth::ApiModifyCallRequest.new
185 yield body
186 BANDWIDTH_VOICE.modify_call(
187 CONFIG[:creds][:account],
188 call_id,
189 body: body
190 )
191 rescue Bandwidth::APIException
192 # If call does not exist, don't need to hang up or send to voicemail
193 # Other side must have hung up already
194 raise $! unless [404, 409].include?($!.response_code)
195 end
196
197 def start_transcription(customer, call_id, media_url)
198 return unless customer.transcription_enabled
199
200 rev_ai.language_id(
201 media_url,
202 url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
203 from_jid: from_jid,
204 customer_id: customer.customer_id
205 )
206 end
207
208 def call_inputs(customer, from, call_id)
209 EMPromise.all([
210 customer.customer_id, customer.fwd,
211 call_attempt_repo.find_inbound(customer, from, call_id: call_id)
212 ])
213 end
214
215 def json_call(customer, fwd, call_attempt)
216 raise "Not allowed" unless params["secret"] == CONFIG[:component][:secret]
217
218 customer.ogm(params["from"]).then do |ogm|
219 call_attempt.as_json.merge(
220 fwd: fwd,
221 ogm: ogm
222 ).to_json
223 end
224 end
225
226 def transfer_complete_url(customer_id, call_id, tries)
227 url(
228 inbound_calls_path(:transfer_complete, customer_id, call_id: call_id)
229 ) + (tries ? "&tries=#{tries}" : "")
230 end
231
232 def create_call(customer, from, call_id, application_id, tries: nil)
233 call_inputs(customer, from, call_id).then do |(customer_id, fwd, ca)|
234 request.json { json_call(customer, fwd, ca) }
235 ca.create_call(fwd, CONFIG[:creds][:account]) do |cc|
236 cc.from = from
237 cc.application_id = application_id
238 cc.answer_url = url inbound_calls_path(nil, customer_id)
239 cc.disconnect_url = transfer_complete_url(customer_id, call_id, tries)
240 end
241 end
242 end
243
244 def inbound_from
245 if params["from"] && params["from"] =~ /\A\+?\d+\Z/
246 params["from"]
247 else
248 log.info "Inbound call with unusual from: #{params['from']}"
249 TEL_CANDIDATES.fetch(params["from"], "19")
250 end
251 end
252
253 def hangup
254 request.json { {}.to_json }
255
256 render :hangup
257 end
258
259 route do |r|
260 r.get "healthcheck" do
261 "OK"
262 end
263
264 r.on "inbound" do
265 r.on "calls" do
266 r.post "status" do
267 if params["eventType"] == "disconnect"
268 if (outbound_leg = outbound_transfers.delete(params["callId"]))
269 modify_call(outbound_leg) do |call|
270 call.state = "completed"
271 end
272 end
273
274 customer_repo.find_by_tel(params["to"]).then do |customer|
275 cdr_repo.put(CDR.for_inbound(customer.customer_id, params))
276 end
277 end
278 "OK"
279 end
280
281 r.on :call_id do |call_id|
282 r.post "transfer_complete" do
283 outbound_leg = outbound_transfers.delete(call_id)
284 if params["cause"] == "hangup" && params["tag"] == "connected"
285 log.info "Normal hangup, now end #{call_id}", loggable_params
286 modify_call(call_id) { |call| call.state = "completed" }
287 elsif !outbound_leg
288 log.debug "Inbound disconnected", loggable_params
289 elsif params["cause"] == "error" && params["tries"].to_i < 15
290 log.info "2nd leg error, retry", loggable_params
291 customer_repo(
292 sgx_repo: Bwmsgsv2Repo.new
293 ).find(params["customer_id"]).then { |customer|
294 create_call(
295 customer, params["from"], call_id, params["applicationId"],
296 tries: params["tries"].to_i + 1
297 ).then { |call|
298 outbound_transfers[params["callId"]] = call
299 }.catch(&log.method(:error))
300 }
301 else
302 log.debug "Go to voicemail", loggable_params
303 modify_call(call_id) do |call|
304 call.redirect_url = url inbound_calls_path(:voicemail)
305 end
306 end
307 ""
308 end
309
310 r.on "voicemail" do
311 r.post "audio" do
312 duration = Time.parse(params["endTime"]) -
313 Time.parse(params["startTime"])
314 next "OK<5" unless duration > 5
315
316 jmp_media_url = params["mediaUrl"].sub(
317 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
318 "https://jmp.chat"
319 )
320
321 find_by_tel_with_fallback(
322 sgx_repo: Bwmsgsv2Repo.new,
323 transcription_enabled: false
324 ).then do |customer|
325 start_transcription(customer, call_id, jmp_media_url)
326
327 m = Blather::Stanza::Message.new
328 m.chat_state = nil
329 m.from = from_jid
330 m.subject = "New Voicemail"
331 m << OOB.new(jmp_media_url)
332 customer.stanza_to(m)
333
334 "OK"
335 end
336 end
337
338 r.post "language_id" do
339 rev_ai.language_id_result(params).then { |result|
340 rev_ai.stt(
341 result["top_language"],
342 result.dig("metadata", "media_url"),
343 url(inbound_calls_path(
344 "voicemail/transcription",
345 call_id: call_id
346 )),
347 **result["metadata"].transform_keys(&:to_sym)
348 ).then { "OK" }
349 }.catch_only(RevAi::Failed) { |e|
350 log_error(e)
351 "Failure logged"
352 }
353 end
354
355 r.post "transcription" do
356 rev_ai.stt_result(params, request.url).then { |result|
357 next "OK" if result["text"].to_s.empty?
358
359 customer_repo.find(
360 result.dig("metadata", "customer_id")
361 ).then do |customer|
362 m = Blather::Stanza::Message.new
363 m.chat_state = nil
364 m.from = result.dig("metadata", "from_jid")
365 m.subject = "Voicemail Transcription"
366 m.body = result["text"]
367 customer.stanza_to(m)
368
369 "OK"
370 end
371 }.catch_only(RevAi::Failed) { |e|
372 log_error(e)
373 "Failure logged"
374 }
375 end
376
377 r.post do
378 find_by_tel_with_fallback(
379 sgx_repo: Bwmsgsv2Repo.new,
380 ogm_url: nil
381 ).then { |c|
382 c.ogm(params["from"])
383 }.then { |ogm|
384 next hangup unless ogm
385
386 render :voicemail, locals: { ogm: ogm }
387 }.catch_only(CustomerRepo::NotFound) {
388 render "inbound/no_customer"
389 }
390 end
391 end
392
393 r.post do
394 customer_repo(
395 sgx_repo: Bwmsgsv2Repo.new
396 ).find(params.fetch("customer_id")).then do |customer|
397 call_attempt_repo.find_inbound(
398 customer,
399 params["from"],
400 call_id: call_id,
401 digits: params["digits"]
402 ).then { |ca| render(*ca.to_render) }
403 end
404 end
405 end
406
407 r.post do
408 customer_repo(
409 sgx_repo: Bwmsgsv2Repo.new
410 ).find_by_tel(params["to"]).then { |customer|
411 reachability_repo.find(customer, params["from"]).then do |reach|
412 reach.filter(if_yes: ->(_) { hangup }) do
413 create_call(
414 customer,
415 inbound_from,
416 params["callId"],
417 params["applicationId"]
418 ).then { |call|
419 next EMPromise.reject(:voicemail) unless call
420
421 outbound_transfers[params["callId"]] = call
422 render :ring, locals: { duration: 300 }
423 }
424 end
425 end
426 }.catch_only(CustomerFwd::InfiniteTimeout) { |e|
427 render :forward, locals: { fwd: e.fwd, from: params["from"] }
428 }.catch { |e|
429 log_error(e) unless e == :voicemail
430 r.json { { error: e.to_s }.to_json }
431 render :redirect, locals: { to: inbound_calls_path(:voicemail) }
432 }
433 end
434 end
435 end
436
437 r.on "outbound" do
438 r.on "calls" do
439 r.post "status" do
440 log.info "#{params['eventType']} #{params['callId']}", loggable_params
441 if params["eventType"] == "disconnect"
442 from = params["from"].sub(/^(?:\+|c)/, "")
443
444 customer_repo.find_by_format(from).then { |customer|
445 trust_level_repo.find(customer).then { |tl| [customer, tl] }
446 }.then { |(customer, trust_level)|
447 next "OK" unless trust_level.write_cdr?
448
449 customer_id = customer.customer_id
450 call_attempt_repo.ending_call(customer_id, params["callId"])
451 cdr_repo
452 .put(CDR.for_outbound(customer_id, params))
453 .catch(&method(:log_error))
454
455 "OK"
456 }
457 else
458 "OK"
459 end
460 end
461
462 r.post do
463 from = params["from"].sub(/^(?:\+|c)/, "")
464 customer_repo(
465 sgx_repo: Bwmsgsv2Repo.new
466 ).find_by_format(from).then { |c|
467 call_attempt_repo.find_outbound(
468 c,
469 params["to"],
470 call_id: params["callId"],
471 digits: params["digits"]
472 ).then do |ca|
473 r.json { ca.to_json }
474
475 call_attempt_repo.starting_call(c, params["callId"]).then do
476 render(*ca.to_render)
477 end
478 end
479 }.catch_only(CustomerRepo::NotFound) {
480 render "outbound/no_customer"
481 }
482 end
483 end
484 end
485
486 r.on "ogm" do
487 r.post "start" do
488 render :record_ogm, locals: { customer_id: params["customer_id"] }
489 end
490
491 r.post do
492 jmp_media_url = params["mediaUrl"].sub(
493 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
494 "https://jmp.chat"
495 )
496 ogm = OGMDownload.new(jmp_media_url)
497 ogm.download.then do
498 FileUtils.mv(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
499 File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
500 customer_repo.find(params["customer_id"]).then do |customer|
501 customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
502 end
503 end
504 end
505 end
506
507 r.on "orders" do
508 token = r.env["HTTP_AUTHORIZATION"].to_s.sub(/\ABearer\s+/, "")
509
510 r.json do
511 if (orderer = CONFIG[:bulk_order_tokens][token.to_sym])
512 r.get :order_id do |order_id|
513 BandwidthTNOrder.get(order_id).then do |order|
514 if order.status == :complete
515 Transaction.new(
516 customer_id: orderer[:customer_id],
517 transaction_id: order.id,
518 amount: order.tels.length * -1.75,
519 note: "Bulk order",
520 ignore_duplicate: true
521 ).insert.then do
522 {
523 id: order.id,
524 status: order.status,
525 tels: order.tels
526 }.to_json
527 end
528 else
529 { id: order.id, status: order.status }.to_json
530 end
531 end
532 end
533
534 r.on "tels" do
535 r.on :tel, method: :delete do |tel|
536 tn_repo = BandwidthTnRepo.new
537 tn = tn_repo.find(tel)
538 if tn&.dig(:sip_peer, :peer_id).to_s == orderer[:peer_id]
539 tn_repo.disconnect(tel, orderer[:customer_id])
540 { status: "disconnected" }.to_json
541 else
542 response.status = 401
543 { error: "Number not found" }.to_json
544 end
545 end
546 end
547
548 r.post do
549 customer_repo.find(orderer[:customer_id]).then do |customer|
550 if customer.balance >= 1.75 * params["quantity"].to_i
551 BandwidthTNOrder.create_custom(
552 name: "Bulk order",
553 customer_order_id: orderer[:customer_id],
554 peer_id: orderer[:peer_id],
555 state_search_and_order_type: {
556 quantity: params["quantity"].to_i,
557 state: ["WA", "CA", "TX", "IL", "NY", "FL"].sample
558 }
559 ).then do |order|
560 { id: order.id }.to_json
561 end
562 else
563 response.status = 402
564 { error: "Balance too low" }.to_json
565 end
566 end
567 end
568 else
569 response.status = 401
570 { error: "Bad token" }.to_json
571 end
572 end
573 end
574
575 r.public
576 end
577end
578# rubocop:enable Metrics/ClassLength