1#!/usr/bin/env ruby
2#
3# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
4# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
5#
6# This file is part of sgx-bwmsgsv2.
7#
8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
20
21require 'blather/client/dsl'
22require 'em-hiredis'
23require 'em-http-request'
24require 'json'
25require 'securerandom'
26require 'time'
27require 'uri'
28require 'webrick'
29
30require 'goliath/api'
31require 'goliath/server'
32require 'log4r'
33
34require_relative 'em_promise'
35
36def panic(e)
37 puts "Shutting down gateway due to exception: #{e.message}"
38 puts e.backtrace
39 SGXbwmsgsv2.shutdown
40 puts 'Gateway has terminated.'
41 EM.stop
42end
43
44def extract_shortcode(dest)
45 num, context = dest.split(';', 2)
46 num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
47end
48
49def is_anonymous_tel?(dest)
50 num, context = dest.split(';', 2)
51 context && context == 'phone-context=anonymous.phone-context.soprani.ca'
52end
53
54class SGXClient < Blather::Client
55 def register_handler(type, *guards, &block)
56 super(type, *guards) { |*args| wrap_handler(*args, &block) }
57 end
58
59 def register_handler_before(type, *guards, &block)
60 check_handler(type, guards)
61 handler = lambda { |*args| wrap_handler(*args, &block) }
62
63 @handlers[type] ||= []
64 @handlers[type].unshift([guards, handler])
65 end
66
67protected
68
69 def wrap_handler(*args, &block)
70 v = block.call(*args)
71 v.catch(&method(:panic)) if v.is_a?(Promise)
72 true # Do not run other handlers unless throw :pass
73 rescue Exception => e
74 panic(e)
75 end
76end
77
78# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
79module CatapultSettingFlagBits
80 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
81 MMS_ON_OOB_URL = 1
82end
83
84module SGXbwmsgsv2
85 extend Blather::DSL
86
87 @client = SGXClient.new
88 @gateway_features = [
89 "http://jabber.org/protocol/disco#info",
90 "http://jabber.org/protocol/address/",
91 "jabber:iq:register"
92 ]
93
94 def self.run
95 # TODO: read/save ARGV[7] creds to local variables
96 client.run
97 end
98
99 # so classes outside this module can write messages, too
100 def self.write(stanza)
101 client.write(stanza)
102 end
103
104 def self.before_handler(type, *guards, &block)
105 client.register_handler_before(type, *guards, &block)
106 end
107
108 def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
109 # we assume media_url is one of these (always the case so far):
110 # https://api.catapult.inetwork.com/v1/users/[uid]/media/[file]
111 # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
112
113 usr = to
114 pth = ''
115 if media_url.start_with?(
116 'https://api.catapult.inetwork.com/v1/users/')
117
118 # TODO: MUST fix this TERRIBLE hack
119 # there must be a catapult_cred-<usr> key with V1 creds
120 usr = 'v1'
121
122 pth = media_url.split('/', 8)[7]
123
124 elsif media_url.start_with?(
125 'https://messaging.bandwidth.com/api/v2/users/')
126
127 pth = media_url.split('/', 9)[8]
128 else
129 puts "ERROR2: unrecognized media_url: '#{media_url}'"
130 return
131 end
132
133 # the caller must guarantee that 'to' is a bare JID
134 proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(usr) + '/' + pth
135
136 puts 'ORIG_URL: ' + media_url
137 puts 'PROX_URL: ' + proxy_url
138
139 # put URL in the body (so Conversations will still see it)...
140 msg = Blather::Stanza::Message.new(to, proxy_url)
141 if m
142 msg = m.copy
143 msg.body = proxy_url
144 end
145 msg.from = from
146 msg.subject = subject if subject
147
148 # ...but also provide URL in XEP-0066 (OOB) fashion
149 # TODO: confirm client supports OOB or don't send this
150 x = Nokogiri::XML::Node.new 'x', msg.document
151 x['xmlns'] = 'jabber:x:oob'
152
153 urln = Nokogiri::XML::Node.new 'url', msg.document
154 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
155 urln.add_child(urlc)
156 x.add_child(urln)
157
158 if desc
159 descn = Nokogiri::XML::Node.new('desc', msg.document)
160 descc = Nokogiri::XML::Text.new(desc, msg.document)
161 descn.add_child(descc)
162 x.add_child(descn)
163 end
164
165 msg.add_child(x)
166
167 write(msg)
168 rescue Exception => e
169 panic(e)
170 end
171
172 def self.error_msg(orig, query_node, type, name, text=nil)
173 orig.type = :error
174
175 error = Nokogiri::XML::Node.new 'error', orig.document
176 error['type'] = type
177 orig.add_child(error)
178
179 suberr = Nokogiri::XML::Node.new name, orig.document
180 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
181 error.add_child(suberr)
182
183 orig.add_child(query_node) if query_node
184
185 # TODO: add some explanatory xml:lang='en' text (see text param)
186 puts "RESPONSE3: #{orig.inspect}"
187 return orig
188 end
189
190 # workqueue_count MUST be 0 or else Blather uses threads!
191 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
192
193 def self.pass_on_message(m, users_num, jid)
194 # setup delivery receipt; similar to a reply
195 rcpt = ReceiptMessage.new(m.from.stripped)
196 rcpt.from = m.to
197
198 # pass original message (before sending receipt)
199 m.to = jid
200 m.from = "#{users_num}@#{ARGV[0]}"
201
202 puts 'XRESPONSE0: ' + m.inspect
203 write_to_stream m
204
205 # send a delivery receipt back to the sender
206 # TODO: send only when requested per XEP-0184
207 # TODO: pass receipts from target if supported
208
209 # TODO: put in member/instance variable
210 rcpt['id'] = SecureRandom.uuid
211 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
212 rcvd['xmlns'] = 'urn:xmpp:receipts'
213 rcvd['id'] = m.id
214 rcpt.add_child(rcvd)
215
216 puts 'XRESPONSE1: ' + rcpt.inspect
217 write_to_stream rcpt
218 end
219
220 def self.call_catapult(
221 token, secret, m, pth, body=nil,
222 head={}, code=[200], respond_with=:body
223 )
224 # pth looks like one of:
225 # "api/v2/users/#{user_id}/[endpoint_name]"
226 # "v1/users/#{user_id}/[endpoint_name]"
227
228 url_prefix = ''
229
230 # TODO: need to make a separate thing for voice.bw.c eventually
231 if pth.start_with? 'api/v2/users'
232 url_prefix = 'https://messaging.bandwidth.com/'
233 elsif pth.start_with? 'v1/users'
234 # begin hack for running V2 messages along with V1 voice
235 url_prefix = 'https://api.catapult.inetwork.com/'
236 # TODO: set token and secret to vals provided at startup
237 # TODO: replace pth's user_id with user_id from ARGV[7]
238 # -> pth = "v1/users/#{new_id}/" + pth.split('/', 4)[3]
239 else
240 # TODO: error
241 end
242
243 EM::HttpRequest.new(
244 url_prefix + pth
245 ).public_send(
246 m,
247 head: {
248 'Authorization' => [token, secret]
249 }.merge(head),
250 body: body
251 ).then { |http|
252 puts "API response to send: #{http.response} with code"\
253 " response.code #{http.response_header.status}"
254
255 if code.include?(http.response_header.status)
256 case respond_with
257 when :body
258 http.response
259 when :headers
260 http.response_header
261 else
262 http
263 end
264 else
265 EMPromise.reject(http.response_header.status)
266 end
267 }
268 end
269
270 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
271 usern)
272
273 # TODO: fix indentation
274
275 un = s.at("oob|x > oob|url", oob: "jabber:x:oob")
276 if not un
277 puts "MMSOOB: no url node found so process as normal"
278 return to_catapult(s, nil, num_dest, user_id, token,
279 secret, usern)
280 end
281 puts "MMSOOB: found a url node - checking if to make MMS..."
282
283 # TODO: check size of file at un.text and shrink if need
284
285 body = s.respond_to?(:body) ? s.body : ''
286 # some clients send URI in both body & <url/> so delete
287 s.body = body.sub(/\s*#{Regexp::escape(un.text)}\s*$/, '')
288
289 puts "MMSOOB: url text is '#{un.text}'"
290 puts "MMSOOB: the body is '#{body.to_s.strip}'"
291
292 puts "MMSOOB: sending MMS since found OOB & user asked"
293 to_catapult(s, un.text, num_dest, user_id, token,
294 secret, usern)
295 end
296
297 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
298 body = s.respond_to?(:body) ? s.body : ''
299 if murl.to_s.empty? && body.to_s.strip.empty?
300 return EMPromise.reject(
301 [:modify, 'policy-violation']
302 )
303 end
304
305 extra = {}
306 if murl
307 extra = { media: murl }
308 end
309
310 call_catapult(
311 token,
312 secret,
313 :post,
314 "api/v2/users/#{user_id}/messages",
315 JSON.dump(extra.merge(
316 from: usern,
317 to: num_dest,
318 text: body,
319 applicationId: ARGV[4],
320 tag:
321 # callbacks need id and resourcepart
322 WEBrick::HTTPUtils.escape(s.id.to_s) +
323 ' ' +
324 WEBrick::HTTPUtils.escape(
325 s.from.resource.to_s
326 )
327 )),
328 {'Content-Type' => 'application/json'},
329 [201]
330 ).catch {
331 # TODO: add text; mention code number
332 EMPromise.reject(
333 [:cancel, 'internal-server-error']
334 )
335 }
336
337 t = Time.now
338 tai_timestamp = `./tai`.strip
339 tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
340 puts "SMU %d.%09d, %s: msg for %s sent on %s - incrementing\n" %
341 [t.to_i, t.nsec, tai_timestamp, usern, tai_yyyymmdd]
342
343 REDIS.incr('usage_messages-' + tai_yyyymmdd + '-' +
344 usern).then { |total|
345
346 t = Time.now
347 puts "SMU %d.%09d: total msgs for %s-%s now at %s\n" %
348 [t.to_i, t.nsec, tai_yyyymmdd, usern, total]
349 }
350 end
351
352 def self.validate_num(m)
353 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
354 if m.to == ARGV[0]
355 an = m.children.find { |v| v.element_name == "addresses"
356 }
357 if not an
358 return EMPromise.reject([:cancel,
359 'item-not-found'])
360 end
361 puts "ADRXEP: found an addresses node - iterate addrs.."
362
363 nums = []
364 an.children.each do |e|
365 num = ''
366 type = ''
367 e.attributes.each do |c|
368 if c[0] == 'type'
369 if c[1] != 'to'
370 # TODO: error
371 end
372 type = c[1].to_s
373 elsif c[0] == 'uri'
374 if !c[1].to_s.start_with? 'sms:'
375 # TODO: error
376 end
377 num = c[1].to_s[4..-1]
378 # TODO: confirm num validates
379 else
380 # TODO: error - unexpected name
381 end
382 end
383 if num.empty? or type.empty?
384 # TODO: error
385 end
386 nums << num
387 end
388 return nums
389 end
390
391 # if not sent to SGX domain, then assume destination is in 'to'
392 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
393 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
394 next num_dest if num_dest[0] == '+'
395 shortcode = extract_shortcode(num_dest)
396 next shortcode if shortcode
397 end
398
399 if is_anonymous_tel?(num_dest)
400 EMPromise.reject([:cancel, 'gone'])
401 else
402 # TODO: text re num not (yet) supportd/implmentd
403 EMPromise.reject([:cancel, 'item-not-found'])
404 end
405 }
406 end
407
408 def self.fetch_catapult_cred_for(jid)
409 cred_key = "catapult_cred-#{jid.stripped}"
410 REDIS.lrange(cred_key, 0, 3).then { |creds|
411 if creds.length < 4
412 # TODO: add text re credentials not registered
413 EMPromise.reject(
414 [:auth, 'registration-required']
415 )
416 else
417 creds
418 end
419 }
420 end
421
422 message :error? do |m|
423 # TODO: report it somewhere/somehow - eat for now so no err loop
424 puts "EATERROR1: #{m.inspect}"
425 end
426
427 message :body do |m|
428 EMPromise.all([
429 validate_num(m),
430 fetch_catapult_cred_for(m.from)
431 ]).then { |(num_dest, creds)|
432 jid_key = "catapult_jid-#{num_dest}"
433 REDIS.get(jid_key).then { |jid|
434 [jid, num_dest] + creds
435 }
436 }.then { |(jid, num_dest, *creds)|
437 if jid
438 cred_key = "catapult_cred-#{jid}"
439 REDIS.lrange(cred_key, 0, 0).then { |other_user|
440 [jid, num_dest] + creds + other_user
441 }
442 else
443 [jid, num_dest] + creds + [nil]
444 end
445 }.then { |(jid, num_dest, *creds, other_user)|
446 # if destination user is in the system pass on directly
447 if other_user and not other_user.start_with? 'u-'
448 pass_on_message(m, creds.last, jid)
449 else
450 to_catapult_possible_oob(m, num_dest, *creds)
451 end
452 }.catch { |e|
453 if e.is_a?(Array) && e.length == 2
454 write_to_stream error_msg(m.reply, m.body, *e)
455 else
456 EMPromise.reject(e)
457 end
458 }
459 end
460
461 def self.user_cap_identities
462 [{category: 'client', type: 'sms'}]
463 end
464
465 # TODO: must re-add stuff so can do ad-hoc commands
466 def self.user_cap_features
467 [
468 "urn:xmpp:receipts",
469 ]
470 end
471
472 def self.add_gateway_feature(feature)
473 @gateway_features << feature
474 @gateway_features.uniq!
475 end
476
477 subscription :request? do |p|
478 puts "PRESENCE1: #{p.inspect}"
479
480 # subscriptions are allowed from anyone - send reply immediately
481 msg = Blather::Stanza::Presence.new
482 msg.to = p.from
483 msg.from = p.to
484 msg.type = :subscribed
485
486 puts 'RESPONSE5a: ' + msg.inspect
487 write_to_stream msg
488
489 # send a <presence> immediately; not automatically probed for it
490 # TODO: refactor so no "presence :probe? do |p|" duplicate below
491 caps = Blather::Stanza::Capabilities.new
492 # TODO: user a better node URI (?)
493 caps.node = 'http://catapult.sgx.soprani.ca/'
494 caps.identities = user_cap_identities
495 caps.features = user_cap_features
496
497 msg = caps.c
498 msg.to = p.from
499 msg.from = p.to.to_s + '/sgx'
500
501 puts 'RESPONSE5b: ' + msg.inspect
502 write_to_stream msg
503
504 # need to subscribe back so Conversations displays images inline
505 msg = Blather::Stanza::Presence.new
506 msg.to = p.from.to_s.split('/', 2)[0]
507 msg.from = p.to.to_s.split('/', 2)[0]
508 msg.type = :subscribe
509
510 puts 'RESPONSE5c: ' + msg.inspect
511 write_to_stream msg
512 end
513
514 presence :probe? do |p|
515 puts 'PRESENCE2: ' + p.inspect
516
517 caps = Blather::Stanza::Capabilities.new
518 # TODO: user a better node URI (?)
519 caps.node = 'http://catapult.sgx.soprani.ca/'
520 caps.identities = user_cap_identities
521 caps.features = user_cap_features
522
523 msg = caps.c
524 msg.to = p.from
525 msg.from = p.to.to_s + '/sgx'
526
527 puts 'RESPONSE6: ' + msg.inspect
528 write_to_stream msg
529 end
530
531 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
532 # TODO: return error if i.type is :set - if it is :reply or
533 # :error it should be ignored (as the below does currently);
534 # review specification to see how to handle other type values
535 if i.type != :get
536 puts 'DISCO iq rcvd, of non-get type "' + i.type.to_s +
537 '" for message "' + i.inspect + '"; ignoring...'
538 next
539 end
540
541 # respond to capabilities request for an sgx-bwmsgsv2 number JID
542 if i.to.node
543 # TODO: confirm the node URL is expected using below
544 #puts "XR[node]: #{xpath_result[0]['node']}"
545
546 msg = i.reply
547 msg.node = i.node
548 msg.identities = user_cap_identities
549 msg.features = user_cap_features
550
551 puts 'RESPONSE7: ' + msg.inspect
552 write_to_stream msg
553 next
554 end
555
556 # respond to capabilities request for sgx-bwmsgsv2 itself
557 msg = i.reply
558 msg.node = i.node
559 msg.identities = [{
560 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
561 type: 'sms', category: 'gateway'
562 }]
563 msg.features = @gateway_features
564 write_to_stream msg
565 end
566
567 def self.check_then_register(i, *creds)
568 jid_key = "catapult_jid-#{creds.last}"
569 bare_jid = i.from.stripped
570 cred_key = "catapult_cred-#{bare_jid}"
571
572 REDIS.get(jid_key).then { |existing_jid|
573 if existing_jid && existing_jid != bare_jid
574 # TODO: add/log text: credentials exist already
575 EMPromise.reject([:cancel, 'conflict'])
576 end
577 }.then {
578 REDIS.lrange(cred_key, 0, 3)
579 }.then { |existing_creds|
580 # TODO: add/log text: credentials exist already
581 if existing_creds.length == 4 && creds != existing_creds
582 EMPromise.reject([:cancel, 'conflict'])
583 elsif existing_creds.length < 4
584 REDIS.rpush(cred_key, *creds).then { |length|
585 if length != 4
586 EMPromise.reject([
587 :cancel,
588 'internal-server-error'
589 ])
590 end
591 }
592 end
593 }.then {
594 # not necessary if existing_jid non-nil, easier this way
595 REDIS.set(jid_key, bare_jid)
596 }.then { |result|
597 if result != 'OK'
598 # TODO: add txt re push failure
599 EMPromise.reject(
600 [:cancel, 'internal-server-error']
601 )
602 end
603 }.then {
604 write_to_stream i.reply
605 }
606 end
607
608 def self.creds_from_registration_query(qn)
609 xn = qn.children.find { |v| v.element_name == "x" }
610
611 if xn
612 xn.children.each_with_object({}) do |field, h|
613 next if field.element_name != "field"
614 val = field.children.find { |v|
615 v.element_name == "value"
616 }
617
618 case field['var']
619 when 'nick'
620 h[:user_id] = val.text
621 when 'username'
622 h[:api_token] = val.text
623 when 'password'
624 h[:api_secret] = val.text
625 when 'phone'
626 h[:phone_num] = val.text
627 else
628 # TODO: error
629 puts "?: #{field['var']}"
630 end
631 end
632 else
633 qn.children.each_with_object({}) do |field, h|
634 case field.element_name
635 when "nick"
636 h[:user_id] = field.text
637 when "username"
638 h[:api_token] = field.text
639 when "password"
640 h[:api_secret] = field.text
641 when "phone"
642 h[:phone_num] = field.text
643 end
644 end
645 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
646 end
647
648 def self.process_registration(i, qn)
649 EMPromise.resolve(
650 qn.children.find { |v| v.element_name == "remove" }
651 ).then { |rn|
652 if rn
653 puts "received <remove/> - ignoring for now..."
654 EMPromise.reject(:done)
655 else
656 creds_from_registration_query(qn)
657 end
658 }.then { |user_id, api_token, api_secret, phone_num|
659 if phone_num[0] == '+'
660 [user_id, api_token, api_secret, phone_num]
661 else
662 # TODO: add text re number not (yet) supported
663 EMPromise.reject([:cancel, 'item-not-found'])
664 end
665 }.then { |user_id, api_token, api_secret, phone_num|
666 # TODO: find way to verify #{phone_num}, too
667 call_catapult(
668 api_token,
669 api_secret,
670 :get,
671 "api/v2/users/#{user_id}/media"
672 ).then { |response|
673 params = JSON.parse(response)
674 # TODO: confirm params is array - could be empty
675
676 puts "register got str #{response.to_s[0..999]}"
677
678 check_then_register(
679 i,
680 user_id,
681 api_token,
682 api_secret,
683 phone_num
684 )
685 }
686 }.catch { |e|
687 EMPromise.reject(case e
688 when 401
689 # TODO: add text re bad credentials
690 [:auth, 'not-authorized']
691 when 404
692 # TODO: add text re number not found or disabled
693 [:cancel, 'item-not-found']
694 when Integer
695 [:modify, 'not-acceptable']
696 else
697 e
698 end)
699 }
700 end
701
702 def self.registration_form(orig, existing_number=nil)
703 msg = Nokogiri::XML::Node.new 'query', orig.document
704 msg['xmlns'] = 'jabber:iq:register'
705
706 if existing_number
707 msg.add_child(
708 Nokogiri::XML::Node.new(
709 'registered', msg.document
710 )
711 )
712 end
713
714 # TODO: update "User Id" x2 below (to "accountId"?), and others?
715 n1 = Nokogiri::XML::Node.new(
716 'instructions', msg.document
717 )
718 n1.content = "Enter the information from your Account "\
719 "page as well as the Phone Number\nin your "\
720 "account you want to use (ie. '+12345678901')"\
721 ".\nUser Id is nick, API Token is username, "\
722 "API Secret is password, Phone Number is phone"\
723 ".\n\nThe source code for this gateway is at "\
724 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
725 "\nCopyright (C) 2017-2020 Denver Gingerich "\
726 "and others, licensed under AGPLv3+."
727 n2 = Nokogiri::XML::Node.new 'nick', msg.document
728 n3 = Nokogiri::XML::Node.new 'username', msg.document
729 n4 = Nokogiri::XML::Node.new 'password', msg.document
730 n5 = Nokogiri::XML::Node.new 'phone', msg.document
731 n5.content = existing_number.to_s
732 msg.add_child(n1)
733 msg.add_child(n2)
734 msg.add_child(n3)
735 msg.add_child(n4)
736 msg.add_child(n5)
737
738 x = Blather::Stanza::X.new :form, [
739 {
740 required: true, type: :"text-single",
741 label: 'User Id', var: 'nick'
742 },
743 {
744 required: true, type: :"text-single",
745 label: 'API Token', var: 'username'
746 },
747 {
748 required: true, type: :"text-private",
749 label: 'API Secret', var: 'password'
750 },
751 {
752 required: true, type: :"text-single",
753 label: 'Phone Number', var: 'phone',
754 value: existing_number.to_s
755 }
756 ]
757 x.title = 'Register for '\
758 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
759 x.instructions = "Enter the details from your Account "\
760 "page as well as the Phone Number\nin your "\
761 "account you want to use (ie. '+12345678901')"\
762 ".\n\nThe source code for this gateway is at "\
763 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
764 "\nCopyright (C) 2017-2020 Denver Gingerich "\
765 "and others, licensed under AGPLv3+."
766 msg.add_child(x)
767
768 orig.add_child(msg)
769
770 return orig
771 end
772
773 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
774 puts "IQ: #{i.inspect}"
775
776 case i.type
777 when :set
778 process_registration(i, qn)
779 when :get
780 bare_jid = i.from.stripped
781 cred_key = "catapult_cred-#{bare_jid}"
782 REDIS.lindex(cred_key, 3).then { |existing_number|
783 reply = registration_form(i.reply, existing_number)
784 puts "RESPONSE2: #{reply.inspect}"
785 write_to_stream reply
786 }
787 else
788 # Unknown IQ, ignore for now
789 EMPromise.reject(:done)
790 end.catch { |e|
791 if e.is_a?(Array) && e.length == 2
792 write_to_stream error_msg(i.reply, qn, *e)
793 elsif e != :done
794 EMPromise.reject(e)
795 end
796 }.catch(&method(:panic))
797 end
798
799 iq :get? do |i|
800 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
801 end
802
803 iq :set? do |i|
804 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
805 end
806end
807
808class ReceiptMessage < Blather::Stanza
809 def self.new(to=nil)
810 node = super :message
811 node.to = to
812 node
813 end
814end
815
816class WebhookHandler < Goliath::API
817 use Goliath::Rack::Params
818
819 def response(env)
820 # TODO: add timestamp grab here, and MUST include ./tai version
821
822 puts 'ENV: ' + env.reject{ |k| k == 'params' }.to_s
823
824 if params.empty?
825 puts 'PARAMS empty!'
826 return [200, {}, "OK"]
827 end
828
829 # TODO: process each message in list, not just first one
830 jparams = params['_json'][0]['message']
831
832 type = params['_json'][0]['type']
833
834 users_num = ''
835 others_num = ''
836 if jparams['direction'] == 'in'
837 users_num = jparams['owner']
838 others_num = jparams['from']
839 elsif jparams['direction'] == 'out'
840 users_num = jparams['from']
841 others_num = jparams['owner']
842 else
843 # TODO: exception or similar
844 puts "big prob: '" + jparams['direction'] + "'" + body
845 return [200, {}, "OK"]
846 end
847
848 puts 'BODY - messageId: ' + jparams['id'] +
849 ', eventType: ' + type +
850 ', time: ' + jparams['time'] +
851 ', direction: ' + jparams['direction'] +
852 #', state: ' + jparams['state'] +
853 ', deliveryState: ' + (jparams['deliveryState'] ?
854 jparams['deliveryState'] : 'NONE') +
855 ', deliveryCode: ' + (jparams['deliveryCode'] ?
856 jparams['deliveryCode'] : 'NONE') +
857 ', deliveryDesc: ' + (jparams['deliveryDescription'] ?
858 jparams['deliveryDescription'] : 'NONE') +
859 ', tag: ' + (jparams['tag'] ? jparams['tag'] : 'NONE') +
860 ', media: ' + (jparams['media'] ?
861 jparams['media'].to_s : 'NONE')
862
863 if others_num[0] != '+'
864 # TODO: check that others_num actually a shortcode first
865 others_num +=
866 ';phone-context=ca-us.phone-context.soprani.ca'
867 end
868
869 jid_key = "catapult_jid-#{users_num}"
870 bare_jid = REDIS.get(jid_key).promise.sync
871
872 if !bare_jid
873 puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
874
875 # TODO: likely not appropriate; give error to BW API?
876 # TODO: add text re credentials not being registered
877 #write_to_stream error_msg(m.reply, m.body, :auth,
878 # 'registration-required')
879 return [200, {}, "OK"]
880 end
881
882 msg = nil
883 case jparams['direction']
884 when 'in'
885 text = ''
886 case type
887 when 'sms'
888 text = jparams['text']
889 when 'mms'
890 has_media = false
891
892 if jparams['text'].empty?
893 if not has_media
894 text = '[suspected group msg '\
895 'with no text (odd)]'
896 end
897 else
898 text = if has_media
899 # TODO: write/use a caption XEP
900 jparams['text']
901 else
902 '[suspected group msg '\
903 '(recipient list not '\
904 'available) with '\
905 'following text] ' +
906 jparams['text']
907 end
908 end
909
910 # ie. if text param non-empty or had no media
911 if not text.empty?
912 msg = Blather::Stanza::Message.new(
913 bare_jid, text)
914 msg.from = others_num + '@' + ARGV[0]
915 SGXbwmsgsv2.write(msg)
916 end
917
918 return [200, {}, "OK"]
919 when 'message-received'
920 # TODO: handle group chat, and fix above
921 text = jparams['text']
922
923 if jparams['to'].length > 1
924 msg = Blather::Stanza::Message.new(
925 'cheogram.com', text) # TODO
926
927 addrs = Nokogiri::XML::Node.new(
928 'addresses', msg.document)
929 addrs['xmlns'] = 'http://jabber.org/' +
930 'protocol/address'
931
932 addr1 = Nokogiri::XML::Node.new(
933 'address', msg.document)
934 addr1['type'] = 'to'
935 addr1['jid'] = bare_jid
936 addrs.add_child(addr1)
937
938 jparams['to'].each do |receiver|
939 if receiver == users_num
940 # already there in addr1
941 next
942 end
943
944 addrn = Nokogiri::XML::Node.new(
945 'address', msg.document)
946 addrn['type'] = 'to'
947 addrn['uri'] = "sms:#{receiver}"
948 addrn['delivered'] = 'true'
949 addrs.add_child(addrn)
950 end
951
952 msg.add_child(addrs)
953
954 # TODO: delete
955 puts "RESPONSE9: #{msg.inspect}"
956 end
957
958 jparams['media'].each do |media_url|
959 if not media_url.end_with?(
960 '.smil', '.txt', '.xml'
961 )
962
963 has_media = true
964 SGXbwmsgsv2.send_media(
965 others_num + '@' +
966 ARGV[0],
967 bare_jid, media_url,
968 nil, nil, msg
969 )
970 end
971 end unless not jparams['media']
972 else
973 text = "unknown type (#{type})"\
974 " with text: " + jparams['text']
975
976 # TODO: log/notify of this properly
977 puts text
978 end
979
980 if not msg
981 msg = Blather::Stanza::Message.new(bare_jid,
982 text)
983 end
984 else # per prior switch, this is: jparams['direction'] == 'out'
985 tag_parts = jparams['tag'].split(/ /, 2)
986 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
987 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
988
989 # TODO: remove this hack
990 if jparams['to'].length > 1
991 puts "WARN! group no rcpt: #{users_num}"
992 return [200, {}, "OK"]
993 end
994
995 case type
996 when 'message-failed'
997 # create a bare message like the one user sent
998 msg = Blather::Stanza::Message.new(
999 others_num + '@' + ARGV[0])
1000 msg.from = bare_jid + '/' + resourcepart
1001 msg['id'] = id
1002
1003 # TODO: add 'errorCode' and/or 'description' val
1004 # create an error reply to the bare message
1005 msg = Blather::StanzaError.new(
1006 msg,
1007 'recipient-unavailable',
1008 :wait
1009 ).to_node
1010
1011 # TODO: make prettier: this should be done above
1012 others_num = params['_json'][0]['to']
1013 when 'message-delivered'
1014
1015 msg = ReceiptMessage.new(bare_jid)
1016
1017 # TODO: put in member/instance variable
1018 msg['id'] = SecureRandom.uuid
1019
1020 # TODO: send only when requested per XEP-0184
1021 rcvd = Nokogiri::XML::Node.new(
1022 'received',
1023 msg.document
1024 )
1025 rcvd['xmlns'] = 'urn:xmpp:receipts'
1026 rcvd['id'] = id
1027 msg.add_child(rcvd)
1028
1029 # TODO: make prettier: this should be done above
1030 others_num = params['_json'][0]['to']
1031 else
1032 # TODO: notify somehow of unknown state receivd?
1033 puts "message with id #{id} has "\
1034 "other type #{type}"
1035 return [200, {}, "OK"]
1036 end
1037
1038 puts "RESPONSE4: #{msg.inspect}"
1039 end
1040
1041 msg.from = others_num + '@' + ARGV[0]
1042 SGXbwmsgsv2.write(msg)
1043
1044 [200, {}, "OK"]
1045
1046 rescue Exception => e
1047 puts 'Shutting down gateway due to exception 013: ' + e.message
1048 SGXbwmsgsv2.shutdown
1049 puts 'Gateway has terminated.'
1050 EM.stop
1051 end
1052end
1053
1054at_exit do
1055 $stdout.sync = true
1056
1057 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1058 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1059
1060 if ARGV.size != 7 and ARGV.size != 8
1061 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1062 "<component_password> <server_hostname> "\
1063 "<server_port> <application_id> "\
1064 "<http_listen_port> <mms_proxy_prefix_url> [V1_creds_file]"
1065 exit 0
1066 end
1067
1068 t = Time.now
1069 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1070
1071 EM.run do
1072 REDIS = EM::Hiredis.connect
1073
1074 SGXbwmsgsv2.run
1075
1076 # required when using Prosody otherwise disconnects on 6-hour inactivity
1077 EM.add_periodic_timer(3600) do
1078 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1079 msg.from = ARGV[0]
1080 SGXbwmsgsv2.write(msg)
1081 end
1082
1083 server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
1084 server.api = WebhookHandler.new
1085 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1086 server.logger = Log4r::Logger.new('goliath')
1087 server.logger.add(Log4r::StdoutOutputter.new('console'))
1088 server.logger.level = Log4r::INFO
1089 server.start do
1090 ["INT", "TERM"].each do |sig|
1091 trap(sig) do
1092 EM.defer do
1093 puts 'Shutting down gateway...'
1094 SGXbwmsgsv2.shutdown
1095
1096 puts 'Gateway has terminated.'
1097 EM.stop
1098 end
1099 end
1100 end
1101 end
1102 end
1103end