1#!/usr/bin/env ruby
2#
3# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
4# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
5#
6# This file is part of sgx-bwmsgsv2.
7#
8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
20
21require 'blather/client/dsl'
22require 'em-hiredis'
23require 'em-http-request'
24require 'json'
25require 'securerandom'
26require 'time'
27require 'uri'
28require 'webrick'
29
30require 'goliath/api'
31require 'goliath/server'
32require 'log4r'
33
34require_relative 'em_promise'
35
36def panic(e)
37 puts "Shutting down gateway due to exception: #{e.message}"
38 puts e.backtrace
39 SGXbwmsgsv2.shutdown
40 puts 'Gateway has terminated.'
41 EM.stop
42end
43
44def extract_shortcode(dest)
45 num, context = dest.split(';', 2)
46 num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
47end
48
49def is_anonymous_tel?(dest)
50 num, context = dest.split(';', 2)
51 context && context == 'phone-context=anonymous.phone-context.soprani.ca'
52end
53
54class SGXClient < Blather::Client
55 def register_handler(type, *guards, &block)
56 super(type, *guards) { |*args| wrap_handler(*args, &block) }
57 end
58
59 def register_handler_before(type, *guards, &block)
60 check_handler(type, guards)
61 handler = lambda { |*args| wrap_handler(*args, &block) }
62
63 @handlers[type] ||= []
64 @handlers[type].unshift([guards, handler])
65 end
66
67protected
68
69 def wrap_handler(*args, &block)
70 v = block.call(*args)
71 v.catch(&method(:panic)) if v.is_a?(Promise)
72 true # Do not run other handlers unless throw :pass
73 rescue Exception => e
74 panic(e)
75 end
76end
77
78# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
79module CatapultSettingFlagBits
80 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
81 MMS_ON_OOB_URL = 1
82end
83
84module SGXbwmsgsv2
85 extend Blather::DSL
86
87 @client = SGXClient.new
88 @gateway_features = [
89 "http://jabber.org/protocol/disco#info",
90 "http://jabber.org/protocol/address/",
91 "jabber:iq:register"
92 ]
93
94 def self.run
95 client.run
96 end
97
98 # so classes outside this module can write messages, too
99 def self.write(stanza)
100 client.write(stanza)
101 end
102
103 def self.before_handler(type, *guards, &block)
104 client.register_handler_before(type, *guards, &block)
105 end
106
107 def self.send_media(from, to, media_url, desc=nil, subject=nil)
108 # we assume media_url is of the form (always the case so far):
109 # https://api.catapult.inetwork.com/v1/users/[uid]/media/[file]
110
111 # the caller must guarantee that 'to' is a bare JID
112 proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(to) + '/' +
113 media_url.split('/', 8)[7]
114
115 puts 'ORIG_URL: ' + media_url
116 puts 'PROX_URL: ' + proxy_url
117
118 # put URL in the body (so Conversations will still see it)...
119 msg = Blather::Stanza::Message.new(to, proxy_url)
120 msg.from = from
121 msg.subject = subject if subject
122
123 # ...but also provide URL in XEP-0066 (OOB) fashion
124 # TODO: confirm client supports OOB or don't send this
125 x = Nokogiri::XML::Node.new 'x', msg.document
126 x['xmlns'] = 'jabber:x:oob'
127
128 urln = Nokogiri::XML::Node.new 'url', msg.document
129 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
130 urln.add_child(urlc)
131 x.add_child(urln)
132
133 if desc
134 descn = Nokogiri::XML::Node.new('desc', msg.document)
135 descc = Nokogiri::XML::Text.new(desc, msg.document)
136 descn.add_child(descc)
137 x.add_child(descn)
138 end
139
140 msg.add_child(x)
141
142 write(msg)
143 rescue Exception => e
144 panic(e)
145 end
146
147 def self.error_msg(orig, query_node, type, name, text=nil)
148 orig.type = :error
149
150 error = Nokogiri::XML::Node.new 'error', orig.document
151 error['type'] = type
152 orig.add_child(error)
153
154 suberr = Nokogiri::XML::Node.new name, orig.document
155 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
156 error.add_child(suberr)
157
158 orig.add_child(query_node) if query_node
159
160 # TODO: add some explanatory xml:lang='en' text (see text param)
161 puts "RESPONSE3: #{orig.inspect}"
162 return orig
163 end
164
165 # workqueue_count MUST be 0 or else Blather uses threads!
166 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
167
168 def self.pass_on_message(m, users_num, jid)
169 # setup delivery receipt; similar to a reply
170 rcpt = ReceiptMessage.new(m.from.stripped)
171 rcpt.from = m.to
172
173 # pass original message (before sending receipt)
174 m.to = jid
175 m.from = "#{users_num}@#{ARGV[0]}"
176
177 puts 'XRESPONSE0: ' + m.inspect
178 write_to_stream m
179
180 # send a delivery receipt back to the sender
181 # TODO: send only when requested per XEP-0184
182 # TODO: pass receipts from target if supported
183
184 # TODO: put in member/instance variable
185 rcpt['id'] = SecureRandom.uuid
186 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
187 rcvd['xmlns'] = 'urn:xmpp:receipts'
188 rcvd['id'] = m.id
189 rcpt.add_child(rcvd)
190
191 puts 'XRESPONSE1: ' + rcpt.inspect
192 write_to_stream rcpt
193 end
194
195 def self.call_catapult(
196 token, secret, m, pth, body=nil,
197 head={}, code=[200], respond_with=:body
198 )
199 # TODO: need to make a separate thing for voice.bw.c eventually
200 EM::HttpRequest.new(
201 "https://messaging.bandwidth.com/#{pth}"
202 ).public_send(
203 m,
204 head: {
205 'Authorization' => [token, secret]
206 }.merge(head),
207 body: body
208 ).then { |http|
209 puts "API response to send: #{http.response} with code"\
210 " response.code #{http.response_header.status}"
211
212 if code.include?(http.response_header.status)
213 case respond_with
214 when :body
215 http.response
216 when :headers
217 http.response_header
218 else
219 http
220 end
221 else
222 EMPromise.reject(http.response_header.status)
223 end
224 }
225 end
226
227 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
228 usern)
229
230 xn = s.children.find { |v| v.element_name == "x" }
231 if not xn
232 to_catapult(s, nil, num_dest, user_id, token, secret,
233 usern)
234 return
235 end
236 puts "MMSOOB: found an x node - checking for url node..."
237
238 # TODO: also check for xmlns='jabber:x:oob' in <x/> - the below
239 # is probably fine, though, as non-OOB <x><url/></x> unlikely
240
241 un = xn.children.find { |v| v.element_name == "url" }
242 if not un
243 puts "MMSOOB: no url node found so process as normal"
244 to_catapult(s, nil, num_dest, user_id, token, secret,
245 usern)
246 return
247 end
248 puts "MMSOOB: found a url node - checking if to make MMS..."
249
250 REDIS.getbit("catapult_setting_flags-#{s.from.stripped}",
251 CatapultSettingFlagBits::MMS_ON_OOB_URL).then { |oob_on|
252
253 puts "MMSOOB: found MMS_ON_OOB_URL value is '#{oob_on}'"
254 if 0 == oob_on
255 puts "MMSOOB: MMS_ON_OOB_URL off so no MMS send"
256 to_catapult(s, nil, num_dest, user_id, token,
257 secret, usern)
258 next
259 end
260
261 # TODO: check size of file at un.text and shrink if need
262
263 body = s.respond_to?(:body) ? s.body : ''
264 puts "MMSOOB: url text is '#{un.text}'"
265 puts "MMSOOB: the body is '#{body.to_s.strip}'"
266
267 # some clients send URI in both body & <url/> so delete
268 if un.text == body.to_s.strip
269 puts "MMSOOB: url matches body so deleting body"
270 s.body = ''
271 end
272
273 puts "MMSOOB: sending MMS since found OOB & user asked"
274 to_catapult(s, un.text, num_dest, user_id, token,
275 secret, usern)
276 }
277 end
278
279 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
280 body = s.respond_to?(:body) ? s.body : ''
281 if murl.to_s.empty? && body.to_s.strip.empty?
282 return EMPromise.reject(
283 [:modify, 'policy-violation']
284 )
285 end
286
287 extra = {}
288 if murl
289 extra = { media: murl }
290 end
291
292 call_catapult(
293 token,
294 secret,
295 :post,
296 "api/v2/users/#{user_id}/messages",
297 JSON.dump(extra.merge(
298 from: usern,
299 to: num_dest,
300 text: body,
301 applicationId: ARGV[4],
302 tag:
303 # callbacks need id and resourcepart
304 WEBrick::HTTPUtils.escape(s.id.to_s) +
305 ' ' +
306 WEBrick::HTTPUtils.escape(
307 s.from.resource.to_s
308 )
309 )),
310 {'Content-Type' => 'application/json'},
311 [201]
312 ).catch {
313 # TODO: add text; mention code number
314 EMPromise.reject(
315 [:cancel, 'internal-server-error']
316 )
317 }
318
319 t = Time.now
320 tai_timestamp = `./tai`.strip
321 tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
322 puts "SMU %d.%09d, %s: msg for %s sent on %s - incrementing\n" %
323 [t.to_i, t.nsec, tai_timestamp, usern, tai_yyyymmdd]
324
325 REDIS.incr('usage_messages-' + tai_yyyymmdd + '-' +
326 usern).then { |total|
327
328 t = Time.now
329 puts "SMU %d.%09d: total msgs for %s-%s now at %s\n" %
330 [t.to_i, t.nsec, tai_yyyymmdd, usern, total]
331 }
332 end
333
334 def self.validate_num(m)
335 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
336 if m.to == ARGV[0]
337 an = m.children.find { |v| v.element_name == "addresses"
338 }
339 if not an
340 return EMPromise.reject([:cancel,
341 'item-not-found'])
342 end
343 puts "ADRXEP: found an addresses node - iterate addrs.."
344
345 nums = []
346 an.children.each do |e|
347 num = ''
348 type = ''
349 e.attributes.each do |c|
350 if c[0] == 'type'
351 if c[1] != 'to'
352 # TODO: error
353 end
354 type = c[1].to_s
355 elsif c[0] == 'uri'
356 if !c[1].to_s.start_with? 'sms:'
357 # TODO: error
358 end
359 num = c[1].to_s[4..-1]
360 # TODO: confirm num validates
361 else
362 # TODO: error - unexpected name
363 end
364 end
365 if num.empty? or type.empty?
366 # TODO: error
367 end
368 nums << num
369 end
370 return nums
371 end
372
373 # if not sent to SGX domain, then assume destination is in 'to'
374 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
375 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
376 next num_dest if num_dest[0] == '+'
377 shortcode = extract_shortcode(num_dest)
378 next shortcode if shortcode
379 end
380
381 if is_anonymous_tel?(num_dest)
382 EMPromise.reject([:cancel, 'gone'])
383 else
384 # TODO: text re num not (yet) supportd/implmentd
385 EMPromise.reject([:cancel, 'item-not-found'])
386 end
387 }
388 end
389
390 def self.fetch_catapult_cred_for(jid)
391 cred_key = "catapult_cred-#{jid.stripped}"
392 REDIS.lrange(cred_key, 0, 3).then { |creds|
393 if creds.length < 4
394 # TODO: add text re credentials not registered
395 EMPromise.reject(
396 [:auth, 'registration-required']
397 )
398 else
399 creds
400 end
401 }
402 end
403
404 message :error? do |m|
405 # TODO: report it somewhere/somehow - eat for now so no err loop
406 puts "EATERROR1: #{m.inspect}"
407 end
408
409 message :body do |m|
410 EMPromise.all([
411 validate_num(m),
412 fetch_catapult_cred_for(m.from)
413 ]).then { |(num_dest, creds)|
414 jid_key = "catapult_jid-#{num_dest}"
415 REDIS.get(jid_key).then { |jid|
416 [jid, num_dest] + creds
417 }
418 }.then { |(jid, num_dest, *creds)|
419 # if destination user is in the system pass on directly
420 if jid
421 pass_on_message(m, creds.last, jid)
422 else
423 to_catapult_possible_oob(m, num_dest, *creds)
424 end
425 }.catch { |e|
426 if e.is_a?(Array) && e.length == 2
427 write_to_stream error_msg(m.reply, m.body, *e)
428 else
429 EMPromise.reject(e)
430 end
431 }
432 end
433
434 def self.user_cap_identities
435 [{category: 'client', type: 'sms'}]
436 end
437
438 # TODO: must re-add stuff so can do ad-hoc commands
439 def self.user_cap_features
440 [
441 "urn:xmpp:receipts",
442 ]
443 end
444
445 def self.add_gateway_feature(feature)
446 @gateway_features << feature
447 @gateway_features.uniq!
448 end
449
450 subscription :request? do |p|
451 puts "PRESENCE1: #{p.inspect}"
452
453 # subscriptions are allowed from anyone - send reply immediately
454 msg = Blather::Stanza::Presence.new
455 msg.to = p.from
456 msg.from = p.to
457 msg.type = :subscribed
458
459 puts 'RESPONSE5a: ' + msg.inspect
460 write_to_stream msg
461
462 # send a <presence> immediately; not automatically probed for it
463 # TODO: refactor so no "presence :probe? do |p|" duplicate below
464 caps = Blather::Stanza::Capabilities.new
465 # TODO: user a better node URI (?)
466 caps.node = 'http://catapult.sgx.soprani.ca/'
467 caps.identities = user_cap_identities
468 caps.features = user_cap_features
469
470 msg = caps.c
471 msg.to = p.from
472 msg.from = p.to.to_s + '/sgx'
473
474 puts 'RESPONSE5b: ' + msg.inspect
475 write_to_stream msg
476
477 # need to subscribe back so Conversations displays images inline
478 msg = Blather::Stanza::Presence.new
479 msg.to = p.from.to_s.split('/', 2)[0]
480 msg.from = p.to.to_s.split('/', 2)[0]
481 msg.type = :subscribe
482
483 puts 'RESPONSE5c: ' + msg.inspect
484 write_to_stream msg
485 end
486
487 presence :probe? do |p|
488 puts 'PRESENCE2: ' + p.inspect
489
490 caps = Blather::Stanza::Capabilities.new
491 # TODO: user a better node URI (?)
492 caps.node = 'http://catapult.sgx.soprani.ca/'
493 caps.identities = user_cap_identities
494 caps.features = user_cap_features
495
496 msg = caps.c
497 msg.to = p.from
498 msg.from = p.to.to_s + '/sgx'
499
500 puts 'RESPONSE6: ' + msg.inspect
501 write_to_stream msg
502 end
503
504 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
505 # respond to capabilities request for an sgx-bwmsgsv2 number JID
506 if i.to.node
507 # TODO: confirm the node URL is expected using below
508 #puts "XR[node]: #{xpath_result[0]['node']}"
509
510 msg = i.reply
511 msg.node = i.node
512 msg.identities = user_cap_identities
513 msg.features = user_cap_features
514
515 puts 'RESPONSE7: ' + msg.inspect
516 write_to_stream msg
517 next
518 end
519
520 # respond to capabilities request for sgx-bwmsgsv2 itself
521 msg = i.reply
522 msg.node = i.node
523 msg.identities = [{
524 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
525 type: 'sms', category: 'gateway'
526 }]
527 msg.features = @gateway_features
528 write_to_stream msg
529 end
530
531 def self.check_then_register(i, *creds)
532 jid_key = "catapult_jid-#{creds.last}"
533 bare_jid = i.from.stripped
534 cred_key = "catapult_cred-#{bare_jid}"
535
536 REDIS.get(jid_key).then { |existing_jid|
537 if existing_jid && existing_jid != bare_jid
538 # TODO: add/log text: credentials exist already
539 EMPromise.reject([:cancel, 'conflict'])
540 end
541 }.then {
542 REDIS.lrange(cred_key, 0, 3)
543 }.then { |existing_creds|
544 # TODO: add/log text: credentials exist already
545 if existing_creds.length == 4 && creds != existing_creds
546 EMPromise.reject([:cancel, 'conflict'])
547 elsif existing_creds.length < 4
548 REDIS.rpush(cred_key, *creds).then { |length|
549 if length != 4
550 EMPromise.reject([
551 :cancel,
552 'internal-server-error'
553 ])
554 end
555 }
556 end
557 }.then {
558 # not necessary if existing_jid non-nil, easier this way
559 REDIS.set(jid_key, bare_jid)
560 }.then { |result|
561 if result != 'OK'
562 # TODO: add txt re push failure
563 EMPromise.reject(
564 [:cancel, 'internal-server-error']
565 )
566 end
567 }.then {
568 write_to_stream i.reply
569 }
570 end
571
572 def self.creds_from_registration_query(qn)
573 xn = qn.children.find { |v| v.element_name == "x" }
574
575 if xn
576 xn.children.each_with_object({}) do |field, h|
577 next if field.element_name != "field"
578 val = field.children.find { |v|
579 v.element_name == "value"
580 }
581
582 case field['var']
583 when 'nick'
584 h[:user_id] = val.text
585 when 'username'
586 h[:api_token] = val.text
587 when 'password'
588 h[:api_secret] = val.text
589 when 'phone'
590 h[:phone_num] = val.text
591 else
592 # TODO: error
593 puts "?: #{field['var']}"
594 end
595 end
596 else
597 qn.children.each_with_object({}) do |field, h|
598 case field.element_name
599 when "nick"
600 h[:user_id] = field.text
601 when "username"
602 h[:api_token] = field.text
603 when "password"
604 h[:api_secret] = field.text
605 when "phone"
606 h[:phone_num] = field.text
607 end
608 end
609 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
610 end
611
612 def self.process_registration(i, qn)
613 EMPromise.resolve(
614 qn.children.find { |v| v.element_name == "remove" }
615 ).then { |rn|
616 if rn
617 puts "received <remove/> - ignoring for now..."
618 EMPromise.reject(:done)
619 else
620 creds_from_registration_query(qn)
621 end
622 }.then { |user_id, api_token, api_secret, phone_num|
623 if phone_num[0] == '+'
624 [user_id, api_token, api_secret, phone_num]
625 else
626 # TODO: add text re number not (yet) supported
627 EMPromise.reject([:cancel, 'item-not-found'])
628 end
629 }.then { |user_id, api_token, api_secret, phone_num|
630 # TODO: find way to verify #{phone_num}, too
631 call_catapult(
632 api_token,
633 api_secret,
634 :get,
635 "api/v2/users/#{user_id}/media"
636 ).then { |response|
637 params = JSON.parse(response)
638 # TODO: confirm params is array - could be empty
639
640 puts "register got str #{response.to_s[0..999]}"
641
642 check_then_register(
643 i,
644 user_id,
645 api_token,
646 api_secret,
647 phone_num
648 )
649 }
650 }.catch { |e|
651 EMPromise.reject(case e
652 when 401
653 # TODO: add text re bad credentials
654 [:auth, 'not-authorized']
655 when 404
656 # TODO: add text re number not found or disabled
657 [:cancel, 'item-not-found']
658 when Integer
659 [:modify, 'not-acceptable']
660 else
661 e
662 end)
663 }
664 end
665
666 def self.registration_form(orig, existing_number=nil)
667 msg = Nokogiri::XML::Node.new 'query', orig.document
668 msg['xmlns'] = 'jabber:iq:register'
669
670 if existing_number
671 msg.add_child(
672 Nokogiri::XML::Node.new(
673 'registered', msg.document
674 )
675 )
676 end
677
678 # TODO: update "User Id" x2 below (to "accountId"?), and others?
679 n1 = Nokogiri::XML::Node.new(
680 'instructions', msg.document
681 )
682 n1.content = "Enter the information from your Account "\
683 "page as well as the Phone Number\nin your "\
684 "account you want to use (ie. '+12345678901')"\
685 ".\nUser Id is nick, API Token is username, "\
686 "API Secret is password, Phone Number is phone"\
687 ".\n\nThe source code for this gateway is at "\
688 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
689 "\nCopyright (C) 2017-2020 Denver Gingerich "\
690 "and others, licensed under AGPLv3+."
691 n2 = Nokogiri::XML::Node.new 'nick', msg.document
692 n3 = Nokogiri::XML::Node.new 'username', msg.document
693 n4 = Nokogiri::XML::Node.new 'password', msg.document
694 n5 = Nokogiri::XML::Node.new 'phone', msg.document
695 n5.content = existing_number.to_s
696 msg.add_child(n1)
697 msg.add_child(n2)
698 msg.add_child(n3)
699 msg.add_child(n4)
700 msg.add_child(n5)
701
702 x = Blather::Stanza::X.new :form, [
703 {
704 required: true, type: :"text-single",
705 label: 'User Id', var: 'nick'
706 },
707 {
708 required: true, type: :"text-single",
709 label: 'API Token', var: 'username'
710 },
711 {
712 required: true, type: :"text-private",
713 label: 'API Secret', var: 'password'
714 },
715 {
716 required: true, type: :"text-single",
717 label: 'Phone Number', var: 'phone',
718 value: existing_number.to_s
719 }
720 ]
721 x.title = 'Register for '\
722 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
723 x.instructions = "Enter the details from your Account "\
724 "page as well as the Phone Number\nin your "\
725 "account you want to use (ie. '+12345678901')"\
726 ".\n\nThe source code for this gateway is at "\
727 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
728 "\nCopyright (C) 2017-2020 Denver Gingerich "\
729 "and others, licensed under AGPLv3+."
730 msg.add_child(x)
731
732 orig.add_child(msg)
733
734 return orig
735 end
736
737 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
738 puts "IQ: #{i.inspect}"
739
740 case i.type
741 when :set
742 process_registration(i, qn)
743 when :get
744 bare_jid = i.from.stripped
745 cred_key = "catapult_cred-#{bare_jid}"
746 REDIS.lindex(cred_key, 3).then { |existing_number|
747 reply = registration_form(i.reply, existing_number)
748 puts "RESPONSE2: #{reply.inspect}"
749 write_to_stream reply
750 }
751 else
752 # Unknown IQ, ignore for now
753 EMPromise.reject(:done)
754 end.catch { |e|
755 if e.is_a?(Array) && e.length == 2
756 write_to_stream error_msg(i.reply, qn, *e)
757 elsif e != :done
758 EMPromise.reject(e)
759 end
760 }.catch(&method(:panic))
761 end
762
763 iq :get? do |i|
764 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
765 end
766
767 iq :set? do |i|
768 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
769 end
770end
771
772class ReceiptMessage < Blather::Stanza
773 def self.new(to=nil)
774 node = super :message
775 node.to = to
776 node
777 end
778end
779
780class WebhookHandler < Goliath::API
781 use Goliath::Rack::Params
782
783 def response(env)
784 # TODO: add timestamp grab here, and MUST include ./tai version
785
786 puts 'ENV: ' + env.reject{ |k| k == 'params' }.to_s
787
788 users_num = ''
789 others_num = ''
790 if params['direction'] == 'in'
791 users_num = params['to']
792 others_num = params['from']
793 elsif params['direction'] == 'out'
794 users_num = params['from']
795 others_num = params['to']
796 else
797 # TODO: exception or similar
798 puts "big problem: '" + params['direction'] + "'" + body
799 return [200, {}, "OK"]
800 end
801
802 puts 'BODY - messageId: ' + params['messageId'] +
803 ', eventType: ' + params['eventType'] +
804 ', time: ' + params['time'] +
805 ', direction: ' + params['direction'] +
806 ', state: ' + params['state'] +
807 ', deliveryState: ' + (params['deliveryState'] ?
808 params['deliveryState'] : 'NONE') +
809 ', deliveryCode: ' + (params['deliveryCode'] ?
810 params['deliveryCode'] : 'NONE') +
811 ', deliveryDesc: ' + (params['deliveryDescription'] ?
812 params['deliveryDescription'] : 'NONE') +
813 ', tag: ' + (params['tag'] ? params['tag'] : 'NONE') +
814 ', media: ' + (params['media'] ? params['media'].to_s :
815 'NONE')
816
817 if others_num[0] != '+'
818 # TODO: check that others_num actually a shortcode first
819 others_num +=
820 ';phone-context=ca-us.phone-context.soprani.ca'
821 end
822
823 jid_key = "catapult_jid-#{users_num}"
824 bare_jid = REDIS.get(jid_key).promise.sync
825
826 if !bare_jid
827 puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
828
829 # TODO: likely not appropriate; give error to BW API?
830 # TODO: add text re credentials not being registered
831 #write_to_stream error_msg(m.reply, m.body, :auth,
832 # 'registration-required')
833 return [200, {}, "OK"]
834 end
835
836 msg = ''
837 case params['direction']
838 when 'in'
839 text = ''
840 case params['eventType']
841 when 'sms'
842 text = params['text']
843 when 'mms'
844 has_media = false
845 params['media'].each do |media_url|
846 if not media_url.end_with?(
847 '.smil', '.txt', '.xml'
848 )
849
850 has_media = true
851 SGXbwmsgsv2.send_media(
852 others_num + '@' +
853 ARGV[0],
854 bare_jid, media_url
855 )
856 end
857 end
858
859 if params['text'].empty?
860 if not has_media
861 text = '[suspected group msg '\
862 'with no text (odd)]'
863 end
864 else
865 text = if has_media
866 # TODO: write/use a caption XEP
867 params['text']
868 else
869 '[suspected group msg '\
870 '(recipient list not '\
871 'available) with '\
872 'following text] ' +
873 params['text']
874 end
875 end
876
877 # ie. if text param non-empty or had no media
878 if not text.empty?
879 msg = Blather::Stanza::Message.new(
880 bare_jid, text)
881 msg.from = others_num + '@' + ARGV[0]
882 SGXbwmsgsv2.write(msg)
883 end
884
885 return [200, {}, "OK"]
886 else
887 text = "unknown type (#{params['eventType']})"\
888 " with text: " + params['text']
889
890 # TODO: log/notify of this properly
891 puts text
892 end
893
894 msg = Blather::Stanza::Message.new(bare_jid, text)
895 else # per prior switch, this is: params['direction'] == 'out'
896 tag_parts = params['tag'].split(/ /, 2)
897 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
898 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
899
900 case params['deliveryState']
901 when 'not-delivered'
902 # create a bare message like the one user sent
903 msg = Blather::Stanza::Message.new(
904 others_num + '@' + ARGV[0])
905 msg.from = bare_jid + '/' + resourcepart
906 msg['id'] = id
907
908 # create an error reply to the bare message
909 msg = Blather::StanzaError.new(
910 msg,
911 'recipient-unavailable',
912 :wait
913 ).to_node
914 when 'delivered'
915 msg = ReceiptMessage.new(bare_jid)
916
917 # TODO: put in member/instance variable
918 msg['id'] = SecureRandom.uuid
919
920 # TODO: send only when requested per XEP-0184
921 rcvd = Nokogiri::XML::Node.new(
922 'received',
923 msg.document
924 )
925 rcvd['xmlns'] = 'urn:xmpp:receipts'
926 rcvd['id'] = id
927 msg.add_child(rcvd)
928 when 'waiting'
929 # can't really do anything with it; nice to know
930 puts "message with id #{id} waiting"
931 return [200, {}, "OK"]
932 else
933 # TODO: notify somehow of unknown state receivd?
934 puts "message with id #{id} has "\
935 "other state #{params['deliveryState']}"
936 return [200, {}, "OK"]
937 end
938
939 puts "RESPONSE4: #{msg.inspect}"
940 end
941
942 msg.from = others_num + '@' + ARGV[0]
943 SGXbwmsgsv2.write(msg)
944
945 [200, {}, "OK"]
946
947 rescue Exception => e
948 puts 'Shutting down gateway due to exception 013: ' + e.message
949 SGXbwmsgsv2.shutdown
950 puts 'Gateway has terminated.'
951 EM.stop
952 end
953end
954
955at_exit do
956 $stdout.sync = true
957
958 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
959 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
960
961 if ARGV.size != 7
962 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
963 "<component_password> <server_hostname> "\
964 "<server_port> <application_id> "\
965 "<http_listen_port> <mms_proxy_prefix_url>"
966 exit 0
967 end
968
969 t = Time.now
970 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
971
972 EM.run do
973 REDIS = EM::Hiredis.connect
974
975 SGXbwmsgsv2.run
976
977 # required when using Prosody otherwise disconnects on 6-hour inactivity
978 EM.add_periodic_timer(3600) do
979 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
980 msg.from = ARGV[0]
981 SGXbwmsgsv2.write(msg)
982 end
983
984 server = Goliath::Server.new('0.0.0.0', ARGV[5].to_i)
985 server.api = WebhookHandler.new
986 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
987 server.logger = Log4r::Logger.new('goliath')
988 server.logger.add(Log4r::StdoutOutputter.new('console'))
989 server.logger.level = Log4r::INFO
990 server.start do
991 ["INT", "TERM"].each do |sig|
992 trap(sig) do
993 EM.defer do
994 puts 'Shutting down gateway...'
995 SGXbwmsgsv2.shutdown
996
997 puts 'Gateway has terminated.'
998 EM.stop
999 end
1000 end
1001 end
1002 end
1003 end
1004end