Limit call concurrency for outbound SIP

Stephen Paul Weber created

Change summary

lib/call_attempt.rb      | 13 ++++++-------
lib/call_attempt_repo.rb | 38 ++++++++++++++++++++++++++++++++------
lib/trust_level.rb       | 12 ++++++------
test/test_helper.rb      | 25 +++++++++++++++++++++++++
web.rb                   |  2 ++
5 files changed, 71 insertions(+), 19 deletions(-)

Detailed changes

lib/call_attempt.rb 🔗

@@ -6,13 +6,12 @@ require_relative "tts_template"
 require_relative "low_balance"
 
 class CallAttempt
-	def self.for(customer, rate, usage, trust_level, direction:, **kwargs)
-		kwargs.merge!(direction: direction)
+	def self.for(customer, rate, usage, supported, **kwargs)
 		credit = [customer.minute_limit.to_d - usage, 0].max + customer.balance
-		if !rate || !trust_level.support_call?(rate)
-			Unsupported.new(direction: direction)
+		if !supported
+			Unsupported.new(**kwargs.slice(:direction))
 		elsif credit < rate * 10
-			NoBalance.for(customer, rate, usage, trust_level, **kwargs)
+			NoBalance.for(customer, rate, usage, supported, **kwargs)
 		else
 			for_ask_or_go(customer, rate, usage, credit, **kwargs)
 		end
@@ -97,12 +96,12 @@ class CallAttempt
 	end
 
 	class NoBalance
-		def self.for(customer, rate, usage, trust_level, direction:, **kwargs)
+		def self.for(customer, rate, usage, supported, direction:, **kwargs)
 			LowBalance.for(customer).then(&:notify!).then do |amount|
 				if amount&.positive?
 					CallAttempt.for(
 						customer.with_balance(customer.balance + amount),
-						rate, usage, trust_level, direction: direction, **kwargs
+						rate, usage, supported, direction: direction, **kwargs
 					)
 				else
 					NoBalance.new(balance: customer.balance, direction: direction)

lib/call_attempt_repo.rb 🔗

@@ -34,20 +34,46 @@ class CallAttemptRepo
 		)
 	end
 
+	def starting_call(customer, call_id)
+		redis.sadd(
+			"jmp_customer_ongoing_calls-#{customer.customer_id}",
+			call_id
+		).then do
+			redis.expire(
+				"jmp_customer_ongoing_calls-#{customer.customer_id}",
+				60 * 60
+			)
+		end
+	end
+
+	def ending_call(customer, call_id)
+		redis.srem(
+			"jmp_customer_ongoing_calls-#{customer.customer_id}",
+			call_id
+		)
+	end
+
 protected
 
 	def find(customer, other_tel, direction:, **kwargs)
-		EMPromise.all([
-			find_rate(customer.plan_name, other_tel, direction),
-			find_usage(customer.customer_id),
-			TrustLevelRepo.new(db: db, redis: redis).find(customer)
-		]).then do |(rate, usage, trust_level)|
+		find_all(customer, other_tel, direction).then do |(rate, usage, tl, c)|
 			CallAttempt.for(
-				customer, rate, usage, trust_level, direction: direction, **kwargs
+				customer, rate, usage,
+				rate && tl.support_call?(rate, c || 0),
+				direction: direction, **kwargs
 			)
 		end
 	end
 
+	def find_all(customer, other_tel, direction)
+		EMPromise.all([
+			find_rate(customer.plan_name, other_tel, direction),
+			find_usage(customer.customer_id),
+			TrustLevelRepo.new(db: db, redis: redis).find(customer),
+			redis.scard("jmp_customer_ongoing_calls-#{customer.customer_id}")
+		])
+	end
+
 	def find_usage(customer_id)
 		db.query_one(<<~SQL, customer_id, default: { a: 0 }).then { |r| r[:a] }
 			SELECT COALESCE(SUM(charge), 0) AS a FROM cdr_with_charge

lib/trust_level.rb 🔗

@@ -34,8 +34,8 @@ module TrustLevel
 			new if manual == "Basement" || (!manual && settled_amount < 10)
 		end
 
-		def support_call?(rate)
-			rate <= 0.02
+		def support_call?(rate, concurrency)
+			rate <= 0.02 && concurrency < 1
 		end
 	end
 
@@ -44,8 +44,8 @@ module TrustLevel
 			new if manual == "Paragon" || (!manual && settled_amount > 60)
 		end
 
-		def support_call?(*)
-			true
+		def support_call?(_, concurrency)
+			concurrency < 10
 		end
 	end
 
@@ -67,8 +67,8 @@ module TrustLevel
 			@max_rate = EXPENSIVE_ROUTE.fetch(plan_name, 0.1)
 		end
 
-		def support_call?(rate)
-			rate <= @max_rate
+		def support_call?(rate, concurrency)
+			rate <= @max_rate && concurrency < 4
 		end
 	end
 end

test/test_helper.rb 🔗

@@ -199,6 +199,31 @@ class FakeRedis
 		get(key).then { |v| v.to_i.to_s(2)[bit].to_i }
 	end
 
+	def hget(key, field)
+		@values.dig(key, field)
+	end
+
+	def hincrby(key, field, incrby)
+		@values[key] ||= {}
+		@values[key][field] ||= 0
+		@values[key][field] += incrby
+	end
+
+	def sadd(key, member)
+		@values[key] ||= Set.new
+		@values[key] << member
+	end
+
+	def srem(key, member)
+		@values[key].delete(member)
+	end
+
+	def scard(key)
+		@values[key]&.size || 0
+	end
+
+	def expire(_, _); end
+
 	def exists(*keys)
 		EMPromise.resolve(
 			@values.select { |k, _| keys.include? k }.size

web.rb 🔗

@@ -299,6 +299,7 @@ class Web < Roda
 				r.post "status" do
 					log.info "#{params['eventType']} #{params['callId']}", loggable_params
 					if params["eventType"] == "disconnect"
+						call_attempt_repo.ending_call(c, params["callId"])
 						CDR.for_outbound(params).save.catch(&method(:log_error))
 					end
 					"OK"
@@ -317,6 +318,7 @@ class Web < Roda
 						).then do |ca|
 							r.json { ca.to_json }
 
+							call_attempt_repo.starting_call(c, params["callId"])
 							render(*ca.to_render)
 						end
 					end