add ws msgid to telemetry span if it's ws requestor (#1215)

* add ws msgid to telemetry span if it's ws requestor

* wip
This commit is contained in:
Hoan Luu Huu
2025-05-28 18:51:59 +07:00
committed by GitHub
parent 760394aa5e
commit 690a7fcd55
4 changed files with 12 additions and 8 deletions

View File

@@ -438,7 +438,7 @@ module.exports = function(srf, logger) {
span = obj.span;
const b3 = rootSpan.getTracingPropagation();
const httpHeaders = b3 && { b3 };
json = await app.requestor.request('session:new', app.call_hook, params, httpHeaders);
json = await app.requestor.request('session:new', app.call_hook, params, httpHeaders, span);
}
app.tasks = normalizeJambones(logger, json).map((tdata) => makeTask(logger, tdata));

View File

@@ -166,7 +166,7 @@ class Task extends Emitter {
span.setAttributes({'http.body': JSON.stringify(params)});
try {
if (this.id) params.verb_id = this.id;
const json = await this.cs.requestor.request(type, this.actionHook, params, httpHeaders);
const json = await this.cs.requestor.request(type, this.actionHook, params, httpHeaders, span);
span.setAttributes({'http.statusCode': 200});
const isWsConnection = this.cs.requestor instanceof WsRequestor;
if (!isWsConnection || (expectResponse && json && Array.isArray(json) && json.length)) {
@@ -210,7 +210,7 @@ class Task extends Emitter {
const httpHeaders = b3 && {b3};
span.setAttributes({'http.body': JSON.stringify(params)});
try {
const json = await cs.requestor.request('verb:hook', hook, params, httpHeaders);
const json = await cs.requestor.request('verb:hook', hook, params, httpHeaders, span);
span.setAttributes({'http.statusCode': 200});
span.end();
if (json && Array.isArray(json)) {

View File

@@ -102,7 +102,7 @@ class HttpRequestor extends BaseRequestor {
* @param {string} [hook.password] - if basic auth is protecting the endpoint
* @param {object} [params] - request parameters
*/
async request(type, hook, params, httpHeaders = {}) {
async request(type, hook, params, httpHeaders = {}, span) {
/* jambonz:error only sent over ws */
if (type === 'jambonz:error') return;
@@ -131,7 +131,7 @@ class HttpRequestor extends BaseRequestor {
this.close();
this.emit('handover', requestor);
}
return requestor.request('session:new', hook, params, httpHeaders);
return requestor.request('session:new', hook, params, httpHeaders, span);
}
let newClient;

View File

@@ -55,7 +55,7 @@ class WsRequestor extends BaseRequestor {
* @param {string} [hook.password] - if basic auth is protecting the endpoint
* @param {object} [params] - request parameters
*/
async request(type, hook, params, httpHeaders = {}) {
async request(type, hook, params, httpHeaders = {}, span) {
assert(HookMsgTypes.includes(type));
const url = hook.url || hook;
const wantsAck = !MTYPE_WANTS_ACK.includes(type);
@@ -87,7 +87,7 @@ class WsRequestor extends BaseRequestor {
this.close();
this.emit('handover', requestor);
}
return requestor.request(type, hook, params, httpHeaders);
return requestor.request(type, hook, params, httpHeaders, span);
}
/* connect if necessary */
@@ -152,13 +152,17 @@ class WsRequestor extends BaseRequestor {
data: {...payload},
...b3
};
// add msgid to span attributes if it exists
if (span) {
span.setAttributes({'msgid': msgid});
}
const sendQueuedMsgs = () => {
if (this.queuedMsg.length > 0) {
for (const {type, hook, params, httpHeaders, promise} of this.queuedMsg) {
this.logger.debug(`WsRequestor:request - preparing queued ${type} for sending`);
if (promise) {
this.request(type, hook, params, httpHeaders)
this.request(type, hook, params, httpHeaders, span)
.then((res) => promise.resolve(res))
.catch((err) => promise.reject(err));
}