1#!/usr/bin/env ruby
2#
3# Copyright (C) 2017-2020 Denver Gingerich <denver@ossguy.com>
4# Copyright (C) 2017 Stephen Paul Weber <singpolyma@singpolyma.net>
5#
6# This file is part of sgx-bwmsgsv2.
7#
8# sgx-bwmsgsv2 is free software: you can redistribute it and/or modify it under
9# the terms of the GNU Affero General Public License as published by the Free
10# Software Foundation, either version 3 of the License, or (at your option) any
11# later version.
12#
13# sgx-bwmsgsv2 is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
16# details.
17#
18# You should have received a copy of the GNU Affero General Public License along
19# with sgx-bwmsgsv2. If not, see <http://www.gnu.org/licenses/>.
20
21require 'blather/client/dsl'
22require 'em-hiredis'
23require 'em-http-request'
24require 'json'
25require 'securerandom'
26require 'time'
27require 'uri'
28require 'webrick'
29
30require 'goliath/api'
31require 'goliath/server'
32require 'log4r'
33
34require_relative 'em_promise'
35
36def panic(e)
37 puts "Shutting down gateway due to exception: #{e.message}"
38 puts e.backtrace
39 SGXbwmsgsv2.shutdown
40 puts 'Gateway has terminated.'
41 EM.stop
42end
43
44def extract_shortcode(dest)
45 num, context = dest.split(';', 2)
46 num if context && context == 'phone-context=ca-us.phone-context.soprani.ca'
47end
48
49def is_anonymous_tel?(dest)
50 num, context = dest.split(';', 2)
51 context && context == 'phone-context=anonymous.phone-context.soprani.ca'
52end
53
54class SGXClient < Blather::Client
55 def register_handler(type, *guards, &block)
56 super(type, *guards) { |*args| wrap_handler(*args, &block) }
57 end
58
59 def register_handler_before(type, *guards, &block)
60 check_handler(type, guards)
61 handler = lambda { |*args| wrap_handler(*args, &block) }
62
63 @handlers[type] ||= []
64 @handlers[type].unshift([guards, handler])
65 end
66
67protected
68
69 def wrap_handler(*args, &block)
70 v = block.call(*args)
71 v.catch(&method(:panic)) if v.is_a?(Promise)
72 true # Do not run other handlers unless throw :pass
73 rescue Exception => e
74 panic(e)
75 end
76end
77
78# TODO: keep in sync with jmp-acct_bot.rb, and eventually put in common location
79module CatapultSettingFlagBits
80 VOICEMAIL_TRANSCRIPTION_DISABLED = 0
81 MMS_ON_OOB_URL = 1
82end
83
84module SGXbwmsgsv2
85 extend Blather::DSL
86
87 @client = SGXClient.new
88 @gateway_features = [
89 "http://jabber.org/protocol/disco#info",
90 "http://jabber.org/protocol/address/",
91 "jabber:iq:register"
92 ]
93
94 def self.run
95 client.run
96 end
97
98 # so classes outside this module can write messages, too
99 def self.write(stanza)
100 client.write(stanza)
101 end
102
103 def self.before_handler(type, *guards, &block)
104 client.register_handler_before(type, *guards, &block)
105 end
106
107 def self.send_media(from, to, media_url, desc=nil, subject=nil, m=nil)
108 # we assume media_url is of the form (always the case so far):
109 # https://messaging.bandwidth.com/api/v2/users/[u]/media/[path]
110
111 # the caller must guarantee that 'to' is a bare JID
112 proxy_url = ARGV[6] + WEBrick::HTTPUtils.escape(to) + '/' +
113 media_url.split('/', 9)[8]
114
115 puts 'ORIG_URL: ' + media_url
116 puts 'PROX_URL: ' + proxy_url
117
118 # put URL in the body (so Conversations will still see it)...
119 msg = Blather::Stanza::Message.new(to, proxy_url)
120 if m
121 msg = m.copy
122 msg.body = proxy_url
123 end
124 msg.from = from
125 msg.subject = subject if subject
126
127 # ...but also provide URL in XEP-0066 (OOB) fashion
128 # TODO: confirm client supports OOB or don't send this
129 x = Nokogiri::XML::Node.new 'x', msg.document
130 x['xmlns'] = 'jabber:x:oob'
131
132 urln = Nokogiri::XML::Node.new 'url', msg.document
133 urlc = Nokogiri::XML::Text.new proxy_url, msg.document
134 urln.add_child(urlc)
135 x.add_child(urln)
136
137 if desc
138 descn = Nokogiri::XML::Node.new('desc', msg.document)
139 descc = Nokogiri::XML::Text.new(desc, msg.document)
140 descn.add_child(descc)
141 x.add_child(descn)
142 end
143
144 msg.add_child(x)
145
146 write(msg)
147 rescue Exception => e
148 panic(e)
149 end
150
151 def self.error_msg(orig, query_node, type, name, text=nil)
152 orig.type = :error
153
154 error = Nokogiri::XML::Node.new 'error', orig.document
155 error['type'] = type
156 orig.add_child(error)
157
158 suberr = Nokogiri::XML::Node.new name, orig.document
159 suberr['xmlns'] = 'urn:ietf:params:xml:ns:xmpp-stanzas'
160 error.add_child(suberr)
161
162 orig.add_child(query_node) if query_node
163
164 # TODO: add some explanatory xml:lang='en' text (see text param)
165 puts "RESPONSE3: #{orig.inspect}"
166 return orig
167 end
168
169 # workqueue_count MUST be 0 or else Blather uses threads!
170 setup ARGV[0], ARGV[1], ARGV[2], ARGV[3], nil, nil, workqueue_count: 0
171
172 def self.pass_on_message(m, users_num, jid)
173 # setup delivery receipt; similar to a reply
174 rcpt = ReceiptMessage.new(m.from.stripped)
175 rcpt.from = m.to
176
177 # pass original message (before sending receipt)
178 m.to = jid
179 m.from = "#{users_num}@#{ARGV[0]}"
180
181 puts 'XRESPONSE0: ' + m.inspect
182 write_to_stream m
183
184 # send a delivery receipt back to the sender
185 # TODO: send only when requested per XEP-0184
186 # TODO: pass receipts from target if supported
187
188 # TODO: put in member/instance variable
189 rcpt['id'] = SecureRandom.uuid
190 rcvd = Nokogiri::XML::Node.new 'received', rcpt.document
191 rcvd['xmlns'] = 'urn:xmpp:receipts'
192 rcvd['id'] = m.id
193 rcpt.add_child(rcvd)
194
195 puts 'XRESPONSE1: ' + rcpt.inspect
196 write_to_stream rcpt
197 end
198
199 def self.call_catapult(
200 token, secret, m, pth, body=nil,
201 head={}, code=[200], respond_with=:body
202 )
203 # TODO: need to make a separate thing for voice.bw.c eventually
204 EM::HttpRequest.new(
205 "https://messaging.bandwidth.com/#{pth}"
206 ).public_send(
207 m,
208 head: {
209 'Authorization' => [token, secret]
210 }.merge(head),
211 body: body
212 ).then { |http|
213 puts "API response to send: #{http.response} with code"\
214 " response.code #{http.response_header.status}"
215
216 if code.include?(http.response_header.status)
217 case respond_with
218 when :body
219 http.response
220 when :headers
221 http.response_header
222 else
223 http
224 end
225 else
226 EMPromise.reject(http.response_header.status)
227 end
228 }
229 end
230
231 def self.to_catapult_possible_oob(s, num_dest, user_id, token, secret,
232 usern)
233
234 xn = s.children.find { |v| v.element_name == "x" }
235 if not xn
236 to_catapult(s, nil, num_dest, user_id, token, secret,
237 usern)
238 return
239 end
240 puts "MMSOOB: found an x node - checking for url node..."
241
242 # TODO: also check for xmlns='jabber:x:oob' in <x/> - the below
243 # is probably fine, though, as non-OOB <x><url/></x> unlikely
244
245 un = xn.children.find { |v| v.element_name == "url" }
246 if not un
247 puts "MMSOOB: no url node found so process as normal"
248 to_catapult(s, nil, num_dest, user_id, token, secret,
249 usern)
250 return
251 end
252 puts "MMSOOB: found a url node - checking if to make MMS..."
253
254 REDIS.getbit("catapult_setting_flags-#{s.from.stripped}",
255 CatapultSettingFlagBits::MMS_ON_OOB_URL).then { |oob_on|
256
257 puts "MMSOOB: found MMS_ON_OOB_URL value is '#{oob_on}'"
258 if 0 == oob_on
259 puts "MMSOOB: MMS_ON_OOB_URL off so no MMS send"
260 to_catapult(s, nil, num_dest, user_id, token,
261 secret, usern)
262 next
263 end
264
265 # TODO: check size of file at un.text and shrink if need
266
267 body = s.respond_to?(:body) ? s.body : ''
268 puts "MMSOOB: url text is '#{un.text}'"
269 puts "MMSOOB: the body is '#{body.to_s.strip}'"
270
271 # some clients send URI in both body & <url/> so delete
272 if un.text == body.to_s.strip
273 puts "MMSOOB: url matches body so deleting body"
274 s.body = ''
275 end
276
277 puts "MMSOOB: sending MMS since found OOB & user asked"
278 to_catapult(s, un.text, num_dest, user_id, token,
279 secret, usern)
280 }
281 end
282
283 def self.to_catapult(s, murl, num_dest, user_id, token, secret, usern)
284 body = s.respond_to?(:body) ? s.body : ''
285 if murl.to_s.empty? && body.to_s.strip.empty?
286 return EMPromise.reject(
287 [:modify, 'policy-violation']
288 )
289 end
290
291 extra = {}
292 if murl
293 extra = { media: murl }
294 end
295
296 call_catapult(
297 token,
298 secret,
299 :post,
300 "api/v2/users/#{user_id}/messages",
301 JSON.dump(extra.merge(
302 from: usern,
303 to: num_dest,
304 text: body,
305 applicationId: ARGV[4],
306 tag:
307 # callbacks need id and resourcepart
308 WEBrick::HTTPUtils.escape(s.id.to_s) +
309 ' ' +
310 WEBrick::HTTPUtils.escape(
311 s.from.resource.to_s
312 )
313 )),
314 {'Content-Type' => 'application/json'},
315 [201]
316 ).catch {
317 # TODO: add text; mention code number
318 EMPromise.reject(
319 [:cancel, 'internal-server-error']
320 )
321 }
322
323 t = Time.now
324 tai_timestamp = `./tai`.strip
325 tai_yyyymmdd = Time.at(tai_timestamp.to_i).strftime('%Y%m%d')
326 puts "SMU %d.%09d, %s: msg for %s sent on %s - incrementing\n" %
327 [t.to_i, t.nsec, tai_timestamp, usern, tai_yyyymmdd]
328
329 REDIS.incr('usage_messages-' + tai_yyyymmdd + '-' +
330 usern).then { |total|
331
332 t = Time.now
333 puts "SMU %d.%09d: total msgs for %s-%s now at %s\n" %
334 [t.to_i, t.nsec, tai_yyyymmdd, usern, total]
335 }
336 end
337
338 def self.validate_num(m)
339 # if sent to SGX domain use https://wiki.soprani.ca/SGX/GroupMMS
340 if m.to == ARGV[0]
341 an = m.children.find { |v| v.element_name == "addresses"
342 }
343 if not an
344 return EMPromise.reject([:cancel,
345 'item-not-found'])
346 end
347 puts "ADRXEP: found an addresses node - iterate addrs.."
348
349 nums = []
350 an.children.each do |e|
351 num = ''
352 type = ''
353 e.attributes.each do |c|
354 if c[0] == 'type'
355 if c[1] != 'to'
356 # TODO: error
357 end
358 type = c[1].to_s
359 elsif c[0] == 'uri'
360 if !c[1].to_s.start_with? 'sms:'
361 # TODO: error
362 end
363 num = c[1].to_s[4..-1]
364 # TODO: confirm num validates
365 else
366 # TODO: error - unexpected name
367 end
368 end
369 if num.empty? or type.empty?
370 # TODO: error
371 end
372 nums << num
373 end
374 return nums
375 end
376
377 # if not sent to SGX domain, then assume destination is in 'to'
378 EMPromise.resolve(m.to.node.to_s).then { |num_dest|
379 if num_dest =~ /\A\+?[0-9]+(?:;.*)?\Z/
380 next num_dest if num_dest[0] == '+'
381 shortcode = extract_shortcode(num_dest)
382 next shortcode if shortcode
383 end
384
385 if is_anonymous_tel?(num_dest)
386 EMPromise.reject([:cancel, 'gone'])
387 else
388 # TODO: text re num not (yet) supportd/implmentd
389 EMPromise.reject([:cancel, 'item-not-found'])
390 end
391 }
392 end
393
394 def self.fetch_catapult_cred_for(jid)
395 cred_key = "catapult_cred-#{jid.stripped}"
396 REDIS.lrange(cred_key, 0, 3).then { |creds|
397 if creds.length < 4
398 # TODO: add text re credentials not registered
399 EMPromise.reject(
400 [:auth, 'registration-required']
401 )
402 else
403 creds
404 end
405 }
406 end
407
408 message :error? do |m|
409 # TODO: report it somewhere/somehow - eat for now so no err loop
410 puts "EATERROR1: #{m.inspect}"
411 end
412
413 message :body do |m|
414 EMPromise.all([
415 validate_num(m),
416 fetch_catapult_cred_for(m.from)
417 ]).then { |(num_dest, creds)|
418 jid_key = "catapult_jid-#{num_dest}"
419 REDIS.get(jid_key).then { |jid|
420 [jid, num_dest] + creds
421 }
422 }.then { |(jid, num_dest, *creds)|
423 # if destination user is in the system pass on directly
424 if jid
425 pass_on_message(m, creds.last, jid)
426 else
427 to_catapult_possible_oob(m, num_dest, *creds)
428 end
429 }.catch { |e|
430 if e.is_a?(Array) && e.length == 2
431 write_to_stream error_msg(m.reply, m.body, *e)
432 else
433 EMPromise.reject(e)
434 end
435 }
436 end
437
438 def self.user_cap_identities
439 [{category: 'client', type: 'sms'}]
440 end
441
442 # TODO: must re-add stuff so can do ad-hoc commands
443 def self.user_cap_features
444 [
445 "urn:xmpp:receipts",
446 ]
447 end
448
449 def self.add_gateway_feature(feature)
450 @gateway_features << feature
451 @gateway_features.uniq!
452 end
453
454 subscription :request? do |p|
455 puts "PRESENCE1: #{p.inspect}"
456
457 # subscriptions are allowed from anyone - send reply immediately
458 msg = Blather::Stanza::Presence.new
459 msg.to = p.from
460 msg.from = p.to
461 msg.type = :subscribed
462
463 puts 'RESPONSE5a: ' + msg.inspect
464 write_to_stream msg
465
466 # send a <presence> immediately; not automatically probed for it
467 # TODO: refactor so no "presence :probe? do |p|" duplicate below
468 caps = Blather::Stanza::Capabilities.new
469 # TODO: user a better node URI (?)
470 caps.node = 'http://catapult.sgx.soprani.ca/'
471 caps.identities = user_cap_identities
472 caps.features = user_cap_features
473
474 msg = caps.c
475 msg.to = p.from
476 msg.from = p.to.to_s + '/sgx'
477
478 puts 'RESPONSE5b: ' + msg.inspect
479 write_to_stream msg
480
481 # need to subscribe back so Conversations displays images inline
482 msg = Blather::Stanza::Presence.new
483 msg.to = p.from.to_s.split('/', 2)[0]
484 msg.from = p.to.to_s.split('/', 2)[0]
485 msg.type = :subscribe
486
487 puts 'RESPONSE5c: ' + msg.inspect
488 write_to_stream msg
489 end
490
491 presence :probe? do |p|
492 puts 'PRESENCE2: ' + p.inspect
493
494 caps = Blather::Stanza::Capabilities.new
495 # TODO: user a better node URI (?)
496 caps.node = 'http://catapult.sgx.soprani.ca/'
497 caps.identities = user_cap_identities
498 caps.features = user_cap_features
499
500 msg = caps.c
501 msg.to = p.from
502 msg.from = p.to.to_s + '/sgx'
503
504 puts 'RESPONSE6: ' + msg.inspect
505 write_to_stream msg
506 end
507
508 iq '/iq/ns:query', ns: 'http://jabber.org/protocol/disco#info' do |i|
509 # respond to capabilities request for an sgx-bwmsgsv2 number JID
510 if i.to.node
511 # TODO: confirm the node URL is expected using below
512 #puts "XR[node]: #{xpath_result[0]['node']}"
513
514 msg = i.reply
515 msg.node = i.node
516 msg.identities = user_cap_identities
517 msg.features = user_cap_features
518
519 puts 'RESPONSE7: ' + msg.inspect
520 write_to_stream msg
521 next
522 end
523
524 # respond to capabilities request for sgx-bwmsgsv2 itself
525 msg = i.reply
526 msg.node = i.node
527 msg.identities = [{
528 name: 'Soprani.ca Gateway to XMPP - Bandwidth API V2',
529 type: 'sms', category: 'gateway'
530 }]
531 msg.features = @gateway_features
532 write_to_stream msg
533 end
534
535 def self.check_then_register(i, *creds)
536 jid_key = "catapult_jid-#{creds.last}"
537 bare_jid = i.from.stripped
538 cred_key = "catapult_cred-#{bare_jid}"
539
540 REDIS.get(jid_key).then { |existing_jid|
541 if existing_jid && existing_jid != bare_jid
542 # TODO: add/log text: credentials exist already
543 EMPromise.reject([:cancel, 'conflict'])
544 end
545 }.then {
546 REDIS.lrange(cred_key, 0, 3)
547 }.then { |existing_creds|
548 # TODO: add/log text: credentials exist already
549 if existing_creds.length == 4 && creds != existing_creds
550 EMPromise.reject([:cancel, 'conflict'])
551 elsif existing_creds.length < 4
552 REDIS.rpush(cred_key, *creds).then { |length|
553 if length != 4
554 EMPromise.reject([
555 :cancel,
556 'internal-server-error'
557 ])
558 end
559 }
560 end
561 }.then {
562 # not necessary if existing_jid non-nil, easier this way
563 REDIS.set(jid_key, bare_jid)
564 }.then { |result|
565 if result != 'OK'
566 # TODO: add txt re push failure
567 EMPromise.reject(
568 [:cancel, 'internal-server-error']
569 )
570 end
571 }.then {
572 write_to_stream i.reply
573 }
574 end
575
576 def self.creds_from_registration_query(qn)
577 xn = qn.children.find { |v| v.element_name == "x" }
578
579 if xn
580 xn.children.each_with_object({}) do |field, h|
581 next if field.element_name != "field"
582 val = field.children.find { |v|
583 v.element_name == "value"
584 }
585
586 case field['var']
587 when 'nick'
588 h[:user_id] = val.text
589 when 'username'
590 h[:api_token] = val.text
591 when 'password'
592 h[:api_secret] = val.text
593 when 'phone'
594 h[:phone_num] = val.text
595 else
596 # TODO: error
597 puts "?: #{field['var']}"
598 end
599 end
600 else
601 qn.children.each_with_object({}) do |field, h|
602 case field.element_name
603 when "nick"
604 h[:user_id] = field.text
605 when "username"
606 h[:api_token] = field.text
607 when "password"
608 h[:api_secret] = field.text
609 when "phone"
610 h[:phone_num] = field.text
611 end
612 end
613 end.values_at(:user_id, :api_token, :api_secret, :phone_num)
614 end
615
616 def self.process_registration(i, qn)
617 EMPromise.resolve(
618 qn.children.find { |v| v.element_name == "remove" }
619 ).then { |rn|
620 if rn
621 puts "received <remove/> - ignoring for now..."
622 EMPromise.reject(:done)
623 else
624 creds_from_registration_query(qn)
625 end
626 }.then { |user_id, api_token, api_secret, phone_num|
627 if phone_num[0] == '+'
628 [user_id, api_token, api_secret, phone_num]
629 else
630 # TODO: add text re number not (yet) supported
631 EMPromise.reject([:cancel, 'item-not-found'])
632 end
633 }.then { |user_id, api_token, api_secret, phone_num|
634 # TODO: find way to verify #{phone_num}, too
635 call_catapult(
636 api_token,
637 api_secret,
638 :get,
639 "api/v2/users/#{user_id}/media"
640 ).then { |response|
641 params = JSON.parse(response)
642 # TODO: confirm params is array - could be empty
643
644 puts "register got str #{response.to_s[0..999]}"
645
646 check_then_register(
647 i,
648 user_id,
649 api_token,
650 api_secret,
651 phone_num
652 )
653 }
654 }.catch { |e|
655 EMPromise.reject(case e
656 when 401
657 # TODO: add text re bad credentials
658 [:auth, 'not-authorized']
659 when 404
660 # TODO: add text re number not found or disabled
661 [:cancel, 'item-not-found']
662 when Integer
663 [:modify, 'not-acceptable']
664 else
665 e
666 end)
667 }
668 end
669
670 def self.registration_form(orig, existing_number=nil)
671 msg = Nokogiri::XML::Node.new 'query', orig.document
672 msg['xmlns'] = 'jabber:iq:register'
673
674 if existing_number
675 msg.add_child(
676 Nokogiri::XML::Node.new(
677 'registered', msg.document
678 )
679 )
680 end
681
682 # TODO: update "User Id" x2 below (to "accountId"?), and others?
683 n1 = Nokogiri::XML::Node.new(
684 'instructions', msg.document
685 )
686 n1.content = "Enter the information from your Account "\
687 "page as well as the Phone Number\nin your "\
688 "account you want to use (ie. '+12345678901')"\
689 ".\nUser Id is nick, API Token is username, "\
690 "API Secret is password, Phone Number is phone"\
691 ".\n\nThe source code for this gateway is at "\
692 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
693 "\nCopyright (C) 2017-2020 Denver Gingerich "\
694 "and others, licensed under AGPLv3+."
695 n2 = Nokogiri::XML::Node.new 'nick', msg.document
696 n3 = Nokogiri::XML::Node.new 'username', msg.document
697 n4 = Nokogiri::XML::Node.new 'password', msg.document
698 n5 = Nokogiri::XML::Node.new 'phone', msg.document
699 n5.content = existing_number.to_s
700 msg.add_child(n1)
701 msg.add_child(n2)
702 msg.add_child(n3)
703 msg.add_child(n4)
704 msg.add_child(n5)
705
706 x = Blather::Stanza::X.new :form, [
707 {
708 required: true, type: :"text-single",
709 label: 'User Id', var: 'nick'
710 },
711 {
712 required: true, type: :"text-single",
713 label: 'API Token', var: 'username'
714 },
715 {
716 required: true, type: :"text-private",
717 label: 'API Secret', var: 'password'
718 },
719 {
720 required: true, type: :"text-single",
721 label: 'Phone Number', var: 'phone',
722 value: existing_number.to_s
723 }
724 ]
725 x.title = 'Register for '\
726 'Soprani.ca Gateway to XMPP - Bandwidth API V2'
727 x.instructions = "Enter the details from your Account "\
728 "page as well as the Phone Number\nin your "\
729 "account you want to use (ie. '+12345678901')"\
730 ".\n\nThe source code for this gateway is at "\
731 "https://gitlab.com/soprani.ca/sgx-bwmsgsv2 ."\
732 "\nCopyright (C) 2017-2020 Denver Gingerich "\
733 "and others, licensed under AGPLv3+."
734 msg.add_child(x)
735
736 orig.add_child(msg)
737
738 return orig
739 end
740
741 iq '/iq/ns:query', ns: 'jabber:iq:register' do |i, qn|
742 puts "IQ: #{i.inspect}"
743
744 case i.type
745 when :set
746 process_registration(i, qn)
747 when :get
748 bare_jid = i.from.stripped
749 cred_key = "catapult_cred-#{bare_jid}"
750 REDIS.lindex(cred_key, 3).then { |existing_number|
751 reply = registration_form(i.reply, existing_number)
752 puts "RESPONSE2: #{reply.inspect}"
753 write_to_stream reply
754 }
755 else
756 # Unknown IQ, ignore for now
757 EMPromise.reject(:done)
758 end.catch { |e|
759 if e.is_a?(Array) && e.length == 2
760 write_to_stream error_msg(i.reply, qn, *e)
761 elsif e != :done
762 EMPromise.reject(e)
763 end
764 }.catch(&method(:panic))
765 end
766
767 iq :get? do |i|
768 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
769 end
770
771 iq :set? do |i|
772 write_to_stream error_msg(i.reply, i.children, 'cancel', 'feature-not-implemented')
773 end
774end
775
776class ReceiptMessage < Blather::Stanza
777 def self.new(to=nil)
778 node = super :message
779 node.to = to
780 node
781 end
782end
783
784class WebhookHandler < Goliath::API
785 use Goliath::Rack::Params
786
787 def response(env)
788 # TODO: add timestamp grab here, and MUST include ./tai version
789
790 puts 'ENV: ' + env.reject{ |k| k == 'params' }.to_s
791
792 # TODO: process each message in list, not just first one
793 jparams = params['_json'][0]['message']
794
795 type = params['_json'][0]['type']
796
797 users_num = ''
798 others_num = ''
799 if jparams['direction'] == 'in'
800 users_num = jparams['owner']
801 others_num = jparams['from']
802 elsif jparams['direction'] == 'out'
803 users_num = jparams['from']
804 others_num = jparams['owner']
805 else
806 # TODO: exception or similar
807 puts "big prob: '" + jparams['direction'] + "'" + body
808 return [200, {}, "OK"]
809 end
810
811 puts 'BODY - messageId: ' + jparams['id'] +
812 ', eventType: ' + type +
813 ', time: ' + jparams['time'] +
814 ', direction: ' + jparams['direction'] +
815 #', state: ' + jparams['state'] +
816 ', deliveryState: ' + (jparams['deliveryState'] ?
817 jparams['deliveryState'] : 'NONE') +
818 ', deliveryCode: ' + (jparams['deliveryCode'] ?
819 jparams['deliveryCode'] : 'NONE') +
820 ', deliveryDesc: ' + (jparams['deliveryDescription'] ?
821 jparams['deliveryDescription'] : 'NONE') +
822 ', tag: ' + (jparams['tag'] ? jparams['tag'] : 'NONE') +
823 ', media: ' + (jparams['media'] ?
824 jparams['media'].to_s : 'NONE')
825
826 if others_num[0] != '+'
827 # TODO: check that others_num actually a shortcode first
828 others_num +=
829 ';phone-context=ca-us.phone-context.soprani.ca'
830 end
831
832 jid_key = "catapult_jid-#{users_num}"
833 bare_jid = REDIS.get(jid_key).promise.sync
834
835 if !bare_jid
836 puts "jid_key (#{jid_key}) DNE; BW API misconfigured?"
837
838 # TODO: likely not appropriate; give error to BW API?
839 # TODO: add text re credentials not being registered
840 #write_to_stream error_msg(m.reply, m.body, :auth,
841 # 'registration-required')
842 return [200, {}, "OK"]
843 end
844
845 msg = nil
846 case jparams['direction']
847 when 'in'
848 text = ''
849 case type
850 when 'sms'
851 text = jparams['text']
852 when 'mms'
853 has_media = false
854
855 if jparams['text'].empty?
856 if not has_media
857 text = '[suspected group msg '\
858 'with no text (odd)]'
859 end
860 else
861 text = if has_media
862 # TODO: write/use a caption XEP
863 jparams['text']
864 else
865 '[suspected group msg '\
866 '(recipient list not '\
867 'available) with '\
868 'following text] ' +
869 jparams['text']
870 end
871 end
872
873 # ie. if text param non-empty or had no media
874 if not text.empty?
875 msg = Blather::Stanza::Message.new(
876 bare_jid, text)
877 msg.from = others_num + '@' + ARGV[0]
878 SGXbwmsgsv2.write(msg)
879 end
880
881 return [200, {}, "OK"]
882 when 'message-received'
883 # TODO: handle group chat, and fix above
884 text = jparams['text']
885
886 if jparams['to'].length > 1
887 msg = Blather::Stanza::Message.new(
888 'cheogram.com', text) # TODO
889
890 addrs = Nokogiri::XML::Node.new(
891 'addresses', msg.document)
892 addrs['xmlns'] = 'http://jabber.org/' +
893 'protocol/address'
894
895 addr1 = Nokogiri::XML::Node.new(
896 'address', msg.document)
897 addr1['type'] = 'to'
898 addr1['jid'] = bare_jid
899 addrs.add_child(addr1)
900
901 jparams['to'].each do |receiver|
902 if receiver == users_num
903 # already there in addr1
904 next
905 end
906
907 addrn = Nokogiri::XML::Node.new(
908 'address', msg.document)
909 addrn['type'] = 'to'
910 addrn['uri'] = "sms:#{receiver}"
911 addrn['delivered'] = 'true'
912 addrs.add_child(addrn)
913 end
914
915 msg.add_child(addrs)
916
917 # TODO: delete
918 puts "RESPONSE9: #{msg.inspect}"
919 end
920
921 jparams['media'].each do |media_url|
922 if not media_url.end_with?(
923 '.smil', '.txt', '.xml'
924 )
925
926 has_media = true
927 SGXbwmsgsv2.send_media(
928 others_num + '@' +
929 ARGV[0],
930 bare_jid, media_url,
931 nil, nil, msg
932 )
933 end
934 end unless not jparams['media']
935 else
936 text = "unknown type (#{type})"\
937 " with text: " + jparams['text']
938
939 # TODO: log/notify of this properly
940 puts text
941 end
942
943 if not msg
944 msg = Blather::Stanza::Message.new(bare_jid,
945 text)
946 end
947 else # per prior switch, this is: jparams['direction'] == 'out'
948 tag_parts = jparams['tag'].split(/ /, 2)
949 id = WEBrick::HTTPUtils.unescape(tag_parts[0])
950 resourcepart = WEBrick::HTTPUtils.unescape(tag_parts[1])
951
952 # TODO: remove this hack
953 if jparams['to'].length > 1
954 puts "WARN! group no rcpt: #{users_num}"
955 return [200, {}, "OK"]
956 end
957
958 case type
959 when 'message-failed'
960 # create a bare message like the one user sent
961 msg = Blather::Stanza::Message.new(
962 others_num + '@' + ARGV[0])
963 msg.from = bare_jid + '/' + resourcepart
964 msg['id'] = id
965
966 # TODO: add 'errorCode' and/or 'description' val
967 # create an error reply to the bare message
968 msg = Blather::StanzaError.new(
969 msg,
970 'recipient-unavailable',
971 :wait
972 ).to_node
973
974 # TODO: make prettier: this should be done above
975 others_num = params['_json'][0]['to']
976 when 'message-delivered'
977
978 msg = ReceiptMessage.new(bare_jid)
979
980 # TODO: put in member/instance variable
981 msg['id'] = SecureRandom.uuid
982
983 # TODO: send only when requested per XEP-0184
984 rcvd = Nokogiri::XML::Node.new(
985 'received',
986 msg.document
987 )
988 rcvd['xmlns'] = 'urn:xmpp:receipts'
989 rcvd['id'] = id
990 msg.add_child(rcvd)
991
992 # TODO: make prettier: this should be done above
993 others_num = params['_json'][0]['to']
994 else
995 # TODO: notify somehow of unknown state receivd?
996 puts "message with id #{id} has "\
997 "other type #{type}"
998 return [200, {}, "OK"]
999 end
1000
1001 puts "RESPONSE4: #{msg.inspect}"
1002 end
1003
1004 msg.from = others_num + '@' + ARGV[0]
1005 SGXbwmsgsv2.write(msg)
1006
1007 [200, {}, "OK"]
1008
1009 rescue Exception => e
1010 puts 'Shutting down gateway due to exception 013: ' + e.message
1011 SGXbwmsgsv2.shutdown
1012 puts 'Gateway has terminated.'
1013 EM.stop
1014 end
1015end
1016
1017at_exit do
1018 $stdout.sync = true
1019
1020 puts "Soprani.ca/SMS Gateway for XMPP - Bandwidth API V2\n"\
1021 "==>> last commit of this version is " + `git rev-parse HEAD` + "\n"
1022
1023 if ARGV.size != 7
1024 puts "Usage: sgx-bwmsgsv2.rb <component_jid> "\
1025 "<component_password> <server_hostname> "\
1026 "<server_port> <application_id> "\
1027 "<http_listen_port> <mms_proxy_prefix_url>"
1028 exit 0
1029 end
1030
1031 t = Time.now
1032 puts "LOG %d.%09d: starting...\n\n" % [t.to_i, t.nsec]
1033
1034 EM.run do
1035 REDIS = EM::Hiredis.connect
1036
1037 SGXbwmsgsv2.run
1038
1039 # required when using Prosody otherwise disconnects on 6-hour inactivity
1040 EM.add_periodic_timer(3600) do
1041 msg = Blather::Stanza::Iq::Ping.new(:get, 'localhost')
1042 msg.from = ARGV[0]
1043 SGXbwmsgsv2.write(msg)
1044 end
1045
1046 server = Goliath::Server.new('127.0.0.1', ARGV[5].to_i)
1047 server.api = WebhookHandler.new
1048 server.app = Goliath::Rack::Builder.build(server.api.class, server.api)
1049 server.logger = Log4r::Logger.new('goliath')
1050 server.logger.add(Log4r::StdoutOutputter.new('console'))
1051 server.logger.level = Log4r::INFO
1052 server.start do
1053 ["INT", "TERM"].each do |sig|
1054 trap(sig) do
1055 EM.defer do
1056 puts 'Shutting down gateway...'
1057 SGXbwmsgsv2.shutdown
1058
1059 puts 'Gateway has terminated.'
1060 EM.stop
1061 end
1062 end
1063 end
1064 end
1065 end
1066end