mirror of
https://github.com/jambonz/jambonz-feature-server.git
synced 2025-12-20 08:40:38 +00:00
generate trace id before outdial so we can include it in custom header (#418)
* generate trace id before outdial so we can include it in custom header * logging * logging * fix #420 race condition on rest outdial when ws is used * revert unnecessary logging change
This commit is contained in:
@@ -47,6 +47,11 @@ router.post('/', async(req, res) => {
|
|||||||
const application = req.body.application_sid ? await lookupAppBySid(req.body.application_sid) : null;
|
const application = req.body.application_sid ? await lookupAppBySid(req.body.application_sid) : null;
|
||||||
const record_all_calls = account.record_all_calls || (application && application.record_all_calls);
|
const record_all_calls = account.record_all_calls || (application && application.record_all_calls);
|
||||||
const recordOutputFormat = account.record_format || 'mp3';
|
const recordOutputFormat = account.record_format || 'mp3';
|
||||||
|
const rootSpan = new RootSpan('rest-call', {
|
||||||
|
callSid,
|
||||||
|
accountSid,
|
||||||
|
...(req.body?.application_sid && {'X-Application-Sid': req.body.application_sid})
|
||||||
|
});
|
||||||
|
|
||||||
opts.headers = {
|
opts.headers = {
|
||||||
...opts.headers,
|
...opts.headers,
|
||||||
@@ -54,6 +59,7 @@ router.post('/', async(req, res) => {
|
|||||||
'X-Jambonz-FS-UUID': srf.locals.fsUUID,
|
'X-Jambonz-FS-UUID': srf.locals.fsUUID,
|
||||||
'X-Call-Sid': callSid,
|
'X-Call-Sid': callSid,
|
||||||
'X-Account-Sid': accountSid,
|
'X-Account-Sid': accountSid,
|
||||||
|
'X-Trace-ID': rootSpan.traceId,
|
||||||
...(req.body?.application_sid && {'X-Application-Sid': req.body.application_sid}),
|
...(req.body?.application_sid && {'X-Application-Sid': req.body.application_sid}),
|
||||||
...(restDial.fromHost && {'X-Preferred-From-Host': restDial.fromHost}),
|
...(restDial.fromHost && {'X-Preferred-From-Host': restDial.fromHost}),
|
||||||
...(record_all_calls && {'X-Record-All-Calls': recordOutputFormat})
|
...(record_all_calls && {'X-Record-All-Calls': recordOutputFormat})
|
||||||
@@ -194,7 +200,6 @@ router.post('/', async(req, res) => {
|
|||||||
/* ok our outbound INVITE is in flight */
|
/* ok our outbound INVITE is in flight */
|
||||||
|
|
||||||
const tasks = [restDial];
|
const tasks = [restDial];
|
||||||
const rootSpan = new RootSpan('rest-call', inviteReq);
|
|
||||||
sipLogger = logger.child({
|
sipLogger = logger.child({
|
||||||
callSid,
|
callSid,
|
||||||
callId: inviteReq.get('Call-ID'),
|
callId: inviteReq.get('Call-ID'),
|
||||||
|
|||||||
@@ -63,6 +63,7 @@ class TaskRestDial extends Task {
|
|||||||
this.canCancel = false;
|
this.canCancel = false;
|
||||||
const cs = this.callSession;
|
const cs = this.callSession;
|
||||||
cs.setDialog(dlg);
|
cs.setDialog(dlg);
|
||||||
|
this.logger.debug('TaskRestDial:_onConnect - call connected');
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const b3 = this.getTracingPropagation();
|
const b3 = this.getTracingPropagation();
|
||||||
@@ -90,8 +91,10 @@ class TaskRestDial extends Task {
|
|||||||
}
|
}
|
||||||
let tasks;
|
let tasks;
|
||||||
if (this.app_json) {
|
if (this.app_json) {
|
||||||
|
this.logger.debug('TaskRestDial: using app_json from task data');
|
||||||
tasks = JSON.parse(this.app_json);
|
tasks = JSON.parse(this.app_json);
|
||||||
} else {
|
} else {
|
||||||
|
this.logger.debug({call_hook: this.call_hook}, 'TaskRestDial: retrieving application');
|
||||||
tasks = await cs.requestor.request('session:new', this.call_hook, params, httpHeaders);
|
tasks = await cs.requestor.request('session:new', this.call_hook, params, httpHeaders);
|
||||||
}
|
}
|
||||||
if (tasks && Array.isArray(tasks)) {
|
if (tasks && Array.isArray(tasks)) {
|
||||||
|
|||||||
@@ -2,17 +2,24 @@ const {context, trace} = require('@opentelemetry/api');
|
|||||||
const {Dialog} = require('drachtio-srf');
|
const {Dialog} = require('drachtio-srf');
|
||||||
class RootSpan {
|
class RootSpan {
|
||||||
constructor(callType, req) {
|
constructor(callType, req) {
|
||||||
let tracer, callSid, linkedSpanId;
|
const {srf} = require('../../');
|
||||||
|
const tracer = srf.locals.otel.tracer;
|
||||||
|
let callSid, accountSid, applicationSid, linkedSpanId;
|
||||||
|
|
||||||
if (req instanceof Dialog) {
|
if (req instanceof Dialog) {
|
||||||
const dlg = req;
|
const dlg = req;
|
||||||
tracer = dlg.srf.locals.otel.tracer;
|
|
||||||
callSid = dlg.callSid;
|
callSid = dlg.callSid;
|
||||||
linkedSpanId = dlg.linkedSpanId;
|
linkedSpanId = dlg.linkedSpanId;
|
||||||
}
|
}
|
||||||
else {
|
else if (req.srf) {
|
||||||
tracer = req.srf.locals.otel.tracer;
|
|
||||||
callSid = req.locals.callSid;
|
callSid = req.locals.callSid;
|
||||||
|
accountSid = req.get('X-Account-Sid'),
|
||||||
|
applicationSid = req.locals.application_sid;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
callSid = req.callSid;
|
||||||
|
accountSid = req.accountSid;
|
||||||
|
applicationSid = req.applicationSid;
|
||||||
}
|
}
|
||||||
this._span = tracer.startSpan(callType || 'incoming-call');
|
this._span = tracer.startSpan(callType || 'incoming-call');
|
||||||
if (req instanceof Dialog) {
|
if (req instanceof Dialog) {
|
||||||
@@ -22,13 +29,20 @@ class RootSpan {
|
|||||||
callId: dlg.sip.callId
|
callId: dlg.sip.callId
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
else if (req.srf) {
|
||||||
|
this._span.setAttributes({
|
||||||
|
callSid,
|
||||||
|
accountSid,
|
||||||
|
applicationSid,
|
||||||
|
callId: req.get('Call-ID'),
|
||||||
|
externalCallId: req.get('X-CID')
|
||||||
|
});
|
||||||
|
}
|
||||||
else {
|
else {
|
||||||
this._span.setAttributes({
|
this._span.setAttributes({
|
||||||
callSid,
|
callSid,
|
||||||
accountSid: req.get('X-Account-Sid'),
|
accountSid,
|
||||||
applicationSid: req.locals.application_sid,
|
applicationSid
|
||||||
callId: req.get('Call-ID'),
|
|
||||||
externalCallId: req.get('X-CID')
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ class WsRequestor extends BaseRequestor {
|
|||||||
async request(type, hook, params, httpHeaders = {}) {
|
async request(type, hook, params, httpHeaders = {}) {
|
||||||
assert(HookMsgTypes.includes(type));
|
assert(HookMsgTypes.includes(type));
|
||||||
const url = hook.url || hook;
|
const url = hook.url || hook;
|
||||||
|
const wantsAck = !['call:status', 'verb:status', 'jambonz:error'].includes(type);
|
||||||
|
|
||||||
if (this.maliciousClient) {
|
if (this.maliciousClient) {
|
||||||
this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client');
|
this.logger.info({url: this.url}, 'WsRequestor:request - discarding msg to malicious client');
|
||||||
@@ -73,11 +74,19 @@ class WsRequestor extends BaseRequestor {
|
|||||||
if (this.connectInProgress) {
|
if (this.connectInProgress) {
|
||||||
this.logger.debug(
|
this.logger.debug(
|
||||||
`WsRequestor:request(${this.id}) - queueing ${type} message since we are connecting`);
|
`WsRequestor:request(${this.id}) - queueing ${type} message since we are connecting`);
|
||||||
this.queuedMsg.push({type, hook, params, httpHeaders});
|
if (wantsAck) {
|
||||||
|
const p = new Promise((resolve, reject) => {
|
||||||
|
this.queuedMsg.push({type, hook, params, httpHeaders, promise: {resolve, reject}});
|
||||||
|
});
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
this.queuedMsg.push({type, hook, params, httpHeaders});
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.connectInProgress = true;
|
this.connectInProgress = true;
|
||||||
this.logger.debug(`WsRequestor:request(${this.id}) - connecting since we do not have a connection`);
|
this.logger.debug(`WsRequestor:request(${this.id}) - connecting since we do not have a connection for ${type}`);
|
||||||
if (this.connections >= MAX_RECONNECTS) {
|
if (this.connections >= MAX_RECONNECTS) {
|
||||||
return Promise.reject(`max attempts connecting to ${this.url}`);
|
return Promise.reject(`max attempts connecting to ${this.url}`);
|
||||||
}
|
}
|
||||||
@@ -116,9 +125,14 @@ class WsRequestor extends BaseRequestor {
|
|||||||
|
|
||||||
const sendQueuedMsgs = () => {
|
const sendQueuedMsgs = () => {
|
||||||
if (this.queuedMsg.length > 0) {
|
if (this.queuedMsg.length > 0) {
|
||||||
for (const {type, hook, params, httpHeaders} of this.queuedMsg) {
|
for (const {type, hook, params, httpHeaders, promise} of this.queuedMsg) {
|
||||||
this.logger.debug(`WsRequestor:request - preparing queued ${type} for sending`);
|
this.logger.debug(`WsRequestor:request - preparing queued ${type} for sending`);
|
||||||
setImmediate(this.request.bind(this, type, hook, params, httpHeaders));
|
if (promise) {
|
||||||
|
this.request(type, hook, params, httpHeaders)
|
||||||
|
.then((res) => promise.resolve(res))
|
||||||
|
.catch((err) => promise.reject(err));
|
||||||
|
}
|
||||||
|
else setImmediate(this.request.bind(this, type, hook, params, httpHeaders));
|
||||||
}
|
}
|
||||||
this.queuedMsg.length = 0;
|
this.queuedMsg.length = 0;
|
||||||
}
|
}
|
||||||
@@ -137,7 +151,7 @@ class WsRequestor extends BaseRequestor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* simple notifications */
|
/* simple notifications */
|
||||||
if (['call:status', 'verb:status', 'jambonz:error'].includes(type) || reconnectingWithoutAck) {
|
if (!wantsAck || reconnectingWithoutAck) {
|
||||||
this.ws?.send(JSON.stringify(obj), () => {
|
this.ws?.send(JSON.stringify(obj), () => {
|
||||||
this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`);
|
this.logger.debug({obj}, `WsRequestor:request websocket: sent (${url})`);
|
||||||
sendQueuedMsgs();
|
sendQueuedMsgs();
|
||||||
|
|||||||
Reference in New Issue
Block a user