1#!/usr/bin/env ruby
2# frozen_string_literal: true
3
4# Copyright (C) 2020 Denver Gingerich <denver@ossguy.com>
5#
6# This file is part of sgx-bwmsgsv2.
7#
8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
20
21require 'redis'
22require 'hiredis'
23require 'net/http'
24require 'time'
25
26$stdout.sync = true
27
28puts "Redis queue to SGX HTTP request translator - for Bandwidth API V2 SGX\n"\
29 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
30
31if ARGV.size != 3
32 # note that <redis_queue_suffix> should match h2r-bwmsgsv2's $queue_name
33 puts "Usage: r2s-bwmsgsv2.rb <redis_queue_suffix> <sgx_hostname> "\
34 "<sgx_https_port>"
35 exit 0
36end
37
38["INT", "TERM"].each do |sig|
39 trap(sig) do
40 puts "Translator has terminated due to SIG#{sig}."
41 exit 0
42 end
43end
44
45t = Time.now
46puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
47
48redis = Redis.new(driver: :hiredis)
49
50pending_len = redis.llen('pending_messages-' + ARGV[0])
51if pending_len != 0
52 puts 'Translator terminated due to non-empty pending queue of size ' +
53 pending_len.to_s
54 exit 1
55end
56
57loop do
58 timestamps_plus_json_blob = redis.brpoplpush('incoming_messages-' +
59 ARGV[0], 'pending_messages-' + ARGV[0])
60
61 t = Time.now
62 ts = "%d.%09d" % [t.to_i, t.nsec]
63 tai_timestamp = `./tai`.strip
64 tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
65 puts "LOG %s, %s: handling message sent on %s ...\n" %
66 [ts, tai_timestamp, tai_yyyymmdd]
67
68 # WARNING: since not atomic with setex() below, translator is singleton
69 day_msg_count = redis.incr(
70 "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-total"
71 )
72 # TODO: confirm day_msg_count is integer > 0 (otherwise likely an error)
73 # unsure how to best error here: bail would cause infinite bounce; eml?
74
75 t = Time.now
76 puts "LOG %d.%09d: total msgs for %s-%s now at %s\n" %
77 [t.to_i, t.nsec, tai_yyyymmdd, ARGV[0], day_msg_count]
78
79 # TODO: print less stuff in here
80 puts "TODO - got some stuff: " + timestamps_plus_json_blob
81 puts "TODO - daymsgcount: '#{day_msg_count}'"
82
83 t2 = Time.now
84 ts2 = "%d.%09d" % [t2.to_i, t2.nsec]
85 tai_timestamp2 = `./tai`.strip
86
87 new_json_blob =
88 '"ts_040_tai-translator_pre_send":' + tai_timestamp2 +
89 ',"ts_040_unix-translator_pre_send":' + ts2 +
90 ',"ts_030_tai-translator_received":' + tai_timestamp +
91 ',"ts_030_unix-translator_received":' + ts +
92 ',' + timestamps_plus_json_blob
93
94 uri = URI("http://#{ARGV[1]}:#{ARGV[2]}/")
95 req = Net::HTTP::Post.new(uri, 'Content-Type' => 'application/json')
96 req.body = new_json_blob.split('G', 2)[1][2..-1] # only use MSG part
97 begin
98 res = Net::HTTP.start(uri.hostname, uri.port) do |http|
99 http.request(req)
100 end
101 # list of exceptions is from https://stackoverflow.com/a/5370726
102 rescue Timeout::Error, Errno::EINVAL, Errno::ECONNRESET, EOFError,
103 Net::HTTPBadResponse, Net::HTTPHeaderSyntaxError,
104 Net::ProtocolError => e
105
106 puts "TODO problem '#{e}' - now we should try again..."
107 # TODO: actually try again
108 else
109 puts "TODO - res: '#{res.code}' '#{res.message}': '#{res.body}'"
110 end
111
112 # TODO: add following functionality:
113 # if SGX gives back anything but response.code 200, retry 5 times-ish
114 # if SGX failed to respond after retries, email or other notify
115 # and back off with even longer retries (can't do anything else really)
116
117 exists = redis.exists(
118 "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}"
119 )
120 if exists
121 puts 'Translator terminated since archive message at index ' +
122 day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
123 " unexpectedly exists already, with retval '#{exists}'"
124 exit 2
125 end
126
127 # WARNING: since not atomic with incr() above, translator is singleton
128 rv1 = redis.setex(
129 "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}",
130 259200, new_json_blob
131 )
132 if rv1 != 'OK'
133 puts 'Translator terminated since archive message at index ' +
134 day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
135 " could not be saved, with return value '#{rv1}'"
136 exit 3
137 end
138
139 pending_len = redis.llen('pending_messages-' + ARGV[0])
140 if pending_len != 1
141 puts 'Translator terminated since pending queue (at index ' +
142 day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
143 ') has unexpected non-1 length of ' + pending_len.to_s
144 exit 4
145 end
146
147 pending_item = redis.rpop('pending_messages-' + ARGV[0])
148 if timestamps_plus_json_blob != pending_item
149 puts 'Translator terminated since archived item (at index ' +
150 day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
151 ") does not match pending item '#{pending_item}'"
152 exit 5
153 end
154end