Feature/opentelemetry (#89)

* initial adds for otel tracing

* initial basic testing

* basic tracing for incoming calls

* linting

* add traceId to the webhook params

* trace webhook calls

* tracing: add new commands as tags when receiving async commands over websocket

* tracing new commands

* add summary for config verb

* trace async commands

* bugfix: undefined ref

* tracing: give time for final webhooks before closing root span

* tracing bugfix: span for background gather was not ended

* tracing - minor tag changes

* tracing - add span atttribute for reason call ended

* trace call status webhooks, add app version to trace output

* config: add support for automatically re-enabling

* env var to customize service name in tracing UI

* config: change to use 'sticky' attribute to re-enable bargein automatically

* fix warnings

* when adulting create a new root span

* when background gather triggers bargein via vad clear queue of tasks

* additional trace attributes for dial and refer

* fix dial tracing

* add better summary for dial

* fix prev commit

* add exponential backoff to WsRequestor reconnection logic

* add calling number to log metadata, as this will be frequently the key data given for troubleshooting

* add accountSid to log metadata

* make handshake timeout for ws connections configurable with default 1.5 secs

* rename env var

* fix bug prev checkin

* logging fixes

* consistent env naming
This commit is contained in:
Dave Horton
2022-03-28 15:38:28 -04:00
committed by GitHub
parent f1f83598ca
commit 6abfdafe05
24 changed files with 1852 additions and 87 deletions

View File

@@ -8,6 +8,7 @@ const SipError = require('drachtio-srf').SipError;
const sysError = require('./error');
const HttpRequestor = require('../../utils/http-requestor');
const WsRequestor = require('../../utils/ws-requestor');
const RootSpan = require('../../utils/call-tracer');
const dbUtils = require('../../utils/db-utils');
router.post('/', async(req, res) => {
@@ -143,6 +144,7 @@ router.post('/', async(req, res) => {
/* ok our outbound INVITE is in flight */
const tasks = [restDial];
const rootSpan = new RootSpan('rest-call', req);
const callInfo = new CallInfo({
direction: CallDirection.Outbound,
req: inviteReq,
@@ -150,9 +152,20 @@ router.post('/', async(req, res) => {
tag: app.tag,
callSid,
accountSid: req.body.account_sid,
applicationSid: app.application_sid
applicationSid: app.application_sid,
traceId: this.rootSpan.traceId
});
cs = new RestCallSession({
logger,
application: app,
srf,
req: inviteReq,
ep,
tasks,
callInfo,
accountInfo,
rootSpan
});
cs = new RestCallSession({logger, application: app, srf, req: inviteReq, ep, tasks, callInfo, accountInfo});
cs.exec(req);
res.status(201).json({sid: cs.callSid});

View File

@@ -7,6 +7,8 @@ const makeTask = require('./tasks/make_task');
const parseUri = require('drachtio-srf').parseUri;
const normalizeJambones = require('./utils/normalize-jambones');
const dbUtils = require('./utils/db-utils');
const RootSpan = require('./utils/call-tracer');
const listTaskNames = require('./utils/summarize-tasks');
module.exports = function(srf, logger) {
const {
@@ -17,15 +19,18 @@ module.exports = function(srf, logger) {
lookupAppByTeamsTenant
} = srf.locals.dbHelpers;
const {lookupAccountDetails} = dbUtils(logger, srf);
function initLocals(req, res, next) {
if (!req.has('X-Account-Sid')) {
logger.info('getAccountDetails - rejecting call due to missing X-Account-Sid header');
return res.send(500);
}
const callSid = req.has('X-Retain-Call-Sid') ? req.get('X-Retain-Call-Sid') : uuidv4();
req.locals = {
callSid,
logger: logger.child({callId: req.get('Call-ID'), callSid})
};
const account_sid = req.get('X-Account-Sid');
req.locals = {callSid, account_sid};
if (req.has('X-Application-Sid')) {
const application_sid = req.get('X-Application-Sid');
req.locals.logger.debug(`got application from X-Application-Sid header: ${application_sid}`);
logger.debug(`got application from X-Application-Sid header: ${application_sid}`);
req.locals.application_sid = application_sid;
}
if (req.has('X-Authenticated-User')) req.locals.originatingUser = req.get('X-Authenticated-User');
@@ -34,19 +39,50 @@ module.exports = function(srf, logger) {
next();
}
function createRootSpan(req, res, next) {
const {callSid, account_sid} = req.locals;
const rootSpan = new RootSpan('incoming-call', req);
const traceId = rootSpan.traceId;
req.locals = {
...req.locals,
traceId,
logger: logger.child({
callId: req.get('Call-ID'),
callSid,
accountSid: account_sid,
callingNumber: req.callingNumber,
calledNumber: req.calledNumber,
traceId}),
rootSpan
};
/**
* end the span on final failure or cancel from caller;
* otherwise it will be closed when sip dialog is destroyed
*/
req.once('cancel', () => {
rootSpan.setAttributes({finalStatus: 487});
rootSpan.end();
});
res.once('finish', () => {
rootSpan.setAttributes({finalStatus: res.statusCode});
res.statusCode >= 300 && rootSpan.end();
});
next();
}
/**
* retrieve account information for the incoming call
*/
async function getAccountDetails(req, res, next) {
const {rootSpan, account_sid} = req.locals;
if (!req.has('X-Account-Sid')) {
logger.info('getAccountDetails - rejecting call due to missing X-Account-Sid header');
return res.send(500);
}
const account_sid = req.locals.account_sid = req.get('X-Account-Sid');
const {span} = rootSpan.startChildSpan('lookupAccountDetails');
try {
req.locals.accountInfo = await lookupAccountDetails(account_sid);
span.end();
if (!req.locals.accountInfo.account.is_active) {
logger.info(`Account is inactive or suspended ${account_sid}`);
// TODO: alert
@@ -55,6 +91,7 @@ module.exports = function(srf, logger) {
logger.debug({accountInfo: req.locals?.accountInfo?.account}, `retrieved account info for ${account_sid}`);
next();
} catch (err) {
span.end();
logger.info({err}, `Error retrieving account details for account ${account_sid}`);
res.send(503, {headers: {'X-Reason': `No Account exists for sid ${account_sid}`}});
}
@@ -86,7 +123,8 @@ module.exports = function(srf, logger) {
*/
async function retrieveApplication(req, res, next) {
const logger = req.locals.logger;
const {accountInfo, account_sid} = req.locals;
const {accountInfo, account_sid, rootSpan} = req.locals;
const {span} = rootSpan.startChildSpan('lookupApplication');
try {
let app;
if (req.locals.application_sid) app = await lookupAppBySid(req.locals.application_sid);
@@ -130,6 +168,11 @@ module.exports = function(srf, logger) {
}
}
span.setAttributes({
'app.hook': app?.call_hook?.url,
'application_sid': req.locals.application_sid
});
span.end();
if (!app || !app.call_hook || !app.call_hook.url) {
logger.info(`rejecting call to ${req.locals.calledNumber}: no application or webhook url`);
return res.send(480, {
@@ -163,9 +206,15 @@ module.exports = function(srf, logger) {
// eslint-disable-next-line no-unused-vars
const {call_hook, call_status_hook, ...appInfo} = obj; // mask sensitive data like user/pass on webhook
logger.info({app: appInfo}, `retrieved application for incoming call to ${req.locals.calledNumber}`);
req.locals.callInfo = new CallInfo({req, app, direction: CallDirection.Inbound});
req.locals.callInfo = new CallInfo({
req,
app,
direction: CallDirection.Inbound,
traceId: rootSpan.traceId
});
next();
} catch (err) {
span.end();
logger.error(err, `${req.get('Call-ID')} Error looking up application for ${req.calledNumber}`);
res.send(500);
}
@@ -176,7 +225,8 @@ module.exports = function(srf, logger) {
*/
async function invokeWebCallback(req, res, next) {
const logger = req.locals.logger;
const app = req.locals.application;
const {rootSpan, application:app} = req.locals;
let span;
try {
if (app.tasks) {
@@ -187,11 +237,20 @@ module.exports = function(srf, logger) {
/* retrieve the application to execute for this inbound call */
const params = Object.assign(['POST', 'WS'].includes(app.call_hook.method) ? {sip: req.msg} : {},
req.locals.callInfo);
const obj = rootSpan.startChildSpan('performAppWebhook');
span = obj.span;
const json = await app.requestor.request('session:new', app.call_hook, params);
app.tasks = normalizeJambones(logger, json).map((tdata) => makeTask(logger, tdata));
span.setAttributes({
'http.statusCode': 200,
'app.tasks': listTaskNames(app.tasks)
});
span.end();
if (0 === app.tasks.length) throw new Error('no application provided');
next();
} catch (err) {
span?.setAttributes({webhookStatus: err.statusCode});
span?.end();
logger.info({err}, `Error retrieving or parsing application: ${err?.message}`);
res.send(480, {headers: {'X-Reason': err?.message || 'unknown'}});
app.requestor.close();
@@ -200,6 +259,7 @@ module.exports = function(srf, logger) {
return {
initLocals,
createRootSpan,
getAccountDetails,
normalizeNumbers,
retrieveApplication,

View File

@@ -10,6 +10,7 @@ class CallInfo {
let from ;
let srf;
this.direction = opts.direction;
this.traceId = opts.traceId;
if (opts.req) {
const u = opts.req.getParsedHeader('from');
const uri = parseUri(u.uri);
@@ -114,6 +115,7 @@ class CallInfo {
callStatus: this.callStatus,
callerId: this.callerId,
accountSid: this.accountSid,
traceId: this.traceId,
applicationSid: this.applicationSid,
fsSipAddress: this.localSipAddress
};

View File

@@ -35,7 +35,7 @@ class CallSession extends Emitter {
* @param {array} opts.tasks - tasks we are to execute
* @param {callInfo} opts.callInfo - information about the call
*/
constructor({logger, application, srf, tasks, callInfo, accountInfo, memberId, confName, confUuid}) {
constructor({logger, application, srf, tasks, callInfo, accountInfo, rootSpan, memberId, confName, confUuid}) {
super();
this.logger = logger;
this.application = application;
@@ -50,6 +50,9 @@ class CallSession extends Emitter {
this.stackIdx = 0;
this.callGone = false;
this.notifiedComplete = false;
this.rootSpan = rootSpan;
assert(rootSpan);
this.tmpFiles = new Set();
@@ -65,6 +68,7 @@ class CallSession extends Emitter {
this._pool = srf.locals.dbHelpers.pool;
this.requestor.on('command', this._onCommand.bind(this));
this.requestor.on('connection-dropped', this._onWsConnectionDropped.bind(this));
}
/**
@@ -226,26 +230,36 @@ class CallSession extends Emitter {
return this.backgroundGatherTask;
}
async enableBotMode(gather) {
async enableBotMode(gather, autoEnable) {
try {
const t = normalizeJambones(this.logger, [gather]);
this.backgroundGatherTask = makeTask(this.logger, t[0]);
this.backgroundGatherTask
.on('dtmf', this._clearTasks.bind(this))
.on('vad', this._clearTasks.bind(this))
.on('transcription', this._clearTasks.bind(this))
.on('timeout', this._clearTasks.bind(this));
this.logger.info({gather}, 'CallSession:enableBotMode - starting background gather');
const resources = await this._evaluatePreconditions(this.backgroundGatherTask);
const {span, ctx} = this.rootSpan.startChildSpan(`background-gather:${this.backgroundGatherTask.summary}`);
this.backgroundGatherTask.span = span;
this.backgroundGatherTask.ctx = ctx;
this.backgroundGatherTask.exec(this, resources)
.then(() => {
this.logger.info('CallSession:enableBotMode: gather completed');
this.backgroundGatherTask && this.backgroundGatherTask.removeAllListeners();
this.backgroundGatherTask && this.backgroundGatherTask.span.end();
this.backgroundGatherTask = null;
if (autoEnable && !this.callGone) {
this.logger.info('CallSession:enableBotMode: restarting background gather');
setImmediate(() => this.enableBotMode(gather, true));
}
return;
})
.catch((err) => {
this.logger.info({err}, 'CallSession:enableBotMode: gather threw error');
this.backgroundGatherTask && this.backgroundGatherTask.removeAllListeners();
this.backgroundGatherTask && this.backgroundGatherTask.span.end();
this.backgroundGatherTask = null;
});
} catch (err) {
@@ -362,11 +376,16 @@ class CallSession extends Emitter {
this.backgroundGatherTask.updateTimeout(timeout);
}
else {
const {span, ctx} = this.rootSpan.startChildSpan(`verb:${task.summary}`);
task.span = span;
task.ctx = ctx;
await task.exec(this, resources);
task.span.end();
}
this.currentTask = null;
this.logger.info(`CallSession:exec completed task #${stackNum}:${taskNum}: ${task.name}`);
} catch (err) {
task.span?.end();
this.currentTask = null;
if (err.message?.includes(BADPRECONDITIONS)) {
this.logger.info(`CallSession:exec task #${stackNum}:${taskNum}: ${task.name}: ${err.message}`);
@@ -378,10 +397,19 @@ class CallSession extends Emitter {
}
if (0 === this.tasks.length && this.hasStableDialog && this.requestor instanceof WsRequestor) {
let span;
try {
await this._awaitCommandsOrHangup();
const {span} = this.rootSpan.startChildSpan('waiting for commands');
const {reason, queue, command} = await this._awaitCommandsOrHangup();
span.setAttributes({
'completion.reason': reason,
'async.request.queue': queue,
'async.request.command': command
});
span.end();
if (!this.hasStableDialog || this.callGone) break;
} catch (err) {
span.end();
this.logger.info(err, 'CallSession:exec - error waiting for new commands');
break;
}
@@ -438,17 +466,15 @@ class CallSession extends Emitter {
* this is called to clean up when the call is released from one side or another
*/
_callReleased() {
this.logger.debug('CallSession:_callReleased - caller hung up');
this.callGone = true;
if (this.currentTask) {
this.currentTask.kill(this);
this.currentTask = null;
}
if (this.wakeupResolver) {
this.wakeupResolver();
this.wakeupResolver({reason: 'session ended'});
this.wakeupResolver = null;
}
this.requestor && this.requestor.close();
}
/**
@@ -728,6 +754,7 @@ class CallSession extends Emitter {
_onCommand({msgid, command, call_sid, queueCommand, data}) {
this.logger.info({msgid, command, queueCommand}, 'CallSession:_onCommand - received command');
const resolution = {reason: 'received command', queue: queueCommand, command};
switch (command) {
case 'redirect':
if (Array.isArray(data)) {
@@ -742,6 +769,7 @@ class CallSession extends Emitter {
this.tasks.push(...t);
this.logger.debug({tasks: listTaskNames(this.tasks)}, 'CallSession:_onCommand - updated task list');
}
resolution.command = listTaskNames(t);
}
else this._lccCallHook(data);
break;
@@ -781,10 +809,24 @@ class CallSession extends Emitter {
this.logger.info(`CallSession:_onCommand - invalid command ${command}`);
}
if (this.wakeupResolver) {
this.logger.info('CallSession:_onCommand - got commands, waking up..');
this.wakeupResolver();
this.logger.debug({resolution}, 'CallSession:_onCommand - got commands, waking up..');
this.wakeupResolver(resolution);
this.wakeupResolver = null;
}
else {
const {span} = this.rootSpan.startChildSpan('async command');
const {queue, command} = resolution;
span.setAttributes({
'async.request.queue': queue,
'async.request.command': command
});
span.end();
}
}
_onWsConnectionDropped() {
const {stats} = this.srf.locals;
stats.increment('app.hook.remote_close');
}
_evaluatePreconditions(task) {
@@ -928,6 +970,8 @@ class CallSession extends Emitter {
}
this.tmpFiles.clear();
this.requestor && this.requestor.close();
this.rootSpan && this.rootSpan.end();
}
/**
@@ -960,7 +1004,13 @@ class CallSession extends Emitter {
async propagateAnswer() {
if (!this.dlg) {
assert(this.ep);
this.dlg = await this.srf.createUAS(this.req, this.res, {localSdp: this.ep.local.sdp});
this.dlg = await this.srf.createUAS(this.req, this.res, {
headers: {
'X-Trace-ID': this.req.locals.traceId,
'X-Call-Sid': this.req.locals.callSid
},
localSdp: this.ep.local.sdp
});
this.logger.debug('answered call');
this.dlg.on('destroy', this._callerHungup.bind(this));
this.wrapDialog(this.dlg);
@@ -1148,9 +1198,10 @@ class CallSession extends Emitter {
const duration = moment().diff(this.dlg.connectTime, 'seconds');
this.emit('callStatusChange', {callStatus: CallStatus.Completed, duration});
this.logger.debug('CallSession: call terminated by jambones');
this.rootSpan.setAttributes({'call.termination': 'hangup by jambonz'});
origDestroy();
if (this.wakeupResolver) {
this.wakeupResolver();
this.wakeupResolver({reason: 'session ended'});
this.wakeupResolver = null;
}
}
@@ -1210,9 +1261,13 @@ class CallSession extends Emitter {
this.callInfo.updateCallStatus(callStatus, sipStatus, sipReason);
if (typeof duration === 'number') this.callInfo.duration = duration;
const {span} = this.rootSpan.startChildSpan(`call-status:${this.callInfo.callStatus}`);
span.setAttributes(this.callInfo.toJSON());
try {
this.notifier.request('call:status', this.call_status_hook, this.callInfo.toJSON());
span.end();
} catch (err) {
span.end();
this.logger.info(err, `CallSession:_notifyCallStatusChange error sending ${callStatus} ${sipStatus}`);
}

View File

@@ -8,7 +8,7 @@ const CallSession = require('./call-session');
*/
class ConfirmCallSession extends CallSession {
constructor({logger, application, dlg, ep, tasks, callInfo, accountInfo, memberId, confName}) {
constructor({logger, application, dlg, ep, tasks, callInfo, accountInfo, memberId, confName, rootSpan}) {
super({
logger,
application,
@@ -18,7 +18,8 @@ class ConfirmCallSession extends CallSession {
callInfo,
accountInfo,
memberId,
confName
confName,
rootSpan
});
this.dlg = dlg;
this.ep = ep;

View File

@@ -16,7 +16,8 @@ class InboundCallSession extends CallSession {
application: req.locals.application,
callInfo: req.locals.callInfo,
accountInfo: req.locals.accountInfo,
tasks: req.locals.application.tasks
tasks: req.locals.application.tasks,
rootSpan: req.locals.rootSpan
});
this.req = req;
this.res = res;
@@ -32,6 +33,7 @@ class InboundCallSession extends CallSession {
}
_onCancel() {
this.rootSpan.setAttributes({'call.termination': 'caller abandoned'});
this._notifyCallStatusChange({
callStatus: CallStatus.NoAnswer,
sipStatus: 487,
@@ -43,6 +45,7 @@ class InboundCallSession extends CallSession {
_onTasksDone() {
if (!this.res.finalResponseSent) {
if (this._mediaServerFailure) {
this.rootSpan.setAttributes({'call.termination': 'media server failure'});
this.logger.info('InboundCallSession:_onTasksDone generating 480 due to media server failure');
this.res.send(480, {
headers: {
@@ -51,6 +54,7 @@ class InboundCallSession extends CallSession {
});
}
else {
this.rootSpan.setAttributes({'call.termination': 'tasks completed without answering call'});
this.logger.info('InboundCallSession:_onTasksDone auto-generating non-success response to invite');
this.res.send(603);
}
@@ -64,11 +68,12 @@ class InboundCallSession extends CallSession {
_callerHungup() {
assert(this.dlg.connectTime);
const duration = moment().diff(this.dlg.connectTime, 'seconds');
this.rootSpan.setAttributes({'call.termination': 'hangup by caller'});
this.emit('callStatusChange', {
callStatus: CallStatus.Completed,
duration
});
this.logger.debug('InboundCallSession: caller hung up');
this.logger.info('InboundCallSession: caller hung up');
this._callReleased();
this.req.removeAllListeners('cancel');
}

View File

@@ -8,7 +8,7 @@ const moment = require('moment');
* @extends CallSession
*/
class RestCallSession extends CallSession {
constructor({logger, application, srf, req, ep, tasks, callInfo, accountInfo}) {
constructor({logger, application, srf, req, ep, tasks, callInfo, accountInfo, rootSpan}) {
super({
logger,
application,
@@ -16,7 +16,8 @@ class RestCallSession extends CallSession {
callSid: callInfo.callSid,
tasks,
callInfo,
accountInfo
accountInfo,
rootSpan
});
this.req = req;
this.ep = ep;

View File

@@ -24,6 +24,7 @@ class TaskConfig extends Task {
].forEach((k) => {
if (this.bargeIn[k]) this.gatherOpts[k] = this.bargeIn[k];
});
if (this.bargeIn.sticky) this.autoEnable = true;
}
this.preconditions = this.hasBargeIn ? TaskPreconditions.Endpoint : TaskPreconditions.None;
}
@@ -36,6 +37,22 @@ class TaskConfig extends Task {
get hasBargeIn() { return Object.keys(this.bargeIn).length; }
get summary() {
const phrase = [];
if (this.hasBargeIn) phrase.push('enable barge-in');
if (this.hasSynthesizer) {
const {vendor:v, language:l, voice} = this.synthesizer;
const s = `{${v},${l},${voice}}`;
phrase.push(`set synthesizer${s}`);
}
if (this.hasRecognizer) {
const {vendor:v, language:l} = this.recognizer;
const s = `{${v},${l}}`;
phrase.push(`set recognizer${s}`);
}
return `${this.name}{${phrase.join(',')}`;
}
async exec(cs) {
await super.exec(cs);
@@ -63,7 +80,7 @@ class TaskConfig extends Task {
if (this.hasBargeIn) {
if (this.gatherOpts) {
this.logger.debug({opts: this.gatherOpts}, 'Config: enabling bargeIn');
cs.enableBotMode(this.gatherOpts);
cs.enableBotMode(this.gatherOpts, this.autoEnable);
}
else {
this.logger.debug('Config: disabling bargeIn');

View File

@@ -138,7 +138,20 @@ class TaskDial extends Task {
}
get summary() {
if (this.target.length === 1) return `${this.name}{type=${this.target[0].type}}`;
if (this.target.length === 1) {
const target = this.target[0];
switch (target.type) {
case 'phone':
case 'teams':
return `${this.name}{type=${target.type},number=${target.number}}`;
case 'user':
return `${this.name}{type=${target.type},name=${target.name}}`;
case 'sip':
return `${this.name}{type=${target.type},sipUri=${target.sipUri}}`;
default:
return `${this.name}`;
}
}
else return `${this.name}{${this.target.length} targets}`;
}
@@ -384,6 +397,7 @@ class TaskDial extends Task {
this._killOutdials();
}, this.timeout * 1000);
this.span.setAttributes('dial.target', JSON.stringify(this.target));
this.target.forEach(async(t) => {
try {
t.url = t.url || this.confirmUrl;
@@ -420,7 +434,9 @@ class TaskDial extends Task {
target: t,
opts,
callInfo: cs.callInfo,
accountInfo: cs.accountInfo
accountInfo: cs.accountInfo,
rootSpan: cs.rootSpan,
startSpan: this.startSpan.bind(this)
});
this.dials.set(sd.callSid, sd);

View File

@@ -397,6 +397,7 @@ class TaskGather extends Task {
if (this.bargein && this.minBargeinWordCount === 0) {
this.logger.debug('TaskGather:_onVadDetected');
this._killAudio(cs);
this.emit('vad');
}
}
@@ -407,9 +408,10 @@ class TaskGather extends Task {
async _resolve(reason, evt) {
if (this.resolved) return;
this.resolved = true;
this.logger.debug(`TaskGather:resolve with reason ${reason}`);
this.logger.info(`TaskGather:resolve with reason ${reason}`);
clearTimeout(this.interDigitTimer);
this.span.setAttributes({'stt.resolve': reason, 'stt.result': JSON.stringify(evt)});
if (this.ep && this.ep.connected) {
this.ep.stopTranscription({vendor: this.vendor})
.catch((err) => this.logger.error({err}, 'Error stopping transcription'));

View File

@@ -13,6 +13,10 @@ class TaskPlay extends Task {
get name() { return TaskName.Play; }
get summary() {
return `${this.name}:{url=${this.url}}`;
}
async exec(cs, ep) {
await super.exec(cs);
this.ep = ep;

View File

@@ -19,7 +19,7 @@ class TaskSay extends Task {
if (this.text[i].startsWith('silence_stream')) continue;
return `${this.name}{text=${this.text[i].slice(0, 15)}${this.text[i].length > 15 ? '...' : ''}}`;
}
return this.text[0];
return `${this.name}{${this.text[0]}}`;
}
async exec(cs, ep) {
@@ -44,6 +44,7 @@ class TaskSay extends Task {
this.logger.info({vendor, language, voice}, 'TaskSay:exec');
this.ep = ep;
let span;
try {
if (!credentials) {
writeAlerts({
@@ -55,6 +56,14 @@ class TaskSay extends Task {
}
// synthesize all of the text elements
let lastUpdated = false;
/* otel: trace time for tts */
span = this.startSpan('tts-generation', {
'tts.vendor': vendor,
'tts.language': language,
'tts.voice': voice
});
const filepath = (await Promise.all(this.text.map(async(text) => {
if (this.killed) return;
if (text.startsWith('silence_stream://')) return text;
@@ -82,9 +91,10 @@ class TaskSay extends Task {
updateSpeechCredentialLastUsed(credentials.speech_credential_sid)
.catch(() => {/*already logged error */});
}
span.setAttributes({'tts.cached': servedFromCache});
return filePath;
}))).filter((fp) => fp && fp.length);
span?.end();
this.logger.debug({filepath}, 'synthesized files for tts');
while (!this.killed && (this.loop === 'forever' || this.loop--) && this.ep?.connected) {
@@ -102,6 +112,7 @@ class TaskSay extends Task {
} while (!this.killed && ++segment < filepath.length);
}
} catch (err) {
span?.end();
this.logger.info(err, 'TaskSay:exec error');
}
this.emit('playDone');

View File

@@ -26,6 +26,12 @@ class TaskSipRefer extends Task {
try {
this.notifyHandler = this._handleNotify.bind(this, cs, dlg);
dlg.on('notify', this.notifyHandler);
/* otel: trace time for tts */
this.referSpan = this.startSpan('send-refer', {
'refer.refer_to': referTo,
'refer.referred_by': referredBy
});
const response = await dlg.request({
method: 'REFER',
headers: {
@@ -35,16 +41,20 @@ class TaskSipRefer extends Task {
}
});
this.referStatus = response.status;
this.referSpan.setAttributes({'refer.status_code': response.status});
this.logger.info(`TaskSipRefer:exec - received ${this.referStatus} to REFER`);
/* if we fail, fall through to next verb. If success, we should get BYE from far end */
if (this.referStatus === 202) {
await this.awaitTaskDone();
}
else await this.performAction({refer_status: this.referStatus});
else {
await this.performAction({refer_status: this.referStatus});
}
} catch (err) {
this.logger.info({err}, 'TaskSipRefer:exec - error sending REFER');
}
this.referSpan?.end();
}
async kill(cs) {
@@ -69,6 +79,7 @@ class TaskSipRefer extends Task {
await cs.requestor.request('verb:hook', this.eventHook, {event: 'transfer-status', call_status: status});
}
if (status >= 200) {
this.referSpan.setAttributes({'refer.finalNotify': status});
await this.performAction({refer_status: 202, final_referred_call_status: status});
this.notifyTaskDone();
}

View File

@@ -32,6 +32,7 @@
"bargeIn": {
"properties": {
"enable": "boolean",
"sticky": "boolean",
"actionHook": "object|string",
"input": "array",
"finishOnKey": "string",

View File

@@ -4,6 +4,7 @@ const debug = require('debug')('jambonz:feature-server');
const assert = require('assert');
const {TaskPreconditions} = require('../utils/constants');
const normalizeJambones = require('../utils/normalize-jambones');
const {trace} = require('@opentelemetry/api');
const specs = new Map();
const _specData = require('./specs');
for (const key in _specData) {specs.set(key, _specData[key]);}
@@ -71,6 +72,15 @@ class Task extends Emitter {
setImmediate(() => this.parentTask = null);
}
startSpan(name, attributes) {
const {srf} = require('../..');
const {tracer} = srf.locals.otel;
const span = tracer.startSpan(name, undefined, this.ctx);
if (attributes) span.setAttributes(attributes);
trace.setSpan(this.ctx, span);
return span;
}
/**
* when a subclass Task has completed its work, it should call this method
*/
@@ -111,29 +121,49 @@ class Task extends Emitter {
async performAction(results, expectResponse = true) {
if (this.actionHook) {
const params = results ? Object.assign(results, this.cs.callInfo.toJSON()) : this.cs.callInfo.toJSON();
const json = await this.cs.requestor.request('verb:hook', this.actionHook, params);
if (expectResponse && json && Array.isArray(json)) {
const makeTask = require('./make_task');
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
if (tasks && tasks.length > 0) {
this.logger.info({tasks: tasks}, `${this.name} replacing application with ${tasks.length} tasks`);
this.callSession.replaceApplication(tasks);
const span = this.startSpan('verb:hook', {'hook.url': this.actionHook});
span.setAttributes({'http.body': JSON.stringify(params)});
try {
const json = await this.cs.requestor.request('verb:hook', this.actionHook, params);
span.setAttributes({'http.statusCode': 200});
span.end();
if (expectResponse && json && Array.isArray(json)) {
const makeTask = require('./make_task');
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
if (tasks && tasks.length > 0) {
this.logger.info({tasks: tasks}, `${this.name} replacing application with ${tasks.length} tasks`);
this.callSession.replaceApplication(tasks);
}
}
} catch (err) {
span.setAttributes({'http.statusCode': err.statusCode});
span.end();
throw err;
}
}
}
async performHook(cs, hook, results) {
const json = await cs.requestor.request('verb:hook', hook, results);
if (json && Array.isArray(json)) {
const makeTask = require('./make_task');
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
if (tasks && tasks.length > 0) {
this.redirect(cs, tasks);
return true;
const span = this.startSpan('verb:hook', {'hook.url': hook});
span.setAttributes({'http.body': JSON.stringify(results)});
try {
const json = await cs.requestor.request('verb:hook', hook, results);
span.setAttributes({'http.statusCode': 200});
span.end();
if (json && Array.isArray(json)) {
const makeTask = require('./make_task');
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
if (tasks && tasks.length > 0) {
this.redirect(cs, tasks);
return true;
}
}
return false;
} catch (err) {
span.setAttributes({'http.statusCode': err.statusCode});
span.end();
throw err;
}
return false;
}
redirect(cs, tasks) {

63
lib/utils/call-tracer.js Normal file
View File

@@ -0,0 +1,63 @@
const {context, trace} = require('@opentelemetry/api');
const {Dialog} = require('drachtio-srf');
class RootSpan {
constructor(callType, req) {
let tracer, callSid, linkedSpanId;
if (req instanceof Dialog) {
const dlg = req;
tracer = dlg.srf.locals.otel.tracer;
callSid = dlg.callSid;
linkedSpanId = dlg.linkedSpanId;
}
else {
tracer = req.srf.locals.otel.tracer;
callSid = req.locals.callSid;
}
this._span = tracer.startSpan(callType || 'incoming-call');
if (req instanceof Dialog) {
const dlg = req;
this._span.setAttributes({
linkedSpanId,
callId: dlg.sip.callId
});
}
else {
this._span.setAttributes({
callSid,
accountSid: req.get('X-Account-Sid'),
applicationSid: req.locals.application_sid,
callId: req.get('Call-ID'),
externalCallId: req.get('X-CID')
});
}
this._ctx = trace.setSpan(context.active(), this._span);
this.tracer = tracer;
}
get context() {
return this._ctx;
}
get traceId() {
return this._span.spanContext().traceId;
}
setAttributes(attrs) {
this._span.setAttributes(attrs);
}
end() {
this._span.end();
}
startChildSpan(name, attributes) {
const span = this.tracer.startSpan(name, attributes, this._ctx);
const ctx = trace.setSpan(context.active(), span);
return {span, ctx};
}
}
module.exports = RootSpan;

View File

@@ -48,6 +48,7 @@ module.exports = (logger, srf) => {
const pp = pool.promise();
const lookupAccountDetails = async(account_sid) => {
const [r] = await pp.query({sql: sqlAccountDetails, nestTables: true}, account_sid);
if (0 === r.length) throw new Error(`invalid accountSid: ${account_sid}`);
const [r2] = await pp.query(sqlSpeechCredentials, account_sid);

View File

@@ -32,6 +32,7 @@ function initMS(logger, wrapper, ms) {
function installSrfLocals(srf, logger) {
logger.debug('installing srf locals');
assert(!srf.locals.dbHelpers);
const {tracer} = srf.locals.otel;
const {getSBC, lifecycleEmitter} = require('./sbc-pinger')(logger);
const StatsCollector = require('@jambonz/stats-collector');
const stats = srf.locals.stats = new StatsCollector(logger);
@@ -127,7 +128,7 @@ function installSrfLocals(srf, logger) {
password: process.env.JAMBONES_MYSQL_PASSWORD,
database: process.env.JAMBONES_MYSQL_DATABASE,
connectionLimit: process.env.JAMBONES_MYSQL_CONNECTION_LIMIT || 10
}, logger);
}, logger, tracer);
const {
client,
updateCallStatus,
@@ -152,7 +153,7 @@ function installSrfLocals(srf, logger) {
} = require('@jambonz/realtimedb-helpers')({
host: process.env.JAMBONES_REDIS_HOST,
port: process.env.JAMBONES_REDIS_PORT || 6379
}, logger);
}, logger, tracer);
const {
writeAlerts,
AlertType

View File

@@ -9,10 +9,11 @@ const AdultingCallSession = require('../session/adulting-call-session');
const deepcopy = require('deepcopy');
const moment = require('moment');
const stripCodecs = require('./strip-ancillary-codecs');
const RootSpan = require('./call-tracer');
const { v4: uuidv4 } = require('uuid');
class SingleDialer extends Emitter {
constructor({logger, sbcAddress, target, opts, application, callInfo, accountInfo}) {
constructor({logger, sbcAddress, target, opts, application, callInfo, accountInfo, rootSpan, startSpan}) {
super();
assert(target.type);
@@ -22,6 +23,8 @@ class SingleDialer extends Emitter {
this.opts = opts;
this.application = application;
this.confirmHook = target.confirmHook;
this.rootSpan = rootSpan;
this.startSpan = startSpan;
this.bindings = logger.bindings();
@@ -71,7 +74,7 @@ class SingleDialer extends Emitter {
};
}
this.ms = ms;
let uri, to;
let uri, to, inviteSpan;
try {
switch (this.target.type) {
case 'phone':
@@ -137,13 +140,24 @@ class SingleDialer extends Emitter {
localSdp: this.ep.local.sdp
});
if (this.target.auth) opts.auth = this.target.auth;
inviteSpan = this.startSpan('invite', {
'invite.uri': uri,
'invite.dest_type': this.target.type
});
this.dlg = await srf.createUAC(uri, {...opts, followRedirects: true, keepUriOnRedirect: true}, {
cbRequest: (err, req) => {
if (err) {
this.logger.error(err, 'SingleDialer:exec Error creating call');
this.emit('callCreateFail', err);
inviteSpan.setAttributes({
'invite.status_code': 500,
'invite.err': err.message
});
inviteSpan.end();
return;
}
inviteSpan.setAttributes({'invite.call_id': req.get('Call-ID')});
/**
* INVITE has been sent out
@@ -156,7 +170,8 @@ class SingleDialer extends Emitter {
parentCallInfo: this.parentCallInfo,
req,
to,
callSid: this.callSid
callSid: this.callSid,
traceId: this.rootSpan.traceId
});
this.logger = srf.locals.parentLogger.child({
callSid: this.callSid,
@@ -193,6 +208,9 @@ class SingleDialer extends Emitter {
});
this.logger.debug(`SingleDialer:exec call connected: ${this.callSid}`);
const connectTime = this.dlg.connectTime = moment();
inviteSpan.setAttributes({'invite.status_code': 200});
inviteSpan.end();
/* race condition: we were killed just as call was answered */
if (this.killed) {
@@ -246,10 +264,17 @@ class SingleDialer extends Emitter {
if (err.status === 487) status.callStatus = CallStatus.NoAnswer;
else if ([486, 600].includes(err.status)) status.callStatus = CallStatus.Busy;
this.logger.info(`SingleDialer:exec outdial failure ${err.status}`);
inviteSpan.setAttributes({'invite.status_code': err.status});
inviteSpan.end();
}
else {
this.logger.error(err, 'SingleDialer:exec');
status.sipStatus = 500;
inviteSpan.setAttributes({
'invite.status_code': 500,
'invite.err': err.message
});
inviteSpan.end();
}
this.emit('callStatusChange', status);
if (this.ep) this.ep.destroy();
@@ -305,7 +330,8 @@ class SingleDialer extends Emitter {
dlg: this.dlg,
ep: this.ep,
callInfo: this.callInfo,
tasks
tasks,
rootSpan: this.rootSpan
});
await cs.exec();
@@ -330,13 +356,18 @@ class SingleDialer extends Emitter {
else {
await this.reAnchorMedia();
}
this.dlg.callSid = this.callSid;
this.dlg.linkedSpanId = this.rootSpan.traceId;
const rootSpan = new RootSpan('outbound-call', this.dlg);
const cs = new AdultingCallSession({
logger: this.logger,
singleDialer: this,
application,
callInfo: this.callInfo,
accountInfo: this.accountInfo,
tasks
tasks,
rootSpan
});
cs.exec();
return cs;
@@ -387,9 +418,13 @@ class SingleDialer extends Emitter {
}
}
function placeOutdial({logger, srf, ms, sbcAddress, target, opts, application, callInfo, accountInfo}) {
function placeOutdial({
logger, srf, ms, sbcAddress, target, opts, application, callInfo, accountInfo, rootSpan, startSpan
}) {
const myOpts = deepcopy(opts);
const sd = new SingleDialer({logger, sbcAddress, target, myOpts, application, callInfo, accountInfo});
const sd = new SingleDialer({
logger, sbcAddress, target, myOpts, application, callInfo, accountInfo, rootSpan, startSpan
});
sd.exec(srf, ms, myOpts);
return sd;
}

View File

@@ -15,6 +15,7 @@ class WsRequestor extends BaseRequestor {
this.messagesInFlight = new Map();
this.maliciousClient = false;
this.closedByUs = false;
this.backoffMs = 500;
assert(this._isAbsoluteUrl(this.url));
@@ -123,10 +124,10 @@ class WsRequestor extends BaseRequestor {
}
close() {
this.logger.info('WsRequestor:close closing socket');
this.closedByUs = true;
try {
if (this.ws) {
this.logger.info('WsRequestor:close closing socket');
this.ws.close();
this.ws.removeAllListeners();
}
@@ -144,10 +145,13 @@ class WsRequestor extends BaseRequestor {
_connect() {
assert(!this.ws);
return new Promise((resolve, reject) => {
const handshakeTimeout = process.env.JAMBONES_WS_HANDSHAKE_TIMEOUT_MS ?
parseInt(process.env.JAMBONES_WS_HANDSHAKE_TIMEOUT_MS) :
1500;
let opts = {
followRedirects: true,
maxRedirects: 2,
handshakeTimeout: 1000,
handshakeTimeout,
maxPayload: 8096,
};
if (this.username && this.password) opts = {...opts, auth: `${this.username}:${this.password}`};
@@ -156,7 +160,7 @@ class WsRequestor extends BaseRequestor {
.once('ready', (ws) => {
this.ws = ws;
this.removeAllListeners('not-ready');
if (this.connections++ > 0) this.request('session:reconnect', this.url);
if (this.connections > 0) this.request('session:reconnect', this.url);
resolve();
})
.once('not-ready', (err) => {
@@ -213,8 +217,10 @@ class WsRequestor extends BaseRequestor {
_onSocketClosed() {
this.ws = null;
if (this.connections > 0 && this.connections < MAX_RECONNECTS && !this.closedByUs) {
setTimeout(this._connect.bind(this), 500);
this.emit('connection-dropped');
if (this.connections++ > 0 && this.connections < MAX_RECONNECTS && !this.closedByUs) {
setTimeout(this._connect.bind(this), this.backoffMs);
this.backoffMs = this.backoffMs < 2000 ? this.backoffMs * 2 : (this.backoffMs + 2000);
}
}