Outbound calls from v2 SIP endpoint work and save a CDR

Stephen Paul Weber created

Change summary

Gemfile                |  2 +
lib/cdr.rb             | 52 ++++++++++++++++++++++++++++++
lib/rack_fiber.rb      | 29 +++++++++++++++++
lib/roda_em_promise.rb | 11 ++++++
sgx_jmp.rb             |  3 +
views/forward.slim     |  3 +
web.rb                 | 74 ++++++++++++++++++++++++++++++++++++++++++++
7 files changed, 174 insertions(+)

Detailed changes

Gemfile 🔗

@@ -14,9 +14,11 @@ gem "em_promise.rb", "~> 0.0.3"
 gem "eventmachine"
 gem "money-open-exchange-rates"
 gem "ougai"
+gem "roda"
 gem "ruby-bandwidth-iris"
 gem "sentry-ruby", "<= 4.3.1"
 gem "statsd-instrument", git: "https://github.com/singpolyma/statsd-instrument.git", branch: "graphite"
+gem "thin"
 gem "value_semantics", git: "https://github.com/singpolyma/value_semantics"
 
 group(:development) do

lib/cdr.rb 🔗

@@ -0,0 +1,52 @@
+# frozen_string_literal: true
+
+require "value_semantics/monkey_patched"
+
+class CDR
+	value_semantics do
+		cdr_id String
+		customer_id String
+		start Time
+		billsec Integer
+		disposition Either("NO ANSWER", "ANSWERED", "BUSY", "FAILED")
+		tel(/\A\+\d+\Z/)
+		direction Either(:inbound, :outbound)
+	end
+
+	def self.for_disconnect(event)
+		start = Time.parse(event["startTime"])
+
+		new(
+			cdr_id: "sgx-jmp/#{event['callId']}",
+			customer_id: event["from"].sub(/^\+/, ""),
+			start: start,
+			billsec: (Time.parse(event["endTime"]) - start).ceil,
+			disposition: Disposition.for(event["cause"]),
+			tel: event["to"],
+			direction: :outbound
+		)
+	end
+
+	def save
+		columns, values = to_h.to_a.transpose
+		DB.query_defer(<<~SQL, values)
+			INSERT INTO cdr (#{columns.join(',')})
+			VALUES ($1, $2, $3, $4, $5, $6, $7)
+		SQL
+	end
+
+	module Disposition
+		def self.for(cause)
+			case cause
+			when "timeout", "rejected", "cancel"
+				"NO ANSWER"
+			when "hangup"
+				"ANSWERED"
+			when "busy"
+				"BUSY"
+			else
+				"FAILED"
+			end
+		end
+	end
+end

lib/rack_fiber.rb 🔗

@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+
+require "fiber"
+
+module Rack
+	class Fiber
+		def initialize(app)
+			@app = app
+		end
+
+		def call(env)
+			async_callback = env.delete("async.callback")
+			EM.next_tick { run_fiber(env, async_callback) }
+			throw :async
+		end
+
+	protected
+
+		def run_fiber(env, async_callback)
+			::Fiber.new {
+				begin
+					async_callback.call(@app.call(env))
+				rescue ::Exception # rubocop:disable Lint/RescueException
+					async_callback.call([500, {}, [$!.to_s]])
+				end
+			}.resume
+		end
+	end
+end

lib/roda_em_promise.rb 🔗

@@ -0,0 +1,11 @@
+# frozen_string_literal: true
+
+require "em_promise"
+
+module RodaEMPromise
+	module RequestMethods
+		def block_result(result)
+			super(EMPromise.resolve(result).sync)
+		end
+	end
+end

sgx_jmp.rb 🔗

@@ -80,6 +80,7 @@ require_relative "lib/transaction"
 require_relative "lib/tel_selections"
 require_relative "lib/session_manager"
 require_relative "lib/statsd"
+require_relative "web"
 
 ELECTRUM = Electrum.new(**CONFIG[:electrum])
 EM::Hiredis::Client.load_scripts_from("./redis_lua")
@@ -189,6 +190,8 @@ when_ready do
 		ping.from = CONFIG[:component][:jid]
 		self << ping
 	end
+
+	Web.run(LOG.child, CustomerRepo.new)
 end
 
 # workqueue_count MUST be 0 or else Blather uses threads!

web.rb 🔗

@@ -0,0 +1,74 @@
+# frozen_string_literal: true
+
+require "roda"
+require "thin"
+require "sentry-ruby"
+
+require_relative "lib/cdr"
+require_relative "lib/roda_em_promise"
+require_relative "lib/rack_fiber"
+
+class Web < Roda
+	use Rack::Fiber # Must go first!
+	use Sentry::Rack::CaptureExceptions
+	plugin :json_parser
+	plugin :render, engine: "slim"
+	plugin RodaEMPromise # Must go last!
+
+	class << self
+		attr_reader :customer_repo, :log
+	end
+
+	def customer_repo
+		Web.customer_repo
+	end
+
+	def log
+		Web.log
+	end
+
+	def params
+		request.params
+	end
+
+	def self.run(log, customer_repo)
+		plugin :common_logger, log, method: :info
+		@log = log
+		@customer_repo = customer_repo
+		Thin::Logging.logger = log
+		Thin::Server.start(
+			"::1",
+			ENV.fetch("PORT", 8080),
+			freeze.app,
+			signals: false
+		)
+	end
+
+	route do |r|
+		r.on "outbound" do
+			r.on "calls" do
+				r.post "status" do
+					loggable = params.dup.tap { |p| p.delete("to") }
+					log.info "#{params['eventType']} #{params['callId']}", loggable
+					if params["eventType"] == "disconnect"
+						CDR.for_disconnect(params).save.catch do |e|
+							log.error("Error raised during /outbound/calls/status", e, loggable)
+							Sentry.capture_exception(e)
+						end
+					end
+					"OK"
+				end
+
+				r.post do
+					customer_id = params["from"].sub(/^\+/, "")
+					customer_repo.find(customer_id).then(:registered?).then do |reg|
+						render :forward, locals: {
+							from: reg.phone,
+							to: params["to"]
+						}
+					end
+				end
+			end
+		end
+	end
+end