1#!/usr/bin/env ruby
2#
3# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
4# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
5#
6# This file is part of sgx-bwmsgsv2.
7#
8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
20
21require 'blather/client/dsl'
22require 'em-hiredis'
23require 'em-http-request'
24require 'json'
25require 'securerandom'
26require 'time'
27require 'uri'
28require 'webrick'
29
30require 'goliath/api'
31require 'goliath/server'
32require 'log4r'
33
34require_relative 'em_promise'
35
36def panic(e)
37 puts "Shutting down gateway due to exception: #{e.message}"
38 puts e.backtrace
39 SGXbwmsgsv2.shutdown
40 puts 'Gateway has terminated.'
41 EM.stop
42end
43
44def extract_shortcode(dest)
45 num, context = dest.split(';', 2)
46 num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
47end
48
49def is_anonymous_tel?(dest)
50 num, context = dest.split(';', 2)
51 context && context == 'phone-context=anonymous.phone-context.soprani.ca'
52end
53
54class SGXClient < Blather::Client
55 def register_handler(type, *guards, &block)
56 super(type, *guards) { |*args| wrap_handler(*args, &block) }
57 end
58
59 def register_handler_before(type, *guards, &block)
60 check_handler(type, guards)
61 handler = lambda { |*args| wrap_handler(*args, &block) }
62
63 @handlers[type] ||= []
64 @handlers[type].unshift([guards, handler])
65 end
66
67protected
68
69 def wrap_handler(*args, &block)
70 v = block.call(*args)
71 v.catch(&method(:panic)) if v.is_a?(Promise)
72 true # Do not run other handlers unless throw :pass
73 rescue Exception => e
74 panic(e)
75 end
76end
77
78# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
79module CatapultSettingFlagBits
80 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
81 MMS_ON_OOB_URL = 1
82end
83
84module SGXbwmsgsv2
85 extend Blather::DSL
86
87 @client = SGXClient.new
88 @gateway_features = [
89 "http://jabber.org/protocol/disco#info",
90 "http://jabber.org/protocol/address/",
91 "jabber:iq:register"
92 ]
93
94 def self.run
95 client.run
96 end
97
98 # so classes outside this module can write messages, too
99 def self.write(stanza)
100 client.write(stanza)
101 end
102
103 def self.before_handler(type, *guards, &block)
104 client.register_handler_before(type, *guards, &block)
105 end
106
107 def self.send_media(from, to, media_url, desc=nil, subject=nil)
108 # we assume media_url is of the form (always the case so far):
109 # https://api.catapult.inetwork.com/v1/users/[uid]/media/[file]
110
111 # the caller must guarantee that 'to' is a bare JID
112 proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(to) + '/' +
113 media_url.split('/', 8)[7]
114
115 puts 'ORIG_URL: ' + media_url
116 puts 'PROX_URL: ' + proxy_url
117
118 # put URL in the body (so Conversations will still see it)...
119 msg = Blather::Stanza::Message.new(to, proxy_url)
120 msg.from = from
121 msg.subject = subject if subject
122
123 # ...but also provide URL in XEP-0066 (OOB) fashion
124 # TODO: confirm client supports OOB or don't send this
125 x = Nokogiri::XML::Node.new 'x', msg.document
126 x['xmlns'] = 'jabber:x:oob'
127
128 urln = Nokogiri::XML::Node.new 'url', msg.document
129 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
130 urln.add_child(urlc)
131 x.add_child(urln)
132
133 if desc
134 descn = Nokogiri::XML::Node.new('desc', msg.document)
135 descc = Nokogiri::XML::Text.new(desc, msg.document)
136 descn.add_child(descc)
137 x.add_child(descn)
138 end
139
140 msg.add_child(x)
141
142 write(msg)
143 rescue Exception => e
144 panic(e)
145 end
146
147 def self.error_msg(orig, query_node, type, name, text=nil)
148 orig.type = :error
149
150 error = Nokogiri::XML::Node.new 'error', orig.document
151 error['type'] = type
152 orig.add_child(error)
153
154 suberr = Nokogiri::XML::Node.new name, orig.document
155 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
156 error.add_child(suberr)
157
158 orig.add_child(query_node) if query_node
159
160 # TODO: add some explanatory xml:lang='en' text (see text param)
161 puts "RESPONSE3: #{orig.inspect}"
162 return orig
163 end
164
165 # workqueue_count MUST be 0 or else Blather uses threads!
166 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
167
168 def self.pass_on_message(m, users_num, jid)
169 # setup delivery receipt; similar to a reply
170 rcpt = ReceiptMessage.new(m.from.stripped)
171 rcpt.from = m.to
172
173 # pass original message (before sending receipt)
174 m.to = jid
175 m.from = "#{users_num}@#{ARGV[0]}"
176
177 puts 'XRESPONSE0: ' + m.inspect
178 write_to_stream m
179
180 # send a delivery receipt back to the sender
181 # TODO: send only when requested per XEP-0184
182 # TODO: pass receipts from target if supported
183
184 # TODO: put in member/instance variable
185 rcpt['id'] = SecureRandom.uuid
186 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
187 rcvd['xmlns'] = 'urn:xmpp:receipts'
188 rcvd['id'] = m.id
189 rcpt.add_child(rcvd)
190
191 puts 'XRESPONSE1: ' + rcpt.inspect
192 write_to_stream rcpt
193 end
194
195 def self.call_catapult(
196 token, secret, m, pth, body=nil,
197 head={}, code=[200], respond_with=:body
198 )
199 # TODO: need to make a separate thing for voice.bw.c eventually
200 EM::HttpRequest.new(
201 "https://messaging.bandwidth.com/#{pth}"
202 ).public_send(
203 m,
204 head: {
205 'Authorization' => [token, secret]
206 }.merge(head),
207 body: body
208 ).then { |http|
209 puts "API response to send: #{http.response} with code"\
210 " response.code #{http.response_header.status}"
211
212 if code.include?(http.response_header.status)
213 case respond_with
214 when :body
215 http.response
216 when :headers
217 http.response_header
218 else
219 http
220 end
221 else
222 EMPromise.reject(http.response_header.status)
223 end
224 }
225 end
226
227 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
228 usern)
229
230 xn = s.children.find { |v| v.element_name == "x" }
231 if not xn
232 to_catapult(s, nil, num_dest, user_id, token, secret,
233 usern)
234 return
235 end
236 puts "MMSOOB: found an x node - checking for url node..."
237
238 # TODO: also check for xmlns='jabber:x:oob' in <x/> - the below
239 # is probably fine, though, as non-OOB <x><url/></x> unlikely
240
241 un = xn.children.find { |v| v.element_name == "url" }
242 if not un
243 puts "MMSOOB: no url node found so process as normal"
244 to_catapult(s, nil, num_dest, user_id, token, secret,
245 usern)
246 return
247 end
248 puts "MMSOOB: found a url node - checking if to make MMS..."
249
250 REDIS.getbit("catapult_setting_flags-#{s.from.stripped}",
251 CatapultSettingFlagBits::MMS_ON_OOB_URL).then { |oob_on|
252
253 puts "MMSOOB: found MMS_ON_OOB_URL value is '#{oob_on}'"
254 if 0 == oob_on
255 puts "MMSOOB: MMS_ON_OOB_URL off so no MMS send"
256 to_catapult(s, nil, num_dest, user_id, token,
257 secret, usern)
258 next
259 end
260
261 # TODO: check size of file at un.text and shrink if need
262
263 body = s.respond_to?(:body) ? s.body : ''
264 puts "MMSOOB: url text is '#{un.text}'"
265 puts "MMSOOB: the body is '#{body.to_s.strip}'"
266
267 # some clients send URI in both body & <url/> so delete
268 if un.text == body.to_s.strip
269 puts "MMSOOB: url matches body so deleting body"
270 s.body = ''
271 end
272
273 puts "MMSOOB: sending MMS since found OOB & user asked"
274 to_catapult(s, un.text, num_dest, user_id, token,
275 secret, usern)
276 }
277 end
278
279 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
280 body = s.respond_to?(:body) ? s.body : ''
281 if murl.to_s.empty? && body.to_s.strip.empty?
282 return EMPromise.reject(
283 [:modify, 'policy-violation']
284 )
285 end
286
287 extra = {}
288 if murl
289 extra = { media: murl }
290 end
291
292 call_catapult(
293 token,
294 secret,
295 :post,
296 "api/v2/users/#{user_id}/messages",
297 JSON.dump(extra.merge(
298 from: usern,
299 to: num_dest,
300 text: body,
301 applicationId: ARGV[4],
302 tag:
303 # callbacks need id and resourcepart
304 WEBrick::HTTPUtils.escape(s.id.to_s) +
305 ' ' +
306 WEBrick::HTTPUtils.escape(
307 s.from.resource.to_s
308 )
309 )),
310 {'Content-Type' => 'application/json'},
311 [201]
312 ).catch {
313 # TODO: add text; mention code number
314 EMPromise.reject(
315 [:cancel, 'internal-server-error']
316 )
317 }
318
319 t = Time.now
320 tai_timestamp = `./tai`.strip
321 tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
322 puts "SMU %d.%09d, %s: msg for %s sent on %s - incrementing\n" %
323 [t.to_i, t.nsec, tai_timestamp, usern, tai_yyyymmdd]
324
325 REDIS.incr('usage_messages-' + tai_yyyymmdd + '-' +
326 usern).then { |total|
327
328 t = Time.now
329 puts "SMU %d.%09d: total msgs for %s-%s now at %s\n" %
330 [t.to_i, t.nsec, tai_yyyymmdd, usern, total]
331 }
332 end
333
334 def self.validate_num(m)
335 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
336 if m.to == ARGV[0]
337 an = m.children.find { |v| v.element_name == "addresses"
338 }
339 if not an
340 return EMPromise.reject([:cancel,
341 'item-not-found'])
342 end
343 puts "ADRXEP: found an addresses node - iterate addrs.."
344
345 nums = []
346 an.children.each do |e|
347 num = ''
348 type = ''
349 e.attributes.each do |c|
350 if c[0] == 'type'
351 if c[1] != 'to'
352 # TODO: error
353 end
354 type = c[1].to_s
355 elsif c[0] == 'uri'
356 if !c[1].to_s.start_with? 'sms:'
357 # TODO: error
358 end
359 num = c[1].to_s[4..-1]
360 # TODO: confirm num validates
361 else
362 # TODO: error - unexpected name
363 end
364 end
365 if num.empty? or type.empty?
366 # TODO: error
367 end
368 nums << num
369 end
370 return nums
371 end
372
373 # if not sent to SGX domain, then assume destination is in 'to'
374 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
375 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
376 next num_dest if num_dest[0] == '+'
377 shortcode = extract_shortcode(num_dest)
378 next shortcode if shortcode
379 end
380
381 if is_anonymous_tel?(num_dest)
382 EMPromise.reject([:cancel, 'gone'])
383 else
384 numbers = num_dest.split(',')
385 if numbers.length > 1
386 # TODO: validate each element of numbers
387
388 if numbers.sort != numbers
389 # TODO: send regular msg re err
390
391 # TODO: sorted JID in gone body
392 next EMPromise.reject([:cancel,
393 'gone'])
394 end
395
396 next numbers
397 end
398
399 # TODO: text re num not (yet) supportd/implmentd
400 EMPromise.reject([:cancel, 'item-not-found'])
401 end
402 }
403 end
404
405 def self.fetch_catapult_cred_for(jid)
406 cred_key = "catapult_cred-#{jid.stripped}"
407 REDIS.lrange(cred_key, 0, 3).then { |creds|
408 if creds.length < 4
409 # TODO: add text re credentials not registered
410 EMPromise.reject(
411 [:auth, 'registration-required']
412 )
413 else
414 creds
415 end
416 }
417 end
418
419 message :error? do |m|
420 # TODO: report it somewhere/somehow - eat for now so no err loop
421 puts "EATERROR1: #{m.inspect}"
422 end
423
424 message :body do |m|
425 EMPromise.all([
426 validate_num(m),
427 fetch_catapult_cred_for(m.from)
428 ]).then { |(num_dest, creds)|
429 jid_key = "catapult_jid-#{num_dest}"
430 REDIS.get(jid_key).then { |jid|
431 [jid, num_dest] + creds
432 }
433 }.then { |(jid, num_dest, *creds)|
434 # if destination user is in the system pass on directly
435 if jid
436 pass_on_message(m, creds.last, jid)
437 else
438 to_catapult_possible_oob(m, num_dest, *creds)
439 end
440 }.catch { |e|
441 if e.is_a?(Array) && e.length == 2
442 write_to_stream error_msg(m.reply, m.body, *e)
443 else
444 EMPromise.reject(e)
445 end
446 }
447 end
448
449 def self.user_cap_identities
450 [{category: 'client', type: 'sms'}]
451 end
452
453 # TODO: must re-add stuff so can do ad-hoc commands
454 def self.user_cap_features
455 [
456 "urn:xmpp:receipts",
457 ]
458 end
459
460 def self.add_gateway_feature(feature)
461 @gateway_features << feature
462 @gateway_features.uniq!
463 end
464
465 subscription :request? do |p|
466 puts "PRESENCE1: #{p.inspect}"
467
468 # subscriptions are allowed from anyone - send reply immediately
469 msg = Blather::Stanza::Presence.new
470 msg.to = p.from
471 msg.from = p.to
472 msg.type = :subscribed
473
474 puts 'RESPONSE5a: ' + msg.inspect
475 write_to_stream msg
476
477 # send a <presence> immediately; not automatically probed for it
478 # TODO: refactor so no "presence :probe? do |p|" duplicate below
479 caps = Blather::Stanza::Capabilities.new
480 # TODO: user a better node URI (?)
481 caps.node = 'http://catapult.sgx.soprani.ca/'
482 caps.identities = user_cap_identities
483 caps.features = user_cap_features
484
485 msg = caps.c
486 msg.to = p.from
487 msg.from = p.to.to_s + '/sgx'
488
489 puts 'RESPONSE5b: ' + msg.inspect
490 write_to_stream msg
491
492 # need to subscribe back so Conversations displays images inline
493 msg = Blather::Stanza::Presence.new
494 msg.to = p.from.to_s.split('/', 2)[0]
495 msg.from = p.to.to_s.split('/', 2)[0]
496 msg.type = :subscribe
497
498 puts 'RESPONSE5c: ' + msg.inspect
499 write_to_stream msg
500 end
501
502 presence :probe? do |p|
503 puts 'PRESENCE2: ' + p.inspect
504
505 caps = Blather::Stanza::Capabilities.new
506 # TODO: user a better node URI (?)
507 caps.node = 'http://catapult.sgx.soprani.ca/'
508 caps.identities = user_cap_identities
509 caps.features = user_cap_features
510
511 msg = caps.c
512 msg.to = p.from
513 msg.from = p.to.to_s + '/sgx'
514
515 puts 'RESPONSE6: ' + msg.inspect
516 write_to_stream msg
517 end
518
519 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
520 # respond to capabilities request for an sgx-bwmsgsv2 number JID
521 if i.to.node
522 # TODO: confirm the node URL is expected using below
523 #puts "XR[node]: #{xpath_result[0]['node']}"
524
525 msg = i.reply
526 msg.node = i.node
527 msg.identities = user_cap_identities
528 msg.features = user_cap_features
529
530 puts 'RESPONSE7: ' + msg.inspect
531 write_to_stream msg
532 next
533 end
534
535 # respond to capabilities request for sgx-bwmsgsv2 itself
536 msg = i.reply
537 msg.node = i.node
538 msg.identities = [{
539 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
540 type: 'sms', category: 'gateway'
541 }]
542 msg.features = @gateway_features
543 write_to_stream msg
544 end
545
546 def self.check_then_register(i, *creds)
547 jid_key = "catapult_jid-#{creds.last}"
548 bare_jid = i.from.stripped
549 cred_key = "catapult_cred-#{bare_jid}"
550
551 REDIS.get(jid_key).then { |existing_jid|
552 if existing_jid && existing_jid != bare_jid
553 # TODO: add/log text: credentials exist already
554 EMPromise.reject([:cancel, 'conflict'])
555 end
556 }.then {
557 REDIS.lrange(cred_key, 0, 3)
558 }.then { |existing_creds|
559 # TODO: add/log text: credentials exist already
560 if existing_creds.length == 4 && creds != existing_creds
561 EMPromise.reject([:cancel, 'conflict'])
562 elsif existing_creds.length < 4
563 REDIS.rpush(cred_key, *creds).then { |length|
564 if length != 4
565 EMPromise.reject([
566 :cancel,
567 'internal-server-error'
568 ])
569 end
570 }
571 end
572 }.then {
573 # not necessary if existing_jid non-nil, easier this way
574 REDIS.set(jid_key, bare_jid)
575 }.then { |result|
576 if result != 'OK'
577 # TODO: add txt re push failure
578 EMPromise.reject(
579 [:cancel, 'internal-server-error']
580 )
581 end
582 }.then {
583 write_to_stream i.reply
584 }
585 end
586
587 def self.creds_from_registration_query(qn)
588 xn = qn.children.find { |v| v.element_name == "x" }
589
590 if xn
591 xn.children.each_with_object({}) do |field, h|
592 next if field.element_name != "field"
593 val = field.children.find { |v|
594 v.element_name == "value"
595 }
596
597 case field['var']
598 when 'nick'
599 h[:user_id] = val.text
600 when 'username'
601 h[:api_token] = val.text
602 when 'password'
603 h[:api_secret] = val.text
604 when 'phone'
605 h[:phone_num] = val.text
606 else
607 # TODO: error
608 puts "?: #{field['var']}"
609 end
610 end
611 else
612 qn.children.each_with_object({}) do |field, h|
613 case field.element_name
614 when "nick"
615 h[:user_id] = field.text
616 when "username"
617 h[:api_token] = field.text
618 when "password"
619 h[:api_secret] = field.text
620 when "phone"
621 h[:phone_num] = field.text
622 end
623 end
624 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
625 end
626
627 def self.process_registration(i, qn)
628 EMPromise.resolve(
629 qn.children.find { |v| v.element_name == "remove" }
630 ).then { |rn|
631 if rn
632 puts "received <remove/> - ignoring for now..."
633 EMPromise.reject(:done)
634 else
635 creds_from_registration_query(qn)
636 end
637 }.then { |user_id, api_token, api_secret, phone_num|
638 if phone_num[0] == '+'
639 [user_id, api_token, api_secret, phone_num]
640 else
641 # TODO: add text re number not (yet) supported
642 EMPromise.reject([:cancel, 'item-not-found'])
643 end
644 }.then { |user_id, api_token, api_secret, phone_num|
645 # TODO: find way to verify #{phone_num}, too
646 call_catapult(
647 api_token,
648 api_secret,
649 :get,
650 "api/v2/users/#{user_id}/media"
651 ).then { |response|
652 params = JSON.parse(response)
653 # TODO: confirm params is array - could be empty
654
655 puts "register got str #{response.to_s[0..999]}"
656
657 check_then_register(
658 i,
659 user_id,
660 api_token,
661 api_secret,
662 phone_num
663 )
664 }
665 }.catch { |e|
666 EMPromise.reject(case e
667 when 401
668 # TODO: add text re bad credentials
669 [:auth, 'not-authorized']
670 when 404
671 # TODO: add text re number not found or disabled
672 [:cancel, 'item-not-found']
673 when Integer
674 [:modify, 'not-acceptable']
675 else
676 e
677 end)
678 }
679 end
680
681 def self.registration_form(orig, existing_number=nil)
682 msg = Nokogiri::XML::Node.new 'query', orig.document
683 msg['xmlns'] = 'jabber:iq:register'
684
685 if existing_number
686 msg.add_child(
687 Nokogiri::XML::Node.new(
688 'registered', msg.document
689 )
690 )
691 end
692
693 # TODO: update "User Id" x2 below (to "accountId"?), and others?
694 n1 = Nokogiri::XML::Node.new(
695 'instructions', msg.document
696 )
697 n1.content = "Enter the information from your Account "\
698 "page as well as the Phone Number\nin your "\
699 "account you want to use (ie. '+12345678901')"\
700 ".\nUser Id is nick, API Token is username, "\
701 "API Secret is password, Phone Number is phone"\
702 ".\n\nThe source code for this gateway is at "\
703 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
704 "\nCopyright (C) 2017-2020 Denver Gingerich "\
705 "and others, licensed under AGPLv3+."
706 n2 = Nokogiri::XML::Node.new 'nick', msg.document
707 n3 = Nokogiri::XML::Node.new 'username', msg.document
708 n4 = Nokogiri::XML::Node.new 'password', msg.document
709 n5 = Nokogiri::XML::Node.new 'phone', msg.document
710 n5.content = existing_number.to_s
711 msg.add_child(n1)
712 msg.add_child(n2)
713 msg.add_child(n3)
714 msg.add_child(n4)
715 msg.add_child(n5)
716
717 x = Blather::Stanza::X.new :form, [
718 {
719 required: true, type: :"text-single",
720 label: 'User Id', var: 'nick'
721 },
722 {
723 required: true, type: :"text-single",
724 label: 'API Token', var: 'username'
725 },
726 {
727 required: true, type: :"text-private",
728 label: 'API Secret', var: 'password'
729 },
730 {
731 required: true, type: :"text-single",
732 label: 'Phone Number', var: 'phone',
733 value: existing_number.to_s
734 }
735 ]
736 x.title = 'Register for '\
737 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
738 x.instructions = "Enter the details from your Account "\
739 "page as well as the Phone Number\nin your "\
740 "account you want to use (ie. '+12345678901')"\
741 ".\n\nThe source code for this gateway is at "\
742 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
743 "\nCopyright (C) 2017-2020 Denver Gingerich "\
744 "and others, licensed under AGPLv3+."
745 msg.add_child(x)
746
747 orig.add_child(msg)
748
749 return orig
750 end
751
752 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
753 puts "IQ: #{i.inspect}"
754
755 case i.type
756 when :set
757 process_registration(i, qn)
758 when :get
759 bare_jid = i.from.stripped
760 cred_key = "catapult_cred-#{bare_jid}"
761 REDIS.lindex(cred_key, 3).then { |existing_number|
762 reply = registration_form(i.reply, existing_number)
763 puts "RESPONSE2: #{reply.inspect}"
764 write_to_stream reply
765 }
766 else
767 # Unknown IQ, ignore for now
768 EMPromise.reject(:done)
769 end.catch { |e|
770 if e.is_a?(Array) && e.length == 2
771 write_to_stream error_msg(i.reply, qn, *e)
772 elsif e != :done
773 EMPromise.reject(e)
774 end
775 }.catch(&method(:panic))
776 end
777
778 iq :get? do |i|
779 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
780 end
781
782 iq :set? do |i|
783 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
784 end
785end
786
787class ReceiptMessage < Blather::Stanza
788 def self.new(to=nil)
789 node = super :message
790 node.to = to
791 node
792 end
793end
794
795class WebhookHandler < Goliath::API
796 use Goliath::Rack::Params
797
798 def response(env)
799 # TODO: add timestamp grab here, and MUST include ./tai version
800
801 puts 'ENV: ' + env.reject{ |k| k == 'params' }.to_s
802
803 users_num = ''
804 others_num = ''
805 if params['direction'] == 'in'
806 users_num = params['to']
807 others_num = params['from']
808 elsif params['direction'] == 'out'
809 users_num = params['from']
810 others_num = params['to']
811 else
812 # TODO: exception or similar
813 puts "big problem: '" + params['direction'] + "'" + body
814 return [200, {}, "OK"]
815 end
816
817 puts 'BODY - messageId: ' + params['messageId'] +
818 ', eventType: ' + params['eventType'] +
819 ', time: ' + params['time'] +
820 ', direction: ' + params['direction'] +
821 ', state: ' + params['state'] +
822 ', deliveryState: ' + (params['deliveryState'] ?
823 params['deliveryState'] : 'NONE') +
824 ', deliveryCode: ' + (params['deliveryCode'] ?
825 params['deliveryCode'] : 'NONE') +
826 ', deliveryDesc: ' + (params['deliveryDescription'] ?
827 params['deliveryDescription'] : 'NONE') +
828 ', tag: ' + (params['tag'] ? params['tag'] : 'NONE') +
829 ', media: ' + (params['media'] ? params['media'].to_s :
830 'NONE')
831
832 if others_num[0] != '+'
833 # TODO: check that others_num actually a shortcode first
834 others_num +=
835 ';phone-context=ca-us.phone-context.soprani.ca'
836 end
837
838 jid_key = "catapult_jid-#{users_num}"
839 bare_jid = REDIS.get(jid_key).promise.sync
840
841 if !bare_jid
842 puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
843
844 # TODO: likely not appropriate; give error to BW API?
845 # TODO: add text re credentials not being registered
846 #write_to_stream error_msg(m.reply, m.body, :auth,
847 # 'registration-required')
848 return [200, {}, "OK"]
849 end
850
851 msg = ''
852 case params['direction']
853 when 'in'
854 text = ''
855 case params['eventType']
856 when 'sms'
857 text = params['text']
858 when 'mms'
859 has_media = false
860 params['media'].each do |media_url|
861 if not media_url.end_with?(
862 '.smil', '.txt', '.xml'
863 )
864
865 has_media = true
866 SGXbwmsgsv2.send_media(
867 others_num + '@' +
868 ARGV[0],
869 bare_jid, media_url
870 )
871 end
872 end
873
874 if params['text'].empty?
875 if not has_media
876 text = '[suspected group msg '\
877 'with no text (odd)]'
878 end
879 else
880 text = if has_media
881 # TODO: write/use a caption XEP
882 params['text']
883 else
884 '[suspected group msg '\
885 '(recipient list not '\
886 'available) with '\
887 'following text] ' +
888 params['text']
889 end
890 end
891
892 # ie. if text param non-empty or had no media
893 if not text.empty?
894 msg = Blather::Stanza::Message.new(
895 bare_jid, text)
896 msg.from = others_num + '@' + ARGV[0]
897 SGXbwmsgsv2.write(msg)
898 end
899
900 return [200, {}, "OK"]
901 else
902 text = "unknown type (#{params['eventType']})"\
903 " with text: " + params['text']
904
905 # TODO: log/notify of this properly
906 puts text
907 end
908
909 msg = Blather::Stanza::Message.new(bare_jid, text)
910 else # per prior switch, this is: params['direction'] == 'out'
911 tag_parts = params['tag'].split(/ /, 2)
912 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
913 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
914
915 case params['deliveryState']
916 when 'not-delivered'
917 # create a bare message like the one user sent
918 msg = Blather::Stanza::Message.new(
919 others_num + '@' + ARGV[0])
920 msg.from = bare_jid + '/' + resourcepart
921 msg['id'] = id
922
923 # create an error reply to the bare message
924 msg = Blather::StanzaError.new(
925 msg,
926 'recipient-unavailable',
927 :wait
928 ).to_node
929 when 'delivered'
930 msg = ReceiptMessage.new(bare_jid)
931
932 # TODO: put in member/instance variable
933 msg['id'] = SecureRandom.uuid
934
935 # TODO: send only when requested per XEP-0184
936 rcvd = Nokogiri::XML::Node.new(
937 'received',
938 msg.document
939 )
940 rcvd['xmlns'] = 'urn:xmpp:receipts'
941 rcvd['id'] = id
942 msg.add_child(rcvd)
943 when 'waiting'
944 # can't really do anything with it; nice to know
945 puts "message with id #{id} waiting"
946 return [200, {}, "OK"]
947 else
948 # TODO: notify somehow of unknown state receivd?
949 puts "message with id #{id} has "\
950 "other state #{params['deliveryState']}"
951 return [200, {}, "OK"]
952 end
953
954 puts "RESPONSE4: #{msg.inspect}"
955 end
956
957 msg.from = others_num + '@' + ARGV[0]
958 SGXbwmsgsv2.write(msg)
959
960 [200, {}, "OK"]
961
962 rescue Exception => e
963 puts 'Shutting down gateway due to exception 013: ' + e.message
964 SGXbwmsgsv2.shutdown
965 puts 'Gateway has terminated.'
966 EM.stop
967 end
968end
969
970at_exit do
971 $stdout.sync = true
972
973 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
974 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
975
976 if ARGV.size != 7
977 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
978 "<component_password> <server_hostname> "\
979 "<server_port> <application_id> "\
980 "<http_listen_port> <mms_proxy_prefix_url>"
981 exit 0
982 end
983
984 t = Time.now
985 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
986
987 EM.run do
988 REDIS = EM::Hiredis.connect
989
990 SGXbwmsgsv2.run
991
992 # required when using Prosody otherwise disconnects on 6-hour inactivity
993 EM.add_periodic_timer(3600) do
994 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
995 msg.from = ARGV[0]
996 SGXbwmsgsv2.write(msg)
997 end
998
999 server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1000 server.api = WebhookHandler.new
1001 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1002 server.logger = Log4r::Logger.new('goliath')
1003 server.logger.add(Log4r::StdoutOutputter.new('console'))
1004 server.logger.level = Log4r::INFO
1005 server.start do
1006 ["INT", "TERM"].each do |sig|
1007 trap(sig) do
1008 EM.defer do
1009 puts 'Shutting down gateway...'
1010 SGXbwmsgsv2.shutdown
1011
1012 puts 'Gateway has terminated.'
1013 EM.stop
1014 end
1015 end
1016 end
1017 end
1018 end
1019end