Compare commits

..

22 Commits

Author SHA1 Message Date
Dave Horton
ad722a55ee generate trace id before outdial so we can include it in custom header (#418)
* generate trace id before outdial so we can include it in custom header

* logging

* logging

* fix #420 race condition on rest outdial when ws is used

* revert unnecessary logging change
2023-08-08 13:00:34 -04:00
Hoan Luu Huu
82939214a2 update stats-collector version (#421) 2023-08-07 21:22:10 -04:00
Dave Horton
043a171f41 remove log message 2023-08-07 15:22:03 -04:00
Dave Horton
c8e9b34b53 fix typo that caused record to fail on rest calls 2023-08-07 14:46:51 -04:00
Hoan Luu Huu
d7dcdb1d0c Continuos ASR for transcribe (#398)
* asrTimeout

* fix jslint

* change log

* fix interrim
2023-08-03 09:49:44 -04:00
Dave Horton
fbd0782258 #388 - support custom speech vendor in transcribe verb (#414)
Co-authored-by: Hoan Luu Huu <110280845+xquanluu@users.noreply.github.com>
2023-08-02 19:06:31 -04:00
Fábio Gomes
38f9329b12 When recordings are enabled, disable bidirectional audio on jambonz-session-record (#415) 2023-08-02 14:21:59 -04:00
Dave Horton
d4bfdf0916 #412 - dont delay sending call status when stopping background listen (#413) 2023-08-02 12:50:13 -04:00
Dave Horton
9203deef0f fix bug in prev commit 2023-08-02 10:27:50 -04:00
Dave Horton
48b182c891 Fix/rest outdial failure session hangs (#411)
* fix #410

* on rest outdial failure, if remote end closed gracefully don't wait for a reconnection
2023-08-01 12:59:30 -04:00
Dave Horton
e8e987cb9d Fix/snake case customer data issue 406 (#409)
* revert recent change on silence trimming

* fix issue with incorrectly snake-casing customer data (#406)
2023-07-27 22:31:43 -04:00
Dave Horton
38ea9e7411 update to speech-utils@0.0.18 which ignores trimming of silence on azure ssml audio 2023-07-25 07:51:46 -04:00
Hoan Luu Huu
7b11a56a53 feat siprec custom header (#400)
* feat siprec custom header

* wip

* update verb specification

* add newline to info siprec body

* add newline to info siprec body
2023-07-20 09:10:41 -04:00
Dave Horton
66305b5aea feature: optionally trim silence from azure tts (#399) 2023-07-19 10:36:24 -04:00
Dave Horton
6793bbf330 fix exception that appears in logs if session ends before last call status update 2023-07-18 13:20:53 -04:00
Hoan Luu Huu
d8543f73f2 execute status callback async (#394)
* execute status callback async

* fix review comment

* revert fix review comment
2023-07-18 12:40:57 -04:00
Hoan Luu Huu
e1dad569dc Fix/background listen tag (#391)
* fix background listen send customerData to api server

* test listen

* fix review comment
2023-07-11 16:03:32 +01:00
Hoan Luu Huu
643bee48c5 feat multi srs (#381) 2023-07-05 08:16:59 +01:00
Dave Horton
487bfd90d9 0.8.4 2023-06-28 09:23:40 +01:00
Hoan Luu Huu
810f6eb695 fix aws-sdk v3 (#387)
* fix aws-sdk v3

* fix jslint

* fix jslint

* fix aws response parser
2023-06-28 09:20:43 +01:00
Hoan Luu Huu
62bc6b4bac feat: add fs service url to sbc ping option (#383)
* feat multi srs

* add fs service URL to SBC ping option
2023-06-23 11:13:08 +01:00
two56
91fe3ceb06 Clear conference details in both Jambonz and FreeSWITCH (#350)
Co-authored-by: Matt Preskett <matt.preskett@netcall.com>
2023-06-14 15:35:04 -04:00
17 changed files with 4705 additions and 1485 deletions

5
app.js
View File

@@ -120,10 +120,15 @@ function handle(signal) {
srf.locals.disabled = true; srf.locals.disabled = true;
logger.info(`got signal ${signal}`); logger.info(`got signal ${signal}`);
const setName = `${(JAMBONES_CLUSTER_ID || 'default')}:active-fs`; const setName = `${(JAMBONES_CLUSTER_ID || 'default')}:active-fs`;
const fsServiceUrlSetName = `${(JAMBONES_CLUSTER_ID || 'default')}:fs-service-url`;
if (setName && srf.locals.localSipAddress) { if (setName && srf.locals.localSipAddress) {
logger.info(`got signal ${signal}, removing ${srf.locals.localSipAddress} from set ${setName}`); logger.info(`got signal ${signal}, removing ${srf.locals.localSipAddress} from set ${setName}`);
removeFromSet(setName, srf.locals.localSipAddress); removeFromSet(setName, srf.locals.localSipAddress);
} }
if (fsServiceUrlSetName && srf.locals.serviceUrl) {
logger.info(`got signal ${signal}, removing ${srf.locals.serviceUrl} from set ${fsServiceUrlSetName}`);
removeFromSet(fsServiceUrlSetName, srf.locals.serviceUrl);
}
removeFromSet(FS_UUID_SET_NAME, srf.locals.fsUUID); removeFromSet(FS_UUID_SET_NAME, srf.locals.fsUUID);
if (K8S) { if (K8S) {
srf.locals.lifecycleEmitter.operationalState = LifeCycleEvents.ScaleIn; srf.locals.lifecycleEmitter.operationalState = LifeCycleEvents.ScaleIn;

View File

@@ -47,6 +47,11 @@ router.post('/', async(req, res) => {
const application = req.body.application_sid ? await lookupAppBySid(req.body.application_sid) : null; const application = req.body.application_sid ? await lookupAppBySid(req.body.application_sid) : null;
const record_all_calls = account.record_all_calls || (application && application.record_all_calls); const record_all_calls = account.record_all_calls || (application && application.record_all_calls);
const recordOutputFormat = account.record_format || 'mp3'; const recordOutputFormat = account.record_format || 'mp3';
const rootSpan = new RootSpan('rest-call', {
callSid,
accountSid,
...(req.body?.application_sid && {'X-Application-Sid': req.body.application_sid})
});
opts.headers = { opts.headers = {
...opts.headers, ...opts.headers,
@@ -54,6 +59,7 @@ router.post('/', async(req, res) => {
'X-Jambonz-FS-UUID': srf.locals.fsUUID, 'X-Jambonz-FS-UUID': srf.locals.fsUUID,
'X-Call-Sid': callSid, 'X-Call-Sid': callSid,
'X-Account-Sid': accountSid, 'X-Account-Sid': accountSid,
'X-Trace-ID': rootSpan.traceId,
...(req.body?.application_sid && {'X-Application-Sid': req.body.application_sid}), ...(req.body?.application_sid && {'X-Application-Sid': req.body.application_sid}),
...(restDial.fromHost && {'X-Preferred-From-Host': restDial.fromHost}), ...(restDial.fromHost && {'X-Preferred-From-Host': restDial.fromHost}),
...(record_all_calls && {'X-Record-All-Calls': recordOutputFormat}) ...(record_all_calls && {'X-Record-All-Calls': recordOutputFormat})
@@ -194,7 +200,6 @@ router.post('/', async(req, res) => {
/* ok our outbound INVITE is in flight */ /* ok our outbound INVITE is in flight */
const tasks = [restDial]; const tasks = [restDial];
const rootSpan = new RootSpan('rest-call', inviteReq);
sipLogger = logger.child({ sipLogger = logger.child({
callSid, callSid,
callId: inviteReq.get('Call-ID'), callId: inviteReq.get('Call-ID'),
@@ -258,6 +263,7 @@ router.post('/', async(req, res) => {
sipStatus: err.status, sipStatus: err.status,
sipReason: err.reason sipReason: err.reason
}); });
cs.callGone = true;
} }
else { else {
if (cs) cs.emit('callStatusChange', { if (cs) cs.emit('callStatusChange', {

View File

@@ -423,7 +423,10 @@ class CallSession extends Emitter {
'X-Call-Sid': this.callSid, 'X-Call-Sid': this.callSid,
'X-Account-Sid': this.accountSid, 'X-Account-Sid': this.accountSid,
'X-Application-Sid': this.applicationSid, 'X-Application-Sid': this.applicationSid,
} ...(this.recordOptions.headers && {'Content-Type': 'application/json'})
},
// Siprect Client is initiated from startCallRecording, so just need to pass custom headers in startRecording
...(this.recordOptions.headers && {body: JSON.stringify(this.recordOptions.headers) + '\n'})
}); });
if (res.status === 200) { if (res.status === 200) {
this._recordState = RecordState.RecordingOn; this._recordState = RecordState.RecordingOn;
@@ -444,7 +447,7 @@ class CallSession extends Emitter {
const res = await this.dlg.request({ const res = await this.dlg.request({
method: 'INFO', method: 'INFO',
headers: { headers: {
'X-Reason': 'stopCallRecording', 'X-Reason': 'stopCallRecording'
} }
}); });
if (res.status === 200) { if (res.status === 200) {
@@ -466,7 +469,7 @@ class CallSession extends Emitter {
const res = await this.dlg.request({ const res = await this.dlg.request({
method: 'INFO', method: 'INFO',
headers: { headers: {
'X-Reason': 'pauseCallRecording', 'X-Reason': 'pauseCallRecording'
} }
}); });
if (res.status === 200) { if (res.status === 200) {
@@ -488,7 +491,7 @@ class CallSession extends Emitter {
const res = await this.dlg.request({ const res = await this.dlg.request({
method: 'INFO', method: 'INFO',
headers: { headers: {
'X-Reason': 'resumeCallRecording', 'X-Reason': 'resumeCallRecording'
} }
}); });
if (res.status === 200) { if (res.status === 200) {
@@ -513,6 +516,8 @@ class CallSession extends Emitter {
const t = normalizeJambones(this.logger, [opts]); const t = normalizeJambones(this.logger, [opts]);
this.backgroundListenTask = makeTask(this.logger, t[0]); this.backgroundListenTask = makeTask(this.logger, t[0]);
this.backgroundListenTask.bugname = bugname; this.backgroundListenTask.bugname = bugname;
// Remove unneeded customer data to be sent to api server.
this.backgroundListenTask.ignoreCustomerData = true;
const resources = await this._evaluatePreconditions(this.backgroundListenTask); const resources = await this._evaluatePreconditions(this.backgroundListenTask);
const {span, ctx} = this.rootSpan.startChildSpan(`background-listen:${this.backgroundListenTask.summary}`); const {span, ctx} = this.rootSpan.startChildSpan(`background-listen:${this.backgroundListenTask.summary}`);
this.backgroundListenTask.span = span; this.backgroundListenTask.span = span;
@@ -791,23 +796,15 @@ class CallSession extends Emitter {
} }
} }
if (0 === this.tasks.length && this.requestor instanceof WsRequestor && !this.callGone) { if (0 === this.tasks.length &&
//let span; this.requestor instanceof WsRequestor &&
!this.requestor.closedGracefully &&
!this.callGone
) {
try { try {
//const {span} = this.rootSpan.startChildSpan('waiting for commands');
//const {reason, queue, command} = await this._awaitCommandsOrHangup();
/*
span.setAttributes({
'completion.reason': reason,
'async.request.queue': queue,
'async.request.command': command
});
span.end();
*/
await this._awaitCommandsOrHangup(); await this._awaitCommandsOrHangup();
if (this.callGone) break; if (this.callGone) break;
} catch (err) { } catch (err) {
//span.end();
this.logger.info(err, 'CallSession:exec - error waiting for new commands'); this.logger.info(err, 'CallSession:exec - error waiting for new commands');
break; break;
} }
@@ -1764,7 +1761,8 @@ class CallSession extends Emitter {
// nice, call is in progress, good time to enable record // nice, call is in progress, good time to enable record
await this.enableRecordAllCall(); await this.enableRecordAllCall();
} else if (callStatus == CallStatus.Completed && this.isBackGroundListen) { } else if (callStatus == CallStatus.Completed && this.isBackGroundListen) {
await this.stopBackgroundListen(); this.stopBackgroundListen().catch((err) => this.logger.error(
{err}, 'CallSession:_notifyCallStatusChange - error stopping background listen'));
} }
/* race condition: we hang up at the same time as the caller */ /* race condition: we hang up at the same time as the caller */
@@ -1779,6 +1777,15 @@ class CallSession extends Emitter {
this.callInfo.updateCallStatus(callStatus, sipStatus, sipReason); this.callInfo.updateCallStatus(callStatus, sipStatus, sipReason);
if (typeof duration === 'number') this.callInfo.duration = duration; if (typeof duration === 'number') this.callInfo.duration = duration;
this.executeStatusCallback(callStatus, sipStatus);
// update calls db
//this.logger.debug(`updating redis with ${JSON.stringify(this.callInfo)}`);
this.updateCallStatus(Object.assign({}, this.callInfo.toJSON()), this.serviceUrl)
.catch((err) => this.logger.error(err, 'redis error'));
}
async executeStatusCallback(callStatus, sipStatus) {
const {span} = this.rootSpan.startChildSpan(`call-status:${this.callInfo.callStatus}`); const {span} = this.rootSpan.startChildSpan(`call-status:${this.callInfo.callStatus}`);
span.setAttributes(this.callInfo.toJSON()); span.setAttributes(this.callInfo.toJSON());
try { try {
@@ -1790,11 +1797,6 @@ class CallSession extends Emitter {
span.end(); span.end();
this.logger.info(err, `CallSession:_notifyCallStatusChange error sending ${callStatus} ${sipStatus}`); this.logger.info(err, `CallSession:_notifyCallStatusChange error sending ${callStatus} ${sipStatus}`);
} }
// update calls db
//this.logger.debug(`updating redis with ${JSON.stringify(this.callInfo)}`);
this.updateCallStatus(Object.assign({}, this.callInfo.toJSON()), this.serviceUrl)
.catch((err) => this.logger.error(err, 'redis error'));
} }
async enableRecordAllCall() { async enableRecordAllCall() {
@@ -1805,6 +1807,7 @@ class CallSession extends Emitter {
username: JAMBONZ_RECORD_WS_USERNAME, username: JAMBONZ_RECORD_WS_USERNAME,
password: JAMBONZ_RECORD_WS_PASSWORD password: JAMBONZ_RECORD_WS_PASSWORD
}, },
disableBidirectionalAudio: true,
mixType : 'stereo', mixType : 'stereo',
passDtmf: true passDtmf: true
}; };

View File

@@ -49,7 +49,6 @@ class RestCallSession extends CallSession {
*/ */
_callerHungup() { _callerHungup() {
if (this.restDialTask) { if (this.restDialTask) {
this.logger.info('RestCallSession: releasing AMD');
this.restDialTask.turnOffAmd(); this.restDialTask.turnOffAmd();
} }
this.callInfo.callTerminationBy = 'caller'; this.callInfo.callTerminationBy = 'caller';

View File

@@ -114,7 +114,12 @@ class Conference extends Task {
} }
this.emitter.emit('kill'); this.emitter.emit('kill');
await this._doFinalMemberCheck(cs); await this._doFinalMemberCheck(cs);
if (this.ep && this.ep.connected) this.ep.conn.removeAllListeners('esl::event::CUSTOM::*') ; if (this.ep && this.ep.connected) {
this.ep.conn.removeAllListeners('esl::event::CUSTOM::*');
this.ep.api(`conference ${this.confName} kick ${this.memberId}`)
.catch((err) => this.logger.info({err}, 'Error kicking participant'));
}
cs.clearConferenceDetails();
this.notifyTaskDone(); this.notifyTaskDone();
} }

View File

@@ -8,6 +8,7 @@ const DTMF_SPAN_NAME = 'dtmf';
class TaskListen extends Task { class TaskListen extends Task {
constructor(logger, opts, parentTask) { constructor(logger, opts, parentTask) {
super(logger, opts); super(logger, opts);
this.disableBidirectionalAudio = opts.disableBidirectionalAudio;
this.preconditions = TaskPreconditions.Endpoint; this.preconditions = TaskPreconditions.Endpoint;
[ [
@@ -32,6 +33,8 @@ class TaskListen extends Task {
set bugname(name) { this._bugname = name; } set bugname(name) { this._bugname = name; }
set ignoreCustomerData(val) { this._ignoreCustomerData = val; }
async exec(cs, {ep}) { async exec(cs, {ep}) {
await super.exec(cs); await super.exec(cs);
this.ep = ep; this.ep = ep;
@@ -111,9 +114,13 @@ class TaskListen extends Task {
async _startListening(cs, ep) { async _startListening(cs, ep) {
this._initListeners(ep); this._initListeners(ep);
const ci = this.nested ? this.parentTask.sd.callInfo : cs.callInfo.toJSON();
if (this._ignoreCustomerData) {
delete ci.customerData;
}
const metadata = Object.assign( const metadata = Object.assign(
{sampleRate: this.sampleRate, mixType: this.mixType}, {sampleRate: this.sampleRate, mixType: this.mixType},
this.nested ? this.parentTask.sd.callInfo : cs.callInfo.toJSON(), ci,
this.metadata); this.metadata);
if (this.hook.auth) { if (this.hook.auth) {
this.logger.debug({username: this.hook.auth.username, password: this.hook.auth.password}, this.logger.debug({username: this.hook.auth.username, password: this.hook.auth.password},
@@ -148,7 +155,7 @@ class TaskListen extends Task {
} }
/* support bi-directional audio */ /* support bi-directional audio */
if (!this.disableBiDirectionalAudio) { if (!this.disableBidirectionalAudio) {
ep.addCustomEventListener(ListenEvents.PlayAudio, this._onPlayAudio.bind(this, ep)); ep.addCustomEventListener(ListenEvents.PlayAudio, this._onPlayAudio.bind(this, ep));
} }
ep.addCustomEventListener(ListenEvents.KillAudio, this._onKillAudio.bind(this, ep)); ep.addCustomEventListener(ListenEvents.KillAudio, this._onKillAudio.bind(this, ep));

View File

@@ -63,12 +63,13 @@ class TaskRestDial extends Task {
this.canCancel = false; this.canCancel = false;
const cs = this.callSession; const cs = this.callSession;
cs.setDialog(dlg); cs.setDialog(dlg);
this.logger.debug('TaskRestDial:_onConnect - call connected');
try { try {
const b3 = this.getTracingPropagation(); const b3 = this.getTracingPropagation();
const httpHeaders = b3 && {b3}; const httpHeaders = b3 && {b3};
const params = { const params = {
...cs.callInfo, ...(cs.callInfo.toJSON()),
defaults: { defaults: {
synthesizer: { synthesizer: {
vendor: cs.speechSynthesisVendor, vendor: cs.speechSynthesisVendor,
@@ -90,8 +91,10 @@ class TaskRestDial extends Task {
} }
let tasks; let tasks;
if (this.app_json) { if (this.app_json) {
this.logger.debug('TaskRestDial: using app_json from task data');
tasks = JSON.parse(this.app_json); tasks = JSON.parse(this.app_json);
} else { } else {
this.logger.debug({call_hook: this.call_hook}, 'TaskRestDial: retrieving application');
tasks = await cs.requestor.request('session:new', this.call_hook, params, httpHeaders); tasks = await cs.requestor.request('session:new', this.call_hook, params, httpHeaders);
} }
if (tasks && Array.isArray(tasks)) { if (tasks && Array.isArray(tasks)) {

View File

@@ -1,4 +1,5 @@
const Task = require('./task'); const Task = require('./task');
const assert = require('assert');
const { const {
TaskName, TaskName,
TaskPreconditions, TaskPreconditions,
@@ -56,6 +57,12 @@ class TaskTranscribe extends Task {
this._sonioxTranscripts = []; this._sonioxTranscripts = [];
this.childSpan = [null, null]; this.childSpan = [null, null];
// Continuos asr timeout
this.asrTimeout = typeof this.data.recognizer.asrTimeout === 'number' ? this.data.recognizer.asrTimeout * 1000 : 0;
this.isContinuousAsr = this.asrTimeout > 0;
/* buffer speech for continuous asr */
this._bufferedTranscripts = [];
} }
get name() { return TaskName.Transcribe; } get name() { return TaskName.Transcribe; }
@@ -234,7 +241,19 @@ class TaskTranscribe extends Task {
this._onVadDetected.bind(this, cs, ep)); this._onVadDetected.bind(this, cs, ep));
break; break;
default: default:
throw new Error(`Invalid vendor ${this.vendor}`); if (this.vendor.startsWith('custom:')) {
this.bugname = `${this.vendor}_transcribe`;
ep.addCustomEventListener(JambonzTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
ep.addCustomEventListener(JambonzTranscriptionEvents.Connect, this._onJambonzConnect.bind(this, cs, ep));
ep.addCustomEventListener(JambonzTranscriptionEvents.ConnectFailure,
this._onJambonzConnectFailure.bind(this, cs, ep));
break;
}
else {
this.notifyError({ msg: 'ASR error', details:`Invalid vendor ${this.vendor}`});
this.notifyTaskDone();
throw new Error(`Invalid vendor ${this.vendor}`);
}
} }
/* common handler for all stt engine errors */ /* common handler for all stt engine errors */
@@ -296,6 +315,26 @@ class TaskTranscribe extends Task {
} }
} }
if (this.isContinuousAsr && evt.is_final) {
this._bufferedTranscripts.push(evt);
this._startAsrTimer(channel);
} else {
await this._resolve(channel, evt);
}
}
_compileTranscripts() {
assert(this._bufferedTranscripts.length);
const evt = this._bufferedTranscripts[0];
let t = '';
for (const a of this._bufferedTranscripts) {
t += ` ${a.alternatives[0].transcript}`;
}
evt.alternatives[0].transcript = t.trim();
return evt;
}
async _resolve(channel, evt) {
/* we've got a transcript, so end the otel child span for this channel */ /* we've got a transcript, so end the otel child span for this channel */
if (this.childSpan[channel - 1] && this.childSpan[channel - 1].span) { if (this.childSpan[channel - 1] && this.childSpan[channel - 1].span) {
this.childSpan[channel - 1].span.setAttributes({ this.childSpan[channel - 1].span.setAttributes({
@@ -408,6 +447,24 @@ class TaskTranscribe extends Task {
this.notifyTaskDone(); this.notifyTaskDone();
} }
_onJambonzConnect(_cs, _ep) {
this.logger.debug('TaskTranscribe:_onJambonzConnect');
}
_onJambonzConnectFailure(cs, _ep, evt) {
const {reason} = evt;
const {writeAlerts, AlertType} = cs.srf.locals;
this.logger.info({evt}, 'TaskTranscribe:_onJambonzConnectFailure');
writeAlerts({
account_sid: cs.accountSid,
alert_type: AlertType.STT_FAILURE,
message: `Failed connecting to ${this.vendor} speech recognizer: ${reason}`,
vendor: this.vendor,
}).catch((err) => this.logger.info({err}, 'Error generating alert for jambonz custom connection failure'));
this.notifyError({msg: 'ASR error', details:`Failed connecting to speech vendor ${this.vendor}: ${reason}`});
this.notifyTaskDone();
}
_onIbmConnect(_cs, _ep) { _onIbmConnect(_cs, _ep) {
this.logger.debug('TaskTranscribe:_onIbmConnect'); this.logger.debug('TaskTranscribe:_onIbmConnect');
} }
@@ -455,7 +512,22 @@ class TaskTranscribe extends Task {
this.notifyError({msg: 'ASR error', details:`Custom speech vendor ${this.vendor} error: ${evt.error}`}); this.notifyError({msg: 'ASR error', details:`Custom speech vendor ${this.vendor} error: ${evt.error}`});
} }
_startAsrTimer(channel) {
assert(this.isContinuousAsr);
this._clearAsrTimer(channel);
this._asrTimer = setTimeout(() => {
this.logger.debug(`TaskTranscribe:_startAsrTimer - asr timer went off for channel: ${channel}`);
const evt = this._compileTranscripts();
this._bufferedTranscripts = [];
this._resolve(channel, evt);
}, this.asrTimeout);
this.logger.debug(`TaskTranscribe:_startAsrTimer: set for ${this.asrTimeout}ms for channel ${channel}`);
}
_clearAsrTimer(channel) {
if (this._asrTimer) clearTimeout(this._asrTimer);
this._asrTimer = null;
}
} }
module.exports = TaskTranscribe; module.exports = TaskTranscribe;

View File

@@ -11,15 +11,20 @@ const {LifeCycleEvents} = require('./constants');
const express = require('express'); const express = require('express');
const app = express(); const app = express();
const getString = bent('string'); const getString = bent('string');
const AWS = require('aws-sdk'); const {
const sns = new AWS.SNS({apiVersion: '2010-03-31'}); SNSClient,
const autoscaling = new AWS.AutoScaling({apiVersion: '2011-01-01'}); SubscribeCommand,
UnsubscribeCommand } = require('@aws-sdk/client-sns');
const snsClient = new SNSClient({ region: AWS_REGION, apiVersion: '2010-03-31' });
const {
AutoScalingClient,
DescribeAutoScalingGroupsCommand,
CompleteLifecycleActionCommand } = require('@aws-sdk/client-auto-scaling');
const autoScalingClient = new AutoScalingClient({ region: AWS_REGION, apiVersion: '2011-01-01' });
const {Parser} = require('xml2js'); const {Parser} = require('xml2js');
const parser = new Parser(); const parser = new Parser();
const {validatePayload} = require('verify-aws-sns-signature'); const {validatePayload} = require('verify-aws-sns-signature');
AWS.config.update({region: AWS_REGION});
class SnsNotifier extends Emitter { class SnsNotifier extends Emitter {
constructor(logger) { constructor(logger) {
super(); super();
@@ -69,7 +74,7 @@ class SnsNotifier extends Emitter {
subscriptionRequestId: this.subscriptionRequestId subscriptionRequestId: this.subscriptionRequestId
}, 'response from SNS SubscribeURL'); }, 'response from SNS SubscribeURL');
const data = await this.describeInstance(); const data = await this.describeInstance();
this.lifecycleState = data.AutoScalingInstances[0].LifecycleState; this.lifecycleState = data.AutoScalingGroups[0].Instances[0].LifecycleState;
this.emit('SubscriptionConfirmation', {publicIp: this.publicIp}); this.emit('SubscriptionConfirmation', {publicIp: this.publicIp});
break; break;
@@ -135,11 +140,12 @@ class SnsNotifier extends Emitter {
async subscribe() { async subscribe() {
try { try {
const response = await sns.subscribe({ const params = {
Protocol: 'http', Protocol: 'http',
TopicArn: AWS_SNS_TOPIC_ARM, TopicArn: AWS_SNS_TOPIC_ARM,
Endpoint: this.snsEndpoint Endpoint: this.snsEndpoint
}).promise(); };
const response = await snsClient.send(new SubscribeCommand(params));
this.logger.info({response}, `response to SNS subscribe to ${AWS_SNS_TOPIC_ARM}`); this.logger.info({response}, `response to SNS subscribe to ${AWS_SNS_TOPIC_ARM}`);
} catch (err) { } catch (err) {
this.logger.error({err}, `Error subscribing to SNS topic arn ${AWS_SNS_TOPIC_ARM}`); this.logger.error({err}, `Error subscribing to SNS topic arn ${AWS_SNS_TOPIC_ARM}`);
@@ -149,9 +155,10 @@ class SnsNotifier extends Emitter {
async unsubscribe() { async unsubscribe() {
if (!this.subscriptionArn) throw new Error('SnsNotifier#unsubscribe called without an active subscription'); if (!this.subscriptionArn) throw new Error('SnsNotifier#unsubscribe called without an active subscription');
try { try {
const response = await sns.unsubscribe({ const params = {
SubscriptionArn: this.subscriptionArn SubscriptionArn: this.subscriptionArn
}).promise(); };
const response = await snsClient.send(new UnsubscribeCommand(params));
this.logger.info({response}, `response to SNS unsubscribe to ${AWS_SNS_TOPIC_ARM}`); this.logger.info({response}, `response to SNS unsubscribe to ${AWS_SNS_TOPIC_ARM}`);
} catch (err) { } catch (err) {
this.logger.error({err}, `Error unsubscribing to SNS topic arn ${AWS_SNS_TOPIC_ARM}`); this.logger.error({err}, `Error unsubscribing to SNS topic arn ${AWS_SNS_TOPIC_ARM}`);
@@ -160,26 +167,29 @@ class SnsNotifier extends Emitter {
completeScaleIn() { completeScaleIn() {
assert(this.scaleInParams); assert(this.scaleInParams);
autoscaling.completeLifecycleAction(this.scaleInParams, (err, response) => { autoScalingClient.send(new CompleteLifecycleActionCommand(this.scaleInParams))
if (err) return this.logger.error({err}, 'Error completing scale-in'); .then((data) => {
this.logger.info({response}, 'Successfully completed scale-in action'); return this.logger.info({data}, 'Successfully completed scale-in action');
}); })
.catch((err) => {
this.logger.error({err}, 'Error completing scale-in');
});
} }
describeInstance() { describeInstance() {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!this.instanceId) return reject('instance-id unknown'); if (!this.instanceId) return reject('instance-id unknown');
autoscaling.describeAutoScalingInstances({ autoScalingClient.send(new DescribeAutoScalingGroupsCommand({
InstanceIds: [this.instanceId] InstanceIds: [this.instanceId]
}, (err, data) => { }))
if (err) { .then((data) => {
this.logger.info({data}, 'SnsNotifier: describeInstance');
return resolve(data);
})
.catch((err) => {
this.logger.error({err}, 'Error describing instances'); this.logger.error({err}, 'Error describing instances');
reject(err); reject(err);
} else { });
this.logger.info({data}, 'SnsNotifier: describeInstance');
resolve(data);
}
});
}); });
} }
@@ -193,7 +203,7 @@ module.exports = async function(logger) {
process.on('SIGHUP', async() => { process.on('SIGHUP', async() => {
try { try {
const data = await notifier.describeInstance(); const data = await notifier.describeInstance();
const state = data.AutoScalingInstances[0].LifecycleState; const state = data.AutoScalingGroups[0].Instances[0].LifecycleState;
if (state !== notifier.lifecycleState) { if (state !== notifier.lifecycleState) {
notifier.lifecycleState = state; notifier.lifecycleState = state;
switch (state) { switch (state) {

View File

@@ -2,17 +2,24 @@ const {context, trace} = require('@opentelemetry/api');
const {Dialog} = require('drachtio-srf'); const {Dialog} = require('drachtio-srf');
class RootSpan { class RootSpan {
constructor(callType, req) { constructor(callType, req) {
let tracer, callSid, linkedSpanId; const {srf} = require('../../');
const tracer = srf.locals.otel.tracer;
let callSid, accountSid, applicationSid, linkedSpanId;
if (req instanceof Dialog) { if (req instanceof Dialog) {
const dlg = req; const dlg = req;
tracer = dlg.srf.locals.otel.tracer;
callSid = dlg.callSid; callSid = dlg.callSid;
linkedSpanId = dlg.linkedSpanId; linkedSpanId = dlg.linkedSpanId;
} }
else { else if (req.srf) {
tracer = req.srf.locals.otel.tracer;
callSid = req.locals.callSid; callSid = req.locals.callSid;
accountSid = req.get('X-Account-Sid'),
applicationSid = req.locals.application_sid;
}
else {
callSid = req.callSid;
accountSid = req.accountSid;
applicationSid = req.applicationSid;
} }
this._span = tracer.startSpan(callType || 'incoming-call'); this._span = tracer.startSpan(callType || 'incoming-call');
if (req instanceof Dialog) { if (req instanceof Dialog) {
@@ -22,13 +29,20 @@ class RootSpan {
callId: dlg.sip.callId callId: dlg.sip.callId
}); });
} }
else if (req.srf) {
this._span.setAttributes({
callSid,
accountSid,
applicationSid,
callId: req.get('Call-ID'),
externalCallId: req.get('X-CID')
});
}
else { else {
this._span.setAttributes({ this._span.setAttributes({
callSid, callSid,
accountSid: req.get('X-Account-Sid'), accountSid,
applicationSid: req.locals.application_sid, applicationSid
callId: req.get('Call-ID'),
externalCallId: req.get('X-CID')
}); });
} }

View File

@@ -101,7 +101,8 @@ module.exports = (logger) => {
method: 'OPTIONS', method: 'OPTIONS',
headers: { headers: {
'X-FS-Status': ms && !dryUpCalls ? 'open' : 'closed', 'X-FS-Status': ms && !dryUpCalls ? 'open' : 'closed',
'X-FS-Calls': srf.locals.sessionTracker.count 'X-FS-Calls': srf.locals.sessionTracker.count,
'X-FS-ServiceUrl': srf.locals.serviceUrl
} }
}); });
req.on('response', (res) => { req.on('response', (res) => {

View File

@@ -43,6 +43,7 @@ class WsRequestor extends BaseRequestor {
async request(type, hook, params, httpHeaders = {}) { async request(type, hook, params, httpHeaders = {}) {
assert(HookMsgTypes.includes(type)); assert(HookMsgTypes.includes(type));
const url = hook.url || hook; const url = hook.url || hook;
const wantsAck = !['call:status', 'verb:status', 'jambonz:error'].includes(type);
if (this.maliciousClient) { if (this.maliciousClient) {
this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client'); this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client');
@@ -73,11 +74,19 @@ class WsRequestor extends BaseRequestor {
if (this.connectInProgress) { if (this.connectInProgress) {
this.logger.debug( this.logger.debug(
`WsRequestor:request(${this.id}) - queueing ${type} message since we are connecting`); `WsRequestor:request(${this.id}) - queueing ${type} message since we are connecting`);
this.queuedMsg.push({type, hook, params, httpHeaders}); if (wantsAck) {
const p = new Promise((resolve, reject) => {
this.queuedMsg.push({type, hook, params, httpHeaders, promise: {resolve, reject}});
});
return p;
}
else {
this.queuedMsg.push({type, hook, params, httpHeaders});
}
return; return;
} }
this.connectInProgress = true; this.connectInProgress = true;
this.logger.debug(`WsRequestor:request(${this.id}) - connecting since we do not have a connection`); this.logger.debug(`WsRequestor:request(${this.id}) - connecting since we do not have a connection for ${type}`);
if (this.connections >= MAX_RECONNECTS) { if (this.connections >= MAX_RECONNECTS) {
return Promise.reject(`max attempts connecting to ${this.url}`); return Promise.reject(`max attempts connecting to ${this.url}`);
} }
@@ -116,9 +125,14 @@ class WsRequestor extends BaseRequestor {
const sendQueuedMsgs = () => { const sendQueuedMsgs = () => {
if (this.queuedMsg.length > 0) { if (this.queuedMsg.length > 0) {
for (const {type, hook, params, httpHeaders} of this.queuedMsg) { for (const {type, hook, params, httpHeaders, promise} of this.queuedMsg) {
this.logger.debug(`WsRequestor:request - preparing queued ${type} for sending`); this.logger.debug(`WsRequestor:request - preparing queued ${type} for sending`);
setImmediate(this.request.bind(this, type, hook, params, httpHeaders)); if (promise) {
this.request(type, hook, params, httpHeaders)
.then((res) => promise.resolve(res))
.catch((err) => promise.reject(err));
}
else setImmediate(this.request.bind(this, type, hook, params, httpHeaders));
} }
this.queuedMsg.length = 0; this.queuedMsg.length = 0;
} }
@@ -137,8 +151,8 @@ class WsRequestor extends BaseRequestor {
} }
/* simple notifications */ /* simple notifications */
if (['call:status', 'verb:status', 'jambonz:error'].includes(type) || reconnectingWithoutAck) { if (!wantsAck || reconnectingWithoutAck) {
this.ws.send(JSON.stringify(obj), () => { this.ws?.send(JSON.stringify(obj), () => {
this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`); this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`);
sendQueuedMsgs(); sendQueuedMsgs();
}); });

5851
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
{ {
"name": "jambonz-feature-server", "name": "jambonz-feature-server",
"version": "0.8.3", "version": "0.8.4",
"main": "app.js", "main": "app.js",
"engines": { "engines": {
"node": ">= 10.16.0" "node": ">= 10.16.0"
@@ -19,19 +19,19 @@
"bugs": {}, "bugs": {},
"scripts": { "scripts": {
"start": "node app", "start": "node app",
"test": "NODE_ENV=test JAMBONES_HOSTING=1 HTTP_POOL=1 ENCRYPTION_SECRET=foobar DRACHTIO_HOST=127.0.0.1 DRACHTIO_PORT=9060 DRACHTIO_SECRET=cymru JAMBONES_MYSQL_HOST=127.0.0.1 JAMBONES_MYSQL_PORT=3360 JAMBONES_MYSQL_USER=jambones_test JAMBONES_MYSQL_PASSWORD=jambones_test JAMBONES_MYSQL_DATABASE=jambones_test JAMBONES_REDIS_HOST=127.0.0.1 JAMBONES_REDIS_PORT=16379 JAMBONES_LOGLEVEL=error ENABLE_METRICS=0 HTTP_PORT=3000 JAMBONES_SBCS=172.38.0.10 JAMBONES_FREESWITCH=127.0.0.1:8022:JambonzR0ck$:docker-host JAMBONES_TIME_SERIES_HOST=127.0.0.1 JAMBONES_NETWORK_CIDR=172.38.0.0/16 node test/ ", "test": "NODE_ENV=test JAMBONES_HOSTING=1 HTTP_POOL=1 JAMBONES_TTS_TRIM_SILENCE=1 ENCRYPTION_SECRET=foobar DRACHTIO_HOST=127.0.0.1 DRACHTIO_PORT=9060 DRACHTIO_SECRET=cymru JAMBONES_MYSQL_HOST=127.0.0.1 JAMBONES_MYSQL_PORT=3360 JAMBONES_MYSQL_USER=jambones_test JAMBONES_MYSQL_PASSWORD=jambones_test JAMBONES_MYSQL_DATABASE=jambones_test JAMBONES_REDIS_HOST=127.0.0.1 JAMBONES_REDIS_PORT=16379 JAMBONES_LOGLEVEL=error ENABLE_METRICS=0 HTTP_PORT=3000 JAMBONES_SBCS=172.38.0.10 JAMBONES_FREESWITCH=127.0.0.1:8022:JambonzR0ck$:docker-host JAMBONES_TIME_SERIES_HOST=127.0.0.1 JAMBONES_NETWORK_CIDR=172.38.0.0/16 node test/ ",
"coverage": "./node_modules/.bin/nyc --reporter html --report-dir ./coverage npm run test", "coverage": "./node_modules/.bin/nyc --reporter html --report-dir ./coverage npm run test",
"jslint": "eslint app.js tracer.js lib", "jslint": "eslint app.js tracer.js lib",
"jslint:fix": "eslint app.js tracer.js lib --fix" "jslint:fix": "eslint app.js tracer.js lib --fix"
}, },
"dependencies": { "dependencies": {
"@jambonz/db-helpers": "^0.9.0", "@jambonz/db-helpers": "^0.9.1",
"@jambonz/http-health-check": "^0.0.1", "@jambonz/http-health-check": "^0.0.1",
"@jambonz/realtimedb-helpers": "^0.8.6", "@jambonz/realtimedb-helpers": "^0.8.6",
"@jambonz/speech-utils": "^0.0.15", "@jambonz/speech-utils": "^0.0.19",
"@jambonz/stats-collector": "^0.1.8", "@jambonz/stats-collector": "^0.1.9",
"@jambonz/time-series": "^0.2.7", "@jambonz/time-series": "^0.2.8",
"@jambonz/verb-specifications": "^0.0.24", "@jambonz/verb-specifications": "^0.0.26",
"@opentelemetry/api": "^1.4.0", "@opentelemetry/api": "^1.4.0",
"@opentelemetry/exporter-jaeger": "^1.9.0", "@opentelemetry/exporter-jaeger": "^1.9.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.35.0", "@opentelemetry/exporter-trace-otlp-http": "^0.35.0",
@@ -41,7 +41,8 @@
"@opentelemetry/sdk-trace-base": "^1.9.0", "@opentelemetry/sdk-trace-base": "^1.9.0",
"@opentelemetry/sdk-trace-node": "^1.9.0", "@opentelemetry/sdk-trace-node": "^1.9.0",
"@opentelemetry/semantic-conventions": "^1.9.0", "@opentelemetry/semantic-conventions": "^1.9.0",
"aws-sdk": "^2.1313.0", "@aws-sdk/client-sns": "^3.360.0",
"@aws-sdk/client-auto-scaling": "^3.360.0",
"bent": "^7.3.12", "bent": "^7.3.12",
"debug": "^4.3.4", "debug": "^4.3.4",
"deepcopy": "^2.1.0", "deepcopy": "^2.1.0",

View File

@@ -5,6 +5,8 @@ const getJSON = bent('json')
const clearModule = require('clear-module'); const clearModule = require('clear-module');
const {provisionCallHook} = require('./utils') const {provisionCallHook} = require('./utils')
const sleepFor = (ms) => new Promise((r) => setTimeout(r, ms));
process.on('unhandledRejection', (reason, p) => { process.on('unhandledRejection', (reason, p) => {
console.log('Unhandled Rejection at: Promise', p, 'reason:', reason); console.log('Unhandled Rejection at: Promise', p, 'reason:', reason);
}); });
@@ -47,6 +49,7 @@ test('\'dial-phone\'', async(t) => {
// THEN // THEN
const p = sippUac('uas-dial.xml', '172.38.0.10', undefined, undefined, 2); const p = sippUac('uas-dial.xml', '172.38.0.10', undefined, undefined, 2);
await sleepFor(1000);
let account_sid = '622f62e4-303a-49f2-bbe0-eb1e1714e37a'; let account_sid = '622f62e4-303a-49f2-bbe0-eb1e1714e37a';
let post = bent('http://127.0.0.1:3000/', 'POST', 'json', 201); let post = bent('http://127.0.0.1:3000/', 'POST', 'json', 201);
@@ -84,7 +87,7 @@ test('\'dial-sip\'', async(t) => {
try { try {
await connect(srf); await connect(srf);
// wait for fs connected to drachtio server. // wait for fs connected to drachtio server.
await new Promise(r => setTimeout(r, 1000)); await sleepFor(1000);
// GIVEN // GIVEN
const from = "dial_sip"; const from = "dial_sip";
let verbs = [ let verbs = [

View File

@@ -42,7 +42,7 @@ services:
ipv4_address: 172.38.0.7 ipv4_address: 172.38.0.7
drachtio: drachtio:
image: drachtio/drachtio-server:latest image: drachtio/drachtio-server:0.8.22
restart: always restart: always
command: drachtio --contact "sip:*;transport=udp" --mtu 4096 --address 0.0.0.0 --port 9022 command: drachtio --contact "sip:*;transport=udp" --mtu 4096 --address 0.0.0.0 --port 9022
ports: ports:
@@ -57,7 +57,7 @@ services:
condition: service_healthy condition: service_healthy
freeswitch: freeswitch:
image: drachtio/drachtio-freeswitch-mrf:0.4.18 image: drachtio/drachtio-freeswitch-mrf:0.4.33
restart: always restart: always
command: freeswitch --rtp-range-start 20000 --rtp-range-end 20100 command: freeswitch --rtp-range-start 20000 --rtp-range-end 20100
environment: environment:

View File

@@ -210,6 +210,44 @@ test('\'transcribe\' test - soniox', async(t) => {
t.ok(obj.body.speech.alternatives[0].transcript.toLowerCase().startsWith('i\'d like to speak to customer support'), t.ok(obj.body.speech.alternatives[0].transcript.toLowerCase().startsWith('i\'d like to speak to customer support'),
'transcribe: succeeds when using soniox credentials'); 'transcribe: succeeds when using soniox credentials');
disconnect();
} catch (err) {
console.log(`error received: ${err}`);
disconnect();
t.error(err);
}
});
test('\'transcribe\' test - google with asrTimeout', async(t) => {
if (!GCP_JSON_KEY) {
t.pass('skipping google tests');
return t.end();
}
clearModule.all();
const {srf, disconnect} = require('../app');
try {
await connect(srf);
// GIVEN
let verbs = [
{
"verb": "transcribe",
"recognizer": {
"vendor": "google",
"hints": ["customer support", "sales", "human resources", "HR"],
"asrTimeout": 4
},
"transcriptionHook": "/transcriptionHook"
}
];
let from = "gather_success";
await provisionCallHook(from, verbs);
// THEN
await sippUac('uac-gather-account-creds-success.xml', '172.38.0.10', from);
let obj = await getJSON(`http://127.0.0.1:3100/lastRequest/${from}_actionHook`);
t.ok(obj.body.speech.alternatives[0].transcript.toLowerCase().startsWith('i\'d like to speak to customer support'),
'transcribe: succeeds when using google credentials');
disconnect(); disconnect();
} catch (err) { } catch (err) {
console.log(`error received: ${err}`); console.log(`error received: ${err}`);