mirror of
https://github.com/jambonz/jambonz-feature-server.git
synced 2025-12-20 08:40:38 +00:00
Feature/trace propagation (#96)
* add b3 header for trace propagation on initial webhook * logging * add tracing context to all webhooks * Add span parameter to Task.getTracingPropagation. Pass proper span to getTracingPropagation calls in Task methods to propagate the proper spanId (#91) * some tracing cleanup * bugfix: azure stt results need to be ordered by confidence level before processing (#92) * fix assertion * bugfix: vad was not enabled on config verb, restart STT on empty transcript in gather * gather: dont send webhook if call is gone * rest outdial: handle 302 redirect so we can later cancel request if needed (#95) * gather: restart if we get an empty transcript (looking at you, Azure) Co-authored-by: javibookline <98887695+javibookline@users.noreply.github.com>
This commit is contained in:
@@ -132,8 +132,14 @@ router.post('/', async(req, res) => {
|
|||||||
try {
|
try {
|
||||||
const dlg = await srf.createUAC(uri, {...opts, followRedirects: true, keepUriOnRedirect: true}, {
|
const dlg = await srf.createUAC(uri, {...opts, followRedirects: true, keepUriOnRedirect: true}, {
|
||||||
cbRequest: (err, inviteReq) => {
|
cbRequest: (err, inviteReq) => {
|
||||||
/* in case of 302 redirect, this gets called twice, ignore the second */
|
/* in case of 302 redirect, this gets called twice, ignore the second
|
||||||
if (res.headersSent) return;
|
except to update the req so that it can later be canceled if need be
|
||||||
|
*/
|
||||||
|
if (res.headersSent) {
|
||||||
|
logger.info(`create-call: got redirect, updating request to new call-id ${req.get('Call-ID')}`);
|
||||||
|
if (cs) cs.req = inviteReq;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (err) {
|
if (err) {
|
||||||
logger.error(err, 'createCall Error creating call');
|
logger.error(err, 'createCall Error creating call');
|
||||||
|
|||||||
@@ -228,7 +228,6 @@ module.exports = function(srf, logger) {
|
|||||||
const {rootSpan, application:app} = req.locals;
|
const {rootSpan, application:app} = req.locals;
|
||||||
let span;
|
let span;
|
||||||
try {
|
try {
|
||||||
|
|
||||||
if (app.tasks) {
|
if (app.tasks) {
|
||||||
app.tasks = normalizeJambones(logger, app.tasks).map((tdata) => makeTask(logger, tdata));
|
app.tasks = normalizeJambones(logger, app.tasks).map((tdata) => makeTask(logger, tdata));
|
||||||
if (0 === app.tasks.length) throw new Error('no application provided');
|
if (0 === app.tasks.length) throw new Error('no application provided');
|
||||||
@@ -239,7 +238,9 @@ module.exports = function(srf, logger) {
|
|||||||
req.locals.callInfo);
|
req.locals.callInfo);
|
||||||
const obj = rootSpan.startChildSpan('performAppWebhook');
|
const obj = rootSpan.startChildSpan('performAppWebhook');
|
||||||
span = obj.span;
|
span = obj.span;
|
||||||
const json = await app.requestor.request('session:new', app.call_hook, params);
|
const b3 = rootSpan.getTracingPropagation();
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
|
const json = await app.requestor.request('session:new', app.call_hook, params, httpHeaders);
|
||||||
app.tasks = normalizeJambones(logger, json).map((tdata) => makeTask(logger, tdata));
|
app.tasks = normalizeJambones(logger, json).map((tdata) => makeTask(logger, tdata));
|
||||||
span.setAttributes({
|
span.setAttributes({
|
||||||
'http.statusCode': 200,
|
'http.statusCode': 200,
|
||||||
|
|||||||
@@ -230,6 +230,10 @@ class CallSession extends Emitter {
|
|||||||
return this.backgroundGatherTask;
|
return this.backgroundGatherTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
get b3() {
|
||||||
|
return this.rootSpan?.getTracingPropagation();
|
||||||
|
}
|
||||||
|
|
||||||
async enableBotMode(gather, autoEnable) {
|
async enableBotMode(gather, autoEnable) {
|
||||||
try {
|
try {
|
||||||
const t = normalizeJambones(this.logger, [gather]);
|
const t = normalizeJambones(this.logger, [gather]);
|
||||||
@@ -512,17 +516,20 @@ class CallSession extends Emitter {
|
|||||||
async _lccCallHook(opts) {
|
async _lccCallHook(opts) {
|
||||||
const webhooks = [];
|
const webhooks = [];
|
||||||
let sd, tasks, childTasks;
|
let sd, tasks, childTasks;
|
||||||
|
const b3 = this.b3;
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
|
|
||||||
if (opts.call_hook || opts.child_call_hook) {
|
if (opts.call_hook || opts.child_call_hook) {
|
||||||
if (opts.call_hook) {
|
if (opts.call_hook) {
|
||||||
webhooks.push(this.requestor.request('session:redirect', opts.call_hook, this.callInfo.toJSON()));
|
webhooks.push(this.requestor.request('session:redirect', opts.call_hook, this.callInfo.toJSON(), httpHeaders));
|
||||||
}
|
}
|
||||||
if (opts.child_call_hook) {
|
if (opts.child_call_hook) {
|
||||||
/* child call hook only allowed from a connected Dial state */
|
/* child call hook only allowed from a connected Dial state */
|
||||||
const task = this.currentTask;
|
const task = this.currentTask;
|
||||||
sd = task.sd;
|
sd = task.sd;
|
||||||
if (task && TaskName.Dial === task.name && sd) {
|
if (task && TaskName.Dial === task.name && sd) {
|
||||||
webhooks.push(this.requestor.request('session:redirect', opts.child_call_hook, sd.callInfo.toJSON()));
|
webhooks.push(this.requestor.request(
|
||||||
|
'session:redirect', opts.child_call_hook, sd.callInfo.toJSON(), httpHeaders));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
const [tasks1, tasks2] = await Promise.all(webhooks);
|
const [tasks1, tasks2] = await Promise.all(webhooks);
|
||||||
@@ -641,6 +648,8 @@ class CallSession extends Emitter {
|
|||||||
async _lccWhisper(opts, callSid) {
|
async _lccWhisper(opts, callSid) {
|
||||||
const {whisper} = opts;
|
const {whisper} = opts;
|
||||||
let tasks;
|
let tasks;
|
||||||
|
const b3 = this.b3;
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
|
|
||||||
// this whole thing requires us to be in a Dial verb
|
// this whole thing requires us to be in a Dial verb
|
||||||
const task = this.currentTask;
|
const task = this.currentTask;
|
||||||
@@ -651,7 +660,7 @@ class CallSession extends Emitter {
|
|||||||
// allow user to provide a url object, a url string, an array of tasks, or a single task
|
// allow user to provide a url object, a url string, an array of tasks, or a single task
|
||||||
if (typeof whisper === 'string' || (typeof whisper === 'object' && whisper.url)) {
|
if (typeof whisper === 'string' || (typeof whisper === 'object' && whisper.url)) {
|
||||||
// retrieve a url
|
// retrieve a url
|
||||||
const json = await this.requestor(opts.call_hook, this.callInfo.toJSON());
|
const json = await this.requestor(opts.call_hook, this.callInfo.toJSON(), httpHeaders);
|
||||||
tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
|
tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
|
||||||
}
|
}
|
||||||
else if (Array.isArray(whisper)) {
|
else if (Array.isArray(whisper)) {
|
||||||
@@ -1264,7 +1273,9 @@ class CallSession extends Emitter {
|
|||||||
const {span} = this.rootSpan.startChildSpan(`call-status:${this.callInfo.callStatus}`);
|
const {span} = this.rootSpan.startChildSpan(`call-status:${this.callInfo.callStatus}`);
|
||||||
span.setAttributes(this.callInfo.toJSON());
|
span.setAttributes(this.callInfo.toJSON());
|
||||||
try {
|
try {
|
||||||
this.notifier.request('call:status', this.call_status_hook, this.callInfo.toJSON());
|
const b3 = this.b3;
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
|
this.notifier.request('call:status', this.call_status_hook, this.callInfo.toJSON(), httpHeaders);
|
||||||
span.end();
|
span.end();
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
span.end();
|
span.end();
|
||||||
|
|||||||
@@ -529,7 +529,9 @@ class Conference extends Task {
|
|||||||
|
|
||||||
async _playHook(cs, dlg, hook, allowed = [TaskName.Play, TaskName.Say, TaskName.Pause]) {
|
async _playHook(cs, dlg, hook, allowed = [TaskName.Play, TaskName.Say, TaskName.Pause]) {
|
||||||
assert(!this._playSession);
|
assert(!this._playSession);
|
||||||
const json = await cs.application.requestor.request('verb:hook', hook, cs.callInfo);
|
const b3 = this.getTracingPropagation();
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
|
const json = await cs.application.requestor.request('verb:hook', hook, cs.callInfo, httpHeaders);
|
||||||
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
|
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
|
||||||
|
|
||||||
const allowedTasks = tasks.filter((t) => allowed.includes(t.name));
|
const allowedTasks = tasks.filter((t) => allowed.includes(t.name));
|
||||||
@@ -582,11 +584,14 @@ class Conference extends Task {
|
|||||||
|
|
||||||
_notifyConferenceEvent(cs, eventName, params = {}) {
|
_notifyConferenceEvent(cs, eventName, params = {}) {
|
||||||
if (this.statusEvents.includes(eventName)) {
|
if (this.statusEvents.includes(eventName)) {
|
||||||
|
const b3 = this.getTracingPropagation();
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
params.event = eventName;
|
params.event = eventName;
|
||||||
params.duration = (Date.now() - this.conferenceStartTime.getTime()) / 1000;
|
params.duration = (Date.now() - this.conferenceStartTime.getTime()) / 1000;
|
||||||
if (!params.time) params.time = (new Date()).toISOString();
|
if (!params.time) params.time = (new Date()).toISOString();
|
||||||
if (!params.members && typeof this.participantCount === 'number') params.members = this.participantCount;
|
if (!params.members && typeof this.participantCount === 'number') params.members = this.participantCount;
|
||||||
cs.application.requestor.request('verb:hook', this.statusHook, Object.assign(params, this.statusParams))
|
cs.application.requestor
|
||||||
|
.request('verb:hook', this.statusHook, Object.assign(params, this.statusParams, httpHeaders))
|
||||||
.catch((err) => this.logger.info(err, 'Conference:notifyConferenceEvent - error'));
|
.catch((err) => this.logger.info(err, 'Conference:notifyConferenceEvent - error'));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -75,6 +75,7 @@ class TaskConfig extends Task {
|
|||||||
cs.speechRecognizerLanguage = this.recognizer.language !== 'default'
|
cs.speechRecognizerLanguage = this.recognizer.language !== 'default'
|
||||||
? this.recognizer.language
|
? this.recognizer.language
|
||||||
: cs.speechRecognizerLanguage;
|
: cs.speechRecognizerLanguage;
|
||||||
|
this.gatherOpts.recognizer = this.recognizer;
|
||||||
this.logger.info({recognizer: this.recognizer}, 'Config: updated recognizer');
|
this.logger.info({recognizer: this.recognizer}, 'Config: updated recognizer');
|
||||||
}
|
}
|
||||||
if (this.hasBargeIn) {
|
if (this.hasBargeIn) {
|
||||||
|
|||||||
@@ -271,6 +271,9 @@ class TaskDial extends Task {
|
|||||||
const referring_call_sid = isChild ? callInfo.callSid : cs.callSid;
|
const referring_call_sid = isChild ? callInfo.callSid : cs.callSid;
|
||||||
const referred_call_sid = isChild ? callInfo.parentCallSid : this.sd.callSid;
|
const referred_call_sid = isChild ? callInfo.parentCallSid : this.sd.callSid;
|
||||||
|
|
||||||
|
const b3 = this.getTracingPropagation();
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
|
|
||||||
const to = parseUri(req.getParsedHeader('Refer-To').uri);
|
const to = parseUri(req.getParsedHeader('Refer-To').uri);
|
||||||
const by = parseUri(req.getParsedHeader('Referred-By').uri);
|
const by = parseUri(req.getParsedHeader('Referred-By').uri);
|
||||||
this.logger.info({to}, 'refer to parsed');
|
this.logger.info({to}, 'refer to parsed');
|
||||||
@@ -285,7 +288,7 @@ class TaskDial extends Task {
|
|||||||
referring_call_sid,
|
referring_call_sid,
|
||||||
referred_call_sid
|
referred_call_sid
|
||||||
}
|
}
|
||||||
});
|
}, httpHeaders);
|
||||||
res.send(202);
|
res.send(202);
|
||||||
this.logger.info('DialTask:handleRefer - sent 202 Accepted');
|
this.logger.info('DialTask:handleRefer - sent 202 Accepted');
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
@@ -345,8 +348,10 @@ class TaskDial extends Task {
|
|||||||
const key = arr[1];
|
const key = arr[1];
|
||||||
const match = dtmfDetector.keyPress(key);
|
const match = dtmfDetector.keyPress(key);
|
||||||
if (match) {
|
if (match) {
|
||||||
|
const b3 = this.getTracingPropagation();
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
this.logger.info({callSid}, `Dial:_onInfo triggered dtmf match: ${match}`);
|
this.logger.info({callSid}, `Dial:_onInfo triggered dtmf match: ${match}`);
|
||||||
requestor.request('verb:hook', this.dtmfHook, {dtmf: match, ...callInfo.toJSON()})
|
requestor.request('verb:hook', this.dtmfHook, {dtmf: match, ...callInfo.toJSON(), httpHeaders})
|
||||||
.catch((err) => this.logger.info(err, 'Dial:_onDtmf - error'));
|
.catch((err) => this.logger.info(err, 'Dial:_onDtmf - error'));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -453,7 +453,10 @@ class Dialogflow extends Task {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async _performHook(cs, hook, results = {}) {
|
async _performHook(cs, hook, results = {}) {
|
||||||
const json = await this.cs.requestor.request('verb:hook', hook, {...results, ...cs.callInfo.toJSON()});
|
const b3 = this.getTracingPropagation();
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
|
const json = await this.cs.requestor.request('verb:hook', hook,
|
||||||
|
{...results, ...cs.callInfo.toJSON()}, httpHeaders);
|
||||||
if (json && Array.isArray(json)) {
|
if (json && Array.isArray(json)) {
|
||||||
const makeTask = require('../make_task');
|
const makeTask = require('../make_task');
|
||||||
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
|
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
|
||||||
|
|||||||
@@ -302,6 +302,8 @@ class TaskEnqueue extends Task {
|
|||||||
|
|
||||||
async _playHook(cs, dlg, hook, allowed = [TaskName.Play, TaskName.Say, TaskName.Pause, TaskName.Leave]) {
|
async _playHook(cs, dlg, hook, allowed = [TaskName.Play, TaskName.Say, TaskName.Pause, TaskName.Leave]) {
|
||||||
const {lengthOfList, getListPosition} = cs.srf.locals.dbHelpers;
|
const {lengthOfList, getListPosition} = cs.srf.locals.dbHelpers;
|
||||||
|
const b3 = this.getTracingPropagation();
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
|
|
||||||
assert(!this._playSession);
|
assert(!this._playSession);
|
||||||
if (this.killed) return [];
|
if (this.killed) return [];
|
||||||
@@ -317,7 +319,7 @@ class TaskEnqueue extends Task {
|
|||||||
} catch (err) {
|
} catch (err) {
|
||||||
this.logger.error({err}, `TaskEnqueue:_playHook error retrieving list info for queue ${this.queueName}`);
|
this.logger.error({err}, `TaskEnqueue:_playHook error retrieving list info for queue ${this.queueName}`);
|
||||||
}
|
}
|
||||||
const json = await cs.application.requestor.request('verb:hook', hook, params);
|
const json = await cs.application.requestor.request('verb:hook', hook, params, httpHeaders);
|
||||||
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
|
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
|
||||||
|
|
||||||
const allowedTasks = tasks.filter((t) => allowed.includes(t.name));
|
const allowedTasks = tasks.filter((t) => allowed.includes(t.name));
|
||||||
|
|||||||
@@ -184,13 +184,13 @@ class TaskGather extends Task {
|
|||||||
this._killAudio(cs);
|
this._killAudio(cs);
|
||||||
this.ep.removeAllListeners('dtmf');
|
this.ep.removeAllListeners('dtmf');
|
||||||
clearTimeout(this.interDigitTimer);
|
clearTimeout(this.interDigitTimer);
|
||||||
this._resolve('killed');
|
|
||||||
this.playTask?.span.end();
|
this.playTask?.span.end();
|
||||||
this.sayTask?.span.end();
|
this.sayTask?.span.end();
|
||||||
|
this._resolve('killed');
|
||||||
}
|
}
|
||||||
|
|
||||||
updateTimeout(timeout) {
|
updateTimeout(timeout) {
|
||||||
this.logger.info(`TaskGather:updateTimout - updating timeout to ${timeout}`);
|
this.logger.info(`TaskGather:updateTimeout - updating timeout to ${timeout}`);
|
||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
this._startTimer();
|
this._startTimer();
|
||||||
}
|
}
|
||||||
@@ -226,6 +226,7 @@ class TaskGather extends Task {
|
|||||||
if (this.vad?.enable) {
|
if (this.vad?.enable) {
|
||||||
opts.START_RECOGNIZING_ON_VAD = 1;
|
opts.START_RECOGNIZING_ON_VAD = 1;
|
||||||
if (this.vad.voiceMs) opts.RECOGNIZER_VAD_VOICE_MS = this.vad.voiceMs;
|
if (this.vad.voiceMs) opts.RECOGNIZER_VAD_VOICE_MS = this.vad.voiceMs;
|
||||||
|
else opts.RECOGNIZER_VAD_VOICE_MS = 125;
|
||||||
if (this.vad.mode >= 0 && this.vad.mode <= 3) opts.RECOGNIZER_VAD_MODE = this.vad.mode;
|
if (this.vad.mode >= 0 && this.vad.mode <= 3) opts.RECOGNIZER_VAD_MODE = this.vad.mode;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -283,6 +284,7 @@ class TaskGather extends Task {
|
|||||||
if (this.profanityOption && this.profanityOption !== 'raw') opts.AZURE_PROFANITY_OPTION = this.profanityOption;
|
if (this.profanityOption && this.profanityOption !== 'raw') opts.AZURE_PROFANITY_OPTION = this.profanityOption;
|
||||||
if (this.azureServiceEndpoint) opts.AZURE_SERVICE_ENDPOINT = this.azureServiceEndpoint;
|
if (this.azureServiceEndpoint) opts.AZURE_SERVICE_ENDPOINT = this.azureServiceEndpoint;
|
||||||
if (this.initialSpeechTimeoutMs > 0) opts.AZURE_INITIAL_SPEECH_TIMEOUT_MS = this.initialSpeechTimeoutMs;
|
if (this.initialSpeechTimeoutMs > 0) opts.AZURE_INITIAL_SPEECH_TIMEOUT_MS = this.initialSpeechTimeoutMs;
|
||||||
|
else if (this.timeout === 0) opts.AZURE_INITIAL_SPEECH_TIMEOUT_MS = 120000; // lengthy
|
||||||
opts.AZURE_USE_OUTPUT_FORMAT_DETAILED = 1;
|
opts.AZURE_USE_OUTPUT_FORMAT_DETAILED = 1;
|
||||||
|
|
||||||
ep.addCustomEventListener(AzureTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
|
ep.addCustomEventListener(AzureTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep));
|
||||||
@@ -317,6 +319,10 @@ class TaskGather extends Task {
|
|||||||
|
|
||||||
_startTimer() {
|
_startTimer() {
|
||||||
if (0 === this.timeout) return;
|
if (0 === this.timeout) return;
|
||||||
|
if (this._timeoutTimer) {
|
||||||
|
clearTimeout(this._timeoutTimer);
|
||||||
|
this._timeoutTimer = null;
|
||||||
|
}
|
||||||
assert(!this._timeoutTimer);
|
assert(!this._timeoutTimer);
|
||||||
this.logger.debug(`Gather:_startTimer: timeout ${this.timeout}`);
|
this.logger.debug(`Gather:_startTimer: timeout ${this.timeout}`);
|
||||||
this._timeoutTimer = setTimeout(() => this._resolve('timeout'), this.timeout);
|
this._timeoutTimer = setTimeout(() => this._resolve('timeout'), this.timeout);
|
||||||
@@ -356,7 +362,7 @@ class TaskGather extends Task {
|
|||||||
if ('microsoft' === this.vendor) {
|
if ('microsoft' === this.vendor) {
|
||||||
const final = evt.RecognitionStatus === 'Success';
|
const final = evt.RecognitionStatus === 'Success';
|
||||||
if (final) {
|
if (final) {
|
||||||
const nbest = evt.NBest;
|
const nbest = evt.NBest.sort((a, b) => b.Confidence - a.Confidence);
|
||||||
evt = {
|
evt = {
|
||||||
is_final: true,
|
is_final: true,
|
||||||
alternatives: [
|
alternatives: [
|
||||||
@@ -378,7 +384,13 @@ class TaskGather extends Task {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (evt.is_final) this._resolve('speech', evt);
|
if (evt.is_final) {
|
||||||
|
if (evt.alternatives[0].transcript === '') {
|
||||||
|
this.logger.info({evt}, 'TaskGather:_onTranscription - got empty transcript, listen again');
|
||||||
|
return this._startTranscribing(ep);
|
||||||
|
}
|
||||||
|
this._resolve('speech', evt);
|
||||||
|
}
|
||||||
else {
|
else {
|
||||||
/* google has a measure of stability:
|
/* google has a measure of stability:
|
||||||
https://cloud.google.com/speech-to-text/docs/basics#streaming_responses
|
https://cloud.google.com/speech-to-text/docs/basics#streaming_responses
|
||||||
@@ -394,7 +406,10 @@ class TaskGather extends Task {
|
|||||||
this._killAudio(cs);
|
this._killAudio(cs);
|
||||||
}
|
}
|
||||||
if (this.partialResultHook) {
|
if (this.partialResultHook) {
|
||||||
this.cs.requestor.request(this.partialResultHook, Object.assign({speech: evt}, this.cs.callInfo));
|
const b3 = this.getTracingPropagation();
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
|
this.cs.requestor.request(this.partialResultHook, Object.assign({speech: evt},
|
||||||
|
this.cs.callInfo, httpHeaders));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -423,6 +438,8 @@ class TaskGather extends Task {
|
|||||||
|
|
||||||
async _resolve(reason, evt) {
|
async _resolve(reason, evt) {
|
||||||
if (this.resolved) return;
|
if (this.resolved) return;
|
||||||
|
if (this.callSession && this.callSession.callGone) return;
|
||||||
|
|
||||||
this.resolved = true;
|
this.resolved = true;
|
||||||
this.logger.info(`TaskGather:resolve with reason ${reason}`);
|
this.logger.info(`TaskGather:resolve with reason ${reason}`);
|
||||||
clearTimeout(this.interDigitTimer);
|
clearTimeout(this.interDigitTimer);
|
||||||
|
|||||||
@@ -289,7 +289,9 @@ class Lex extends Task {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async _performHook(cs, hook, results) {
|
async _performHook(cs, hook, results) {
|
||||||
const json = await this.cs.requestor.request('verb:hook', hook, results);
|
const b3 = this.getTracingPropagation();
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
|
const json = await this.cs.requestor.request('verb:hook', hook, results, httpHeaders);
|
||||||
if (json && Array.isArray(json)) {
|
if (json && Array.isArray(json)) {
|
||||||
const makeTask = require('./make_task');
|
const makeTask = require('./make_task');
|
||||||
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
|
const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata));
|
||||||
|
|||||||
@@ -48,7 +48,9 @@ class TaskRestDial extends Task {
|
|||||||
cs.setDialog(dlg);
|
cs.setDialog(dlg);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const tasks = await cs.requestor.request('verb:hook', this.call_hook, cs.callInfo);
|
const b3 = this.getTracingPropagation();
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
|
const tasks = await cs.requestor.request('verb:hook', this.call_hook, cs.callInfo, httpHeaders);
|
||||||
if (tasks && Array.isArray(tasks)) {
|
if (tasks && Array.isArray(tasks)) {
|
||||||
this.logger.debug({tasks: tasks}, `TaskRestDial: replacing application with ${tasks.length} tasks`);
|
this.logger.debug({tasks: tasks}, `TaskRestDial: replacing application with ${tasks.length} tasks`);
|
||||||
cs.replaceApplication(normalizeJambones(this.logger, tasks).map((tdata) => makeTask(this.logger, tdata)));
|
cs.replaceApplication(normalizeJambones(this.logger, tasks).map((tdata) => makeTask(this.logger, tdata)));
|
||||||
|
|||||||
@@ -44,7 +44,6 @@ class TaskSay extends Task {
|
|||||||
|
|
||||||
this.logger.info({vendor, language, voice}, 'TaskSay:exec');
|
this.logger.info({vendor, language, voice}, 'TaskSay:exec');
|
||||||
this.ep = ep;
|
this.ep = ep;
|
||||||
let span;
|
|
||||||
try {
|
try {
|
||||||
if (!credentials) {
|
if (!credentials) {
|
||||||
writeAlerts({
|
writeAlerts({
|
||||||
@@ -58,11 +57,12 @@ class TaskSay extends Task {
|
|||||||
let lastUpdated = false;
|
let lastUpdated = false;
|
||||||
|
|
||||||
/* otel: trace time for tts */
|
/* otel: trace time for tts */
|
||||||
span = this.startSpan('tts-generation', {
|
const {span} = this.startChildSpan('tts-generation', {
|
||||||
'tts.vendor': vendor,
|
'tts.vendor': vendor,
|
||||||
'tts.language': language,
|
'tts.language': language,
|
||||||
'tts.voice': voice
|
'tts.voice': voice
|
||||||
});
|
});
|
||||||
|
this.ttsSpan = span;
|
||||||
|
|
||||||
const filepath = (await Promise.all(this.text.map(async(text) => {
|
const filepath = (await Promise.all(this.text.map(async(text) => {
|
||||||
if (this.killed) return;
|
if (this.killed) return;
|
||||||
@@ -91,10 +91,10 @@ class TaskSay extends Task {
|
|||||||
updateSpeechCredentialLastUsed(credentials.speech_credential_sid)
|
updateSpeechCredentialLastUsed(credentials.speech_credential_sid)
|
||||||
.catch(() => {/*already logged error */});
|
.catch(() => {/*already logged error */});
|
||||||
}
|
}
|
||||||
span.setAttributes({'tts.cached': servedFromCache});
|
this.ttsSpan.setAttributes({'tts.cached': servedFromCache});
|
||||||
return filePath;
|
return filePath;
|
||||||
}))).filter((fp) => fp && fp.length);
|
}))).filter((fp) => fp && fp.length);
|
||||||
span?.end();
|
this.ttsSpan?.end();
|
||||||
this.logger.debug({filepath}, 'synthesized files for tts');
|
this.logger.debug({filepath}, 'synthesized files for tts');
|
||||||
|
|
||||||
while (!this.killed && (this.loop === 'forever' || this.loop--) && this.ep?.connected) {
|
while (!this.killed && (this.loop === 'forever' || this.loop--) && this.ep?.connected) {
|
||||||
@@ -112,7 +112,7 @@ class TaskSay extends Task {
|
|||||||
} while (!this.killed && ++segment < filepath.length);
|
} while (!this.killed && ++segment < filepath.length);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
span?.end();
|
this.ttsSpan?.end();
|
||||||
this.logger.info(err, 'TaskSay:exec error');
|
this.logger.info(err, 'TaskSay:exec error');
|
||||||
}
|
}
|
||||||
this.emit('playDone');
|
this.emit('playDone');
|
||||||
|
|||||||
@@ -76,7 +76,10 @@ class TaskSipRefer extends Task {
|
|||||||
const status = arr[1];
|
const status = arr[1];
|
||||||
this.logger.debug(`TaskSipRefer:_handleNotify: call got status ${status}`);
|
this.logger.debug(`TaskSipRefer:_handleNotify: call got status ${status}`);
|
||||||
if (this.eventHook) {
|
if (this.eventHook) {
|
||||||
await cs.requestor.request('verb:hook', this.eventHook, {event: 'transfer-status', call_status: status});
|
const b3 = this.getTracingPropagation();
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
|
await cs.requestor.request('verb:hook', this.eventHook,
|
||||||
|
{event: 'transfer-status', call_status: status}, httpHeaders);
|
||||||
}
|
}
|
||||||
if (status >= 200) {
|
if (status >= 200) {
|
||||||
this.referSpan.setAttributes({'refer.finalNotify': status});
|
this.referSpan.setAttributes({'refer.finalNotify': status});
|
||||||
|
|||||||
@@ -90,6 +90,16 @@ class Task extends Emitter {
|
|||||||
return {span, ctx};
|
return {span, ctx};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getTracingPropagation(encoding, span) {
|
||||||
|
// TODO: support encodings beyond b3 https://github.com/openzipkin/b3-propagation
|
||||||
|
if (span) {
|
||||||
|
return `${span.spanContext().traceId}-${span.spanContext().spanId}-1`;
|
||||||
|
}
|
||||||
|
if (this.span) {
|
||||||
|
return `${this.span.spanContext().traceId}-${this.span.spanContext().spanId}-1`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* when a subclass Task has completed its work, it should call this method
|
* when a subclass Task has completed its work, it should call this method
|
||||||
*/
|
*/
|
||||||
@@ -131,9 +141,11 @@ class Task extends Emitter {
|
|||||||
if (this.actionHook) {
|
if (this.actionHook) {
|
||||||
const params = results ? Object.assign(results, this.cs.callInfo.toJSON()) : this.cs.callInfo.toJSON();
|
const params = results ? Object.assign(results, this.cs.callInfo.toJSON()) : this.cs.callInfo.toJSON();
|
||||||
const span = this.startSpan('verb:hook', {'hook.url': this.actionHook});
|
const span = this.startSpan('verb:hook', {'hook.url': this.actionHook});
|
||||||
|
const b3 = this.getTracingPropagation('b3', span);
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
span.setAttributes({'http.body': JSON.stringify(params)});
|
span.setAttributes({'http.body': JSON.stringify(params)});
|
||||||
try {
|
try {
|
||||||
const json = await this.cs.requestor.request('verb:hook', this.actionHook, params);
|
const json = await this.cs.requestor.request('verb:hook', this.actionHook, params, httpHeaders);
|
||||||
span.setAttributes({'http.statusCode': 200});
|
span.setAttributes({'http.statusCode': 200});
|
||||||
span.end();
|
span.end();
|
||||||
if (expectResponse && json && Array.isArray(json)) {
|
if (expectResponse && json && Array.isArray(json)) {
|
||||||
@@ -154,9 +166,11 @@ class Task extends Emitter {
|
|||||||
|
|
||||||
async performHook(cs, hook, results) {
|
async performHook(cs, hook, results) {
|
||||||
const span = this.startSpan('verb:hook', {'hook.url': hook});
|
const span = this.startSpan('verb:hook', {'hook.url': hook});
|
||||||
|
const b3 = this.getTracingPropagation('b3', span);
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
span.setAttributes({'http.body': JSON.stringify(results)});
|
span.setAttributes({'http.body': JSON.stringify(results)});
|
||||||
try {
|
try {
|
||||||
const json = await cs.requestor.request('verb:hook', hook, results);
|
const json = await cs.requestor.request('verb:hook', hook, results, httpHeaders);
|
||||||
span.setAttributes({'http.statusCode': 200});
|
span.setAttributes({'http.statusCode': 200});
|
||||||
span.end();
|
span.end();
|
||||||
if (json && Array.isArray(json)) {
|
if (json && Array.isArray(json)) {
|
||||||
|
|||||||
@@ -254,7 +254,10 @@ class TaskTranscribe extends Task {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (this.transcriptionHook) {
|
if (this.transcriptionHook) {
|
||||||
this.cs.requestor.request('verb:hook', this.transcriptionHook, Object.assign({speech: evt}, this.cs.callInfo))
|
const b3 = this.getTracingPropagation();
|
||||||
|
const httpHeaders = b3 && {b3};
|
||||||
|
this.cs.requestor.request('verb:hook', this.transcriptionHook,
|
||||||
|
Object.assign({speech: evt}, this.cs.callInfo), httpHeaders)
|
||||||
.catch((err) => this.logger.info(err, 'TranscribeTask:_onTranscription error'));
|
.catch((err) => this.logger.info(err, 'TranscribeTask:_onTranscription error'));
|
||||||
}
|
}
|
||||||
if (this.parentTask) {
|
if (this.parentTask) {
|
||||||
|
|||||||
@@ -44,6 +44,21 @@ class RootSpan {
|
|||||||
return this._span.spanContext().traceId;
|
return this._span.spanContext().traceId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
get spanId() {
|
||||||
|
return this._span.spanContext().spanId;
|
||||||
|
}
|
||||||
|
|
||||||
|
get traceFlags() {
|
||||||
|
return this._span.spanContext().traceFlags;
|
||||||
|
}
|
||||||
|
|
||||||
|
getTracingPropagation(encoding) {
|
||||||
|
// TODO: support encodings beyond b3 https://github.com/openzipkin/b3-propagation
|
||||||
|
if (this._span && this.traceId !== '00000000000000000000000000000000') {
|
||||||
|
return `${this.traceId}-${this.spanId}-1`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
setAttributes(attrs) {
|
setAttributes(attrs) {
|
||||||
this._span.setAttributes(attrs);
|
this._span.setAttributes(attrs);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ class HttpRequestor extends BaseRequestor {
|
|||||||
* @param {string} [hook.password] - if basic auth is protecting the endpoint
|
* @param {string} [hook.password] - if basic auth is protecting the endpoint
|
||||||
* @param {object} [params] - request parameters
|
* @param {object} [params] - request parameters
|
||||||
*/
|
*/
|
||||||
async request(type, hook, params) {
|
async request(type, hook, params, httpHeaders = {}) {
|
||||||
assert(HookMsgTypes.includes(type));
|
assert(HookMsgTypes.includes(type));
|
||||||
const payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null;
|
const payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null;
|
||||||
const url = hook.url || hook;
|
const url = hook.url || hook;
|
||||||
@@ -64,8 +64,8 @@ class HttpRequestor extends BaseRequestor {
|
|||||||
let buf;
|
let buf;
|
||||||
try {
|
try {
|
||||||
const sigHeader = this._generateSigHeader(payload, this.secret);
|
const sigHeader = this._generateSigHeader(payload, this.secret);
|
||||||
const headers = {...sigHeader, ...this.authHeader};
|
const headers = {...sigHeader, ...this.authHeader, ...httpHeaders};
|
||||||
//this.logger.info({url, headers}, 'send webhook');
|
this.logger.debug({url, headers}, 'send webhook');
|
||||||
buf = this._isRelativeUrl(url) ?
|
buf = this._isRelativeUrl(url) ?
|
||||||
await this.post(url, payload, headers) :
|
await this.post(url, payload, headers) :
|
||||||
await bent(method, 'buffer', 200, 201, 202)(url, payload, headers);
|
await bent(method, 'buffer', 200, 201, 202)(url, payload, headers);
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ class WsRequestor extends BaseRequestor {
|
|||||||
* @param {string} [hook.password] - if basic auth is protecting the endpoint
|
* @param {string} [hook.password] - if basic auth is protecting the endpoint
|
||||||
* @param {object} [params] - request parameters
|
* @param {object} [params] - request parameters
|
||||||
*/
|
*/
|
||||||
async request(type, hook, params) {
|
async request(type, hook, params, httpHeaders = {}) {
|
||||||
assert(HookMsgTypes.includes(type));
|
assert(HookMsgTypes.includes(type));
|
||||||
const url = hook.url || hook;
|
const url = hook.url || hook;
|
||||||
|
|
||||||
@@ -48,7 +48,7 @@ class WsRequestor extends BaseRequestor {
|
|||||||
if (this._isAbsoluteUrl(url) && url.startsWith('http')) {
|
if (this._isAbsoluteUrl(url) && url.startsWith('http')) {
|
||||||
this.logger.debug({hook}, 'WsRequestor: sending a webhook (HTTP)');
|
this.logger.debug({hook}, 'WsRequestor: sending a webhook (HTTP)');
|
||||||
const requestor = new HttpRequestor(this.logger, this.account_sid, hook, this.secret);
|
const requestor = new HttpRequestor(this.logger, this.account_sid, hook, this.secret);
|
||||||
return requestor.request(type, hook, params);
|
return requestor.request(type, hook, params, httpHeaders);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* connect if necessary */
|
/* connect if necessary */
|
||||||
@@ -73,12 +73,14 @@ class WsRequestor extends BaseRequestor {
|
|||||||
assert.ok(url, 'WsRequestor:request url was not provided');
|
assert.ok(url, 'WsRequestor:request url was not provided');
|
||||||
|
|
||||||
const msgid = short.generate();
|
const msgid = short.generate();
|
||||||
|
const b3 = httpHeaders?.b3 ? {b3: httpHeaders.b3} : {};
|
||||||
const obj = {
|
const obj = {
|
||||||
type,
|
type,
|
||||||
msgid,
|
msgid,
|
||||||
call_sid: this.call_sid,
|
call_sid: this.call_sid,
|
||||||
hook: type === 'verb:hook' ? url : undefined,
|
hook: type === 'verb:hook' ? url : undefined,
|
||||||
data: {...payload}
|
data: {...payload},
|
||||||
|
...b3
|
||||||
};
|
};
|
||||||
|
|
||||||
//this.logger.debug({obj}, `websocket: sending (${url})`);
|
//this.logger.debug({obj}, `websocket: sending (${url})`);
|
||||||
|
|||||||
Reference in New Issue
Block a user