add acceptor and translator, for incoming messages

Denver Gingerich created

The acceptor (h2r-bwmsgsv2.php) will receive the messages from
Bandwidth via HTTP and put them in a Redis queue.  Then the translator
(r2s-bwmsgsv2.rb) will pick them up from the Redis queue and direct
them to the SGX (sgx-bwmsgsv2.rb), removing them from the pending
queue if the hand-off was successful.

The above components were added in order to improve resiliency of the
system, especially in situations where the SGX may become unavailable
for long periods of time.  Bandwidth will send the message again if it
is not received the first time, but it no longer stores the messages
itself in V2, so the consequences of missing an HTTP request are more
severe than in V1.

Change summary

Gemfile          |  1 
h2r-bwmsgsv2.php | 49 ++++++++++++++++++++++++++++++++
r2s-bwmsgsv2.rb  | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 126 insertions(+)

Detailed changes

Gemfile 🔗

@@ -2,6 +2,7 @@ source 'https://rubygems.org'
 
 gem 'activesupport', '<5.0.0'
 gem 'blather'
+gem 'redis'
 gem 'em-hiredis'
 gem 'em-http-request'
 gem 'eventmachine'

h2r-bwmsgsv2.php 🔗

@@ -0,0 +1,49 @@
+<?php
+/*
+  Copyright (C) 2020  Denver Gingerich <denver@ossguy.com>
+
+  This file is part of sgx-bwmsgsv2.
+
+  sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
+  the terms of the GNU Affero General Public License as published by the Free
+  Software Foundation, either version 3 of the License, or (at your option) any
+  later version.
+
+  sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
+  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+  FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more
+  details.
+
+  You should have received a copy of the GNU Affero General Public License along
+  with sgx-bwmsgsv2.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+$time = microtime(true);
+
+$raw_data = file_get_contents('php://input');
+
+include 'settings-h2r.php';
+$redis = new Redis();
+$redis->pconnect($redis_host, $redis_port);
+
+# TODO: check for return value of FALSE and then print raw_data and return !200
+
+if (!empty($redis_auth)) {
+	# TODO: check return value to confirm login succeeded
+	$redis->auth($redis_auth);
+}
+
+# TODO: MUST add ./tai versions of $time and $time2
+
+$time2 = microtime(true);
+
+$redis->lPush('incoming_messages-'.'test1a',  # TODO: update queue name
+	'"ts_020-first_db_hit":'.$time2.',"ts_010-first_received":'.$time.',"MSG":'.$raw_data);
+
+# TODO: check for return value of FALSE and then print raw_data and return !200
+
+# TODO: test syntax error to ensure non-200 is returned
+
+print 'ok';
+
+?>

r2s-bwmsgsv2.rb 🔗

@@ -0,0 +1,76 @@
+#!/usr/bin/env ruby
+#
+# Copyright (C) 2020  Denver Gingerich <denver@ossguy.com>
+#
+# This file is part of sgx-bwmsgsv2.
+#
+# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Affero General Public License along
+# with sgx-bwmsgsv2.  If not, see <http://www.gnu.org/licenses/>.
+
+require 'redis'
+require 'hiredis'
+require 'net/http'
+require 'json'
+require 'time'
+
+$stdout.sync = true
+
+puts "Redis queue to SGX HTTP request translator - for Bandwidth API V2 SGX\n"\
+	"==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
+
+if ARGV.size != 4
+	puts "Usage: r2s-bwmsgsv2.rb <redis_input_queue_key> "\
+		"<redis_pending_queue_key> <sgx_hostname> <sgx_http_port>"
+	exit 0
+end
+
+["INT", "TERM"].each do |sig|
+	trap(sig) do
+		puts "Translator has terminated due to SIG#{sig}."
+		exit 0
+	end
+end
+
+t = Time.now
+puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
+
+redis = Redis.new(:driver => :hiredis)
+
+while true
+	timestamps_plus_json_blob = redis.brpoplpush(ARGV[0], ARGV[1])
+	# TODO: add timestamping here, and MUST include a ./tai call with it
+
+	# TODO: print less stuff in here
+	puts "TODO - got some stuff: " + timestamps_plus_json_blob
+
+	# add some timestamps to timestamps_plus_json_blob (our own)
+
+	# TODO: fix so sending timestamps_plus_json_blob to SGX, and via POST
+	response = Net::HTTP.get_response(
+		URI("https://#{ARGV[2]}:#{ARGV[3]}/r2tst.php")
+	)
+
+	puts "TODO - response.code = #{response.code} and body #{response.body}"
+
+	# if SGX gives back anything but response.code 200, retry 5 times-ish
+	# if SGX failed to respond after retries, email or other notify
+	#  and back off with even longer retries (can't do anything else really)
+
+	# if got HTTP 200, then:
+
+	# redis.set(...) with expiration of 3 days, including timestamps
+
+	# confirm ARGV[1] key has one element and e == timestamps_plus_json_blob
+	#  if so, then rpop ARGV[1] and discard
+	#  if not, then email or other notify, and probably exit with failure
+end