From ad722a55ee7cfa004b883f5f8ac3b47de4ffa8bf Mon Sep 17 00:00:00 2001 From: Dave Horton Date: Tue, 8 Aug 2023 13:00:34 -0400 Subject: [PATCH] generate trace id before outdial so we can include it in custom header (#418) * generate trace id before outdial so we can include it in custom header * logging * logging * fix #420 race condition on rest outdial when ws is used * revert unnecessary logging change --- lib/http-routes/api/create-call.js | 7 ++++++- lib/tasks/rest_dial.js | 3 +++ lib/utils/call-tracer.js | 30 ++++++++++++++++++++++-------- lib/utils/ws-requestor.js | 24 +++++++++++++++++++----- 4 files changed, 50 insertions(+), 14 deletions(-) diff --git a/lib/http-routes/api/create-call.js b/lib/http-routes/api/create-call.js index fc604c7f..2a167918 100644 --- a/lib/http-routes/api/create-call.js +++ b/lib/http-routes/api/create-call.js @@ -47,6 +47,11 @@ router.post('/', async(req, res) => { const application = req.body.application_sid ? await lookupAppBySid(req.body.application_sid) : null; const record_all_calls = account.record_all_calls || (application && application.record_all_calls); const recordOutputFormat = account.record_format || 'mp3'; + const rootSpan = new RootSpan('rest-call', { + callSid, + accountSid, + ...(req.body?.application_sid && {'X-Application-Sid': req.body.application_sid}) + }); opts.headers = { ...opts.headers, @@ -54,6 +59,7 @@ router.post('/', async(req, res) => { 'X-Jambonz-FS-UUID': srf.locals.fsUUID, 'X-Call-Sid': callSid, 'X-Account-Sid': accountSid, + 'X-Trace-ID': rootSpan.traceId, ...(req.body?.application_sid && {'X-Application-Sid': req.body.application_sid}), ...(restDial.fromHost && {'X-Preferred-From-Host': restDial.fromHost}), ...(record_all_calls && {'X-Record-All-Calls': recordOutputFormat}) @@ -194,7 +200,6 @@ router.post('/', async(req, res) => { /* ok our outbound INVITE is in flight */ const tasks = [restDial]; - const rootSpan = new RootSpan('rest-call', inviteReq); sipLogger = logger.child({ callSid, callId: inviteReq.get('Call-ID'), diff --git a/lib/tasks/rest_dial.js b/lib/tasks/rest_dial.js index c4726ba4..f5642a84 100644 --- a/lib/tasks/rest_dial.js +++ b/lib/tasks/rest_dial.js @@ -63,6 +63,7 @@ class TaskRestDial extends Task { this.canCancel = false; const cs = this.callSession; cs.setDialog(dlg); + this.logger.debug('TaskRestDial:_onConnect - call connected'); try { const b3 = this.getTracingPropagation(); @@ -90,8 +91,10 @@ class TaskRestDial extends Task { } let tasks; if (this.app_json) { + this.logger.debug('TaskRestDial: using app_json from task data'); tasks = JSON.parse(this.app_json); } else { + this.logger.debug({call_hook: this.call_hook}, 'TaskRestDial: retrieving application'); tasks = await cs.requestor.request('session:new', this.call_hook, params, httpHeaders); } if (tasks && Array.isArray(tasks)) { diff --git a/lib/utils/call-tracer.js b/lib/utils/call-tracer.js index b6b229e8..63179f72 100644 --- a/lib/utils/call-tracer.js +++ b/lib/utils/call-tracer.js @@ -2,17 +2,24 @@ const {context, trace} = require('@opentelemetry/api'); const {Dialog} = require('drachtio-srf'); class RootSpan { constructor(callType, req) { - let tracer, callSid, linkedSpanId; + const {srf} = require('../../'); + const tracer = srf.locals.otel.tracer; + let callSid, accountSid, applicationSid, linkedSpanId; if (req instanceof Dialog) { const dlg = req; - tracer = dlg.srf.locals.otel.tracer; callSid = dlg.callSid; linkedSpanId = dlg.linkedSpanId; } - else { - tracer = req.srf.locals.otel.tracer; + else if (req.srf) { callSid = req.locals.callSid; + accountSid = req.get('X-Account-Sid'), + applicationSid = req.locals.application_sid; + } + else { + callSid = req.callSid; + accountSid = req.accountSid; + applicationSid = req.applicationSid; } this._span = tracer.startSpan(callType || 'incoming-call'); if (req instanceof Dialog) { @@ -22,13 +29,20 @@ class RootSpan { callId: dlg.sip.callId }); } + else if (req.srf) { + this._span.setAttributes({ + callSid, + accountSid, + applicationSid, + callId: req.get('Call-ID'), + externalCallId: req.get('X-CID') + }); + } else { this._span.setAttributes({ callSid, - accountSid: req.get('X-Account-Sid'), - applicationSid: req.locals.application_sid, - callId: req.get('Call-ID'), - externalCallId: req.get('X-CID') + accountSid, + applicationSid }); } diff --git a/lib/utils/ws-requestor.js b/lib/utils/ws-requestor.js index 6243ce9a..2355331f 100644 --- a/lib/utils/ws-requestor.js +++ b/lib/utils/ws-requestor.js @@ -43,6 +43,7 @@ class WsRequestor extends BaseRequestor { async request(type, hook, params, httpHeaders = {}) { assert(HookMsgTypes.includes(type)); const url = hook.url || hook; + const wantsAck = !['call:status', 'verb:status', 'jambonz:error'].includes(type); if (this.maliciousClient) { this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client'); @@ -73,11 +74,19 @@ class WsRequestor extends BaseRequestor { if (this.connectInProgress) { this.logger.debug( `WsRequestor:request(${this.id}) - queueing ${type} message since we are connecting`); - this.queuedMsg.push({type, hook, params, httpHeaders}); + if (wantsAck) { + const p = new Promise((resolve, reject) => { + this.queuedMsg.push({type, hook, params, httpHeaders, promise: {resolve, reject}}); + }); + return p; + } + else { + this.queuedMsg.push({type, hook, params, httpHeaders}); + } return; } this.connectInProgress = true; - this.logger.debug(`WsRequestor:request(${this.id}) - connecting since we do not have a connection`); + this.logger.debug(`WsRequestor:request(${this.id}) - connecting since we do not have a connection for ${type}`); if (this.connections >= MAX_RECONNECTS) { return Promise.reject(`max attempts connecting to ${this.url}`); } @@ -116,9 +125,14 @@ class WsRequestor extends BaseRequestor { const sendQueuedMsgs = () => { if (this.queuedMsg.length > 0) { - for (const {type, hook, params, httpHeaders} of this.queuedMsg) { + for (const {type, hook, params, httpHeaders, promise} of this.queuedMsg) { this.logger.debug(`WsRequestor:request - preparing queued ${type} for sending`); - setImmediate(this.request.bind(this, type, hook, params, httpHeaders)); + if (promise) { + this.request(type, hook, params, httpHeaders) + .then((res) => promise.resolve(res)) + .catch((err) => promise.reject(err)); + } + else setImmediate(this.request.bind(this, type, hook, params, httpHeaders)); } this.queuedMsg.length = 0; } @@ -137,7 +151,7 @@ class WsRequestor extends BaseRequestor { } /* simple notifications */ - if (['call:status', 'verb:status', 'jambonz:error'].includes(type) || reconnectingWithoutAck) { + if (!wantsAck || reconnectingWithoutAck) { this.ws?.send(JSON.stringify(obj), () => { this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`); sendQueuedMsgs();