diff --git a/lib/http-routes/api/create-call.js b/lib/http-routes/api/create-call.js index 8300c3ee..a30ed5ac 100644 --- a/lib/http-routes/api/create-call.js +++ b/lib/http-routes/api/create-call.js @@ -132,8 +132,14 @@ router.post('/', async(req, res) => { try { const dlg = await srf.createUAC(uri, {...opts, followRedirects: true, keepUriOnRedirect: true}, { cbRequest: (err, inviteReq) => { - /* in case of 302 redirect, this gets called twice, ignore the second */ - if (res.headersSent) return; + /* in case of 302 redirect, this gets called twice, ignore the second + except to update the req so that it can later be canceled if need be + */ + if (res.headersSent) { + logger.info(`create-call: got redirect, updating request to new call-id ${req.get('Call-ID')}`); + if (cs) cs.req = inviteReq; + return; + } if (err) { logger.error(err, 'createCall Error creating call'); diff --git a/lib/middleware.js b/lib/middleware.js index f04d6055..6c7f2644 100644 --- a/lib/middleware.js +++ b/lib/middleware.js @@ -228,7 +228,6 @@ module.exports = function(srf, logger) { const {rootSpan, application:app} = req.locals; let span; try { - if (app.tasks) { app.tasks = normalizeJambones(logger, app.tasks).map((tdata) => makeTask(logger, tdata)); if (0 === app.tasks.length) throw new Error('no application provided'); @@ -239,7 +238,9 @@ module.exports = function(srf, logger) { req.locals.callInfo); const obj = rootSpan.startChildSpan('performAppWebhook'); span = obj.span; - const json = await app.requestor.request('session:new', app.call_hook, params); + const b3 = rootSpan.getTracingPropagation(); + const httpHeaders = b3 && {b3}; + const json = await app.requestor.request('session:new', app.call_hook, params, httpHeaders); app.tasks = normalizeJambones(logger, json).map((tdata) => makeTask(logger, tdata)); span.setAttributes({ 'http.statusCode': 200, diff --git a/lib/session/call-session.js b/lib/session/call-session.js index 014cef35..b679590f 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -230,6 +230,10 @@ class CallSession extends Emitter { return this.backgroundGatherTask; } + get b3() { + return this.rootSpan?.getTracingPropagation(); + } + async enableBotMode(gather, autoEnable) { try { const t = normalizeJambones(this.logger, [gather]); @@ -512,17 +516,20 @@ class CallSession extends Emitter { async _lccCallHook(opts) { const webhooks = []; let sd, tasks, childTasks; + const b3 = this.b3; + const httpHeaders = b3 && {b3}; if (opts.call_hook || opts.child_call_hook) { if (opts.call_hook) { - webhooks.push(this.requestor.request('session:redirect', opts.call_hook, this.callInfo.toJSON())); + webhooks.push(this.requestor.request('session:redirect', opts.call_hook, this.callInfo.toJSON(), httpHeaders)); } if (opts.child_call_hook) { /* child call hook only allowed from a connected Dial state */ const task = this.currentTask; sd = task.sd; if (task && TaskName.Dial === task.name && sd) { - webhooks.push(this.requestor.request('session:redirect', opts.child_call_hook, sd.callInfo.toJSON())); + webhooks.push(this.requestor.request( + 'session:redirect', opts.child_call_hook, sd.callInfo.toJSON(), httpHeaders)); } } const [tasks1, tasks2] = await Promise.all(webhooks); @@ -641,6 +648,8 @@ class CallSession extends Emitter { async _lccWhisper(opts, callSid) { const {whisper} = opts; let tasks; + const b3 = this.b3; + const httpHeaders = b3 && {b3}; // this whole thing requires us to be in a Dial verb const task = this.currentTask; @@ -651,7 +660,7 @@ class CallSession extends Emitter { // allow user to provide a url object, a url string, an array of tasks, or a single task if (typeof whisper === 'string' || (typeof whisper === 'object' && whisper.url)) { // retrieve a url - const json = await this.requestor(opts.call_hook, this.callInfo.toJSON()); + const json = await this.requestor(opts.call_hook, this.callInfo.toJSON(), httpHeaders); tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); } else if (Array.isArray(whisper)) { @@ -1264,7 +1273,9 @@ class CallSession extends Emitter { const {span} = this.rootSpan.startChildSpan(`call-status:${this.callInfo.callStatus}`); span.setAttributes(this.callInfo.toJSON()); try { - this.notifier.request('call:status', this.call_status_hook, this.callInfo.toJSON()); + const b3 = this.b3; + const httpHeaders = b3 && {b3}; + this.notifier.request('call:status', this.call_status_hook, this.callInfo.toJSON(), httpHeaders); span.end(); } catch (err) { span.end(); diff --git a/lib/tasks/conference.js b/lib/tasks/conference.js index 60782265..b7cf933d 100644 --- a/lib/tasks/conference.js +++ b/lib/tasks/conference.js @@ -529,7 +529,9 @@ class Conference extends Task { async _playHook(cs, dlg, hook, allowed = [TaskName.Play, TaskName.Say, TaskName.Pause]) { assert(!this._playSession); - const json = await cs.application.requestor.request('verb:hook', hook, cs.callInfo); + const b3 = this.getTracingPropagation(); + const httpHeaders = b3 && {b3}; + const json = await cs.application.requestor.request('verb:hook', hook, cs.callInfo, httpHeaders); const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); const allowedTasks = tasks.filter((t) => allowed.includes(t.name)); @@ -582,11 +584,14 @@ class Conference extends Task { _notifyConferenceEvent(cs, eventName, params = {}) { if (this.statusEvents.includes(eventName)) { + const b3 = this.getTracingPropagation(); + const httpHeaders = b3 && {b3}; params.event = eventName; params.duration = (Date.now() - this.conferenceStartTime.getTime()) / 1000; if (!params.time) params.time = (new Date()).toISOString(); if (!params.members && typeof this.participantCount === 'number') params.members = this.participantCount; - cs.application.requestor.request('verb:hook', this.statusHook, Object.assign(params, this.statusParams)) + cs.application.requestor + .request('verb:hook', this.statusHook, Object.assign(params, this.statusParams, httpHeaders)) .catch((err) => this.logger.info(err, 'Conference:notifyConferenceEvent - error')); } } diff --git a/lib/tasks/config.js b/lib/tasks/config.js index cdf18f3f..79becf9e 100644 --- a/lib/tasks/config.js +++ b/lib/tasks/config.js @@ -75,6 +75,7 @@ class TaskConfig extends Task { cs.speechRecognizerLanguage = this.recognizer.language !== 'default' ? this.recognizer.language : cs.speechRecognizerLanguage; + this.gatherOpts.recognizer = this.recognizer; this.logger.info({recognizer: this.recognizer}, 'Config: updated recognizer'); } if (this.hasBargeIn) { diff --git a/lib/tasks/dial.js b/lib/tasks/dial.js index 94df3a98..36952b38 100644 --- a/lib/tasks/dial.js +++ b/lib/tasks/dial.js @@ -271,6 +271,9 @@ class TaskDial extends Task { const referring_call_sid = isChild ? callInfo.callSid : cs.callSid; const referred_call_sid = isChild ? callInfo.parentCallSid : this.sd.callSid; + const b3 = this.getTracingPropagation(); + const httpHeaders = b3 && {b3}; + const to = parseUri(req.getParsedHeader('Refer-To').uri); const by = parseUri(req.getParsedHeader('Referred-By').uri); this.logger.info({to}, 'refer to parsed'); @@ -285,7 +288,7 @@ class TaskDial extends Task { referring_call_sid, referred_call_sid } - }); + }, httpHeaders); res.send(202); this.logger.info('DialTask:handleRefer - sent 202 Accepted'); } catch (err) { @@ -345,8 +348,10 @@ class TaskDial extends Task { const key = arr[1]; const match = dtmfDetector.keyPress(key); if (match) { + const b3 = this.getTracingPropagation(); + const httpHeaders = b3 && {b3}; this.logger.info({callSid}, `Dial:_onInfo triggered dtmf match: ${match}`); - requestor.request('verb:hook', this.dtmfHook, {dtmf: match, ...callInfo.toJSON()}) + requestor.request('verb:hook', this.dtmfHook, {dtmf: match, ...callInfo.toJSON(), httpHeaders}) .catch((err) => this.logger.info(err, 'Dial:_onDtmf - error')); } } diff --git a/lib/tasks/dialogflow/index.js b/lib/tasks/dialogflow/index.js index a56b072b..638c59a3 100644 --- a/lib/tasks/dialogflow/index.js +++ b/lib/tasks/dialogflow/index.js @@ -453,7 +453,10 @@ class Dialogflow extends Task { } async _performHook(cs, hook, results = {}) { - const json = await this.cs.requestor.request('verb:hook', hook, {...results, ...cs.callInfo.toJSON()}); + const b3 = this.getTracingPropagation(); + const httpHeaders = b3 && {b3}; + const json = await this.cs.requestor.request('verb:hook', hook, + {...results, ...cs.callInfo.toJSON()}, httpHeaders); if (json && Array.isArray(json)) { const makeTask = require('../make_task'); const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); diff --git a/lib/tasks/enqueue.js b/lib/tasks/enqueue.js index e9d975d6..65eaacb3 100644 --- a/lib/tasks/enqueue.js +++ b/lib/tasks/enqueue.js @@ -302,6 +302,8 @@ class TaskEnqueue extends Task { async _playHook(cs, dlg, hook, allowed = [TaskName.Play, TaskName.Say, TaskName.Pause, TaskName.Leave]) { const {lengthOfList, getListPosition} = cs.srf.locals.dbHelpers; + const b3 = this.getTracingPropagation(); + const httpHeaders = b3 && {b3}; assert(!this._playSession); if (this.killed) return []; @@ -317,7 +319,7 @@ class TaskEnqueue extends Task { } catch (err) { this.logger.error({err}, `TaskEnqueue:_playHook error retrieving list info for queue ${this.queueName}`); } - const json = await cs.application.requestor.request('verb:hook', hook, params); + const json = await cs.application.requestor.request('verb:hook', hook, params, httpHeaders); const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); const allowedTasks = tasks.filter((t) => allowed.includes(t.name)); diff --git a/lib/tasks/gather.js b/lib/tasks/gather.js index c3833813..3a932dcc 100644 --- a/lib/tasks/gather.js +++ b/lib/tasks/gather.js @@ -184,13 +184,13 @@ class TaskGather extends Task { this._killAudio(cs); this.ep.removeAllListeners('dtmf'); clearTimeout(this.interDigitTimer); - this._resolve('killed'); this.playTask?.span.end(); this.sayTask?.span.end(); + this._resolve('killed'); } updateTimeout(timeout) { - this.logger.info(`TaskGather:updateTimout - updating timeout to ${timeout}`); + this.logger.info(`TaskGather:updateTimeout - updating timeout to ${timeout}`); this.timeout = timeout; this._startTimer(); } @@ -226,6 +226,7 @@ class TaskGather extends Task { if (this.vad?.enable) { opts.START_RECOGNIZING_ON_VAD = 1; if (this.vad.voiceMs) opts.RECOGNIZER_VAD_VOICE_MS = this.vad.voiceMs; + else opts.RECOGNIZER_VAD_VOICE_MS = 125; if (this.vad.mode >= 0 && this.vad.mode <= 3) opts.RECOGNIZER_VAD_MODE = this.vad.mode; } @@ -283,6 +284,7 @@ class TaskGather extends Task { if (this.profanityOption && this.profanityOption !== 'raw') opts.AZURE_PROFANITY_OPTION = this.profanityOption; if (this.azureServiceEndpoint) opts.AZURE_SERVICE_ENDPOINT = this.azureServiceEndpoint; if (this.initialSpeechTimeoutMs > 0) opts.AZURE_INITIAL_SPEECH_TIMEOUT_MS = this.initialSpeechTimeoutMs; + else if (this.timeout === 0) opts.AZURE_INITIAL_SPEECH_TIMEOUT_MS = 120000; // lengthy opts.AZURE_USE_OUTPUT_FORMAT_DETAILED = 1; ep.addCustomEventListener(AzureTranscriptionEvents.Transcription, this._onTranscription.bind(this, cs, ep)); @@ -317,6 +319,10 @@ class TaskGather extends Task { _startTimer() { if (0 === this.timeout) return; + if (this._timeoutTimer) { + clearTimeout(this._timeoutTimer); + this._timeoutTimer = null; + } assert(!this._timeoutTimer); this.logger.debug(`Gather:_startTimer: timeout ${this.timeout}`); this._timeoutTimer = setTimeout(() => this._resolve('timeout'), this.timeout); @@ -356,7 +362,7 @@ class TaskGather extends Task { if ('microsoft' === this.vendor) { const final = evt.RecognitionStatus === 'Success'; if (final) { - const nbest = evt.NBest; + const nbest = evt.NBest.sort((a, b) => b.Confidence - a.Confidence); evt = { is_final: true, alternatives: [ @@ -378,7 +384,13 @@ class TaskGather extends Task { }; } } - if (evt.is_final) this._resolve('speech', evt); + if (evt.is_final) { + if (evt.alternatives[0].transcript === '') { + this.logger.info({evt}, 'TaskGather:_onTranscription - got empty transcript, listen again'); + return this._startTranscribing(ep); + } + this._resolve('speech', evt); + } else { /* google has a measure of stability: https://cloud.google.com/speech-to-text/docs/basics#streaming_responses @@ -394,7 +406,10 @@ class TaskGather extends Task { this._killAudio(cs); } if (this.partialResultHook) { - this.cs.requestor.request(this.partialResultHook, Object.assign({speech: evt}, this.cs.callInfo)); + const b3 = this.getTracingPropagation(); + const httpHeaders = b3 && {b3}; + this.cs.requestor.request(this.partialResultHook, Object.assign({speech: evt}, + this.cs.callInfo, httpHeaders)); } } } @@ -423,6 +438,8 @@ class TaskGather extends Task { async _resolve(reason, evt) { if (this.resolved) return; + if (this.callSession && this.callSession.callGone) return; + this.resolved = true; this.logger.info(`TaskGather:resolve with reason ${reason}`); clearTimeout(this.interDigitTimer); diff --git a/lib/tasks/lex.js b/lib/tasks/lex.js index c84bb058..7d0a3a65 100644 --- a/lib/tasks/lex.js +++ b/lib/tasks/lex.js @@ -289,7 +289,9 @@ class Lex extends Task { } async _performHook(cs, hook, results) { - const json = await this.cs.requestor.request('verb:hook', hook, results); + const b3 = this.getTracingPropagation(); + const httpHeaders = b3 && {b3}; + const json = await this.cs.requestor.request('verb:hook', hook, results, httpHeaders); if (json && Array.isArray(json)) { const makeTask = require('./make_task'); const tasks = normalizeJambones(this.logger, json).map((tdata) => makeTask(this.logger, tdata)); diff --git a/lib/tasks/rest_dial.js b/lib/tasks/rest_dial.js index 77fd6633..03362452 100644 --- a/lib/tasks/rest_dial.js +++ b/lib/tasks/rest_dial.js @@ -48,7 +48,9 @@ class TaskRestDial extends Task { cs.setDialog(dlg); try { - const tasks = await cs.requestor.request('verb:hook', this.call_hook, cs.callInfo); + const b3 = this.getTracingPropagation(); + const httpHeaders = b3 && {b3}; + const tasks = await cs.requestor.request('verb:hook', this.call_hook, cs.callInfo, httpHeaders); if (tasks && Array.isArray(tasks)) { this.logger.debug({tasks: tasks}, `TaskRestDial: replacing application with ${tasks.length} tasks`); cs.replaceApplication(normalizeJambones(this.logger, tasks).map((tdata) => makeTask(this.logger, tdata))); diff --git a/lib/tasks/say.js b/lib/tasks/say.js index 195f9f95..a6f09eb5 100644 --- a/lib/tasks/say.js +++ b/lib/tasks/say.js @@ -44,7 +44,6 @@ class TaskSay extends Task { this.logger.info({vendor, language, voice}, 'TaskSay:exec'); this.ep = ep; - let span; try { if (!credentials) { writeAlerts({ @@ -58,11 +57,12 @@ class TaskSay extends Task { let lastUpdated = false; /* otel: trace time for tts */ - span = this.startSpan('tts-generation', { + const {span} = this.startChildSpan('tts-generation', { 'tts.vendor': vendor, 'tts.language': language, 'tts.voice': voice }); + this.ttsSpan = span; const filepath = (await Promise.all(this.text.map(async(text) => { if (this.killed) return; @@ -91,10 +91,10 @@ class TaskSay extends Task { updateSpeechCredentialLastUsed(credentials.speech_credential_sid) .catch(() => {/*already logged error */}); } - span.setAttributes({'tts.cached': servedFromCache}); + this.ttsSpan.setAttributes({'tts.cached': servedFromCache}); return filePath; }))).filter((fp) => fp && fp.length); - span?.end(); + this.ttsSpan?.end(); this.logger.debug({filepath}, 'synthesized files for tts'); while (!this.killed && (this.loop === 'forever' || this.loop--) && this.ep?.connected) { @@ -112,7 +112,7 @@ class TaskSay extends Task { } while (!this.killed && ++segment < filepath.length); } } catch (err) { - span?.end(); + this.ttsSpan?.end(); this.logger.info(err, 'TaskSay:exec error'); } this.emit('playDone'); diff --git a/lib/tasks/sip_refer.js b/lib/tasks/sip_refer.js index c1fc1bbd..c36ad700 100644 --- a/lib/tasks/sip_refer.js +++ b/lib/tasks/sip_refer.js @@ -76,7 +76,10 @@ class TaskSipRefer extends Task { const status = arr[1]; this.logger.debug(`TaskSipRefer:_handleNotify: call got status ${status}`); if (this.eventHook) { - await cs.requestor.request('verb:hook', this.eventHook, {event: 'transfer-status', call_status: status}); + const b3 = this.getTracingPropagation(); + const httpHeaders = b3 && {b3}; + await cs.requestor.request('verb:hook', this.eventHook, + {event: 'transfer-status', call_status: status}, httpHeaders); } if (status >= 200) { this.referSpan.setAttributes({'refer.finalNotify': status}); diff --git a/lib/tasks/task.js b/lib/tasks/task.js index d5e9c7ff..ebe8d087 100644 --- a/lib/tasks/task.js +++ b/lib/tasks/task.js @@ -90,6 +90,16 @@ class Task extends Emitter { return {span, ctx}; } + getTracingPropagation(encoding, span) { + // TODO: support encodings beyond b3 https://github.com/openzipkin/b3-propagation + if (span) { + return `${span.spanContext().traceId}-${span.spanContext().spanId}-1`; + } + if (this.span) { + return `${this.span.spanContext().traceId}-${this.span.spanContext().spanId}-1`; + } + } + /** * when a subclass Task has completed its work, it should call this method */ @@ -131,9 +141,11 @@ class Task extends Emitter { if (this.actionHook) { const params = results ? Object.assign(results, this.cs.callInfo.toJSON()) : this.cs.callInfo.toJSON(); const span = this.startSpan('verb:hook', {'hook.url': this.actionHook}); + const b3 = this.getTracingPropagation('b3', span); + const httpHeaders = b3 && {b3}; span.setAttributes({'http.body': JSON.stringify(params)}); try { - const json = await this.cs.requestor.request('verb:hook', this.actionHook, params); + const json = await this.cs.requestor.request('verb:hook', this.actionHook, params, httpHeaders); span.setAttributes({'http.statusCode': 200}); span.end(); if (expectResponse && json && Array.isArray(json)) { @@ -154,9 +166,11 @@ class Task extends Emitter { async performHook(cs, hook, results) { const span = this.startSpan('verb:hook', {'hook.url': hook}); + const b3 = this.getTracingPropagation('b3', span); + const httpHeaders = b3 && {b3}; span.setAttributes({'http.body': JSON.stringify(results)}); try { - const json = await cs.requestor.request('verb:hook', hook, results); + const json = await cs.requestor.request('verb:hook', hook, results, httpHeaders); span.setAttributes({'http.statusCode': 200}); span.end(); if (json && Array.isArray(json)) { diff --git a/lib/tasks/transcribe.js b/lib/tasks/transcribe.js index 5094df89..afcbe085 100644 --- a/lib/tasks/transcribe.js +++ b/lib/tasks/transcribe.js @@ -254,7 +254,10 @@ class TaskTranscribe extends Task { } if (this.transcriptionHook) { - this.cs.requestor.request('verb:hook', this.transcriptionHook, Object.assign({speech: evt}, this.cs.callInfo)) + const b3 = this.getTracingPropagation(); + const httpHeaders = b3 && {b3}; + this.cs.requestor.request('verb:hook', this.transcriptionHook, + Object.assign({speech: evt}, this.cs.callInfo), httpHeaders) .catch((err) => this.logger.info(err, 'TranscribeTask:_onTranscription error')); } if (this.parentTask) { diff --git a/lib/utils/call-tracer.js b/lib/utils/call-tracer.js index 75586cd0..b6b229e8 100644 --- a/lib/utils/call-tracer.js +++ b/lib/utils/call-tracer.js @@ -44,6 +44,21 @@ class RootSpan { return this._span.spanContext().traceId; } + get spanId() { + return this._span.spanContext().spanId; + } + + get traceFlags() { + return this._span.spanContext().traceFlags; + } + + getTracingPropagation(encoding) { + // TODO: support encodings beyond b3 https://github.com/openzipkin/b3-propagation + if (this._span && this.traceId !== '00000000000000000000000000000000') { + return `${this.traceId}-${this.spanId}-1`; + } + } + setAttributes(attrs) { this._span.setAttributes(attrs); } diff --git a/lib/utils/http-requestor.js b/lib/utils/http-requestor.js index 922162be..50ab80d4 100644 --- a/lib/utils/http-requestor.js +++ b/lib/utils/http-requestor.js @@ -49,7 +49,7 @@ class HttpRequestor extends BaseRequestor { * @param {string} [hook.password] - if basic auth is protecting the endpoint * @param {object} [params] - request parameters */ - async request(type, hook, params) { + async request(type, hook, params, httpHeaders = {}) { assert(HookMsgTypes.includes(type)); const payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null; const url = hook.url || hook; @@ -64,8 +64,8 @@ class HttpRequestor extends BaseRequestor { let buf; try { const sigHeader = this._generateSigHeader(payload, this.secret); - const headers = {...sigHeader, ...this.authHeader}; - //this.logger.info({url, headers}, 'send webhook'); + const headers = {...sigHeader, ...this.authHeader, ...httpHeaders}; + this.logger.debug({url, headers}, 'send webhook'); buf = this._isRelativeUrl(url) ? await this.post(url, payload, headers) : await bent(method, 'buffer', 200, 201, 202)(url, payload, headers); diff --git a/lib/utils/ws-requestor.js b/lib/utils/ws-requestor.js index aa41b90e..9e5bbd60 100644 --- a/lib/utils/ws-requestor.js +++ b/lib/utils/ws-requestor.js @@ -33,7 +33,7 @@ class WsRequestor extends BaseRequestor { * @param {string} [hook.password] - if basic auth is protecting the endpoint * @param {object} [params] - request parameters */ - async request(type, hook, params) { + async request(type, hook, params, httpHeaders = {}) { assert(HookMsgTypes.includes(type)); const url = hook.url || hook; @@ -48,7 +48,7 @@ class WsRequestor extends BaseRequestor { if (this._isAbsoluteUrl(url) && url.startsWith('http')) { this.logger.debug({hook}, 'WsRequestor: sending a webhook (HTTP)'); const requestor = new HttpRequestor(this.logger, this.account_sid, hook, this.secret); - return requestor.request(type, hook, params); + return requestor.request(type, hook, params, httpHeaders); } /* connect if necessary */ @@ -73,12 +73,14 @@ class WsRequestor extends BaseRequestor { assert.ok(url, 'WsRequestor:request url was not provided'); const msgid = short.generate(); + const b3 = httpHeaders?.b3 ? {b3: httpHeaders.b3} : {}; const obj = { type, msgid, call_sid: this.call_sid, hook: type === 'verb:hook' ? url : undefined, - data: {...payload} + data: {...payload}, + ...b3 }; //this.logger.debug({obj}, `websocket: sending (${url})`);