1#!/usr/bin/env ruby
2#
3# Copyright (C) 2020 Denver Gingerich <denver@ossguy.com>
4#
5# This file is part of sgx-bwmsgsv2.
6#
7# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
8# the terms of the GNU Affero General Public License as published by the Free
9# Software Foundation, either version 3 of the License, or (at your option) any
10# later version.
11#
12# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
13# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
15# details.
16#
17# You should have received a copy of the GNU Affero General Public License along
18# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
19
20require 'redis'
21require 'hiredis'
22require 'net/http'
23require 'json'
24require 'time'
25
26$stdout.sync = true
27
28puts "Redis queue to SGX HTTP request translator - for Bandwidth API V2 SGX\n"\
29 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
30
31if ARGV.size != 3
32 # note that <redis_queue_suffix> should match h2r-bwmsgsv2's $queue_name
33 puts "Usage: r2s-bwmsgsv2.rb <redis_queue_suffix> <sgx_hostname> "\
34 "<sgx_https_port>"
35 exit 0
36end
37
38["INT", "TERM"].each do |sig|
39 trap(sig) do
40 puts "Translator has terminated due to SIG#{sig}."
41 exit 0
42 end
43end
44
45t = Time.now
46puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
47
48redis = Redis.new(:driver => :hiredis)
49
50pending_len = redis.llen('pending_messages-' + ARGV[0])
51if pending_len != 0
52 puts 'Translator terminated due to non-empty pending queue of size ' +
53 pending_len.to_s
54 exit 1
55end
56
57while true
58 timestamps_plus_json_blob = redis.brpoplpush('incoming_messages-' +
59 ARGV[0], 'pending_messages-' + ARGV[0])
60
61 t = Time.now
62 tai_timestamp = `./tai`.strip
63 tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
64 puts "LOG %d.%09d, %s: msg [TODO: ID] sent on %s - incrementing\n" %
65 [t.to_i, t.nsec, tai_timestamp, tai_yyyymmdd]
66
67 # WARNING: since not atomic with setex() below, translator is singleton
68 day_msg_count = redis.incr(
69 "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-total"
70 )
71
72 t = Time.now
73 puts "LOG %d.%09d: total msgs for %s-%s now at %s\n" %
74 [t.to_i, t.nsec, tai_yyyymmdd, ARGV[0], day_msg_count]
75
76 # TODO: print less stuff in here
77 puts "TODO - got some stuff: " + timestamps_plus_json_blob
78 puts "TODO - daymsgcount: '#{day_msg_count}'"
79
80 # TODO: add some timestamps to timestamps_plus_json_blob (our own)
81
82 # TODO: fix so sending timestamps_plus_json_blob to SGX, and via POST
83 response = Net::HTTP.get_response(
84 URI("https://#{ARGV[1]}:#{ARGV[2]}/r2tst.php")
85 )
86
87 puts "TODO - response.code = #{response.code} and body #{response.body}"
88
89 # if SGX gives back anything but response.code 200, retry 5 times-ish
90 # if SGX failed to respond after retries, email or other notify
91 # and back off with even longer retries (can't do anything else really)
92
93 # if got HTTP 200, then:
94
95 exists = redis.exists(
96 "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}"
97 )
98 if exists
99 puts 'Translator terminated since archive message at index ' +
100 day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
101 " unexpectedly exists already, with retval '#{exists}'"
102 exit 2
103 end
104
105 # WARNING: since not atomic with incr() above, translator is singleton
106 rv1 = redis.setex(
107 "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}",
108 259200, timestamps_plus_json_blob
109 )
110 if rv1 != 'OK'
111 puts 'Translator terminated since archive message at index ' +
112 day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
113 " could not be saved, with return value '#{rv1}'"
114 exit 3
115 end
116
117 pending_len = redis.llen('pending_messages-' + ARGV[0])
118 if pending_len != 1
119 puts 'Translator terminated since pending queue (at index ' +
120 day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
121 ') has unexpected non-1 length of ' + pending_len.to_s
122 exit 4
123 end
124
125 pending_item = redis.rpop('pending_messages-' + ARGV[0])
126 if timestamps_plus_json_blob != pending_item
127 puts 'Translator terminated since archived item (at index ' +
128 day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s +
129 ") does not match pending item '#{pending_item}'"
130 exit 5
131 end
132end