1#!/usr/bin/env ruby
2#
3# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
4# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
5#
6# This file is part of sgx-bwmsgsv2.
7#
8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
20
21require 'blather/client/dsl'
22require 'em-hiredis'
23require 'em-http-request'
24require 'json'
25require 'securerandom'
26require 'time'
27require 'uri'
28require 'webrick'
29
30require 'goliath/api'
31require 'goliath/server'
32require 'log4r'
33
34require_relative 'em_promise'
35
36def panic(e)
37 puts "Shutting down gateway due to exception: #{e.message}"
38 puts e.backtrace
39 SGXbwmsgsv2.shutdown
40 puts 'Gateway has terminated.'
41 EM.stop
42end
43
44def extract_shortcode(dest)
45 num, context = dest.split(';', 2)
46 num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
47end
48
49def is_anonymous_tel?(dest)
50 num, context = dest.split(';', 2)
51 context && context == 'phone-context=anonymous.phone-context.soprani.ca'
52end
53
54class SGXClient < Blather::Client
55 def register_handler(type, *guards, &block)
56 super(type, *guards) { |*args| wrap_handler(*args, &block) }
57 end
58
59 def register_handler_before(type, *guards, &block)
60 check_handler(type, guards)
61 handler = lambda { |*args| wrap_handler(*args, &block) }
62
63 @handlers[type] ||= []
64 @handlers[type].unshift([guards, handler])
65 end
66
67protected
68
69 def wrap_handler(*args, &block)
70 v = block.call(*args)
71 v.catch(&method(:panic)) if v.is_a?(Promise)
72 true # Do not run other handlers unless throw :pass
73 rescue Exception => e
74 panic(e)
75 end
76end
77
78# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
79module CatapultSettingFlagBits
80 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
81 MMS_ON_OOB_URL = 1
82end
83
84module SGXbwmsgsv2
85 extend Blather::DSL
86
87 @client = SGXClient.new
88 @gateway_features = [
89 "http://jabber.org/protocol/disco#info",
90 "http://jabber.org/protocol/address/",
91 "jabber:iq:register"
92 ]
93
94 def self.run
95 client.run
96 end
97
98 # so classes outside this module can write messages, too
99 def self.write(stanza)
100 client.write(stanza)
101 end
102
103 def self.before_handler(type, *guards, &block)
104 client.register_handler_before(type, *guards, &block)
105 end
106
107 def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
108 # we assume media_url is of the form (always the case so far):
109 # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
110
111 # the caller must guarantee that 'to' is a bare JID
112 proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(to) + '/' +
113 media_url.split('/', 9)[8]
114
115 puts 'ORIG_URL: ' + media_url
116 puts 'PROX_URL: ' + proxy_url
117
118 # put URL in the body (so Conversations will still see it)...
119 msg = Blather::Stanza::Message.new(to, proxy_url)
120 if m
121 msg = m.copy
122 msg.body = proxy_url
123 end
124 msg.from = from
125 msg.subject = subject if subject
126
127 # ...but also provide URL in XEP-0066 (OOB) fashion
128 # TODO: confirm client supports OOB or don't send this
129 x = Nokogiri::XML::Node.new 'x', msg.document
130 x['xmlns'] = 'jabber:x:oob'
131
132 urln = Nokogiri::XML::Node.new 'url', msg.document
133 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
134 urln.add_child(urlc)
135 x.add_child(urln)
136
137 if desc
138 descn = Nokogiri::XML::Node.new('desc', msg.document)
139 descc = Nokogiri::XML::Text.new(desc, msg.document)
140 descn.add_child(descc)
141 x.add_child(descn)
142 end
143
144 msg.add_child(x)
145
146 write(msg)
147 rescue Exception => e
148 panic(e)
149 end
150
151 def self.error_msg(orig, query_node, type, name, text=nil)
152 orig.type = :error
153
154 error = Nokogiri::XML::Node.new 'error', orig.document
155 error['type'] = type
156 orig.add_child(error)
157
158 suberr = Nokogiri::XML::Node.new name, orig.document
159 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
160 error.add_child(suberr)
161
162 orig.add_child(query_node) if query_node
163
164 # TODO: add some explanatory xml:lang='en' text (see text param)
165 puts "RESPONSE3: #{orig.inspect}"
166 return orig
167 end
168
169 # workqueue_count MUST be 0 or else Blather uses threads!
170 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
171
172 def self.pass_on_message(m, users_num, jid)
173 # setup delivery receipt; similar to a reply
174 rcpt = ReceiptMessage.new(m.from.stripped)
175 rcpt.from = m.to
176
177 # pass original message (before sending receipt)
178 m.to = jid
179 m.from = "#{users_num}@#{ARGV[0]}"
180
181 puts 'XRESPONSE0: ' + m.inspect
182 write_to_stream m
183
184 # send a delivery receipt back to the sender
185 # TODO: send only when requested per XEP-0184
186 # TODO: pass receipts from target if supported
187
188 # TODO: put in member/instance variable
189 rcpt['id'] = SecureRandom.uuid
190 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
191 rcvd['xmlns'] = 'urn:xmpp:receipts'
192 rcvd['id'] = m.id
193 rcpt.add_child(rcvd)
194
195 puts 'XRESPONSE1: ' + rcpt.inspect
196 write_to_stream rcpt
197 end
198
199 def self.call_catapult(
200 token, secret, m, pth, body=nil,
201 head={}, code=[200], respond_with=:body
202 )
203 # pth looks like one of:
204 # "api/v2/users/#{user_id}/[endpoint_name]"
205 # "v1/users/#{user_id}/[endpoint_name]"
206
207 url_prefix = ''
208
209 # TODO: need to make a separate thing for voice.bw.c eventually
210 if pth.start_with? 'api/v2/users'
211 url_prefix = 'https://messaging.bandwidth.com/'
212 elsif pth.start_with? 'v1/users'
213 # begin hack for running V2 messages along with V1 voice
214 url_prefix = 'https://api.catapult.inetwork.com/'
215 # TODO: set token and secret to vals provided at startup
216 else
217 # TODO: error
218 end
219
220 EM::HttpRequest.new(
221 url_prefix + pth
222 ).public_send(
223 m,
224 head: {
225 'Authorization' => [token, secret]
226 }.merge(head),
227 body: body
228 ).then { |http|
229 puts "API response to send: #{http.response} with code"\
230 " response.code #{http.response_header.status}"
231
232 if code.include?(http.response_header.status)
233 case respond_with
234 when :body
235 http.response
236 when :headers
237 http.response_header
238 else
239 http
240 end
241 else
242 EMPromise.reject(http.response_header.status)
243 end
244 }
245 end
246
247 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
248 usern)
249
250 xn = s.children.find { |v| v.element_name == "x" }
251 if not xn
252 to_catapult(s, nil, num_dest, user_id, token, secret,
253 usern)
254 return
255 end
256 puts "MMSOOB: found an x node - checking for url node..."
257
258 # TODO: also check for xmlns='jabber:x:oob' in <x/> - the below
259 # is probably fine, though, as non-OOB <x><url/></x> unlikely
260
261 un = xn.children.find { |v| v.element_name == "url" }
262 if not un
263 puts "MMSOOB: no url node found so process as normal"
264 to_catapult(s, nil, num_dest, user_id, token, secret,
265 usern)
266 return
267 end
268 puts "MMSOOB: found a url node - checking if to make MMS..."
269
270 REDIS.getbit("catapult_setting_flags-#{s.from.stripped}",
271 CatapultSettingFlagBits::MMS_ON_OOB_URL).then { |oob_on|
272
273 puts "MMSOOB: found MMS_ON_OOB_URL value is '#{oob_on}'"
274 if 0 == oob_on
275 puts "MMSOOB: MMS_ON_OOB_URL off so no MMS send"
276 to_catapult(s, nil, num_dest, user_id, token,
277 secret, usern)
278 next
279 end
280
281 # TODO: check size of file at un.text and shrink if need
282
283 body = s.respond_to?(:body) ? s.body : ''
284 puts "MMSOOB: url text is '#{un.text}'"
285 puts "MMSOOB: the body is '#{body.to_s.strip}'"
286
287 # some clients send URI in both body & <url/> so delete
288 if un.text == body.to_s.strip
289 puts "MMSOOB: url matches body so deleting body"
290 s.body = ''
291 end
292
293 puts "MMSOOB: sending MMS since found OOB & user asked"
294 to_catapult(s, un.text, num_dest, user_id, token,
295 secret, usern)
296 }
297 end
298
299 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
300 body = s.respond_to?(:body) ? s.body : ''
301 if murl.to_s.empty? && body.to_s.strip.empty?
302 return EMPromise.reject(
303 [:modify, 'policy-violation']
304 )
305 end
306
307 extra = {}
308 if murl
309 extra = { media: murl }
310 end
311
312 call_catapult(
313 token,
314 secret,
315 :post,
316 "api/v2/users/#{user_id}/messages",
317 JSON.dump(extra.merge(
318 from: usern,
319 to: num_dest,
320 text: body,
321 applicationId: ARGV[4],
322 tag:
323 # callbacks need id and resourcepart
324 WEBrick::HTTPUtils.escape(s.id.to_s) +
325 ' ' +
326 WEBrick::HTTPUtils.escape(
327 s.from.resource.to_s
328 )
329 )),
330 {'Content-Type' => 'application/json'},
331 [201]
332 ).catch {
333 # TODO: add text; mention code number
334 EMPromise.reject(
335 [:cancel, 'internal-server-error']
336 )
337 }
338
339 t = Time.now
340 tai_timestamp = `./tai`.strip
341 tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
342 puts "SMU %d.%09d, %s: msg for %s sent on %s - incrementing\n" %
343 [t.to_i, t.nsec, tai_timestamp, usern, tai_yyyymmdd]
344
345 REDIS.incr('usage_messages-' + tai_yyyymmdd + '-' +
346 usern).then { |total|
347
348 t = Time.now
349 puts "SMU %d.%09d: total msgs for %s-%s now at %s\n" %
350 [t.to_i, t.nsec, tai_yyyymmdd, usern, total]
351 }
352 end
353
354 def self.validate_num(m)
355 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
356 if m.to == ARGV[0]
357 an = m.children.find { |v| v.element_name == "addresses"
358 }
359 if not an
360 return EMPromise.reject([:cancel,
361 'item-not-found'])
362 end
363 puts "ADRXEP: found an addresses node - iterate addrs.."
364
365 nums = []
366 an.children.each do |e|
367 num = ''
368 type = ''
369 e.attributes.each do |c|
370 if c[0] == 'type'
371 if c[1] != 'to'
372 # TODO: error
373 end
374 type = c[1].to_s
375 elsif c[0] == 'uri'
376 if !c[1].to_s.start_with? 'sms:'
377 # TODO: error
378 end
379 num = c[1].to_s[4..-1]
380 # TODO: confirm num validates
381 else
382 # TODO: error - unexpected name
383 end
384 end
385 if num.empty? or type.empty?
386 # TODO: error
387 end
388 nums << num
389 end
390 return nums
391 end
392
393 # if not sent to SGX domain, then assume destination is in 'to'
394 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
395 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
396 next num_dest if num_dest[0] == '+'
397 shortcode = extract_shortcode(num_dest)
398 next shortcode if shortcode
399 end
400
401 if is_anonymous_tel?(num_dest)
402 EMPromise.reject([:cancel, 'gone'])
403 else
404 # TODO: text re num not (yet) supportd/implmentd
405 EMPromise.reject([:cancel, 'item-not-found'])
406 end
407 }
408 end
409
410 def self.fetch_catapult_cred_for(jid)
411 cred_key = "catapult_cred-#{jid.stripped}"
412 REDIS.lrange(cred_key, 0, 3).then { |creds|
413 if creds.length < 4
414 # TODO: add text re credentials not registered
415 EMPromise.reject(
416 [:auth, 'registration-required']
417 )
418 else
419 creds
420 end
421 }
422 end
423
424 message :error? do |m|
425 # TODO: report it somewhere/somehow - eat for now so no err loop
426 puts "EATERROR1: #{m.inspect}"
427 end
428
429 message :body do |m|
430 EMPromise.all([
431 validate_num(m),
432 fetch_catapult_cred_for(m.from)
433 ]).then { |(num_dest, creds)|
434 jid_key = "catapult_jid-#{num_dest}"
435 REDIS.get(jid_key).then { |jid|
436 [jid, num_dest] + creds
437 }
438 }.then { |(jid, num_dest, *creds)|
439 # if destination user is in the system pass on directly
440 if jid
441 pass_on_message(m, creds.last, jid)
442 else
443 to_catapult_possible_oob(m, num_dest, *creds)
444 end
445 }.catch { |e|
446 if e.is_a?(Array) && e.length == 2
447 write_to_stream error_msg(m.reply, m.body, *e)
448 else
449 EMPromise.reject(e)
450 end
451 }
452 end
453
454 def self.user_cap_identities
455 [{category: 'client', type: 'sms'}]
456 end
457
458 # TODO: must re-add stuff so can do ad-hoc commands
459 def self.user_cap_features
460 [
461 "urn:xmpp:receipts",
462 ]
463 end
464
465 def self.add_gateway_feature(feature)
466 @gateway_features << feature
467 @gateway_features.uniq!
468 end
469
470 subscription :request? do |p|
471 puts "PRESENCE1: #{p.inspect}"
472
473 # subscriptions are allowed from anyone - send reply immediately
474 msg = Blather::Stanza::Presence.new
475 msg.to = p.from
476 msg.from = p.to
477 msg.type = :subscribed
478
479 puts 'RESPONSE5a: ' + msg.inspect
480 write_to_stream msg
481
482 # send a <presence> immediately; not automatically probed for it
483 # TODO: refactor so no "presence :probe? do |p|" duplicate below
484 caps = Blather::Stanza::Capabilities.new
485 # TODO: user a better node URI (?)
486 caps.node = 'http://catapult.sgx.soprani.ca/'
487 caps.identities = user_cap_identities
488 caps.features = user_cap_features
489
490 msg = caps.c
491 msg.to = p.from
492 msg.from = p.to.to_s + '/sgx'
493
494 puts 'RESPONSE5b: ' + msg.inspect
495 write_to_stream msg
496
497 # need to subscribe back so Conversations displays images inline
498 msg = Blather::Stanza::Presence.new
499 msg.to = p.from.to_s.split('/', 2)[0]
500 msg.from = p.to.to_s.split('/', 2)[0]
501 msg.type = :subscribe
502
503 puts 'RESPONSE5c: ' + msg.inspect
504 write_to_stream msg
505 end
506
507 presence :probe? do |p|
508 puts 'PRESENCE2: ' + p.inspect
509
510 caps = Blather::Stanza::Capabilities.new
511 # TODO: user a better node URI (?)
512 caps.node = 'http://catapult.sgx.soprani.ca/'
513 caps.identities = user_cap_identities
514 caps.features = user_cap_features
515
516 msg = caps.c
517 msg.to = p.from
518 msg.from = p.to.to_s + '/sgx'
519
520 puts 'RESPONSE6: ' + msg.inspect
521 write_to_stream msg
522 end
523
524 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
525 # respond to capabilities request for an sgx-bwmsgsv2 number JID
526 if i.to.node
527 # TODO: confirm the node URL is expected using below
528 #puts "XR[node]: #{xpath_result[0]['node']}"
529
530 msg = i.reply
531 msg.node = i.node
532 msg.identities = user_cap_identities
533 msg.features = user_cap_features
534
535 puts 'RESPONSE7: ' + msg.inspect
536 write_to_stream msg
537 next
538 end
539
540 # respond to capabilities request for sgx-bwmsgsv2 itself
541 msg = i.reply
542 msg.node = i.node
543 msg.identities = [{
544 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
545 type: 'sms', category: 'gateway'
546 }]
547 msg.features = @gateway_features
548 write_to_stream msg
549 end
550
551 def self.check_then_register(i, *creds)
552 jid_key = "catapult_jid-#{creds.last}"
553 bare_jid = i.from.stripped
554 cred_key = "catapult_cred-#{bare_jid}"
555
556 REDIS.get(jid_key).then { |existing_jid|
557 if existing_jid && existing_jid != bare_jid
558 # TODO: add/log text: credentials exist already
559 EMPromise.reject([:cancel, 'conflict'])
560 end
561 }.then {
562 REDIS.lrange(cred_key, 0, 3)
563 }.then { |existing_creds|
564 # TODO: add/log text: credentials exist already
565 if existing_creds.length == 4 && creds != existing_creds
566 EMPromise.reject([:cancel, 'conflict'])
567 elsif existing_creds.length < 4
568 REDIS.rpush(cred_key, *creds).then { |length|
569 if length != 4
570 EMPromise.reject([
571 :cancel,
572 'internal-server-error'
573 ])
574 end
575 }
576 end
577 }.then {
578 # not necessary if existing_jid non-nil, easier this way
579 REDIS.set(jid_key, bare_jid)
580 }.then { |result|
581 if result != 'OK'
582 # TODO: add txt re push failure
583 EMPromise.reject(
584 [:cancel, 'internal-server-error']
585 )
586 end
587 }.then {
588 write_to_stream i.reply
589 }
590 end
591
592 def self.creds_from_registration_query(qn)
593 xn = qn.children.find { |v| v.element_name == "x" }
594
595 if xn
596 xn.children.each_with_object({}) do |field, h|
597 next if field.element_name != "field"
598 val = field.children.find { |v|
599 v.element_name == "value"
600 }
601
602 case field['var']
603 when 'nick'
604 h[:user_id] = val.text
605 when 'username'
606 h[:api_token] = val.text
607 when 'password'
608 h[:api_secret] = val.text
609 when 'phone'
610 h[:phone_num] = val.text
611 else
612 # TODO: error
613 puts "?: #{field['var']}"
614 end
615 end
616 else
617 qn.children.each_with_object({}) do |field, h|
618 case field.element_name
619 when "nick"
620 h[:user_id] = field.text
621 when "username"
622 h[:api_token] = field.text
623 when "password"
624 h[:api_secret] = field.text
625 when "phone"
626 h[:phone_num] = field.text
627 end
628 end
629 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
630 end
631
632 def self.process_registration(i, qn)
633 EMPromise.resolve(
634 qn.children.find { |v| v.element_name == "remove" }
635 ).then { |rn|
636 if rn
637 puts "received <remove/> - ignoring for now..."
638 EMPromise.reject(:done)
639 else
640 creds_from_registration_query(qn)
641 end
642 }.then { |user_id, api_token, api_secret, phone_num|
643 if phone_num[0] == '+'
644 [user_id, api_token, api_secret, phone_num]
645 else
646 # TODO: add text re number not (yet) supported
647 EMPromise.reject([:cancel, 'item-not-found'])
648 end
649 }.then { |user_id, api_token, api_secret, phone_num|
650 # TODO: find way to verify #{phone_num}, too
651 call_catapult(
652 api_token,
653 api_secret,
654 :get,
655 "api/v2/users/#{user_id}/media"
656 ).then { |response|
657 params = JSON.parse(response)
658 # TODO: confirm params is array - could be empty
659
660 puts "register got str #{response.to_s[0..999]}"
661
662 check_then_register(
663 i,
664 user_id,
665 api_token,
666 api_secret,
667 phone_num
668 )
669 }
670 }.catch { |e|
671 EMPromise.reject(case e
672 when 401
673 # TODO: add text re bad credentials
674 [:auth, 'not-authorized']
675 when 404
676 # TODO: add text re number not found or disabled
677 [:cancel, 'item-not-found']
678 when Integer
679 [:modify, 'not-acceptable']
680 else
681 e
682 end)
683 }
684 end
685
686 def self.registration_form(orig, existing_number=nil)
687 msg = Nokogiri::XML::Node.new 'query', orig.document
688 msg['xmlns'] = 'jabber:iq:register'
689
690 if existing_number
691 msg.add_child(
692 Nokogiri::XML::Node.new(
693 'registered', msg.document
694 )
695 )
696 end
697
698 # TODO: update "User Id" x2 below (to "accountId"?), and others?
699 n1 = Nokogiri::XML::Node.new(
700 'instructions', msg.document
701 )
702 n1.content = "Enter the information from your Account "\
703 "page as well as the Phone Number\nin your "\
704 "account you want to use (ie. '+12345678901')"\
705 ".\nUser Id is nick, API Token is username, "\
706 "API Secret is password, Phone Number is phone"\
707 ".\n\nThe source code for this gateway is at "\
708 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
709 "\nCopyright (C) 2017-2020 Denver Gingerich "\
710 "and others, licensed under AGPLv3+."
711 n2 = Nokogiri::XML::Node.new 'nick', msg.document
712 n3 = Nokogiri::XML::Node.new 'username', msg.document
713 n4 = Nokogiri::XML::Node.new 'password', msg.document
714 n5 = Nokogiri::XML::Node.new 'phone', msg.document
715 n5.content = existing_number.to_s
716 msg.add_child(n1)
717 msg.add_child(n2)
718 msg.add_child(n3)
719 msg.add_child(n4)
720 msg.add_child(n5)
721
722 x = Blather::Stanza::X.new :form, [
723 {
724 required: true, type: :"text-single",
725 label: 'User Id', var: 'nick'
726 },
727 {
728 required: true, type: :"text-single",
729 label: 'API Token', var: 'username'
730 },
731 {
732 required: true, type: :"text-private",
733 label: 'API Secret', var: 'password'
734 },
735 {
736 required: true, type: :"text-single",
737 label: 'Phone Number', var: 'phone',
738 value: existing_number.to_s
739 }
740 ]
741 x.title = 'Register for '\
742 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
743 x.instructions = "Enter the details from your Account "\
744 "page as well as the Phone Number\nin your "\
745 "account you want to use (ie. '+12345678901')"\
746 ".\n\nThe source code for this gateway is at "\
747 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
748 "\nCopyright (C) 2017-2020 Denver Gingerich "\
749 "and others, licensed under AGPLv3+."
750 msg.add_child(x)
751
752 orig.add_child(msg)
753
754 return orig
755 end
756
757 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
758 puts "IQ: #{i.inspect}"
759
760 case i.type
761 when :set
762 process_registration(i, qn)
763 when :get
764 bare_jid = i.from.stripped
765 cred_key = "catapult_cred-#{bare_jid}"
766 REDIS.lindex(cred_key, 3).then { |existing_number|
767 reply = registration_form(i.reply, existing_number)
768 puts "RESPONSE2: #{reply.inspect}"
769 write_to_stream reply
770 }
771 else
772 # Unknown IQ, ignore for now
773 EMPromise.reject(:done)
774 end.catch { |e|
775 if e.is_a?(Array) && e.length == 2
776 write_to_stream error_msg(i.reply, qn, *e)
777 elsif e != :done
778 EMPromise.reject(e)
779 end
780 }.catch(&method(:panic))
781 end
782
783 iq :get? do |i|
784 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
785 end
786
787 iq :set? do |i|
788 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
789 end
790end
791
792class ReceiptMessage < Blather::Stanza
793 def self.new(to=nil)
794 node = super :message
795 node.to = to
796 node
797 end
798end
799
800class WebhookHandler < Goliath::API
801 use Goliath::Rack::Params
802
803 def response(env)
804 # TODO: add timestamp grab here, and MUST include ./tai version
805
806 puts 'ENV: ' + env.reject{ |k| k == 'params' }.to_s
807
808 # TODO: process each message in list, not just first one
809 jparams = params['_json'][0]['message']
810
811 type = params['_json'][0]['type']
812
813 users_num = ''
814 others_num = ''
815 if jparams['direction'] == 'in'
816 users_num = jparams['owner']
817 others_num = jparams['from']
818 elsif jparams['direction'] == 'out'
819 users_num = jparams['from']
820 others_num = jparams['owner']
821 else
822 # TODO: exception or similar
823 puts "big prob: '" + jparams['direction'] + "'" + body
824 return [200, {}, "OK"]
825 end
826
827 puts 'BODY - messageId: ' + jparams['id'] +
828 ', eventType: ' + type +
829 ', time: ' + jparams['time'] +
830 ', direction: ' + jparams['direction'] +
831 #', state: ' + jparams['state'] +
832 ', deliveryState: ' + (jparams['deliveryState'] ?
833 jparams['deliveryState'] : 'NONE') +
834 ', deliveryCode: ' + (jparams['deliveryCode'] ?
835 jparams['deliveryCode'] : 'NONE') +
836 ', deliveryDesc: ' + (jparams['deliveryDescription'] ?
837 jparams['deliveryDescription'] : 'NONE') +
838 ', tag: ' + (jparams['tag'] ? jparams['tag'] : 'NONE') +
839 ', media: ' + (jparams['media'] ?
840 jparams['media'].to_s : 'NONE')
841
842 if others_num[0] != '+'
843 # TODO: check that others_num actually a shortcode first
844 others_num +=
845 ';phone-context=ca-us.phone-context.soprani.ca'
846 end
847
848 jid_key = "catapult_jid-#{users_num}"
849 bare_jid = REDIS.get(jid_key).promise.sync
850
851 if !bare_jid
852 puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
853
854 # TODO: likely not appropriate; give error to BW API?
855 # TODO: add text re credentials not being registered
856 #write_to_stream error_msg(m.reply, m.body, :auth,
857 # 'registration-required')
858 return [200, {}, "OK"]
859 end
860
861 msg = nil
862 case jparams['direction']
863 when 'in'
864 text = ''
865 case type
866 when 'sms'
867 text = jparams['text']
868 when 'mms'
869 has_media = false
870
871 if jparams['text'].empty?
872 if not has_media
873 text = '[suspected group msg '\
874 'with no text (odd)]'
875 end
876 else
877 text = if has_media
878 # TODO: write/use a caption XEP
879 jparams['text']
880 else
881 '[suspected group msg '\
882 '(recipient list not '\
883 'available) with '\
884 'following text] ' +
885 jparams['text']
886 end
887 end
888
889 # ie. if text param non-empty or had no media
890 if not text.empty?
891 msg = Blather::Stanza::Message.new(
892 bare_jid, text)
893 msg.from = others_num + '@' + ARGV[0]
894 SGXbwmsgsv2.write(msg)
895 end
896
897 return [200, {}, "OK"]
898 when 'message-received'
899 # TODO: handle group chat, and fix above
900 text = jparams['text']
901
902 if jparams['to'].length > 1
903 msg = Blather::Stanza::Message.new(
904 'cheogram.com', text) # TODO
905
906 addrs = Nokogiri::XML::Node.new(
907 'addresses', msg.document)
908 addrs['xmlns'] = 'http://jabber.org/' +
909 'protocol/address'
910
911 addr1 = Nokogiri::XML::Node.new(
912 'address', msg.document)
913 addr1['type'] = 'to'
914 addr1['jid'] = bare_jid
915 addrs.add_child(addr1)
916
917 jparams['to'].each do |receiver|
918 if receiver == users_num
919 # already there in addr1
920 next
921 end
922
923 addrn = Nokogiri::XML::Node.new(
924 'address', msg.document)
925 addrn['type'] = 'to'
926 addrn['uri'] = "sms:#{receiver}"
927 addrn['delivered'] = 'true'
928 addrs.add_child(addrn)
929 end
930
931 msg.add_child(addrs)
932
933 # TODO: delete
934 puts "RESPONSE9: #{msg.inspect}"
935 end
936
937 jparams['media'].each do |media_url|
938 if not media_url.end_with?(
939 '.smil', '.txt', '.xml'
940 )
941
942 has_media = true
943 SGXbwmsgsv2.send_media(
944 others_num + '@' +
945 ARGV[0],
946 bare_jid, media_url,
947 nil, nil, msg
948 )
949 end
950 end unless not jparams['media']
951 else
952 text = "unknown type (#{type})"\
953 " with text: " + jparams['text']
954
955 # TODO: log/notify of this properly
956 puts text
957 end
958
959 if not msg
960 msg = Blather::Stanza::Message.new(bare_jid,
961 text)
962 end
963 else # per prior switch, this is: jparams['direction'] == 'out'
964 tag_parts = jparams['tag'].split(/ /, 2)
965 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
966 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
967
968 # TODO: remove this hack
969 if jparams['to'].length > 1
970 puts "WARN! group no rcpt: #{users_num}"
971 return [200, {}, "OK"]
972 end
973
974 case type
975 when 'message-failed'
976 # create a bare message like the one user sent
977 msg = Blather::Stanza::Message.new(
978 others_num + '@' + ARGV[0])
979 msg.from = bare_jid + '/' + resourcepart
980 msg['id'] = id
981
982 # TODO: add 'errorCode' and/or 'description' val
983 # create an error reply to the bare message
984 msg = Blather::StanzaError.new(
985 msg,
986 'recipient-unavailable',
987 :wait
988 ).to_node
989
990 # TODO: make prettier: this should be done above
991 others_num = params['_json'][0]['to']
992 when 'message-delivered'
993
994 msg = ReceiptMessage.new(bare_jid)
995
996 # TODO: put in member/instance variable
997 msg['id'] = SecureRandom.uuid
998
999 # TODO: send only when requested per XEP-0184
1000 rcvd = Nokogiri::XML::Node.new(
1001 'received',
1002 msg.document
1003 )
1004 rcvd['xmlns'] = 'urn:xmpp:receipts'
1005 rcvd['id'] = id
1006 msg.add_child(rcvd)
1007
1008 # TODO: make prettier: this should be done above
1009 others_num = params['_json'][0]['to']
1010 else
1011 # TODO: notify somehow of unknown state receivd?
1012 puts "message with id #{id} has "\
1013 "other type #{type}"
1014 return [200, {}, "OK"]
1015 end
1016
1017 puts "RESPONSE4: #{msg.inspect}"
1018 end
1019
1020 msg.from = others_num + '@' + ARGV[0]
1021 SGXbwmsgsv2.write(msg)
1022
1023 [200, {}, "OK"]
1024
1025 rescue Exception => e
1026 puts 'Shutting down gateway due to exception 013: ' + e.message
1027 SGXbwmsgsv2.shutdown
1028 puts 'Gateway has terminated.'
1029 EM.stop
1030 end
1031end
1032
1033at_exit do
1034 $stdout.sync = true
1035
1036 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1037 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1038
1039 if ARGV.size != 7
1040 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1041 "<component_password> <server_hostname> "\
1042 "<server_port> <application_id> "\
1043 "<http_listen_port> <mms_proxy_prefix_url>"
1044 exit 0
1045 end
1046
1047 t = Time.now
1048 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1049
1050 EM.run do
1051 REDIS = EM::Hiredis.connect
1052
1053 SGXbwmsgsv2.run
1054
1055 # required when using Prosody otherwise disconnects on 6-hour inactivity
1056 EM.add_periodic_timer(3600) do
1057 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1058 msg.from = ARGV[0]
1059 SGXbwmsgsv2.write(msg)
1060 end
1061
1062 server = Goliath::Server.new('127.0.0.1', ARGV[5].to_i)
1063 server.api = WebhookHandler.new
1064 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1065 server.logger = Log4r::Logger.new('goliath')
1066 server.logger.add(Log4r::StdoutOutputter.new('console'))
1067 server.logger.level = Log4r::INFO
1068 server.start do
1069 ["INT", "TERM"].each do |sig|
1070 trap(sig) do
1071 EM.defer do
1072 puts 'Shutting down gateway...'
1073 SGXbwmsgsv2.shutdown
1074
1075 puts 'Gateway has terminated.'
1076 EM.stop
1077 end
1078 end
1079 end
1080 end
1081 end
1082end