diff --git a/r2s-bwmsgsv2.rb b/r2s-bwmsgsv2.rb index 4122153b3af6f696583ca38d5d82de58865a4a27..a88487e9e95b7d778e788b50be1fde5b0ed5265e 100755 --- a/r2s-bwmsgsv2.rb +++ b/r2s-bwmsgsv2.rb @@ -47,6 +47,13 @@ puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec] redis = Redis.new(:driver => :hiredis) +pending_len = redis.llen('pending_messages-' + ARGV[0]) +if pending_len != 0 + puts 'Translator terminated due to non-empty pending queue of size ' + + pending_len.to_s + exit 1 +end + while true timestamps_plus_json_blob = redis.brpoplpush('incoming_messages-' + ARGV[0], 'pending_messages-' + ARGV[0]) @@ -57,6 +64,7 @@ while true puts "LOG %d.%09d, %s: msg [TODO: ID] sent on %s - incrementing\n" % [t.to_i, t.nsec, tai_timestamp, tai_yyyymmdd] + # WARNING: since not atomic with setex() below, translator is singleton day_msg_count = redis.incr( "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-total" ) @@ -65,13 +73,11 @@ while true puts "LOG %d.%09d: total msgs for %s-%s now at %s\n" % [t.to_i, t.nsec, tai_yyyymmdd, ARGV[0], day_msg_count] - # TODO: do something with day_msg_count - # TODO: print less stuff in here puts "TODO - got some stuff: " + timestamps_plus_json_blob puts "TODO - daymsgcount: '#{day_msg_count}'" - # add some timestamps to timestamps_plus_json_blob (our own) + # TODO: add some timestamps to timestamps_plus_json_blob (our own) # TODO: fix so sending timestamps_plus_json_blob to SGX, and via POST response = Net::HTTP.get_response( @@ -86,17 +92,41 @@ while true # if got HTTP 200, then: - # TODO: "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}"? - # if exists, then fail, and leave thing on pending queue (not atomic) + exists = redis.exists( + "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}" + ) + if exists + puts 'Translator terminated since archive message at index ' + + day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s + + " unexpectedly exists already, with retval '#{exists}'" + exit 2 + end + # WARNING: since not atomic with incr() above, translator is singleton rv1 = redis.setex( "archived_message-#{ARGV[0]}-#{tai_yyyymmdd}-#{day_msg_count}", 259200, timestamps_plus_json_blob ) - puts "TODO - rv1: " + rv1 - # TODO: confirm rv1 == 'OK' + if rv1 != 'OK' + puts 'Translator terminated since archive message at index ' + + day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s + + " could not be saved, with return value '#{rv1}'" + exit 3 + end + + pending_len = redis.llen('pending_messages-' + ARGV[0]) + if pending_len != 1 + puts 'Translator terminated since pending queue (at index ' + + day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s + + ') has unexpected non-1 length of ' + pending_len.to_s + exit 4 + end - # confirm pending key has one element and e == timestamps_plus_json_blob - # if so, then rpop pending_messages-#{ARGV[0]} and discard - # if not, then email or other notify, and probably exit with failure + pending_item = redis.rpop('pending_messages-' + ARGV[0]) + if timestamps_plus_json_blob != pending_item + puts 'Translator terminated since archived item (at index ' + + day_msg_count.to_s + ' for day ' + tai_yyyymmdd.to_s + + ") does not match pending item '#{pending_item}'" + exit 5 + end end