1# frozen_string_literal: true
2
3require "digest"
4require "forwardable"
5require "multibases"
6require "multihashes"
7require "roda"
8require "thin"
9require "sentry-ruby"
10
11require_relative "lib/call_attempt_repo"
12require_relative "lib/cdr"
13require_relative "lib/oob"
14require_relative "lib/rev_ai"
15require_relative "lib/roda_capture"
16require_relative "lib/roda_em_promise"
17require_relative "lib/rack_fiber"
18
19class OGMDownload
20 def initialize(url)
21 @digest = Digest::SHA512.new
22 @f = Tempfile.open("ogm")
23 @req = EM::HttpRequest.new(url, tls: { verify_peer: true })
24 end
25
26 def download
27 http = @req.aget
28 http.stream do |chunk|
29 @digest << chunk
30 @f.write chunk
31 end
32 http.then { @f.close }.catch do |e|
33 @f.close!
34 EMPromise.reject(e)
35 end
36 end
37
38 def cid
39 Multibases.encode(
40 "base58btc",
41 [1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
42 ).pack.to_s
43 end
44
45 def path
46 @f.path
47 end
48end
49
50# rubocop:disable Metrics/ClassLength
51class Web < Roda
52 use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
53 use Sentry::Rack::CaptureExceptions
54 plugin :json_parser
55 plugin :type_routing
56 plugin :public
57 plugin :render, engine: "slim"
58 plugin RodaCapture
59 plugin RodaEMPromise # Must go last!
60
61 class << self
62 attr_reader :customer_repo, :log, :outbound_transfers
63
64 def run(log, *listen_on)
65 plugin :common_logger, log, method: :info
66 @outbound_transfers = {}
67 Thin::Logging.logger = log
68 Thin::Server.start(
69 *listen_on,
70 freeze.app,
71 signals: false
72 )
73 end
74 end
75
76 extend Forwardable
77 def_delegators :'self.class', :outbound_transfers
78 def_delegators :request, :params
79
80 def log
81 opts[:common_logger]
82 end
83
84 def log_error(e)
85 log.error(
86 "Error raised during #{request.fullpath}: #{e.class}",
87 e,
88 loggable_params
89 )
90 if e.is_a?(::Exception)
91 Sentry.capture_exception(e)
92 else
93 Sentry.capture_message(e.to_s)
94 end
95 end
96
97 def loggable_params
98 params.dup.tap do |p|
99 p.delete("to")
100 p.delete("from")
101 end
102 end
103
104 def customer_repo(**kwargs)
105 kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
106 opts[:customer_repo] || CustomerRepo.new(**kwargs)
107 end
108
109 def find_by_tel_with_fallback(sgx_repo:, **kwargs)
110 customer_repo(sgx_repo: sgx_repo).find_by_tel(params["to"]).catch { |e|
111 next EMPromise.reject(e) if e.is_a?(CustomerRepo::NotFound)
112
113 log_error(e)
114 customer_repo(
115 sgx_repo: TrivialBackendSgxRepo.new(**kwargs)
116 ).find_by_tel(params["to"])
117 }
118 end
119
120 def call_attempt_repo
121 opts[:call_attempt_repo] || CallAttemptRepo.new
122 end
123
124 def rev_ai
125 RevAi.new(logger: log.child(loggable_params))
126 end
127
128 TEL_CANDIDATES = {
129 "Restricted" => "14",
130 "anonymous" => "15",
131 "Anonymous" => "16",
132 "unavailable" => "17",
133 "Unavailable" => "18"
134 }.freeze
135
136 def sanitize_tel_candidate(candidate)
137 if candidate.length < 3
138 "13;phone-context=anonymous.phone-context.soprani.ca"
139 elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
140 candidate
141 else
142 "#{TEL_CANDIDATES.fetch(candidate, '19')}" \
143 ";phone-context=anonymous.phone-context.soprani.ca"
144 end
145 end
146
147 def from_jid
148 Blather::JID.new(
149 sanitize_tel_candidate(params["from"]),
150 CONFIG[:component][:jid]
151 )
152 end
153
154 def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
155 [
156 "/inbound/calls/#{call_id || params['callId']}",
157 suffix
158 ].compact.join("/") +
159 (customer_id ? "?customer_id=#{customer_id}" : "")
160 end
161
162 def url(path)
163 "#{request.base_url}#{path}"
164 end
165
166 def modify_call(call_id)
167 body = Bandwidth::ApiModifyCallRequest.new
168 yield body
169 BANDWIDTH_VOICE.modify_call(
170 CONFIG[:creds][:account],
171 call_id,
172 body: body
173 )
174 rescue Bandwidth::ApiErrorResponseException
175 # If call does not exist, don't need to hang up or send to voicemail
176 # Other side must have hung up already
177 raise $! unless $!.response_code.to_s == "404"
178 end
179
180 def start_transcription(customer, call_id, media_url)
181 return unless customer.transcription_enabled
182
183 rev_ai.language_id(
184 media_url,
185 url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
186 from_jid: from_jid,
187 customer_id: customer.customer_id
188 )
189 end
190
191 route do |r|
192 r.on "inbound" do
193 r.on "calls" do
194 r.post "status" do
195 if params["eventType"] == "disconnect"
196 if (outbound_leg = outbound_transfers.delete(params["callId"]))
197 modify_call(outbound_leg) do |call|
198 call.state = "completed"
199 end
200 end
201
202 customer_repo.find_by_tel(params["to"]).then do |customer|
203 CDR.for_inbound(customer.customer_id, params).save
204 end
205 end
206 "OK"
207 end
208
209 r.on :call_id do |call_id|
210 r.post "transfer_complete" do
211 outbound_leg = outbound_transfers.delete(call_id)
212 if params["cause"] == "hangup" && params["tag"] == "connected"
213 log.info "Normal hangup, now end #{call_id}", loggable_params
214 modify_call(call_id) { |call| call.state = "completed" }
215 elsif !outbound_leg
216 log.debug "Inbound disconnected", loggable_params
217 else
218 log.debug "Go to voicemail", loggable_params
219 modify_call(call_id) do |call|
220 call.redirect_url = url inbound_calls_path(:voicemail)
221 end
222 end
223 ""
224 end
225
226 r.on "voicemail" do
227 r.post "audio" do
228 duration = Time.parse(params["endTime"]) -
229 Time.parse(params["startTime"])
230 next "OK<5" unless duration > 5
231
232 jmp_media_url = params["mediaUrl"].sub(
233 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
234 "https://jmp.chat"
235 )
236
237 find_by_tel_with_fallback(
238 sgx_repo: Bwmsgsv2Repo.new,
239 transcription_enabled: false
240 ).then do |customer|
241 start_transcription(customer, call_id, jmp_media_url)
242
243 m = Blather::Stanza::Message.new
244 m.chat_state = nil
245 m.from = from_jid
246 m.subject = "New Voicemail"
247 m.body = jmp_media_url
248 m << OOB.new(jmp_media_url, desc: "Voicemail Recording")
249 customer.stanza_to(m)
250
251 "OK"
252 end
253 end
254
255 r.post "language_id" do
256 rev_ai.language_id_result(params).then { |result|
257 rev_ai.stt(
258 result["top_language"],
259 result.dig("metadata", "media_url"),
260 url(inbound_calls_path(
261 "voicemail/transcription",
262 call_id: call_id
263 )),
264 **result["metadata"].transform_keys(&:to_sym)
265 ).then { "OK" }
266 }.catch_only(RevAi::Failed) { |e|
267 log_error(e)
268 "Failure logged"
269 }
270 end
271
272 r.post "transcription" do
273 rev_ai.stt_result(params, request.url).then { |result|
274 next "OK" if result["text"].to_s.empty?
275
276 customer_repo.find(
277 result.dig("metadata", "customer_id")
278 ).then do |customer|
279 m = Blather::Stanza::Message.new
280 m.chat_state = nil
281 m.from = result.dig("metadata", "from_jid")
282 m.subject = "Voicemail Transcription"
283 m.body = result["text"]
284 customer.stanza_to(m)
285
286 "OK"
287 end
288 }.catch_only(RevAi::Failed) { |e|
289 log_error(e)
290 "Failure logged"
291 }
292 end
293
294 r.post do
295 find_by_tel_with_fallback(
296 sgx_repo: Bwmsgsv2Repo.new,
297 ogm_url: nil
298 ).then { |c|
299 c.ogm(params["from"]) if c.fwd.voicemail_enabled?
300 }.then { |ogm|
301 next render :hangup unless ogm
302
303 render :voicemail, locals: { ogm: ogm }
304 }
305 end
306 end
307
308 r.post do
309 customer_repo(
310 sgx_repo: Bwmsgsv2Repo.new
311 ).find(params.fetch("customer_id")).then do |customer|
312 call_attempt_repo.find_inbound(
313 customer,
314 params["from"],
315 call_id: call_id,
316 digits: params["digits"]
317 ).then { |ca| render(*ca.to_render) }
318 end
319 end
320 end
321
322 r.post do
323 customer_repo(
324 sgx_repo: Bwmsgsv2Repo.new
325 ).find_by_tel(params["to"]).then { |customer|
326 EMPromise.all([
327 customer.customer_id, customer.fwd,
328 call_attempt_repo.find_inbound(
329 customer, params["from"], call_id: params["callId"]
330 )
331 ])
332 }.then { |(customer_id, fwd, ca)|
333 call = ca.create_call(fwd, CONFIG[:creds][:account]) { |cc|
334 cc.from = params["from"]
335 cc.application_id = params["applicationId"]
336 cc.answer_url = url inbound_calls_path(nil, customer_id)
337 cc.disconnect_url = url inbound_calls_path(:transfer_complete)
338 }
339
340 next EMPromise.reject(:voicemail) unless call
341
342 outbound_transfers[params["callId"]] = call
343 render :ring, locals: { duration: 300 }
344 }.catch_only(CustomerFwd::InfiniteTimeout) { |e|
345 render :forward, locals: { fwd: e.fwd, from: params["from"] }
346 }.catch { |e|
347 log_error(e) unless e == :voicemail
348 render :redirect, locals: { to: inbound_calls_path(:voicemail) }
349 }
350 end
351 end
352 end
353
354 r.on "outbound" do
355 r.on "calls" do
356 r.post "status" do
357 log.info "#{params['eventType']} #{params['callId']}", loggable_params
358 if params["eventType"] == "disconnect"
359 call_attempt_repo.ending_call(c, params["callId"])
360 CDR.for_outbound(params).save.catch(&method(:log_error))
361 end
362 "OK"
363 end
364
365 r.post do
366 from = params["from"].sub(/^\+1/, "")
367 customer_repo(
368 sgx_repo: Bwmsgsv2Repo.new
369 ).find_by_format(from).then do |c|
370 call_attempt_repo.find_outbound(
371 c,
372 params["to"],
373 call_id: params["callId"],
374 digits: params["digits"]
375 ).then do |ca|
376 r.json { ca.to_json }
377
378 call_attempt_repo.starting_call(c, params["callId"])
379 render(*ca.to_render)
380 end
381 end
382 end
383 end
384 end
385
386 r.on "ogm" do
387 r.post "start" do
388 render :record_ogm, locals: { customer_id: params["customer_id"] }
389 end
390
391 r.post do
392 jmp_media_url = params["mediaUrl"].sub(
393 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
394 "https://jmp.chat"
395 )
396 ogm = OGMDownload.new(jmp_media_url)
397 ogm.download.then do
398 File.rename(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
399 File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
400 customer_repo.find(params["customer_id"]).then do |customer|
401 customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
402 end
403 end
404 end
405 end
406
407 r.public
408 end
409end
410# rubocop:enable Metrics/ClassLength