diff --git a/lib/http-routes/api/create-call.js b/lib/http-routes/api/create-call.js index 60d7be36..418643e9 100644 --- a/lib/http-routes/api/create-call.js +++ b/lib/http-routes/api/create-call.js @@ -6,7 +6,8 @@ const {CallDirection, CallStatus} = require('../../utils/constants'); const { v4: uuidv4 } = require('uuid'); const SipError = require('drachtio-srf').SipError; const sysError = require('./error'); -const Requestor = require('../../utils/requestor'); +const HttpRequestor = require('../../utils/http-requestor'); +const WsRequestor = require('../../utils/ws-requestor'); const dbUtils = require('../../utils/db-utils'); router.post('/', async(req, res) => { @@ -104,11 +105,16 @@ router.post('/', async(req, res) => { * attach our requestor and notifier objects * these will be used for all http requests we make during this call */ - app.requestor = new Requestor(logger, account.account_sid, app.call_hook, account.webhook_secret); - if (app.call_status_hook) { - app.notifier = new Requestor(logger, account.account_sid, app.call_status_hook, account.webhook_secret); + if ('WS' === app.call_hook?.method) { + app.requestor = new WsRequestor(logger, account.account_sid, app.call_hook, account.webhook_secret) ; + app.notifier = app.requestor; + } + else { + app.requestor = new HttpRequestor(logger, account.account_sid, app.call_hook, account.webhook_secret); + if (app.call_status_hook) app.notifier = new HttpRequestor(logger, account.account_sid, app.call_status_hook, + account.webhook_secret); + else app.notifier = {request: () => {}}; } - else app.notifier = {request: () => {}}; /* now launch the outdial */ try { diff --git a/lib/http-routes/api/messaging.js b/lib/http-routes/api/messaging.js index e9e955f2..c372c0c0 100644 --- a/lib/http-routes/api/messaging.js +++ b/lib/http-routes/api/messaging.js @@ -1,5 +1,6 @@ const router = require('express').Router(); -const Requestor = require('../../utils/requestor'); +const HttpRequestor = require('../../utils/http-requestor'); +const WsRequestor = require('../../utils/ws-requestor'); const CallInfo = require('../../session/call-info'); const {CallDirection} = require('../../utils/constants'); const SmsSession = require('../../session/sms-call-session'); @@ -18,7 +19,17 @@ router.post('/:partner', async(req, res) => { const app = req.body.app; const account = await lookupAccountBySid(app.accountSid); const hook = app.messaging_hook; - const requestor = new Requestor(logger, account.account_sid, hook, account.webhook_secret); + let requestor; + + if ('WS' === hook?.method) { + app.requestor = new WsRequestor(logger, account.account_sid, hook, account.webhook_secret) ; + app.notifier = app.requestor; + } + else { + app.requestor = new HttpRequestor(logger, account.account_sid, hook, account.webhook_secret); + app.notifier = {request: () => {}}; + } + const payload = { carrier: req.params.partner, messageSid: app.messageSid, @@ -33,7 +44,7 @@ router.post('/:partner', async(req, res) => { res.status(200).json({sid: req.body.messageSid}); try { - tasks = await requestor.request(hook, payload); + tasks = await requestor.request('session:new', hook, payload); logger.info({tasks}, 'response from incoming SMS webhook'); } catch (err) { logger.error({err, hook}, 'Error sending incoming SMS message'); diff --git a/lib/middleware.js b/lib/middleware.js index c039a270..51b85122 100644 --- a/lib/middleware.js +++ b/lib/middleware.js @@ -1,7 +1,8 @@ const { v4: uuidv4 } = require('uuid'); const {CallDirection} = require('./utils/constants'); const CallInfo = require('./session/call-info'); -const Requestor = require('./utils/requestor'); +const HttpRequestor = require('./utils/http-requestor'); +const WsRequestor = require('./utils/ws-requestor'); const makeTask = require('./tasks/make_task'); const parseUri = require('drachtio-srf').parseUri; const normalizeJambones = require('./utils/normalize-jambones'); @@ -142,10 +143,18 @@ module.exports = function(srf, logger) { * create a requestor that we will use for all http requests we make during the call. * also create a notifier for call status events (if not needed, its a no-op). */ - app.requestor = new Requestor(logger, account_sid, app.call_hook, accountInfo.account.webhook_secret); - if (app.call_status_hook) app.notifier = new Requestor(logger, account_sid, app.call_status_hook, - accountInfo.account.webhook_secret); - else app.notifier = {request: () => {}}; + if ('WS' === app.call_hook?.method || + app.call_hook?.url.startsWith('ws://') || app.call_hook?.url.startsWith('wss://')) { + app.requestor = new WsRequestor(logger, account_sid, app.call_hook, accountInfo.account.webhook_secret) ; + app.notifier = app.requestor; + app.call_hook.method = 'WS'; + } + else { + app.requestor = new HttpRequestor(logger, account_sid, app.call_hook, accountInfo.account.webhook_secret); + if (app.call_status_hook) app.notifier = new HttpRequestor(logger, account_sid, app.call_status_hook, + accountInfo.account.webhook_secret); + else app.notifier = {request: () => {}}; + } req.locals.application = app; const obj = Object.assign({}, app); @@ -176,15 +185,15 @@ module.exports = function(srf, logger) { return next(); } /* retrieve the application to execute for this inbound call */ - const params = Object.assign(app.call_hook.method === 'POST' ? {sip: req.msg} : {}, + const params = Object.assign(['POST', 'WS'].includes(app.call_hook.method) ? {sip: req.msg} : {}, req.locals.callInfo); - const json = await app.requestor.request(app.call_hook, params); + const json = await app.requestor.request('session:new', app.call_hook, params); app.tasks = normalizeJambones(logger, json).map((tdata) => makeTask(logger, tdata)); if (0 === app.tasks.length) throw new Error('no application provided'); next(); } catch (err) { - logger.info({err}, `Error retrieving or parsing application: ${err.message}`); - res.send(480, {headers: {'X-Reason': err.message}}); + logger.info({err}, `Error retrieving or parsing application: ${err?.message}`); + res.send(480, {headers: {'X-Reason': err?.message || 'unknown'}}); } } diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 5d087f72..b7ca275a 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -7,7 +7,8 @@ const sessionTracker = require('./session-tracker'); const makeTask = require('../tasks/make_task'); const normalizeJambones = require('../utils/normalize-jambones'); const listTaskNames = require('../utils/summarize-tasks'); -const Requestor = require('../utils/requestor'); +const HttpRequestor = require('../utils/http-requestor'); +const WsRequestor = require('../utils/ws-requestor'); const BADPRECONDITIONS = 'preconditions not met'; const CALLER_CANCELLED_ERR_MSG = 'Response not sent due to unknown transaction'; @@ -62,6 +63,8 @@ class CallSession extends Emitter { } this._pool = srf.locals.dbHelpers.pool; + + this.requestor.on('command', this._onCommand.bind(this)); } /** @@ -289,6 +292,7 @@ class CallSession extends Emitter { */ async exec() { this.logger.info({tasks: listTaskNames(this.tasks)}, `CallSession:exec starting ${this.tasks.length} tasks`); + while (this.tasks.length && !this.callGone) { const taskNum = ++this.taskIdx; const stackNum = this.stackIdx; @@ -302,7 +306,7 @@ class CallSession extends Emitter { this.logger.info(`CallSession:exec completed task #${stackNum}:${taskNum}: ${task.name}`); } catch (err) { this.currentTask = null; - if (err.message.includes(BADPRECONDITIONS)) { + if (err.message?.includes(BADPRECONDITIONS)) { this.logger.info(`CallSession:exec task #${stackNum}:${taskNum}: ${task.name}: ${err.message}`); } else { @@ -310,6 +314,16 @@ class CallSession extends Emitter { break; } } + + if (0 === this.tasks.length && this.hasStableDialog && this.requestor instanceof WsRequestor) { + try { + await this._awaitCommandsOrHangup(); + if (!this.hasStableDialog || this.callGone) break; + } catch (err) { + this.logger.info(err, 'CallSession:exec - error waiting for new commands'); + break; + } + } } // all done - cleanup @@ -368,6 +382,10 @@ class CallSession extends Emitter { this.currentTask.kill(this); this.currentTask = null; } + if (this.wakeupResolver) { + this.wakeupResolver(); + this.wakeupResolver = null; + } } /** @@ -404,29 +422,42 @@ class CallSession extends Emitter { */ async _lccCallHook(opts) { const webhooks = []; - let sd; - if (opts.call_hook) webhooks.push(this.requestor.request(opts.call_hook, this.callInfo.toJSON())); - if (opts.child_call_hook) { - /* child call hook only allowed from a connected Dial state */ - const task = this.currentTask; - sd = task.sd; - if (task && TaskName.Dial === task.name && sd) { - webhooks.push(this.requestor.request(opts.child_call_hook, sd.callInfo.toJSON())); + let sd, tasks, childTasks; + + if (opts.call_hook || opts.child_call_hook) { + if (opts.call_hook) { + webhooks.push(this.requestor.request('session:redirect', opts.call_hook, this.callInfo.toJSON())); } + if (opts.child_call_hook) { + /* child call hook only allowed from a connected Dial state */ + const task = this.currentTask; + sd = task.sd; + if (task && TaskName.Dial === task.name && sd) { + webhooks.push(this.requestor.request('session:redirect', opts.child_call_hook, sd.callInfo.toJSON())); + } + } + const [tasks1, tasks2] = await Promise.all(webhooks); + if (opts.call_hook) { + tasks = tasks1; + if (opts.child_call_hook) childTasks = tasks2; + } + else childTasks = tasks1; } - const [tasks1, tasks2] = await Promise.all(webhooks); - let tasks, childTasks; - if (opts.call_hook) { - tasks = tasks1; - if (opts.child_call_hook) childTasks = tasks2; + else if (opts.parent_call || opts.child_call) { + const {parent_call, child_call} = opts; + assert.ok(!parent_call || Array.isArray(parent_call), 'CallSession:_lccCallHook - parent_call must be an array'); + assert.ok(!child_call || Array.isArray(child_call), 'CallSession:_lccCallHook - child_call must be an array'); + tasks = parent_call; + childTasks = child_call; } - else childTasks = tasks1; if (childTasks) { const {parentLogger} = this.srf.locals; const childLogger = parentLogger.child({callId: this.callId, callSid: sd.callSid}); const t = normalizeJambones(childLogger, childTasks).map((tdata) => makeTask(childLogger, tdata)); childLogger.info({tasks: listTaskNames(t)}, 'CallSession:_lccCallHook new task list for child call'); + + // TODO: if using websockets api, we need a new websocket for the adulting session.. const cs = await sd.doAdulting({ logger: childLogger, application: this.application, @@ -604,6 +635,59 @@ class CallSession extends Emitter { this.taskIdx = 0; } + _onCommand({msgid, command, queueCommand, data}) { + this.logger.info({msgid, command, queueCommand, data}, 'CallSession:_onCommand - received command'); + switch (command) { + case 'redirect': + if (Array.isArray(data)) { + const t = normalizeJambones(this.logger, data).map((tdata) => makeTask(this.logger, tdata)); + if (!queueCommand) { + this.logger.info({tasks: listTaskNames(t)}, 'CallSession:_onCommand new task list'); + this.replaceApplication(t); + } + else { + this.logger.info({t, tasks: this.tasks}, 'CallSession:_onCommand - about to queue tasks'); + this.tasks.push(...t); + this.logger.debug({tasks: this.tasks}, 'CallSession:_onCommand - tasks have been queued'); + } + } + else this._lccCallHook(data); + break; + + case 'call:status': + this._lccCallStatus(data); + break; + + case 'mute:status': + this._lccMuteStatus(data); + break; + + case 'conf:mute-status': + this._lccConfMuteStatus(data); + break; + + case 'conf:hold-status': + this._lccConfHoldStatus(data); + break; + + case 'listen:status': + this._lccListenStatus(data); + break; + + case 'whisper': + this._lccWhisper(data); + break; + + default: + this.logger.info(`CallSession:_onCommand - invalid command ${command}`); + } + if (this.wakeupResolver) { + this.logger.info('CallSession:_onCommand - got commands, waking up..'); + this.wakeupResolver(); + this.wakeupResolver = null; + } + } + _evaluatePreconditions(task) { switch (task.preconditions) { case TaskPreconditions.None: @@ -740,6 +824,7 @@ class CallSession extends Emitter { }); } this.tmpFiles.clear(); + this.requestor && this.requestor.close(); } /** @@ -841,7 +926,7 @@ class CallSession extends Emitter { } else { this.logger.info({accountSid: this.accountSid, webhook: r[0]}, 'performQueueWebhook: webhook found'); - this.queueEventHookRequestor = new Requestor(this.logger, this.accountSid, + this.queueEventHookRequestor = new HttpRequestor(this.logger, this.accountSid, r[0], this.webhook_secret); this.queueEventHook = r[0]; } @@ -855,7 +940,7 @@ class CallSession extends Emitter { /* send webhook */ const params = {...obj, ...this.callInfo.toJSON()}; this.logger.info({accountSid: this.accountSid, params}, 'performQueueWebhook: sending webhook'); - this.queueEventHookRequestor.request(this.queueEventHook, params) + this.queueEventHookRequestor.request('queue:status', this.queueEventHook, params) .catch((err) => { this.logger.info({err, accountSid: this.accountSid, obj}, 'Error sending queue notification event'); }); @@ -944,6 +1029,10 @@ class CallSession extends Emitter { this.emit('callStatusChange', {callStatus: CallStatus.Completed, duration}); this.logger.debug('CallSession: call terminated by jambones'); origDestroy(); + if (this.wakeupResolver) { + this.wakeupResolver(); + this.wakeupResolver = null; + } } }; } @@ -1002,7 +1091,7 @@ class CallSession extends Emitter { this.callInfo.updateCallStatus(callStatus, sipStatus); if (typeof duration === 'number') this.callInfo.duration = duration; try { - this.notifier.request(this.call_status_hook, this.callInfo.toJSON()); + this.notifier.request('call:status', this.call_status_hook, this.callInfo.toJSON()); } catch (err) { this.logger.info(err, `CallSession:_notifyCallStatusChange error sending ${callStatus} ${sipStatus}`); } @@ -1012,6 +1101,14 @@ class CallSession extends Emitter { this.updateCallStatus(Object.assign({}, this.callInfo.toJSON()), this.serviceUrl) .catch((err) => this.logger.error(err, 'redis error')); } + + _awaitCommandsOrHangup() { + assert(!this.wakeupResolver); + return new Promise((resolve, reject) => { + this.logger.info('_awaitCommandsOrHangup - waiting...'); + this.wakeupResolver = resolve; + }); + } } module.exports = CallSession; diff --git a/lib/tasks/conference.js b/lib/tasks/conference.js index 40ab7ce9..60782265 100644 --- a/lib/tasks/conference.js +++ b/lib/tasks/conference.js @@ -529,7 +529,7 @@ class Conference extends Task { async _playHook(cs, dlg, hook, allowed = [TaskName.Play, TaskName.Say, TaskName.Pause]) { assert(!this._playSession); - const json = await cs.application.requestor.request(hook, cs.callInfo); + const json = await cs.application.requestor.request('verb:hook', hook, cs.callInfo); const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); const allowedTasks = tasks.filter((t) => allowed.includes(t.name)); @@ -586,7 +586,7 @@ class Conference extends Task { params.duration = (Date.now() - this.conferenceStartTime.getTime()) / 1000; if (!params.time) params.time = (new Date()).toISOString(); if (!params.members && typeof this.participantCount === 'number') params.members = this.participantCount; - cs.application.requestor.request(this.statusHook, Object.assign(params, this.statusParams)) + cs.application.requestor.request('verb:hook', this.statusHook, Object.assign(params, this.statusParams)) .catch((err) => this.logger.info(err, 'Conference:notifyConferenceEvent - error')); } } diff --git a/lib/tasks/dial.js b/lib/tasks/dial.js index a40c3845..add36067 100644 --- a/lib/tasks/dial.js +++ b/lib/tasks/dial.js @@ -288,7 +288,7 @@ class TaskDial extends Task { const match = dtmfDetector.keyPress(key); if (match) { this.logger.info({callSid}, `Dial:_onInfo triggered dtmf match: ${match}`); - requestor.request(this.dtmfHook, {dtmf: match, ...callInfo.toJSON()}) + requestor.request('verb:hook', this.dtmfHook, {dtmf: match, ...callInfo.toJSON()}) .catch((err) => this.logger.info(err, 'Dial:_onDtmf - error')); } } diff --git a/lib/tasks/dialogflow/index.js b/lib/tasks/dialogflow/index.js index 1f91b599..a56b072b 100644 --- a/lib/tasks/dialogflow/index.js +++ b/lib/tasks/dialogflow/index.js @@ -453,7 +453,7 @@ class Dialogflow extends Task { } async _performHook(cs, hook, results = {}) { - const json = await this.cs.requestor.request(hook, {...results, ...cs.callInfo.toJSON()}); + const json = await this.cs.requestor.request('verb:hook', hook, {...results, ...cs.callInfo.toJSON()}); if (json && Array.isArray(json)) { const makeTask = require('../make_task'); const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); diff --git a/lib/tasks/enqueue.js b/lib/tasks/enqueue.js index e1380ea6..e9d975d6 100644 --- a/lib/tasks/enqueue.js +++ b/lib/tasks/enqueue.js @@ -317,7 +317,7 @@ class TaskEnqueue extends Task { } catch (err) { this.logger.error({err}, `TaskEnqueue:_playHook error retrieving list info for queue ${this.queueName}`); } - const json = await cs.application.requestor.request(hook, params); + const json = await cs.application.requestor.request('verb:hook', hook, params); const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); const allowedTasks = tasks.filter((t) => allowed.includes(t.name)); diff --git a/lib/tasks/gather.js b/lib/tasks/gather.js index f80afe11..f08e02a4 100644 --- a/lib/tasks/gather.js +++ b/lib/tasks/gather.js @@ -264,7 +264,7 @@ class TaskGather extends Task { this.logger.debug(evt, 'TaskGather:_onTranscription'); if (evt.is_final) this._resolve('speech', evt); else if (this.partialResultHook) { - this.cs.requestor.request(this.partialResultHook, Object.assign({speech: evt}, this.cs.callInfo)) + this.cs.requestor.request('verb:hook', this.partialResultHook, Object.assign({speech: evt}, this.cs.callInfo)) .catch((err) => this.logger.info(err, 'GatherTask:_onTranscription error')); } } diff --git a/lib/tasks/lex.js b/lib/tasks/lex.js index 58df5bce..c84bb058 100644 --- a/lib/tasks/lex.js +++ b/lib/tasks/lex.js @@ -289,7 +289,7 @@ class Lex extends Task { } async _performHook(cs, hook, results) { - const json = await this.cs.requestor.request(hook, results); + const json = await this.cs.requestor.request('verb:hook', hook, results); if (json && Array.isArray(json)) { const makeTask = require('./make_task'); const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); diff --git a/lib/tasks/rest_dial.js b/lib/tasks/rest_dial.js index 9d3363ac..77fd6633 100644 --- a/lib/tasks/rest_dial.js +++ b/lib/tasks/rest_dial.js @@ -48,7 +48,7 @@ class TaskRestDial extends Task { cs.setDialog(dlg); try { - const tasks = await cs.requestor.request(this.call_hook, cs.callInfo); + const tasks = await cs.requestor.request('verb:hook', this.call_hook, cs.callInfo); if (tasks && Array.isArray(tasks)) { this.logger.debug({tasks: tasks}, `TaskRestDial: replacing application with ${tasks.length} tasks`); cs.replaceApplication(normalizeJambones(this.logger, tasks).map((tdata) => makeTask(this.logger, tdata))); diff --git a/lib/tasks/say.js b/lib/tasks/say.js index 3d100f1d..d19748ce 100644 --- a/lib/tasks/say.js +++ b/lib/tasks/say.js @@ -43,6 +43,7 @@ class TaskSay extends Task { // synthesize all of the text elements let lastUpdated = false; const filepath = (await Promise.all(this.text.map(async(text) => { + if (text.startsWith('silence_stream://')) return text; const {filePath, servedFromCache} = await synthAudio(stats, { text, vendor, diff --git a/lib/tasks/sip_refer.js b/lib/tasks/sip_refer.js index 9a42fc1b..3dd9fb9b 100644 --- a/lib/tasks/sip_refer.js +++ b/lib/tasks/sip_refer.js @@ -65,7 +65,7 @@ class TaskSipRefer extends Task { const status = arr[1]; this.logger.debug(`TaskSipRefer:_handleNotify: call got status ${status}`); if (this.eventHook) { - await cs.requestor.request(this.eventHook, {event: 'transfer-status', call_status: status}); + await cs.requestor.request('verb:hook', this.eventHook, {event: 'transfer-status', call_status: status}); } if (status >= 200) { await this.performAction({refer_status: 202, final_referred_call_status: status}); diff --git a/lib/tasks/task.js b/lib/tasks/task.js index 6c2434f9..f6509865 100644 --- a/lib/tasks/task.js +++ b/lib/tasks/task.js @@ -107,7 +107,7 @@ class Task extends Emitter { async performAction(results, expectResponse = true) { if (this.actionHook) { const params = results ? Object.assign(results, this.cs.callInfo.toJSON()) : this.cs.callInfo.toJSON(); - const json = await this.cs.requestor.request(this.actionHook, params); + const json = await this.cs.requestor.request('verb:hook', this.actionHook, params); if (expectResponse && json && Array.isArray(json)) { const makeTask = require('./make_task'); const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); @@ -120,7 +120,7 @@ class Task extends Emitter { } async performHook(cs, hook, results) { - const json = await cs.requestor.request(hook, results); + const json = await cs.requestor.request('verb:hook', hook, results); if (json && Array.isArray(json)) { const makeTask = require('./make_task'); const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); diff --git a/lib/tasks/transcribe.js b/lib/tasks/transcribe.js index 6b58954e..953ab55b 100644 --- a/lib/tasks/transcribe.js +++ b/lib/tasks/transcribe.js @@ -246,7 +246,7 @@ class TaskTranscribe extends Task { } if (this.transcriptionHook) { - this.cs.requestor.request(this.transcriptionHook, Object.assign({speech: evt}, this.cs.callInfo)) + this.cs.requestor.request('verb:hook', this.transcriptionHook, Object.assign({speech: evt}, this.cs.callInfo)) .catch((err) => this.logger.info(err, 'TranscribeTask:_onTranscription error')); } if (this.parentTask) { diff --git a/lib/utils/base-requestor.js b/lib/utils/base-requestor.js new file mode 100644 index 00000000..c2f1be95 --- /dev/null +++ b/lib/utils/base-requestor.js @@ -0,0 +1,75 @@ +const assert = require('assert'); +const Emitter = require('events'); +const crypto = require('crypto'); +const timeSeries = require('@jambonz/time-series'); +let alerter ; + +class BaseRequestor extends Emitter { + constructor(logger, account_sid, hook, secret) { + super(); + assert(typeof hook === 'object'); + + this.logger = logger; + this.url = hook.url; + + this.username = hook.username; + this.password = hook.password; + this.secret = secret; + this.account_sid = account_sid; + + const {stats} = require('../../').srf.locals; + this.stats = stats; + + if (!alerter) { + alerter = timeSeries(logger, { + host: process.env.JAMBONES_TIME_SERIES_HOST, + commitSize: 50, + commitInterval: 'test' === process.env.NODE_ENV ? 7 : 20 + }); + } + } + + get Alerter() { + return alerter; + } + + close() { + /* subclass responsibility */ + } + + _computeSignature(payload, timestamp, secret) { + assert(secret); + const data = `${timestamp}.${JSON.stringify(payload)}`; + return crypto + .createHmac('sha256', secret) + .update(data, 'utf8') + .digest('hex'); + } + + _generateSigHeader(payload, secret) { + const timestamp = Math.floor(Date.now() / 1000); + const signature = this._computeSignature(payload, timestamp, secret); + const scheme = 'v1'; + return { + 'Jambonz-Signature': `t=${timestamp},${scheme}=${signature}` + }; + } + + _isAbsoluteUrl(u) { + return typeof u === 'string' && + u.startsWith('https://') || u.startsWith('http://') || + u.startsWith('ws://') || u.startsWith('wss://'); + } + _isRelativeUrl(u) { + return typeof u === 'string' && u.startsWith('/'); + } + _roundTrip(startAt) { + const diff = process.hrtime(startAt); + const time = diff[0] * 1e3 + diff[1] * 1e-6; + return time.toFixed(0); + } + + +} + +module.exports = BaseRequestor; diff --git a/lib/utils/constants.json b/lib/utils/constants.json index 2ed63ee0..620bdef8 100644 --- a/lib/utils/constants.json +++ b/lib/utils/constants.json @@ -105,6 +105,15 @@ "Hangup": "hangup", "Replaced": "replaced" }, + "HookMsgTypes": [ + "session:new", + "session:reconnect", + "session:redirect", + "call:status", + "queue:status", + "verb:hook", + "jambonz:error" + ], "MAX_SIMRINGS": 10, "BONG_TONE": "tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)", "FS_UUID_SET_NAME": "fsUUIDs" diff --git a/lib/utils/http-requestor.js b/lib/utils/http-requestor.js new file mode 100644 index 00000000..3ecbb103 --- /dev/null +++ b/lib/utils/http-requestor.js @@ -0,0 +1,105 @@ +const bent = require('bent'); +const parseUrl = require('parse-url'); +const assert = require('assert'); +const BaseRequestor = require('./base-requestor'); +const {HookMsgTypes} = require('./constants.json'); +const snakeCaseKeys = require('./snakecase-keys'); + +const toBase64 = (str) => Buffer.from(str || '', 'utf8').toString('base64'); + +function basicAuth(username, password) { + if (!username || !password) return {}; + const creds = `${username}:${password || ''}`; + const header = `Basic ${toBase64(creds)}`; + return {Authorization: header}; +} + + +class HttpRequestor extends BaseRequestor { + constructor(logger, account_sid, hook, secret) { + super(logger, account_sid, hook, secret); + + this.method = hook.method || 'POST'; + this.authHeader = basicAuth(hook.username, hook.password); + + const u = parseUrl(this.url); + const myPort = u.port ? `:${u.port}` : ''; + const baseUrl = this._baseUrl = `${u.protocol}://${u.resource}${myPort}`; + + this.get = bent(baseUrl, 'GET', 'buffer', 200, 201); + this.post = bent(baseUrl, 'POST', 'buffer', 200, 201); + + + assert(this._isAbsoluteUrl(this.url)); + assert(['GET', 'POST'].includes(this.method)); + } + + get baseUrl() { + return this._baseUrl; + } + + /** + * Make an HTTP request. + * All requests use json bodies. + * All requests expect a 200 statusCode on success + * @param {object|string} hook - may be a absolute or relative url, or an object + * @param {string} [hook.url] - an absolute or relative url + * @param {string} [hook.method] - 'GET' or 'POST' + * @param {string} [hook.username] - if basic auth is protecting the endpoint + * @param {string} [hook.password] - if basic auth is protecting the endpoint + * @param {object} [params] - request parameters + */ + async request(type, hook, params) { + assert(HookMsgTypes.includes(type)); + const payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null; + const url = hook.url || hook; + const method = hook.method || 'POST'; + + assert.ok(url, 'HttpRequestor:request url was not provided'); + assert.ok, (['GET', 'POST'].includes(method), `HttpRequestor:request method must be 'GET' or 'POST' not ${method}`); + const {url: urlInfo = hook, method: methodInfo = 'POST'} = hook; // mask user/pass + this.logger.debug({url: urlInfo, method: methodInfo, payload}, `HttpRequestor:request ${method} ${url}`); + const startAt = process.hrtime(); + + let buf; + try { + const sigHeader = this._generateSigHeader(payload, this.secret); + const headers = {...sigHeader, ...this.authHeader}; + //this.logger.info({url, headers}, 'send webhook'); + buf = this._isRelativeUrl(url) ? + await this.post(url, payload, headers) : + await bent(method, 'buffer', 200, 201, 202)(url, payload, headers); + } catch (err) { + this.logger.error({err, secret: this.secret, baseUrl: this.baseUrl, url, statusCode: err.statusCode}, + `web callback returned unexpected error code ${err.statusCode}`); + let opts = {account_sid: this.account_sid}; + if (err.code === 'ECONNREFUSED') { + opts = {...opts, alert_type: this.Alerter.AlertType.WEBHOOK_CONNECTION_FAILURE, url}; + } + else if (err.name === 'StatusError') { + opts = {...opts, alert_type: this.Alerter.AlertType.WEBHOOK_STATUS_FAILURE, url, status: err.statusCode}; + } + else { + opts = {...opts, alert_type: this.Alerter.AlertType.WEBHOOK_CONNECTION_FAILURE, url, detail: err.message}; + } + this.Alerter.writeAlerts(opts).catch((err) => this.logger.info({err, opts}, 'Error writing alert')); + + throw err; + } + const rtt = this._roundTrip(startAt); + if (buf) this.stats.histogram('app.hook.response_time', rtt, ['hook_type:app']); + + if (buf && buf.toString().length > 0) { + try { + const json = JSON.parse(buf.toString()); + this.logger.info({response: json}, `HttpRequestor:request ${method} ${url} succeeded in ${rtt}ms`); + return json; + } + catch (err) { + //this.logger.debug({err, url, method}, `HttpRequestor:request returned non-JSON content: '${buf.toString()}'`); + } + } + } +} + +module.exports = HttpRequestor; diff --git a/lib/utils/place-outdial.js b/lib/utils/place-outdial.js index d2e1062d..4046245b 100644 --- a/lib/utils/place-outdial.js +++ b/lib/utils/place-outdial.js @@ -357,7 +357,7 @@ class SingleDialer extends Emitter { this.callInfo.updateCallStatus(callStatus, sipStatus); if (typeof duration === 'number') this.callInfo.duration = duration; try { - this.requestor.request(this.application.call_status_hook, this.callInfo.toJSON()); + this.requestor.request('call:status', this.application.call_status_hook, this.callInfo.toJSON()); } catch (err) { this.logger.info(err, `SingleDialer:_notifyCallStatusChange error sending ${callStatus} ${sipStatus}`); } diff --git a/lib/utils/ws-requestor.js b/lib/utils/ws-requestor.js new file mode 100644 index 00000000..297b069a --- /dev/null +++ b/lib/utils/ws-requestor.js @@ -0,0 +1,249 @@ +const assert = require('assert'); +const BaseRequestor = require('./base-requestor'); +const short = require('short-uuid'); +const {HookMsgTypes} = require('./constants.json'); +const Websocket = require('ws'); +const snakeCaseKeys = require('./snakecase-keys'); +const HttpRequestor = require('./http-requestor'); +const MAX_RECONNECTS = 5; +const RESPONSE_TIMEOUT_MS = process.env.JAMBONES_WS_API_MSG_RESPONSE_TIMEOUT || 5000; + +class WsRequestor extends BaseRequestor { + constructor(logger, account_sid, hook, secret) { + super(logger, account_sid, hook, secret); + this.connections = 0; + this.messagesInFlight = new Map(); + this.maliciousClient = false; + + assert(this._isAbsoluteUrl(this.url)); + + this.on('socket-closed', this._onSocketClosed.bind(this)); + } + + /** + * Send a JSON payload over the websocket. If this is the first request, + * open the websocket. + * All requests expect an ack message in response + * @param {object|string} hook - may be a absolute or relative url, or an object + * @param {string} [hook.url] - an absolute or relative url + * @param {string} [hook.method] - 'GET' or 'POST' + * @param {string} [hook.username] - if basic auth is protecting the endpoint + * @param {string} [hook.password] - if basic auth is protecting the endpoint + * @param {object} [params] - request parameters + */ + async request(type, hook, params) { + assert(HookMsgTypes.includes(type)); + const url = hook.url || hook; + + if (this.maliciousClient) { + this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client'); + return; + } + + /* if we have an absolute url, and it is http then do a standard webhook */ + if (this._isAbsoluteUrl(url) && url.startsWith('http')) { + this.logger.debug({hook}, 'WsRequestor: sending a webhook'); + const requestor = new HttpRequestor(this.logger, this.account_sid, hook, this.secret); + return requestor.request(type, hook, params); + } + + /* connect if necessary */ + if (!this.ws) { + if (this.connections >= MAX_RECONNECTS) { + throw new Error(`max attempts connecting to ${this.url}`); + } + try { + const startAt = process.hrtime(); + await this._connect(); + const rtt = this._roundTrip(startAt); + this.stats.histogram('app.hook.connect_time', rtt, ['hook_type:app']); + } catch (err) { + this.logger.error({err}, 'WsRequestor:request - failed connecting'); + throw err; + } + } + assert(this.ws); + + /* prepare and send message */ + const payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null; + assert.ok(url, 'WsRequestor:request url was not provided'); + + const msgid = short.generate(); + const obj = { + type, + msgid, + hook: type === 'verb:hook' ? url : undefined, + data: {...payload} + }; + + this.logger.debug({obj}, `WsRequestor:request ${url}`); + + /* simple notifications */ + if (['call:status', 'jambonz:error'].includes(type)) { + this.ws.send(JSON.stringify(obj)); + return; + } + + /* messages that require an ack */ + return new Promise((resolve, reject) => { + /* give the far end a reasonable amount of time to ack our message */ + const timer = setTimeout(() => { + const {failure} = this.messagesInFlight.get(msgid); + failure && failure(`timeout from far end for msgid ${msgid}`); + this.messagesInFlight.delete(msgid); + }, RESPONSE_TIMEOUT_MS); + + /* save the message info for reply */ + const startAt = process.hrtime(); + this.messagesInFlight.set(msgid, { + success: (response) => { + clearTimeout(timer); + const rtt = this._roundTrip(startAt); + this.logger.info({response}, `WsRequestor:request ${url} succeeded in ${rtt}ms`); + this.stats.histogram('app.hook.ws_response_time', rtt, ['hook_type:app']); + resolve(response); + }, + failure: (err) => { + clearTimeout(timer); + reject(err); + } + }); + + /* send the message */ + this.ws.send(JSON.stringify(obj)); + }); + } + + close() { + this.logger.info('WsRequestor: closing socket'); + if (this.ws) { + this.ws.close(); + this.ws.removeAllListeners(); + } + } + + _connect() { + assert(!this.ws); + return new Promise((resolve, reject) => { + let opts = { + followRedirects: true, + maxRedirects: 2, + handshakeTimeout: 1000, + maxPayload: 8096, + }; + if (this.username && this.password) opts = {...opts, auth: `${this.username}:${this.password}`}; + + this + .once('ready', (ws) => { + this.ws = ws; + this.removeAllListeners('not-ready'); + resolve(); + }) + .once('not-ready', () => { + this.removeAllListeners('ready'); + reject(); + }); + const ws = new Websocket(this.url, ['ws.jambonz.org'], opts); + this._setHandlers(ws); + }); + } + + _setHandlers(ws) { + ws + .on('open', this._onOpen.bind(this, ws)) + .on('close', this._onClose.bind(this)) + .on('message', this._onMessage.bind(this)) + .on('unexpected-response', this._onUnexpectedResponse.bind(this, ws)) + .on('error', this._onError.bind(this)); + } + + _onError(err) { + this.logger.info({url: this.url, err}, 'WsRequestor:_onError'); + if (this.connections > 0) this.emit('socket-closed'); + this.emit('not-ready'); + } + + _onOpen(ws) { + assert(!this.ws); + this.emit('ready', ws); + this.logger.info({url: this.url}, 'WsRequestor - successfully connected'); + } + + _onClose() { + this.logger.info({url: this.url}, 'WsRequestor - socket closed unexpectedly from remote side'); + this.emit('socket-closed'); + } + + _onUnexpectedResponse(ws, req, res) { + assert(!this.ws); + this.logger.info({ + headers: res.headers, + statusCode: res.statusCode, + statusMessage: res.statusMessage + }, 'WsRequestor - unexpected response'); + this.emit('connection-failure'); + this.emit('not-ready'); + } + + _onSocketClosed() { + this.ws = null; + if (this.connections > 0) { + if (++this.connections < MAX_RECONNECTS) { + setImmediate(this.connect.bind(this)); + } + } + } + + _onMessage(content, isBinary) { + if (this.isBinary) { + this.logger.info({url: this.url}, 'WsRequestor:_onMessage - discarding binary message'); + this.maliciousClient = true; + this.ws.close(); + return; + } + + /* messages must be JSON format */ + try { + const {type, msgid, command, queueCommand = false, data} = JSON.parse(content); + assert.ok(type, 'type property not supplied'); + + switch (type) { + case 'ack': + assert.ok(msgid, 'msgid not supplied'); + this._recvAck(msgid, data); + break; + + case 'command': + assert.ok(command, 'command property not supplied'); + assert.ok(data, 'data property not supplied'); + this._recvCommand(msgid, command, queueCommand, data); + break; + + default: + assert.ok(false, `invalid type property: ${type}`); + } + } catch (err) { + this.logger.info({err}, 'WsRequestor:_onMessage - invalid incoming message'); + } + } + + _recvAck(msgid, data) { + const obj = this.messagesInFlight.get(msgid); + if (!obj) { + this.logger.info({url: this.url}, `WsRequestor:_recvAck - ack to unknown msgid ${msgid}, discarding`); + return; + } + this.logger.debug({url: this.url}, `WsRequestor:_recvAck - received response to ${msgid}`); + this.messagesInFlight.delete(msgid); + const {success} = obj; + success && success(data); + } + + _recvCommand(msgid, command, queueCommand, data) { + // TODO: validate command + this.logger.info({msgid, command, queueCommand, data}, 'received command'); + this.emit('command', {msgid, command, queueCommand, data}); + } +} + +module.exports = WsRequestor; diff --git a/package-lock.json b/package-lock.json index cba40eab..d08c9630 100644 --- a/package-lock.json +++ b/package-lock.json @@ -29,9 +29,11 @@ "moment": "^2.29.1", "parse-url": "^5.0.7", "pino": "^6.13.4", + "short-uuid": "^4.2.0", "to-snake-case": "^1.0.0", "uuid": "^8.3.2", "verify-aws-sns-signature": "^0.0.6", + "ws": "^8.5.0", "xml2js": "^0.4.23" }, "devDependencies": { @@ -44,6 +46,10 @@ }, "engines": { "node": ">= 10.16.0" + }, + "optionalDependencies": { + "bufferutil": "^4.0.6", + "utf-8-validate": "^5.0.8" } }, "node_modules/@babel/code-frame": { @@ -1099,6 +1105,19 @@ "resolved": "https://registry.npmjs.org/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz", "integrity": "sha1-+OcRMvf/5uAaXJaXpMbz5I1cyBk=" }, + "node_modules/bufferutil": { + "version": "4.0.6", + "resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.6.tgz", + "integrity": "sha512-jduaYOYtnio4aIAyc6UbvPCVcgq7nYpVnucyxr6eCYg/Woad9Hf/oxxBRDnGGjPfjUm6j5O/uBWhIu4iLebFaw==", + "hasInstallScript": true, + "optional": true, + "dependencies": { + "node-gyp-build": "^4.3.0" + }, + "engines": { + "node": ">=6.14.2" + } + }, "node_modules/bytes": { "version": "3.1.1", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.1.tgz", @@ -3543,6 +3562,26 @@ "node": ">= 6.0.0" } }, + "node_modules/microsoft-cognitiveservices-speech-sdk/node_modules/ws": { + "version": "7.5.7", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.7.tgz", + "integrity": "sha512-KMvVuFzpKBuiIXW3E4u3mySRO2/mCHSyZDJQM5NQ9Q9KHWHWh0NHgfbRMLLrceUK5qAL4ytALJbpRMjixFZh8A==", + "engines": { + "node": ">=8.3.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": "^5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/mime": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/mime/-/mime-1.6.0.tgz", @@ -3726,6 +3765,17 @@ "node": ">= 6.13.0" } }, + "node_modules/node-gyp-build": { + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.3.0.tgz", + "integrity": "sha512-iWjXZvmboq0ja1pUGULQBexmxq8CV4xBhX7VDOTbL7ZR4FOowwY/VOtRxBN/yKxmdGoIp4j5ysNT4u3S2pDQ3Q==", + "optional": true, + "bin": { + "node-gyp-build": "bin.js", + "node-gyp-build-optional": "optional.js", + "node-gyp-build-test": "build-test.js" + } + }, "node_modules/node-noop": { "version": "0.0.1", "resolved": "https://registry.npmjs.org/node-noop/-/node-noop-0.0.1.tgz", @@ -5127,6 +5177,19 @@ "resolved": "https://registry.npmjs.org/username-sync/-/username-sync-1.0.3.tgz", "integrity": "sha512-m/7/FSqjJNAzF2La448c/aEom0gJy7HY7Y509h6l0ePvEkFictAGptwWaj1msWJ38JbfEDOUoE8kqFee9EHKdA==" }, + "node_modules/utf-8-validate": { + "version": "5.0.8", + "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.8.tgz", + "integrity": "sha512-k4dW/Qja1BYDl2qD4tOMB9PFVha/UJtxTc1cXYOe3WwA/2m0Yn4qB7wLMpJyLJ/7DR0XnTut3HsCSzDT4ZvKgA==", + "hasInstallScript": true, + "optional": true, + "dependencies": { + "node-gyp-build": "^4.3.0" + }, + "engines": { + "node": ">=6.14.2" + } + }, "node_modules/util-deprecate": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", @@ -5343,11 +5406,11 @@ } }, "node_modules/ws": { - "version": "7.5.7", - "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.7.tgz", - "integrity": "sha512-KMvVuFzpKBuiIXW3E4u3mySRO2/mCHSyZDJQM5NQ9Q9KHWHWh0NHgfbRMLLrceUK5qAL4ytALJbpRMjixFZh8A==", + "version": "8.5.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.5.0.tgz", + "integrity": "sha512-BWX0SWVgLPzYwF8lTzEy1egjhS4S4OEAHfsO8o65WOVsrnSRGaSiUaa9e0ggGlkMTtBlmOpEXiie9RUcBO86qg==", "engines": { - "node": ">=8.3.0" + "node": ">=10.0.0" }, "peerDependencies": { "bufferutil": "^4.0.1", @@ -6334,6 +6397,15 @@ "resolved": "https://registry.npmjs.org/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz", "integrity": "sha1-+OcRMvf/5uAaXJaXpMbz5I1cyBk=" }, + "bufferutil": { + "version": "4.0.6", + "resolved": "https://registry.npmjs.org/bufferutil/-/bufferutil-4.0.6.tgz", + "integrity": "sha512-jduaYOYtnio4aIAyc6UbvPCVcgq7nYpVnucyxr6eCYg/Woad9Hf/oxxBRDnGGjPfjUm6j5O/uBWhIu4iLebFaw==", + "optional": true, + "requires": { + "node-gyp-build": "^4.3.0" + } + }, "bytes": { "version": "3.1.1", "resolved": "https://registry.npmjs.org/bytes/-/bytes-3.1.1.tgz", @@ -8243,6 +8315,12 @@ "integrity": "sha512-TMeqbNl2fMW0nMjTEPOwe3J/PRFP4vqeoNuQMG0HlMrtm5QxKqdvAkZ1pRBQ/ulIyDD5Yq0nJ7YbdD8ey0TO3g==" } } + }, + "ws": { + "version": "7.5.7", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.7.tgz", + "integrity": "sha512-KMvVuFzpKBuiIXW3E4u3mySRO2/mCHSyZDJQM5NQ9Q9KHWHWh0NHgfbRMLLrceUK5qAL4ytALJbpRMjixFZh8A==", + "requires": {} } } }, @@ -8383,6 +8461,12 @@ "resolved": "https://registry.npmjs.org/node-forge/-/node-forge-1.2.1.tgz", "integrity": "sha512-Fcvtbb+zBcZXbTTVwqGA5W+MKBj56UjVRevvchv5XrcyXbmNdesfZL37nlcWOfpgHhgmxApw3tQbTr4CqNmX4w==" }, + "node-gyp-build": { + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/node-gyp-build/-/node-gyp-build-4.3.0.tgz", + "integrity": "sha512-iWjXZvmboq0ja1pUGULQBexmxq8CV4xBhX7VDOTbL7ZR4FOowwY/VOtRxBN/yKxmdGoIp4j5ysNT4u3S2pDQ3Q==", + "optional": true + }, "node-noop": { "version": "0.0.1", "resolved": "https://registry.npmjs.org/node-noop/-/node-noop-0.0.1.tgz", @@ -9489,6 +9573,15 @@ "resolved": "https://registry.npmjs.org/username-sync/-/username-sync-1.0.3.tgz", "integrity": "sha512-m/7/FSqjJNAzF2La448c/aEom0gJy7HY7Y509h6l0ePvEkFictAGptwWaj1msWJ38JbfEDOUoE8kqFee9EHKdA==" }, + "utf-8-validate": { + "version": "5.0.8", + "resolved": "https://registry.npmjs.org/utf-8-validate/-/utf-8-validate-5.0.8.tgz", + "integrity": "sha512-k4dW/Qja1BYDl2qD4tOMB9PFVha/UJtxTc1cXYOe3WwA/2m0Yn4qB7wLMpJyLJ/7DR0XnTut3HsCSzDT4ZvKgA==", + "optional": true, + "requires": { + "node-gyp-build": "^4.3.0" + } + }, "util-deprecate": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", @@ -9662,9 +9755,9 @@ } }, "ws": { - "version": "7.5.7", - "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.7.tgz", - "integrity": "sha512-KMvVuFzpKBuiIXW3E4u3mySRO2/mCHSyZDJQM5NQ9Q9KHWHWh0NHgfbRMLLrceUK5qAL4ytALJbpRMjixFZh8A==", + "version": "8.5.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.5.0.tgz", + "integrity": "sha512-BWX0SWVgLPzYwF8lTzEy1egjhS4S4OEAHfsO8o65WOVsrnSRGaSiUaa9e0ggGlkMTtBlmOpEXiie9RUcBO86qg==", "requires": {} }, "xml2js": { diff --git a/package.json b/package.json index 9ed38595..bf860eed 100644 --- a/package.json +++ b/package.json @@ -46,9 +46,11 @@ "moment": "^2.29.1", "parse-url": "^5.0.7", "pino": "^6.13.4", + "short-uuid": "^4.2.0", "to-snake-case": "^1.0.0", "uuid": "^8.3.2", "verify-aws-sns-signature": "^0.0.6", + "ws": "^8.5.0", "xml2js": "^0.4.23" }, "devDependencies": { @@ -58,5 +60,9 @@ "eslint-plugin-promise": "^4.3.1", "nyc": "^15.1.0", "tape": "^5.2.2" + }, + "optionalDependencies": { + "bufferutil": "^4.0.6", + "utf-8-validate": "^5.0.8" } }