1#!/usr/bin/env ruby
2#
3# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
4# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
5#
6# This file is part of sgx-bwmsgsv2.
7#
8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
20
21require 'blather/client/dsl'
22require 'em-hiredis'
23require 'em-http-request'
24require 'json'
25require 'securerandom'
26require 'time'
27require 'uri'
28require 'webrick'
29
30require 'goliath/api'
31require 'goliath/server'
32require 'log4r'
33
34require_relative 'em_promise'
35
36def panic(e)
37 puts "Shutting down gateway due to exception: #{e.message}"
38 puts e.backtrace
39 SGXbwmsgsv2.shutdown
40 puts 'Gateway has terminated.'
41 EM.stop
42end
43
44def extract_shortcode(dest)
45 num, context = dest.split(';', 2)
46 num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
47end
48
49def is_anonymous_tel?(dest)
50 num, context = dest.split(';', 2)
51 context && context == 'phone-context=anonymous.phone-context.soprani.ca'
52end
53
54class SGXClient < Blather::Client
55 def register_handler(type, *guards, &block)
56 super(type, *guards) { |*args| wrap_handler(*args, &block) }
57 end
58
59 def register_handler_before(type, *guards, &block)
60 check_handler(type, guards)
61 handler = lambda { |*args| wrap_handler(*args, &block) }
62
63 @handlers[type] ||= []
64 @handlers[type].unshift([guards, handler])
65 end
66
67protected
68
69 def wrap_handler(*args, &block)
70 v = block.call(*args)
71 v.catch(&method(:panic)) if v.is_a?(Promise)
72 true # Do not run other handlers unless throw :pass
73 rescue Exception => e
74 panic(e)
75 end
76end
77
78# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
79module CatapultSettingFlagBits
80 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
81 MMS_ON_OOB_URL = 1
82end
83
84module SGXbwmsgsv2
85 extend Blather::DSL
86
87 @client = SGXClient.new
88 @gateway_features = [
89 "http://jabber.org/protocol/disco#info",
90 "http://jabber.org/protocol/address/",
91 "jabber:iq:register"
92 ]
93
94 def self.run
95 client.run
96 end
97
98 # so classes outside this module can write messages, too
99 def self.write(stanza)
100 client.write(stanza)
101 end
102
103 def self.before_handler(type, *guards, &block)
104 client.register_handler_before(type, *guards, &block)
105 end
106
107 def self.send_media(from, to, media_url, desc=nil, subject=nil)
108 # we assume media_url is of the form (always the case so far):
109 # https://api.catapult.inetwork.com/v1/users/[uid]/media/[file]
110
111 # the caller must guarantee that 'to' is a bare JID
112 proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(to) + '/' +
113 media_url.split('/', 8)[7]
114
115 puts 'ORIG_URL: ' + media_url
116 puts 'PROX_URL: ' + proxy_url
117
118 # put URL in the body (so Conversations will still see it)...
119 msg = Blather::Stanza::Message.new(to, proxy_url)
120 msg.from = from
121 msg.subject = subject if subject
122
123 # ...but also provide URL in XEP-0066 (OOB) fashion
124 # TODO: confirm client supports OOB or don't send this
125 x = Nokogiri::XML::Node.new 'x', msg.document
126 x['xmlns'] = 'jabber:x:oob'
127
128 urln = Nokogiri::XML::Node.new 'url', msg.document
129 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
130 urln.add_child(urlc)
131 x.add_child(urln)
132
133 if desc
134 descn = Nokogiri::XML::Node.new('desc', msg.document)
135 descc = Nokogiri::XML::Text.new(desc, msg.document)
136 descn.add_child(descc)
137 x.add_child(descn)
138 end
139
140 msg.add_child(x)
141
142 write(msg)
143 rescue Exception => e
144 panic(e)
145 end
146
147 def self.error_msg(orig, query_node, type, name, text=nil)
148 orig.type = :error
149
150 error = Nokogiri::XML::Node.new 'error', orig.document
151 error['type'] = type
152 orig.add_child(error)
153
154 suberr = Nokogiri::XML::Node.new name, orig.document
155 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
156 error.add_child(suberr)
157
158 orig.add_child(query_node) if query_node
159
160 # TODO: add some explanatory xml:lang='en' text (see text param)
161 puts "RESPONSE3: #{orig.inspect}"
162 return orig
163 end
164
165 # workqueue_count MUST be 0 or else Blather uses threads!
166 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
167
168 def self.pass_on_message(m, users_num, jid)
169 # setup delivery receipt; similar to a reply
170 rcpt = ReceiptMessage.new(m.from.stripped)
171 rcpt.from = m.to
172
173 # pass original message (before sending receipt)
174 m.to = jid
175 m.from = "#{users_num}@#{ARGV[0]}"
176
177 puts 'XRESPONSE0: ' + m.inspect
178 write_to_stream m
179
180 # send a delivery receipt back to the sender
181 # TODO: send only when requested per XEP-0184
182 # TODO: pass receipts from target if supported
183
184 # TODO: put in member/instance variable
185 rcpt['id'] = SecureRandom.uuid
186 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
187 rcvd['xmlns'] = 'urn:xmpp:receipts'
188 rcvd['id'] = m.id
189 rcpt.add_child(rcvd)
190
191 puts 'XRESPONSE1: ' + rcpt.inspect
192 write_to_stream rcpt
193 end
194
195 def self.call_catapult(
196 token, secret, m, pth, body=nil,
197 head={}, code=[200], respond_with=:body
198 )
199 # TODO: need to make a separate thing for voice.bw.c eventually
200 EM::HttpRequest.new(
201 "https://messaging.bandwidth.com/#{pth}"
202 ).public_send(
203 m,
204 head: {
205 'Authorization' => [token, secret]
206 }.merge(head),
207 body: body
208 ).then { |http|
209 puts "API response to send: #{http.response} with code"\
210 " response.code #{http.response_header.status}"
211
212 if code.include?(http.response_header.status)
213 case respond_with
214 when :body
215 http.response
216 when :headers
217 http.response_header
218 else
219 http
220 end
221 else
222 EMPromise.reject(http.response_header.status)
223 end
224 }
225 end
226
227 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
228 usern)
229
230 xn = s.children.find { |v| v.element_name == "x" }
231 if not xn
232 to_catapult(s, nil, num_dest, user_id, token, secret,
233 usern)
234 return
235 end
236 puts "MMSOOB: found an x node - checking for url node..."
237
238 # TODO: also check for xmlns='jabber:x:oob' in <x/> - the below
239 # is probably fine, though, as non-OOB <x><url/></x> unlikely
240
241 un = xn.children.find { |v| v.element_name == "url" }
242 if not un
243 puts "MMSOOB: no url node found so process as normal"
244 to_catapult(s, nil, num_dest, user_id, token, secret,
245 usern)
246 return
247 end
248 puts "MMSOOB: found a url node - checking if to make MMS..."
249
250 REDIS.getbit("catapult_setting_flags-#{s.from.stripped}",
251 CatapultSettingFlagBits::MMS_ON_OOB_URL).then { |oob_on|
252
253 puts "MMSOOB: found MMS_ON_OOB_URL value is '#{oob_on}'"
254 if 0 == oob_on
255 puts "MMSOOB: MMS_ON_OOB_URL off so no MMS send"
256 to_catapult(s, nil, num_dest, user_id, token,
257 secret, usern)
258 next
259 end
260
261 # TODO: check size of file at un.text and shrink if need
262
263 body = s.respond_to?(:body) ? s.body : ''
264 puts "MMSOOB: url text is '#{un.text}'"
265 puts "MMSOOB: the body is '#{body.to_s.strip}'"
266
267 # some clients send URI in both body & <url/> so delete
268 if un.text == body.to_s.strip
269 puts "MMSOOB: url matches body so deleting body"
270 s.body = ''
271 end
272
273 puts "MMSOOB: sending MMS since found OOB & user asked"
274 to_catapult(s, un.text, num_dest, user_id, token,
275 secret, usern)
276 }
277 end
278
279 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
280 body = s.respond_to?(:body) ? s.body : ''
281 if murl.to_s.empty? && body.to_s.strip.empty?
282 return EMPromise.reject(
283 [:modify, 'policy-violation']
284 )
285 end
286
287 extra = {}
288 if murl
289 extra = { media: murl }
290 end
291
292 call_catapult(
293 token,
294 secret,
295 :post,
296 "api/v2/users/#{user_id}/messages",
297 JSON.dump(extra.merge(
298 from: usern,
299 to: num_dest,
300 text: body,
301 applicationId: ARGV[4],
302 tag:
303 # callbacks need id and resourcepart
304 WEBrick::HTTPUtils.escape(s.id.to_s) +
305 ' ' +
306 WEBrick::HTTPUtils.escape(
307 s.from.resource.to_s
308 )
309 )),
310 {'Content-Type' => 'application/json'},
311 [201]
312 ).catch {
313 # TODO: add text; mention code number
314 EMPromise.reject(
315 [:cancel, 'internal-server-error']
316 )
317 }
318
319 t = Time.now
320 tai_timestamp = `./tai`.strip
321 tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
322 puts "SMU %d.%09d, %s: msg for %s sent on %s - incrementing\n" %
323 [t.to_i, t.nsec, tai_timestamp, usern, tai_yyyymmdd]
324
325 REDIS.incr('usage_messages-' + tai_yyyymmdd + '-' +
326 usern).then { |total|
327
328 t = Time.now
329 puts "SMU %d.%09d: total msgs for %s-%s now at %s\n" %
330 [t.to_i, t.nsec, tai_yyyymmdd, usern, total]
331 }
332 end
333
334 def self.validate_num(m)
335 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
336 if m.to == ARGV[0]
337 an = m.children.find { |v| v.element_name == "addresses"
338 }
339 if not an
340 return EMPromise.reject([:cancel,
341 'item-not-found'])
342 end
343 puts "ADRXEP: found an addresses node - iterate addrs.."
344
345 nums = []
346 an.children.each do |e|
347 num = ''
348 type = ''
349 e.attributes.each do |c|
350 if c[0] == 'type'
351 if c[1] != 'to'
352 # TODO: error
353 end
354 type = c[1].to_s
355 elsif c[0] == 'uri'
356 if !c[1].to_s.start_with? 'sms:'
357 # TODO: error
358 end
359 num = c[1].to_s[4..-1]
360 # TODO: confirm num validates
361 else
362 # TODO: error - unexpected name
363 end
364 end
365 if num.empty? or type.empty?
366 # TODO: error
367 end
368 nums << num
369 end
370 return nums
371 end
372
373 # if not sent to SGX domain, then assume destination is in 'to'
374 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
375 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
376 next num_dest if num_dest[0] == '+'
377 shortcode = extract_shortcode(num_dest)
378 next shortcode if shortcode
379 end
380
381 if is_anonymous_tel?(num_dest)
382 EMPromise.reject([:cancel, 'gone'])
383 else
384 # TODO: text re num not (yet) supportd/implmentd
385 EMPromise.reject([:cancel, 'item-not-found'])
386 end
387 }
388 end
389
390 def self.fetch_catapult_cred_for(jid)
391 cred_key = "catapult_cred-#{jid.stripped}"
392 REDIS.lrange(cred_key, 0, 3).then { |creds|
393 if creds.length < 4
394 # TODO: add text re credentials not registered
395 EMPromise.reject(
396 [:auth, 'registration-required']
397 )
398 else
399 creds
400 end
401 }
402 end
403
404 message :error? do |m|
405 # TODO: report it somewhere/somehow - eat for now so no err loop
406 puts "EATERROR1: #{m.inspect}"
407 end
408
409 message :body do |m|
410 EMPromise.all([
411 validate_num(m),
412 fetch_catapult_cred_for(m.from)
413 ]).then { |(num_dest, creds)|
414 jid_key = "catapult_jid-#{num_dest}"
415 REDIS.get(jid_key).then { |jid|
416 [jid, num_dest] + creds
417 }
418 }.then { |(jid, num_dest, *creds)|
419 # if destination user is in the system pass on directly
420 if jid
421 pass_on_message(m, creds.last, jid)
422 else
423 to_catapult_possible_oob(m, num_dest, *creds)
424 end
425 }.catch { |e|
426 if e.is_a?(Array) && e.length == 2
427 write_to_stream error_msg(m.reply, m.body, *e)
428 else
429 EMPromise.reject(e)
430 end
431 }
432 end
433
434 def self.user_cap_identities
435 [{category: 'client', type: 'sms'}]
436 end
437
438 # TODO: must re-add stuff so can do ad-hoc commands
439 def self.user_cap_features
440 [
441 "urn:xmpp:receipts",
442 ]
443 end
444
445 def self.add_gateway_feature(feature)
446 @gateway_features << feature
447 @gateway_features.uniq!
448 end
449
450 subscription :request? do |p|
451 puts "PRESENCE1: #{p.inspect}"
452
453 # subscriptions are allowed from anyone - send reply immediately
454 msg = Blather::Stanza::Presence.new
455 msg.to = p.from
456 msg.from = p.to
457 msg.type = :subscribed
458
459 puts 'RESPONSE5a: ' + msg.inspect
460 write_to_stream msg
461
462 # send a <presence> immediately; not automatically probed for it
463 # TODO: refactor so no "presence :probe? do |p|" duplicate below
464 caps = Blather::Stanza::Capabilities.new
465 # TODO: user a better node URI (?)
466 caps.node = 'http://catapult.sgx.soprani.ca/'
467 caps.identities = user_cap_identities
468 caps.features = user_cap_features
469
470 msg = caps.c
471 msg.to = p.from
472 msg.from = p.to.to_s + '/sgx'
473
474 puts 'RESPONSE5b: ' + msg.inspect
475 write_to_stream msg
476
477 # need to subscribe back so Conversations displays images inline
478 msg = Blather::Stanza::Presence.new
479 msg.to = p.from.to_s.split('/', 2)[0]
480 msg.from = p.to.to_s.split('/', 2)[0]
481 msg.type = :subscribe
482
483 puts 'RESPONSE5c: ' + msg.inspect
484 write_to_stream msg
485 end
486
487 presence :probe? do |p|
488 puts 'PRESENCE2: ' + p.inspect
489
490 caps = Blather::Stanza::Capabilities.new
491 # TODO: user a better node URI (?)
492 caps.node = 'http://catapult.sgx.soprani.ca/'
493 caps.identities = user_cap_identities
494 caps.features = user_cap_features
495
496 msg = caps.c
497 msg.to = p.from
498 msg.from = p.to.to_s + '/sgx'
499
500 puts 'RESPONSE6: ' + msg.inspect
501 write_to_stream msg
502 end
503
504 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
505 # respond to capabilities request for an sgx-bwmsgsv2 number JID
506 if i.to.node
507 # TODO: confirm the node URL is expected using below
508 #puts "XR[node]: #{xpath_result[0]['node']}"
509
510 msg = i.reply
511 msg.node = i.node
512 msg.identities = user_cap_identities
513 msg.features = user_cap_features
514
515 puts 'RESPONSE7: ' + msg.inspect
516 write_to_stream msg
517 next
518 end
519
520 # respond to capabilities request for sgx-bwmsgsv2 itself
521 msg = i.reply
522 msg.node = i.node
523 msg.identities = [{
524 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
525 type: 'sms', category: 'gateway'
526 }]
527 msg.features = @gateway_features
528 write_to_stream msg
529 end
530
531 def self.check_then_register(i, *creds)
532 jid_key = "catapult_jid-#{creds.last}"
533 bare_jid = i.from.stripped
534 cred_key = "catapult_cred-#{bare_jid}"
535
536 REDIS.get(jid_key).then { |existing_jid|
537 if existing_jid && existing_jid != bare_jid
538 # TODO: add/log text: credentials exist already
539 EMPromise.reject([:cancel, 'conflict'])
540 end
541 }.then {
542 REDIS.lrange(cred_key, 0, 3)
543 }.then { |existing_creds|
544 # TODO: add/log text: credentials exist already
545 if existing_creds.length == 4 && creds != existing_creds
546 EMPromise.reject([:cancel, 'conflict'])
547 elsif existing_creds.length < 4
548 REDIS.rpush(cred_key, *creds).then { |length|
549 if length != 4
550 EMPromise.reject([
551 :cancel,
552 'internal-server-error'
553 ])
554 end
555 }
556 end
557 }.then {
558 # not necessary if existing_jid non-nil, easier this way
559 REDIS.set(jid_key, bare_jid)
560 }.then { |result|
561 if result != 'OK'
562 # TODO: add txt re push failure
563 EMPromise.reject(
564 [:cancel, 'internal-server-error']
565 )
566 end
567 }.then {
568 write_to_stream i.reply
569 }
570 end
571
572 def self.creds_from_registration_query(qn)
573 xn = qn.children.find { |v| v.element_name == "x" }
574
575 if xn
576 xn.children.each_with_object({}) do |field, h|
577 next if field.element_name != "field"
578 val = field.children.find { |v|
579 v.element_name == "value"
580 }
581
582 case field['var']
583 when 'nick'
584 h[:user_id] = val.text
585 when 'username'
586 h[:api_token] = val.text
587 when 'password'
588 h[:api_secret] = val.text
589 when 'phone'
590 h[:phone_num] = val.text
591 else
592 # TODO: error
593 puts "?: #{field['var']}"
594 end
595 end
596 else
597 qn.children.each_with_object({}) do |field, h|
598 case field.element_name
599 when "nick"
600 h[:user_id] = field.text
601 when "username"
602 h[:api_token] = field.text
603 when "password"
604 h[:api_secret] = field.text
605 when "phone"
606 h[:phone_num] = field.text
607 end
608 end
609 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
610 end
611
612 def self.process_registration(i, qn)
613 EMPromise.resolve(
614 qn.children.find { |v| v.element_name == "remove" }
615 ).then { |rn|
616 if rn
617 puts "received <remove/> - ignoring for now..."
618 EMPromise.reject(:done)
619 else
620 creds_from_registration_query(qn)
621 end
622 }.then { |user_id, api_token, api_secret, phone_num|
623 if phone_num[0] == '+'
624 [user_id, api_token, api_secret, phone_num]
625 else
626 # TODO: add text re number not (yet) supported
627 EMPromise.reject([:cancel, 'item-not-found'])
628 end
629 }.then { |user_id, api_token, api_secret, phone_num|
630 # TODO: find way to verify #{phone_num}, too
631 call_catapult(
632 api_token,
633 api_secret,
634 :get,
635 "api/v2/users/#{user_id}/media"
636 ).then { |response|
637 params = JSON.parse(response)
638 # TODO: confirm params is array - could be empty
639
640 puts "register got str #{response.to_s[0..999]}"
641
642 check_then_register(
643 i,
644 user_id,
645 api_token,
646 api_secret,
647 phone_num
648 )
649 }
650 }.catch { |e|
651 EMPromise.reject(case e
652 when 401
653 # TODO: add text re bad credentials
654 [:auth, 'not-authorized']
655 when 404
656 # TODO: add text re number not found or disabled
657 [:cancel, 'item-not-found']
658 when Integer
659 [:modify, 'not-acceptable']
660 else
661 e
662 end)
663 }
664 end
665
666 def self.registration_form(orig, existing_number=nil)
667 msg = Nokogiri::XML::Node.new 'query', orig.document
668 msg['xmlns'] = 'jabber:iq:register'
669
670 if existing_number
671 msg.add_child(
672 Nokogiri::XML::Node.new(
673 'registered', msg.document
674 )
675 )
676 end
677
678 # TODO: update "User Id" x2 below (to "accountId"?), and others?
679 n1 = Nokogiri::XML::Node.new(
680 'instructions', msg.document
681 )
682 n1.content = "Enter the information from your Account "\
683 "page as well as the Phone Number\nin your "\
684 "account you want to use (ie. '+12345678901')"\
685 ".\nUser Id is nick, API Token is username, "\
686 "API Secret is password, Phone Number is phone"\
687 ".\n\nThe source code for this gateway is at "\
688 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
689 "\nCopyright (C) 2017-2020 Denver Gingerich "\
690 "and others, licensed under AGPLv3+."
691 n2 = Nokogiri::XML::Node.new 'nick', msg.document
692 n3 = Nokogiri::XML::Node.new 'username', msg.document
693 n4 = Nokogiri::XML::Node.new 'password', msg.document
694 n5 = Nokogiri::XML::Node.new 'phone', msg.document
695 n5.content = existing_number.to_s
696 msg.add_child(n1)
697 msg.add_child(n2)
698 msg.add_child(n3)
699 msg.add_child(n4)
700 msg.add_child(n5)
701
702 x = Blather::Stanza::X.new :form, [
703 {
704 required: true, type: :"text-single",
705 label: 'User Id', var: 'nick'
706 },
707 {
708 required: true, type: :"text-single",
709 label: 'API Token', var: 'username'
710 },
711 {
712 required: true, type: :"text-private",
713 label: 'API Secret', var: 'password'
714 },
715 {
716 required: true, type: :"text-single",
717 label: 'Phone Number', var: 'phone',
718 value: existing_number.to_s
719 }
720 ]
721 x.title = 'Register for '\
722 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
723 x.instructions = "Enter the details from your Account "\
724 "page as well as the Phone Number\nin your "\
725 "account you want to use (ie. '+12345678901')"\
726 ".\n\nThe source code for this gateway is at "\
727 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
728 "\nCopyright (C) 2017-2020 Denver Gingerich "\
729 "and others, licensed under AGPLv3+."
730 msg.add_child(x)
731
732 orig.add_child(msg)
733
734 return orig
735 end
736
737 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
738 puts "IQ: #{i.inspect}"
739
740 case i.type
741 when :set
742 process_registration(i, qn)
743 when :get
744 bare_jid = i.from.stripped
745 cred_key = "catapult_cred-#{bare_jid}"
746 REDIS.lindex(cred_key, 3).then { |existing_number|
747 reply = registration_form(i.reply, existing_number)
748 puts "RESPONSE2: #{reply.inspect}"
749 write_to_stream reply
750 }
751 else
752 # Unknown IQ, ignore for now
753 EMPromise.reject(:done)
754 end.catch { |e|
755 if e.is_a?(Array) && e.length == 2
756 write_to_stream error_msg(i.reply, qn, *e)
757 elsif e != :done
758 EMPromise.reject(e)
759 end
760 }.catch(&method(:panic))
761 end
762
763 iq :get? do |i|
764 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
765 end
766
767 iq :set? do |i|
768 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
769 end
770end
771
772class ReceiptMessage < Blather::Stanza
773 def self.new(to=nil)
774 node = super :message
775 node.to = to
776 node
777 end
778end
779
780class WebhookHandler < Goliath::API
781 use Goliath::Rack::Params
782
783 def response(env)
784 # TODO: add timestamp grab here, and MUST include ./tai version
785
786 puts 'ENV: ' + env.reject{ |k| k == 'params' }.to_s
787
788 # TODO: process each message in list, not just first one
789 jparams = params['_json'][0]['message']
790
791 type = params['_json'][0]['type']
792
793 users_num = ''
794 others_num = ''
795 if jparams['direction'] == 'in'
796 users_num = jparams['owner']
797 others_num = jparams['from']
798 elsif jparams['direction'] == 'out'
799 users_num = jparams['from']
800 others_num = jparams['owner']
801 else
802 # TODO: exception or similar
803 puts "big prob: '" + jparams['direction'] + "'" + body
804 return [200, {}, "OK"]
805 end
806
807 puts 'BODY - messageId: ' + jparams['id'] +
808 ', eventType: ' + type +
809 ', time: ' + jparams['time'] +
810 ', direction: ' + jparams['direction'] +
811 #', state: ' + jparams['state'] +
812 ', deliveryState: ' + (jparams['deliveryState'] ?
813 jparams['deliveryState'] : 'NONE') +
814 ', deliveryCode: ' + (jparams['deliveryCode'] ?
815 jparams['deliveryCode'] : 'NONE') +
816 ', deliveryDesc: ' + (jparams['deliveryDescription'] ?
817 jparams['deliveryDescription'] : 'NONE') +
818 ', tag: ' + (jparams['tag'] ? jparams['tag'] : 'NONE') +
819 ', media: ' + (jparams['media'] ?
820 jparams['media'].to_s : 'NONE')
821
822 if others_num[0] != '+'
823 # TODO: check that others_num actually a shortcode first
824 others_num +=
825 ';phone-context=ca-us.phone-context.soprani.ca'
826 end
827
828 jid_key = "catapult_jid-#{users_num}"
829 bare_jid = REDIS.get(jid_key).promise.sync
830
831 if !bare_jid
832 puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
833
834 # TODO: likely not appropriate; give error to BW API?
835 # TODO: add text re credentials not being registered
836 #write_to_stream error_msg(m.reply, m.body, :auth,
837 # 'registration-required')
838 return [200, {}, "OK"]
839 end
840
841 msg = nil
842 case jparams['direction']
843 when 'in'
844 text = ''
845 case type
846 when 'sms'
847 text = jparams['text']
848 when 'mms'
849 has_media = false
850
851 if jparams['text'].empty?
852 if not has_media
853 text = '[suspected group msg '\
854 'with no text (odd)]'
855 end
856 else
857 text = if has_media
858 # TODO: write/use a caption XEP
859 jparams['text']
860 else
861 '[suspected group msg '\
862 '(recipient list not '\
863 'available) with '\
864 'following text] ' +
865 jparams['text']
866 end
867 end
868
869 # ie. if text param non-empty or had no media
870 if not text.empty?
871 msg = Blather::Stanza::Message.new(
872 bare_jid, text)
873 msg.from = others_num + '@' + ARGV[0]
874 SGXbwmsgsv2.write(msg)
875 end
876
877 return [200, {}, "OK"]
878 when 'message-received'
879 # TODO: handle group chat, and fix above
880 text = jparams['text']
881
882 if jparams['to'].length > 1
883 msg = Blather::Stanza::Message.new(
884 'cheogram.com', text) # TODO
885
886 addrs = Nokogiri::XML::Node.new(
887 'addresses', msg.document)
888 addrs['xmlns'] = 'http://jabber.org/' +
889 'protocol/address'
890
891 addr1 = Nokogiri::XML::Node.new(
892 'address', msg.document)
893 addr1['type'] = 'to'
894 addr1['jid'] = bare_jid
895 addrs.add_child(addr1)
896
897 # TODO: actually do N, instead of just 1
898 addrn = Nokogiri::XML::Node.new(
899 'address', msg.document)
900 addrn['type'] = 'to'
901 addrn['uri'] = "sms:#{jparams['to'][1]}"
902 addrn['delivered'] = 'true'
903 addrs.add_child(addrn)
904
905 msg.add_child(addrs)
906
907 # TODO: delete
908 puts "RESPONSE9: #{msg.inspect}"
909 end
910
911 jparams['media'].each do |media_url|
912 if not media_url.end_with?(
913 '.smil', '.txt', '.xml'
914 )
915
916 has_media = true
917 SGXbwmsgsv2.send_media(
918 others_num + '@' +
919 ARGV[0],
920 bare_jid, media_url
921 )
922 end
923 end unless not jparams['media']
924 else
925 text = "unknown type (#{type})"\
926 " with text: " + jparams['text']
927
928 # TODO: log/notify of this properly
929 puts text
930 end
931
932 if not msg
933 msg = Blather::Stanza::Message.new(bare_jid,
934 text)
935 end
936 else # per prior switch, this is: jparams['direction'] == 'out'
937 tag_parts = jparams['tag'].split(/ /, 2)
938 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
939 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
940
941 case type
942 when 'message-failed'
943 # create a bare message like the one user sent
944 msg = Blather::Stanza::Message.new(
945 others_num + '@' + ARGV[0])
946 msg.from = bare_jid + '/' + resourcepart
947 msg['id'] = id
948
949 # TODO: remove this hack
950 if jparams['to'].length > 1
951 puts "WARN! group no rcpt: #{users_num}"
952 return [200, {}, "OK"]
953 end
954
955 # create an error reply to the bare message
956 msg = Blather::StanzaError.new(
957 msg,
958 'recipient-unavailable',
959 :wait
960 ).to_node
961
962 # TODO: make prettier: this should be done above
963 others_num = params['_json'][0]['to']
964 when 'message-delivered'
965 # TODO: remove this hack
966 if jparams['to'].length > 1
967 puts "WARN! group no rcpt: #{users_num}"
968 return [200, {}, "OK"]
969 end
970
971 msg = ReceiptMessage.new(bare_jid)
972
973 # TODO: put in member/instance variable
974 msg['id'] = SecureRandom.uuid
975
976 # TODO: send only when requested per XEP-0184
977 rcvd = Nokogiri::XML::Node.new(
978 'received',
979 msg.document
980 )
981 rcvd['xmlns'] = 'urn:xmpp:receipts'
982 rcvd['id'] = id
983 msg.add_child(rcvd)
984
985 # TODO: make prettier: this should be done above
986 others_num = params['_json'][0]['to']
987 when 'waiting'
988 # TODO: update above label
989 # can't really do anything with it; nice to know
990 puts "message with id #{id} waiting"
991 return [200, {}, "OK"]
992 else
993 # TODO: notify somehow of unknown state receivd?
994 puts "message with id #{id} has "\
995 "other type #{type}"
996 return [200, {}, "OK"]
997 end
998
999 puts "RESPONSE4: #{msg.inspect}"
1000 end
1001
1002 msg.from = others_num + '@' + ARGV[0]
1003 SGXbwmsgsv2.write(msg)
1004
1005 [200, {}, "OK"]
1006
1007 rescue Exception => e
1008 puts 'Shutting down gateway due to exception 013: ' + e.message
1009 SGXbwmsgsv2.shutdown
1010 puts 'Gateway has terminated.'
1011 EM.stop
1012 end
1013end
1014
1015at_exit do
1016 $stdout.sync = true
1017
1018 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1019 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1020
1021 if ARGV.size != 7
1022 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1023 "<component_password> <server_hostname> "\
1024 "<server_port> <application_id> "\
1025 "<http_listen_port> <mms_proxy_prefix_url>"
1026 exit 0
1027 end
1028
1029 t = Time.now
1030 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1031
1032 EM.run do
1033 REDIS = EM::Hiredis.connect
1034
1035 SGXbwmsgsv2.run
1036
1037 # required when using Prosody otherwise disconnects on 6-hour inactivity
1038 EM.add_periodic_timer(3600) do
1039 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1040 msg.from = ARGV[0]
1041 SGXbwmsgsv2.write(msg)
1042 end
1043
1044 server = Goliath::Server.new('127.0.0.1', ARGV[5].to_i)
1045 server.api = WebhookHandler.new
1046 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1047 server.logger = Log4r::Logger.new('goliath')
1048 server.logger.add(Log4r::StdoutOutputter.new('console'))
1049 server.logger.level = Log4r::INFO
1050 server.start do
1051 ["INT", "TERM"].each do |sig|
1052 trap(sig) do
1053 EM.defer do
1054 puts 'Shutting down gateway...'
1055 SGXbwmsgsv2.shutdown
1056
1057 puts 'Gateway has terminated.'
1058 EM.stop
1059 end
1060 end
1061 end
1062 end
1063 end
1064end