1# frozen_string_literal: true
2
3require "digest"
4require "forwardable"
5require "multibases"
6require "multihashes"
7require "roda"
8require "thin"
9require "sentry-ruby"
10
11require_relative "lib/call_attempt_repo"
12require_relative "lib/cdr"
13require_relative "lib/oob"
14require_relative "lib/rev_ai"
15require_relative "lib/roda_capture"
16require_relative "lib/roda_em_promise"
17require_relative "lib/rack_fiber"
18
19class OGMDownload
20 def initialize(url)
21 @digest = Digest::SHA512.new
22 @f = Tempfile.open("ogm")
23 @req = EM::HttpRequest.new(url, tls: { verify_peer: true })
24 end
25
26 def download
27 http = @req.aget
28 http.stream do |chunk|
29 @digest << chunk
30 @f.write chunk
31 end
32 http.then { @f.close }.catch do |e|
33 @f.close!
34 EMPromise.reject(e)
35 end
36 end
37
38 def cid
39 Multibases.encode(
40 "base58btc",
41 [1, 85].pack("C*") + Multihashes.encode(@digest.digest, "sha2-512")
42 ).pack.to_s
43 end
44
45 def path
46 @f.path
47 end
48end
49
50# rubocop:disable Metrics/ClassLength
51class Web < Roda
52 use Rack::Fiber unless ENV["ENV"] == "test" # Must go first!
53 use Sentry::Rack::CaptureExceptions
54 plugin :json_parser
55 plugin :type_routing
56 plugin :public
57 plugin :render, engine: "slim"
58 plugin RodaCapture
59 plugin RodaEMPromise # Must go last!
60
61 class << self
62 attr_reader :customer_repo, :log, :outbound_transfers
63
64 def run(log, *listen_on)
65 plugin :common_logger, log, method: :info
66 @outbound_transfers = {}
67 Thin::Logging.logger = log
68 Thin::Server.start(
69 *listen_on,
70 freeze.app,
71 signals: false
72 )
73 end
74 end
75
76 extend Forwardable
77 def_delegators :'self.class', :outbound_transfers
78 def_delegators :request, :params
79
80 def log
81 opts[:common_logger]
82 end
83
84 def log_error(e)
85 log.error(
86 "Error raised during #{request.fullpath}: #{e.class}",
87 e,
88 loggable_params
89 )
90 if e.is_a?(::Exception)
91 Sentry.capture_exception(e)
92 else
93 Sentry.capture_message(e.to_s)
94 end
95 end
96
97 def loggable_params
98 params.dup.tap do |p|
99 p.delete("to")
100 p.delete("from")
101 end
102 end
103
104 def customer_repo(**kwargs)
105 kwargs[:set_user] = Sentry.method(:set_user) unless kwargs[:set_user]
106 opts[:customer_repo] || CustomerRepo.new(**kwargs)
107 end
108
109 def call_attempt_repo
110 opts[:call_attempt_repo] || CallAttemptRepo.new
111 end
112
113 def rev_ai
114 RevAi.new
115 end
116
117 TEL_CANDIDATES = {
118 "Restricted" => "14",
119 "anonymous" => "15",
120 "Anonymous" => "16",
121 "unavailable" => "17",
122 "Unavailable" => "18"
123 }.freeze
124
125 def sanitize_tel_candidate(candidate)
126 if candidate.length < 3
127 "13;phone-context=anonymous.phone-context.soprani.ca"
128 elsif candidate[0] == "+" && /\A\d+\z/.match(candidate[1..-1])
129 candidate
130 else
131 "#{TEL_CANDIDATES.fetch(candidate, '19')}" \
132 ";phone-context=anonymous.phone-context.soprani.ca"
133 end
134 end
135
136 def from_jid
137 Blather::JID.new(
138 sanitize_tel_candidate(params["from"]),
139 CONFIG[:component][:jid]
140 )
141 end
142
143 def inbound_calls_path(suffix, customer_id=nil, call_id: nil)
144 [
145 "/inbound/calls/#{call_id || params['callId']}",
146 suffix
147 ].compact.join("/") +
148 (customer_id ? "?customer_id=#{customer_id}" : "")
149 end
150
151 def url(path)
152 "#{request.base_url}#{path}"
153 end
154
155 def modify_call(call_id)
156 body = Bandwidth::ApiModifyCallRequest.new
157 yield body
158 BANDWIDTH_VOICE.modify_call(
159 CONFIG[:creds][:account],
160 call_id,
161 body: body
162 )
163 rescue Bandwidth::ApiErrorResponseException
164 # If call does not exist, don't need to hang up or send to voicemail
165 # Other side must have hung up already
166 raise $! unless $!.response_code.to_s == "404"
167 end
168
169 def start_transcription(customer, call_id, media_url)
170 return unless customer.transcription_enabled
171
172 rev_ai.language_id(
173 media_url,
174 url(inbound_calls_path("voicemail/language_id", call_id: call_id)),
175 from_jid: from_jid,
176 customer_id: customer.customer_id
177 )
178 end
179
180 route do |r|
181 r.on "inbound" do
182 r.on "calls" do
183 r.post "status" do
184 if params["eventType"] == "disconnect"
185 if (outbound_leg = outbound_transfers.delete(params["callId"]))
186 modify_call(outbound_leg) do |call|
187 call.state = "completed"
188 end
189 end
190
191 customer_repo.find_by_tel(params["to"]).then do |customer|
192 CDR.for_inbound(customer.customer_id, params).save
193 end
194 end
195 "OK"
196 end
197
198 r.on :call_id do |call_id|
199 r.post "transfer_complete" do
200 outbound_leg = outbound_transfers.delete(call_id)
201 if params["cause"] == "hangup" && params["tag"] == "connected"
202 log.info "Normal hangup, now end #{call_id}", loggable_params
203 modify_call(call_id) { |call| call.state = "completed" }
204 elsif !outbound_leg
205 log.debug "Inbound disconnected", loggable_params
206 else
207 log.debug "Go to voicemail", loggable_params
208 modify_call(call_id) do |call|
209 call.redirect_url = url inbound_calls_path(:voicemail)
210 end
211 end
212 ""
213 end
214
215 r.on "voicemail" do
216 r.post "audio" do
217 duration = Time.parse(params["endTime"]) -
218 Time.parse(params["startTime"])
219 next "OK<5" unless duration > 5
220
221 jmp_media_url = params["mediaUrl"].sub(
222 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
223 "https://jmp.chat"
224 )
225
226 customer_repo(
227 sgx_repo: Bwmsgsv2Repo.new
228 ).find_by_tel(params["to"]).then do |customer|
229 start_transcription(customer, call_id, jmp_media_url)
230
231 m = Blather::Stanza::Message.new
232 m.chat_state = nil
233 m.from = from_jid
234 m.subject = "New Voicemail"
235 m.body = jmp_media_url
236 m << OOB.new(jmp_media_url, desc: "Voicemail Recording")
237 customer.stanza_to(m)
238
239 "OK"
240 end
241 end
242
243 r.post "language_id" do
244 rev_ai.language_id_result(params).then { |result|
245 rev_ai.stt(
246 result["top_language"],
247 result.dig("metadata", "media_url"),
248 url(inbound_calls_path(
249 "voicemail/transcription",
250 call_id: call_id
251 )),
252 **result["metadata"].transform_keys(&:to_sym)
253 ).then { "OK" }
254 }.catch_only(RevAi::Failed) { |e|
255 log_error(e)
256 "Failure logged"
257 }
258 end
259
260 r.post "transcription" do
261 rev_ai.stt_result(params).then { |result|
262 next "OK" if result["text"].empty?
263
264 customer_repo.find(
265 result.dig("metadata", "customer_id")
266 ).then do |customer|
267 m = Blather::Stanza::Message.new
268 m.chat_state = nil
269 m.from = result.dig("metadata", "from_jid")
270 m.subject = "Voicemail Transcription"
271 m.body = result["text"]
272 customer.stanza_to(m)
273
274 "OK"
275 end
276 }.catch_only(RevAi::Failed) { |e|
277 log_error(e)
278 "Failure logged"
279 }
280 end
281
282 r.post do
283 customer_repo(sgx_repo: Bwmsgsv2Repo.new)
284 .find_by_tel(params["to"])
285 .then { |c|
286 EMPromise.all([c, c.ogm(params["from"])])
287 }.then do |(customer, ogm)|
288 render :voicemail, locals: {
289 ogm: ogm,
290 transcription_enabled: customer.transcription_enabled
291 }
292 end
293 end
294 end
295
296 r.post do
297 customer_repo(
298 sgx_repo: Bwmsgsv2Repo.new
299 ).find(params.fetch("customer_id")).then do |customer|
300 call_attempt_repo.find_inbound(
301 customer,
302 params["from"],
303 call_id: call_id,
304 digits: params["digits"]
305 ).then { |ca| render(*ca.to_render) }
306 end
307 end
308 end
309
310 r.post do
311 customer_repo(
312 sgx_repo: Bwmsgsv2Repo.new
313 ).find_by_tel(params["to"]).then { |customer|
314 EMPromise.all([
315 customer.customer_id, customer.fwd,
316 call_attempt_repo.find_inbound(
317 customer, params["from"], call_id: params["callId"]
318 )
319 ])
320 }.then { |(customer_id, fwd, ca)|
321 call = ca.create_call(fwd, CONFIG[:creds][:account]) { |cc|
322 cc.from = params["from"]
323 cc.application_id = params["applicationId"]
324 cc.answer_url = url inbound_calls_path(nil, customer_id)
325 cc.disconnect_url = url inbound_calls_path(:transfer_complete)
326 }
327
328 next EMPromise.reject(:voicemail) unless call
329
330 outbound_transfers[params["callId"]] = call
331 render :ring, locals: { duration: 300 }
332 }.catch { |e|
333 log_error(e) unless e == :voicemail
334 render :redirect, locals: { to: inbound_calls_path(:voicemail) }
335 }
336 end
337 end
338 end
339
340 r.on "outbound" do
341 r.on "calls" do
342 r.post "status" do
343 log.info "#{params['eventType']} #{params['callId']}", loggable_params
344 if params["eventType"] == "disconnect"
345 call_attempt_repo.ending_call(c, params["callId"])
346 CDR.for_outbound(params).save.catch(&method(:log_error))
347 end
348 "OK"
349 end
350
351 r.post do
352 from = params["from"].sub(/^\+1/, "")
353 customer_repo(
354 sgx_repo: Bwmsgsv2Repo.new
355 ).find_by_format(from).then do |c|
356 call_attempt_repo.find_outbound(
357 c,
358 params["to"],
359 call_id: params["callId"],
360 digits: params["digits"]
361 ).then do |ca|
362 r.json { ca.to_json }
363
364 call_attempt_repo.starting_call(c, params["callId"])
365 render(*ca.to_render)
366 end
367 end
368 end
369 end
370 end
371
372 r.on "ogm" do
373 r.post "start" do
374 render :record_ogm, locals: { customer_id: params["customer_id"] }
375 end
376
377 r.post do
378 jmp_media_url = params["mediaUrl"].sub(
379 /\Ahttps:\/\/voice.bandwidth.com\/api\/v2\/accounts\/\d+/,
380 "https://jmp.chat"
381 )
382 ogm = OGMDownload.new(jmp_media_url)
383 ogm.download.then do
384 File.rename(ogm.path, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
385 File.chmod(0o644, "#{CONFIG[:ogm_path]}/#{ogm.cid}")
386 customer_repo.find(params["customer_id"]).then do |customer|
387 customer.set_ogm_url("#{CONFIG[:ogm_web_root]}/#{ogm.cid}.mp3")
388 end
389 end
390 end
391 end
392
393 r.public
394 end
395end
396# rubocop:enable Metrics/ClassLength