1#!/usr/bin/env ruby
2#
3# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
4# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
5#
6# This file is part of sgx-bwmsgsv2.
7#
8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
20
21require 'blather/client/dsl'
22require 'em-hiredis'
23require 'em-http-request'
24require 'json'
25require 'securerandom'
26require 'time'
27require 'uri'
28require 'webrick'
29
30require 'goliath/api'
31require 'goliath/server'
32require 'log4r'
33
34require_relative 'em_promise'
35
36def panic(e)
37 puts "Shutting down gateway due to exception: #{e.message}"
38 puts e.backtrace
39 SGXbwmsgsv2.shutdown
40 puts 'Gateway has terminated.'
41 EM.stop
42end
43
44def extract_shortcode(dest)
45 num, context = dest.split(';', 2)
46 num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
47end
48
49def is_anonymous_tel?(dest)
50 num, context = dest.split(';', 2)
51 context && context == 'phone-context=anonymous.phone-context.soprani.ca'
52end
53
54class SGXClient < Blather::Client
55 def register_handler(type, *guards, &block)
56 super(type, *guards) { |*args| wrap_handler(*args, &block) }
57 end
58
59 def register_handler_before(type, *guards, &block)
60 check_handler(type, guards)
61 handler = lambda { |*args| wrap_handler(*args, &block) }
62
63 @handlers[type] ||= []
64 @handlers[type].unshift([guards, handler])
65 end
66
67protected
68
69 def wrap_handler(*args, &block)
70 v = block.call(*args)
71 v.catch(&method(:panic)) if v.is_a?(Promise)
72 true # Do not run other handlers unless throw :pass
73 rescue Exception => e
74 panic(e)
75 end
76end
77
78# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
79module CatapultSettingFlagBits
80 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
81 MMS_ON_OOB_URL = 1
82end
83
84module SGXbwmsgsv2
85 extend Blather::DSL
86
87 @client = SGXClient.new
88 @gateway_features = [
89 "http://jabber.org/protocol/disco#info",
90 "http://jabber.org/protocol/address/",
91 "jabber:iq:register"
92 ]
93
94 def self.run
95 client.run
96 end
97
98 # so classes outside this module can write messages, too
99 def self.write(stanza)
100 client.write(stanza)
101 end
102
103 def self.before_handler(type, *guards, &block)
104 client.register_handler_before(type, *guards, &block)
105 end
106
107 def self.send_media(from, to, media_url, desc=nil, subject=nil)
108 # we assume media_url is of the form (always the case so far):
109 # https://api.catapult.inetwork.com/v1/users/[uid]/media/[file]
110
111 # the caller must guarantee that 'to' is a bare JID
112 proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(to) + '/' +
113 media_url.split('/', 8)[7]
114
115 puts 'ORIG_URL: ' + media_url
116 puts 'PROX_URL: ' + proxy_url
117
118 # put URL in the body (so Conversations will still see it)...
119 msg = Blather::Stanza::Message.new(to, proxy_url)
120 msg.from = from
121 msg.subject = subject if subject
122
123 # ...but also provide URL in XEP-0066 (OOB) fashion
124 # TODO: confirm client supports OOB or don't send this
125 x = Nokogiri::XML::Node.new 'x', msg.document
126 x['xmlns'] = 'jabber:x:oob'
127
128 urln = Nokogiri::XML::Node.new 'url', msg.document
129 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
130 urln.add_child(urlc)
131 x.add_child(urln)
132
133 if desc
134 descn = Nokogiri::XML::Node.new('desc', msg.document)
135 descc = Nokogiri::XML::Text.new(desc, msg.document)
136 descn.add_child(descc)
137 x.add_child(descn)
138 end
139
140 msg.add_child(x)
141
142 write(msg)
143 rescue Exception => e
144 panic(e)
145 end
146
147 def self.error_msg(orig, query_node, type, name, text=nil)
148 orig.type = :error
149
150 error = Nokogiri::XML::Node.new 'error', orig.document
151 error['type'] = type
152 orig.add_child(error)
153
154 suberr = Nokogiri::XML::Node.new name, orig.document
155 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
156 error.add_child(suberr)
157
158 orig.add_child(query_node) if query_node
159
160 # TODO: add some explanatory xml:lang='en' text (see text param)
161 puts "RESPONSE3: #{orig.inspect}"
162 return orig
163 end
164
165 # workqueue_count MUST be 0 or else Blather uses threads!
166 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
167
168 def self.pass_on_message(m, users_num, jid)
169 # setup delivery receipt; similar to a reply
170 rcpt = ReceiptMessage.new(m.from.stripped)
171 rcpt.from = m.to
172
173 # pass original message (before sending receipt)
174 m.to = jid
175 m.from = "#{users_num}@#{ARGV[0]}"
176
177 puts 'XRESPONSE0: ' + m.inspect
178 write_to_stream m
179
180 # send a delivery receipt back to the sender
181 # TODO: send only when requested per XEP-0184
182 # TODO: pass receipts from target if supported
183
184 # TODO: put in member/instance variable
185 rcpt['id'] = SecureRandom.uuid
186 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
187 rcvd['xmlns'] = 'urn:xmpp:receipts'
188 rcvd['id'] = m.id
189 rcpt.add_child(rcvd)
190
191 puts 'XRESPONSE1: ' + rcpt.inspect
192 write_to_stream rcpt
193 end
194
195 def self.call_catapult(
196 token, secret, m, pth, body=nil,
197 head={}, code=[200], respond_with=:body
198 )
199 # TODO: need to make a separate thing for voice.bw.c eventually
200 EM::HttpRequest.new(
201 "https://messaging.bandwidth.com/#{pth}"
202 ).public_send(
203 m,
204 head: {
205 'Authorization' => [token, secret]
206 }.merge(head),
207 body: body
208 ).then { |http|
209 puts "API response to send: #{http.response} with code"\
210 " response.code #{http.response_header.status}"
211
212 if code.include?(http.response_header.status)
213 case respond_with
214 when :body
215 http.response
216 when :headers
217 http.response_header
218 else
219 http
220 end
221 else
222 EMPromise.reject(http.response_header.status)
223 end
224 }
225 end
226
227 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
228 usern)
229
230 xn = s.children.find { |v| v.element_name == "x" }
231 if not xn
232 to_catapult(s, nil, num_dest, user_id, token, secret,
233 usern)
234 return
235 end
236 puts "MMSOOB: found an x node - checking for url node..."
237
238 # TODO: also check for xmlns='jabber:x:oob' in <x/> - the below
239 # is probably fine, though, as non-OOB <x><url/></x> unlikely
240
241 un = xn.children.find { |v| v.element_name == "url" }
242 if not un
243 puts "MMSOOB: no url node found so process as normal"
244 to_catapult(s, nil, num_dest, user_id, token, secret,
245 usern)
246 return
247 end
248 puts "MMSOOB: found a url node - checking if to make MMS..."
249
250 REDIS.getbit("catapult_setting_flags-#{s.from.stripped}",
251 CatapultSettingFlagBits::MMS_ON_OOB_URL).then { |oob_on|
252
253 puts "MMSOOB: found MMS_ON_OOB_URL value is '#{oob_on}'"
254 if 0 == oob_on
255 puts "MMSOOB: MMS_ON_OOB_URL off so no MMS send"
256 to_catapult(s, nil, num_dest, user_id, token,
257 secret, usern)
258 next
259 end
260
261 # TODO: check size of file at un.text and shrink if need
262
263 body = s.respond_to?(:body) ? s.body : ''
264 puts "MMSOOB: url text is '#{un.text}'"
265 puts "MMSOOB: the body is '#{body.to_s.strip}'"
266
267 # some clients send URI in both body & <url/> so delete
268 if un.text == body.to_s.strip
269 puts "MMSOOB: url matches body so deleting body"
270 s.body = ''
271 end
272
273 puts "MMSOOB: sending MMS since found OOB & user asked"
274 to_catapult(s, un.text, num_dest, user_id, token,
275 secret, usern)
276 }
277 end
278
279 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
280 body = s.respond_to?(:body) ? s.body : ''
281 if murl.to_s.empty? && body.to_s.strip.empty?
282 return EMPromise.reject(
283 [:modify, 'policy-violation']
284 )
285 end
286
287 extra = {}
288 if murl
289 extra = { media: murl }
290 end
291
292 call_catapult(
293 token,
294 secret,
295 :post,
296 "api/v2/users/#{user_id}/messages",
297 JSON.dump(extra.merge(
298 from: usern,
299 to: num_dest,
300 text: body,
301 applicationId: ARGV[4],
302 tag:
303 # callbacks need id and resourcepart
304 WEBrick::HTTPUtils.escape(s.id.to_s) +
305 ' ' +
306 WEBrick::HTTPUtils.escape(
307 s.from.resource.to_s
308 )
309 )),
310 {'Content-Type' => 'application/json'},
311 [201]
312 ).catch {
313 # TODO: add text; mention code number
314 EMPromise.reject(
315 [:cancel, 'internal-server-error']
316 )
317 }
318
319 t = Time.now
320 tai_timestamp = `./tai`.strip
321 tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
322 puts "SMU %d.%09d, %s: msg for %s sent on %s - incrementing\n" %
323 [t.to_i, t.nsec, tai_timestamp, usern, tai_yyyymmdd]
324
325 REDIS.incr('usage_messages-' + tai_yyyymmdd + '-' +
326 usern).then { |total|
327
328 t = Time.now
329 puts "SMU %d.%09d: total msgs for %s-%s now at %s\n" %
330 [t.to_i, t.nsec, tai_yyyymmdd, usern, total]
331 }
332 end
333
334 def self.validate_num(num)
335 EMPromise.resolve(num.to_s).then { |num_dest|
336 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
337 next num_dest if num_dest[0] == '+'
338 shortcode = extract_shortcode(num_dest)
339 next shortcode if shortcode
340 end
341
342 if is_anonymous_tel?(num_dest)
343 EMPromise.reject([:cancel, 'gone'])
344 else
345 numbers = num_dest.split(',')
346 if numbers.length > 1
347 # TODO: validate each element of numbers
348
349 if numbers.sort != numbers
350 # TODO: send regular msg re err
351
352 # TODO: sorted JID in gone body
353 next EMPromise.reject([:cancel,
354 'gone'])
355 end
356
357 next numbers
358 end
359
360 # TODO: text re num not (yet) supportd/implmentd
361 EMPromise.reject([:cancel, 'item-not-found'])
362 end
363 }
364 end
365
366 def self.fetch_catapult_cred_for(jid)
367 cred_key = "catapult_cred-#{jid.stripped}"
368 REDIS.lrange(cred_key, 0, 3).then { |creds|
369 if creds.length < 4
370 # TODO: add text re credentials not registered
371 EMPromise.reject(
372 [:auth, 'registration-required']
373 )
374 else
375 creds
376 end
377 }
378 end
379
380 message :error? do |m|
381 # TODO: report it somewhere/somehow - eat for now so no err loop
382 puts "EATERROR1: #{m.inspect}"
383 end
384
385 message :body do |m|
386 EMPromise.all([
387 validate_num(m.to.node),
388 fetch_catapult_cred_for(m.from)
389 ]).then { |(num_dest, creds)|
390 jid_key = "catapult_jid-#{num_dest}"
391 REDIS.get(jid_key).then { |jid|
392 [jid, num_dest] + creds
393 }
394 }.then { |(jid, num_dest, *creds)|
395 # if destination user is in the system pass on directly
396 if jid
397 pass_on_message(m, creds.last, jid)
398 else
399 to_catapult_possible_oob(m, num_dest, *creds)
400 end
401 }.catch { |e|
402 if e.is_a?(Array) && e.length == 2
403 write_to_stream error_msg(m.reply, m.body, *e)
404 else
405 EMPromise.reject(e)
406 end
407 }
408 end
409
410 def self.user_cap_identities
411 [{category: 'client', type: 'sms'}]
412 end
413
414 # TODO: must re-add stuff so can do ad-hoc commands
415 def self.user_cap_features
416 [
417 "urn:xmpp:receipts",
418 ]
419 end
420
421 def self.add_gateway_feature(feature)
422 @gateway_features << feature
423 @gateway_features.uniq!
424 end
425
426 subscription :request? do |p|
427 puts "PRESENCE1: #{p.inspect}"
428
429 # subscriptions are allowed from anyone - send reply immediately
430 msg = Blather::Stanza::Presence.new
431 msg.to = p.from
432 msg.from = p.to
433 msg.type = :subscribed
434
435 puts 'RESPONSE5a: ' + msg.inspect
436 write_to_stream msg
437
438 # send a <presence> immediately; not automatically probed for it
439 # TODO: refactor so no "presence :probe? do |p|" duplicate below
440 caps = Blather::Stanza::Capabilities.new
441 # TODO: user a better node URI (?)
442 caps.node = 'http://catapult.sgx.soprani.ca/'
443 caps.identities = user_cap_identities
444 caps.features = user_cap_features
445
446 msg = caps.c
447 msg.to = p.from
448 msg.from = p.to.to_s + '/sgx'
449
450 puts 'RESPONSE5b: ' + msg.inspect
451 write_to_stream msg
452
453 # need to subscribe back so Conversations displays images inline
454 msg = Blather::Stanza::Presence.new
455 msg.to = p.from.to_s.split('/', 2)[0]
456 msg.from = p.to.to_s.split('/', 2)[0]
457 msg.type = :subscribe
458
459 puts 'RESPONSE5c: ' + msg.inspect
460 write_to_stream msg
461 end
462
463 presence :probe? do |p|
464 puts 'PRESENCE2: ' + p.inspect
465
466 caps = Blather::Stanza::Capabilities.new
467 # TODO: user a better node URI (?)
468 caps.node = 'http://catapult.sgx.soprani.ca/'
469 caps.identities = user_cap_identities
470 caps.features = user_cap_features
471
472 msg = caps.c
473 msg.to = p.from
474 msg.from = p.to.to_s + '/sgx'
475
476 puts 'RESPONSE6: ' + msg.inspect
477 write_to_stream msg
478 end
479
480 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
481 # respond to capabilities request for an sgx-bwmsgsv2 number JID
482 if i.to.node
483 # TODO: confirm the node URL is expected using below
484 #puts "XR[node]: #{xpath_result[0]['node']}"
485
486 msg = i.reply
487 msg.node = i.node
488 msg.identities = user_cap_identities
489 msg.features = user_cap_features
490
491 puts 'RESPONSE7: ' + msg.inspect
492 write_to_stream msg
493 next
494 end
495
496 # respond to capabilities request for sgx-bwmsgsv2 itself
497 msg = i.reply
498 msg.node = i.node
499 msg.identities = [{
500 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
501 type: 'sms', category: 'gateway'
502 }]
503 msg.features = @gateway_features
504 write_to_stream msg
505 end
506
507 def self.check_then_register(i, *creds)
508 jid_key = "catapult_jid-#{creds.last}"
509 bare_jid = i.from.stripped
510 cred_key = "catapult_cred-#{bare_jid}"
511
512 REDIS.get(jid_key).then { |existing_jid|
513 if existing_jid && existing_jid != bare_jid
514 # TODO: add/log text: credentials exist already
515 EMPromise.reject([:cancel, 'conflict'])
516 end
517 }.then {
518 REDIS.lrange(cred_key, 0, 3)
519 }.then { |existing_creds|
520 # TODO: add/log text: credentials exist already
521 if existing_creds.length == 4 && creds != existing_creds
522 EMPromise.reject([:cancel, 'conflict'])
523 elsif existing_creds.length < 4
524 REDIS.rpush(cred_key, *creds).then { |length|
525 if length != 4
526 EMPromise.reject([
527 :cancel,
528 'internal-server-error'
529 ])
530 end
531 }
532 end
533 }.then {
534 # not necessary if existing_jid non-nil, easier this way
535 REDIS.set(jid_key, bare_jid)
536 }.then { |result|
537 if result != 'OK'
538 # TODO: add txt re push failure
539 EMPromise.reject(
540 [:cancel, 'internal-server-error']
541 )
542 end
543 }.then {
544 write_to_stream i.reply
545 }
546 end
547
548 def self.creds_from_registration_query(qn)
549 xn = qn.children.find { |v| v.element_name == "x" }
550
551 if xn
552 xn.children.each_with_object({}) do |field, h|
553 next if field.element_name != "field"
554 val = field.children.find { |v|
555 v.element_name == "value"
556 }
557
558 case field['var']
559 when 'nick'
560 h[:user_id] = val.text
561 when 'username'
562 h[:api_token] = val.text
563 when 'password'
564 h[:api_secret] = val.text
565 when 'phone'
566 h[:phone_num] = val.text
567 else
568 # TODO: error
569 puts "?: #{field['var']}"
570 end
571 end
572 else
573 qn.children.each_with_object({}) do |field, h|
574 case field.element_name
575 when "nick"
576 h[:user_id] = field.text
577 when "username"
578 h[:api_token] = field.text
579 when "password"
580 h[:api_secret] = field.text
581 when "phone"
582 h[:phone_num] = field.text
583 end
584 end
585 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
586 end
587
588 def self.process_registration(i, qn)
589 EMPromise.resolve(
590 qn.children.find { |v| v.element_name == "remove" }
591 ).then { |rn|
592 if rn
593 puts "received <remove/> - ignoring for now..."
594 EMPromise.reject(:done)
595 else
596 creds_from_registration_query(qn)
597 end
598 }.then { |user_id, api_token, api_secret, phone_num|
599 if phone_num[0] == '+'
600 [user_id, api_token, api_secret, phone_num]
601 else
602 # TODO: add text re number not (yet) supported
603 EMPromise.reject([:cancel, 'item-not-found'])
604 end
605 }.then { |user_id, api_token, api_secret, phone_num|
606 # TODO: find way to verify #{phone_num}, too
607 call_catapult(
608 api_token,
609 api_secret,
610 :get,
611 "api/v2/users/#{user_id}/media"
612 ).then { |response|
613 params = JSON.parse(response)
614 # TODO: confirm params is array - could be empty
615
616 puts "register got str #{response.to_s[0..999]}"
617
618 check_then_register(
619 i,
620 user_id,
621 api_token,
622 api_secret,
623 phone_num
624 )
625 }
626 }.catch { |e|
627 EMPromise.reject(case e
628 when 401
629 # TODO: add text re bad credentials
630 [:auth, 'not-authorized']
631 when 404
632 # TODO: add text re number not found or disabled
633 [:cancel, 'item-not-found']
634 when Integer
635 [:modify, 'not-acceptable']
636 else
637 e
638 end)
639 }
640 end
641
642 def self.registration_form(orig, existing_number=nil)
643 msg = Nokogiri::XML::Node.new 'query', orig.document
644 msg['xmlns'] = 'jabber:iq:register'
645
646 if existing_number
647 msg.add_child(
648 Nokogiri::XML::Node.new(
649 'registered', msg.document
650 )
651 )
652 end
653
654 # TODO: update "User Id" x2 below (to "accountId"?), and others?
655 n1 = Nokogiri::XML::Node.new(
656 'instructions', msg.document
657 )
658 n1.content = "Enter the information from your Account "\
659 "page as well as the Phone Number\nin your "\
660 "account you want to use (ie. '+12345678901')"\
661 ".\nUser Id is nick, API Token is username, "\
662 "API Secret is password, Phone Number is phone"\
663 ".\n\nThe source code for this gateway is at "\
664 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
665 "\nCopyright (C) 2017-2020 Denver Gingerich "\
666 "and others, licensed under AGPLv3+."
667 n2 = Nokogiri::XML::Node.new 'nick', msg.document
668 n3 = Nokogiri::XML::Node.new 'username', msg.document
669 n4 = Nokogiri::XML::Node.new 'password', msg.document
670 n5 = Nokogiri::XML::Node.new 'phone', msg.document
671 n5.content = existing_number.to_s
672 msg.add_child(n1)
673 msg.add_child(n2)
674 msg.add_child(n3)
675 msg.add_child(n4)
676 msg.add_child(n5)
677
678 x = Blather::Stanza::X.new :form, [
679 {
680 required: true, type: :"text-single",
681 label: 'User Id', var: 'nick'
682 },
683 {
684 required: true, type: :"text-single",
685 label: 'API Token', var: 'username'
686 },
687 {
688 required: true, type: :"text-private",
689 label: 'API Secret', var: 'password'
690 },
691 {
692 required: true, type: :"text-single",
693 label: 'Phone Number', var: 'phone',
694 value: existing_number.to_s
695 }
696 ]
697 x.title = 'Register for '\
698 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
699 x.instructions = "Enter the details from your Account "\
700 "page as well as the Phone Number\nin your "\
701 "account you want to use (ie. '+12345678901')"\
702 ".\n\nThe source code for this gateway is at "\
703 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
704 "\nCopyright (C) 2017-2020 Denver Gingerich "\
705 "and others, licensed under AGPLv3+."
706 msg.add_child(x)
707
708 orig.add_child(msg)
709
710 return orig
711 end
712
713 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
714 puts "IQ: #{i.inspect}"
715
716 case i.type
717 when :set
718 process_registration(i, qn)
719 when :get
720 bare_jid = i.from.stripped
721 cred_key = "catapult_cred-#{bare_jid}"
722 REDIS.lindex(cred_key, 3).then { |existing_number|
723 reply = registration_form(i.reply, existing_number)
724 puts "RESPONSE2: #{reply.inspect}"
725 write_to_stream reply
726 }
727 else
728 # Unknown IQ, ignore for now
729 EMPromise.reject(:done)
730 end.catch { |e|
731 if e.is_a?(Array) && e.length == 2
732 write_to_stream error_msg(i.reply, qn, *e)
733 elsif e != :done
734 EMPromise.reject(e)
735 end
736 }.catch(&method(:panic))
737 end
738
739 iq :get? do |i|
740 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
741 end
742
743 iq :set? do |i|
744 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
745 end
746end
747
748class ReceiptMessage < Blather::Stanza
749 def self.new(to=nil)
750 node = super :message
751 node.to = to
752 node
753 end
754end
755
756class WebhookHandler < Goliath::API
757 use Goliath::Rack::Params
758
759 def response(env)
760 # TODO: add timestamp grab here, and MUST include ./tai version
761
762 puts 'ENV: ' + env.reject{ |k| k == 'params' }.to_s
763
764 users_num = ''
765 others_num = ''
766 if params['direction'] == 'in'
767 users_num = params['to']
768 others_num = params['from']
769 elsif params['direction'] == 'out'
770 users_num = params['from']
771 others_num = params['to']
772 else
773 # TODO: exception or similar
774 puts "big problem: '" + params['direction'] + "'" + body
775 return [200, {}, "OK"]
776 end
777
778 puts 'BODY - messageId: ' + params['messageId'] +
779 ', eventType: ' + params['eventType'] +
780 ', time: ' + params['time'] +
781 ', direction: ' + params['direction'] +
782 ', state: ' + params['state'] +
783 ', deliveryState: ' + (params['deliveryState'] ?
784 params['deliveryState'] : 'NONE') +
785 ', deliveryCode: ' + (params['deliveryCode'] ?
786 params['deliveryCode'] : 'NONE') +
787 ', deliveryDesc: ' + (params['deliveryDescription'] ?
788 params['deliveryDescription'] : 'NONE') +
789 ', tag: ' + (params['tag'] ? params['tag'] : 'NONE') +
790 ', media: ' + (params['media'] ? params['media'].to_s :
791 'NONE')
792
793 if others_num[0] != '+'
794 # TODO: check that others_num actually a shortcode first
795 others_num +=
796 ';phone-context=ca-us.phone-context.soprani.ca'
797 end
798
799 jid_key = "catapult_jid-#{users_num}"
800 bare_jid = REDIS.get(jid_key).promise.sync
801
802 if !bare_jid
803 puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
804
805 # TODO: likely not appropriate; give error to BW API?
806 # TODO: add text re credentials not being registered
807 #write_to_stream error_msg(m.reply, m.body, :auth,
808 # 'registration-required')
809 return [200, {}, "OK"]
810 end
811
812 msg = ''
813 case params['direction']
814 when 'in'
815 text = ''
816 case params['eventType']
817 when 'sms'
818 text = params['text']
819 when 'mms'
820 has_media = false
821 params['media'].each do |media_url|
822 if not media_url.end_with?(
823 '.smil', '.txt', '.xml'
824 )
825
826 has_media = true
827 SGXbwmsgsv2.send_media(
828 others_num + '@' +
829 ARGV[0],
830 bare_jid, media_url
831 )
832 end
833 end
834
835 if params['text'].empty?
836 if not has_media
837 text = '[suspected group msg '\
838 'with no text (odd)]'
839 end
840 else
841 text = if has_media
842 # TODO: write/use a caption XEP
843 params['text']
844 else
845 '[suspected group msg '\
846 '(recipient list not '\
847 'available) with '\
848 'following text] ' +
849 params['text']
850 end
851 end
852
853 # ie. if text param non-empty or had no media
854 if not text.empty?
855 msg = Blather::Stanza::Message.new(
856 bare_jid, text)
857 msg.from = others_num + '@' + ARGV[0]
858 SGXbwmsgsv2.write(msg)
859 end
860
861 return [200, {}, "OK"]
862 else
863 text = "unknown type (#{params['eventType']})"\
864 " with text: " + params['text']
865
866 # TODO: log/notify of this properly
867 puts text
868 end
869
870 msg = Blather::Stanza::Message.new(bare_jid, text)
871 else # per prior switch, this is: params['direction'] == 'out'
872 tag_parts = params['tag'].split(/ /, 2)
873 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
874 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
875
876 case params['deliveryState']
877 when 'not-delivered'
878 # create a bare message like the one user sent
879 msg = Blather::Stanza::Message.new(
880 others_num + '@' + ARGV[0])
881 msg.from = bare_jid + '/' + resourcepart
882 msg['id'] = id
883
884 # create an error reply to the bare message
885 msg = Blather::StanzaError.new(
886 msg,
887 'recipient-unavailable',
888 :wait
889 ).to_node
890 when 'delivered'
891 msg = ReceiptMessage.new(bare_jid)
892
893 # TODO: put in member/instance variable
894 msg['id'] = SecureRandom.uuid
895
896 # TODO: send only when requested per XEP-0184
897 rcvd = Nokogiri::XML::Node.new(
898 'received',
899 msg.document
900 )
901 rcvd['xmlns'] = 'urn:xmpp:receipts'
902 rcvd['id'] = id
903 msg.add_child(rcvd)
904 when 'waiting'
905 # can't really do anything with it; nice to know
906 puts "message with id #{id} waiting"
907 return [200, {}, "OK"]
908 else
909 # TODO: notify somehow of unknown state receivd?
910 puts "message with id #{id} has "\
911 "other state #{params['deliveryState']}"
912 return [200, {}, "OK"]
913 end
914
915 puts "RESPONSE4: #{msg.inspect}"
916 end
917
918 msg.from = others_num + '@' + ARGV[0]
919 SGXbwmsgsv2.write(msg)
920
921 [200, {}, "OK"]
922
923 rescue Exception => e
924 puts 'Shutting down gateway due to exception 013: ' + e.message
925 SGXbwmsgsv2.shutdown
926 puts 'Gateway has terminated.'
927 EM.stop
928 end
929end
930
931at_exit do
932 $stdout.sync = true
933
934 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
935 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
936
937 if ARGV.size != 7
938 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
939 "<component_password> <server_hostname> "\
940 "<server_port> <application_id> "\
941 "<http_listen_port> <mms_proxy_prefix_url>"
942 exit 0
943 end
944
945 t = Time.now
946 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
947
948 EM.run do
949 REDIS = EM::Hiredis.connect
950
951 SGXbwmsgsv2.run
952
953 # required when using Prosody otherwise disconnects on 6-hour inactivity
954 EM.add_periodic_timer(3600) do
955 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
956 msg.from = ARGV[0]
957 SGXbwmsgsv2.write(msg)
958 end
959
960 server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
961 server.api = WebhookHandler.new
962 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
963 server.logger = Log4r::Logger.new('goliath')
964 server.logger.add(Log4r::StdoutOutputter.new('console'))
965 server.logger.level = Log4r::INFO
966 server.start do
967 ["INT", "TERM"].each do |sig|
968 trap(sig) do
969 EM.defer do
970 puts 'Shutting down gateway...'
971 SGXbwmsgsv2.shutdown
972
973 puts 'Gateway has terminated.'
974 EM.stop
975 end
976 end
977 end
978 end
979 end
980end