Feat 1179 race issue with play verb (#1183)

* Fixed race issue between queueCommand false and queueCommand true when play task is involved

https://github.com/jambonz/jambonz-feature-server/issues/1179

* removed unnecessary emitter

* added destroy mechanism for stickyEventEmitter

* clearing stickyEventEmitter

* memory leak fix
This commit is contained in:
rammohan-y
2025-05-12 05:55:48 +05:30
committed by GitHub
parent 4ff5c845de
commit 5c8237b382
4 changed files with 108 additions and 5 deletions

View File

@@ -23,6 +23,7 @@ const HttpRequestor = require('../utils/http-requestor');
const WsRequestor = require('../utils/ws-requestor');
const ActionHookDelayProcessor = require('../utils/action-hook-delay');
const TtsStreamingBuffer = require('../utils/tts-streaming-buffer');
const StickyEventEmitter = require('../utils/sticky-event-emitter');
const {parseUri} = require('drachtio-srf');
const {
JAMBONES_INJECT_CONTENT,
@@ -79,6 +80,10 @@ class CallSession extends Emitter {
this.callGone = false;
this.notifiedComplete = false;
this.rootSpan = rootSpan;
this.stickyEventEmitter = new StickyEventEmitter();
this.stickyEventEmitter.onSuccess =() => {
this.taskInProgress = null;
};
this.backgroundTaskManager = new BackgroundTaskManager({
cs: this,
logger,
@@ -1180,7 +1185,9 @@ class CallSession extends Emitter {
const taskNum = ++this.taskIdx;
const stackNum = this.stackIdx;
const task = this.tasks.shift();
this.logger.info(`CallSession:exec starting task #${stackNum}:${taskNum}: ${task.name}`);
this.isCurTaskPlay = TaskName.Play === task.name;
this.taskInProgress = task;
this.logger.info(`CallSession:exec starting task #${stackNum}:${taskNum}: ${task.name} : {task.taskId}`);
this._notifyTaskStatus(task, {event: 'starting'});
// Register verbhook span wait for end
task.on('VerbHookSpanWaitForEnd', ({span}) => {
@@ -1919,6 +1926,8 @@ Duration=${duration} `
this.logger.debug({tasks: listTaskNames(tasks)},
`CallSession:replaceApplication reset with ${tasks.length} new tasks, stack depth is ${this.stackIdx}`);
if (this.currentTask) {
this.logger.debug('CallSession:replaceApplication - killing current task ' +
this.currentTask?.name + ', taskId: ' + this.currentTask.taskId);
this.currentTask.kill(this, KillReason.Replaced);
this.currentTask = null;
}
@@ -1927,6 +1936,10 @@ Duration=${duration} `
this.wakeupResolver({reason: 'new tasks'});
this.wakeupResolver = null;
}
if ((!this.currentTask || this.currentTask === undefined) && this.isCurTaskPlay) {
this.logger.debug(`CallSession:replaceApplication - emitting uuid_break, taskId: ${this.taskInProgress?.taskId}`);
this.stickyEventEmitter.emit('uuid_break', this.taskInProgress);
}
}
kill(onBackgroundGatherBargein = false) {
@@ -2387,6 +2400,9 @@ Duration=${duration} `
* Hang up the call and free the media endpoint
*/
async _clearResources() {
this.stickyEventEmitter.destroy();
this.stickyEventEmitter = null;
this.taskInProgress = null;
for (const resource of [this.dlg, this.ep, this.ep2]) {
try {
if (resource && resource.connected) await resource.destroy();

View File

@@ -1,7 +1,6 @@
const Task = require('./task');
const {TaskName, TaskPreconditions} = require('../utils/constants');
const { PlayFileNotFoundError } = require('../utils/error');
class TaskPlay extends Task {
constructor(logger, opts) {
super(logger, opts);
@@ -27,6 +26,7 @@ class TaskPlay extends Task {
let playbackSeconds = 0;
let playbackMilliseconds = 0;
let completed = !(this.timeoutSecs > 0 || this.loop);
cs.playingAudio = true;
if (this.timeoutSecs > 0) {
timeout = setTimeout(async() => {
completed = true;
@@ -40,6 +40,22 @@ class TaskPlay extends Task {
try {
this.notifyStatus({event: 'start-playback'});
while (!this.killed && (this.loop === 'forever' || this.loop--) && this.ep.connected) {
/* Listen for playback-start event and set up a one-time listener for uuid_break
* that will kill the audio playback if the taskIds match. This ensures that
* we only kill the currently playing audio and not audio from other tasks.
* As we are using stickyEventEmitter, even if the event is emitted before the listener is registered,
* the listener will receive the most recent event.
*/
ep.once('playback-start', (evt) => {
this.logger.debug({evt}, 'Play got playback-start');
this.cs.stickyEventEmitter.once('uuid_break', (t) => {
if (t?.taskId === this.taskId) {
this.logger.debug(`Play got kill-playback, executing uuid_break, taskId: ${t?.taskId}`);
this.ep.api('uuid_break', this.ep.uuid).catch((err) => this.logger.info(err, 'Error killing audio'));
this.notifyStatus({event: 'kill-playback'});
}
});
});
if (cs.isInConference) {
const {memberId, confName, confUuid} = cs;
if (Array.isArray(this.url)) {
@@ -87,15 +103,15 @@ class TaskPlay extends Task {
async kill(cs) {
super.kill(cs);
if (this.ep.connected && !this.playComplete) {
if (this.ep?.connected && !this.playComplete) {
this.logger.debug('TaskPlay:kill - killing audio');
if (cs.isInConference) {
const {memberId, confName} = cs;
this.killPlayToConfMember(this.ep, memberId, confName);
}
else {
this.notifyStatus({event: 'kill-playback'});
this.ep.api('uuid_break', this.ep.uuid).catch((err) => this.logger.info(err, 'Error killing audio'));
//this.ep.api('uuid_break', this.ep.uuid).catch((err) => this.logger.info(err, 'Error killing audio'));
cs.stickyEventEmitter.emit('uuid_break', this);
}
}
}

View File

@@ -19,6 +19,7 @@ class Task extends Emitter {
this.data = data;
this.actionHook = this.data.actionHook;
this.id = data.id;
this.taskId = this.name + '-' + uuidv4();
this._killInProgress = false;
this._completionPromise = new Promise((resolve) => this._completionResolver = resolve);

View File

@@ -0,0 +1,70 @@
const EventEmitter = require('events');
/**
* A specialized EventEmitter that caches the most recent event emissions.
* When new listeners are added, they immediately receive the most recent
* event if it was previously emitted. This is useful for handling state
* changes where late subscribers need to know the current state.
*
* Features:
* - Caches the most recent emission for each event type
* - New listeners immediately receive the cached event if available
* - Supports both regular (on) and one-time (once) listeners
* - Maintains compatibility with Node's EventEmitter interface
*/
class StickyEventEmitter extends EventEmitter {
constructor() {
super();
this._eventCache = new Map();
this._onceListeners = new Map(); // For storing once listeners if needed
}
destroy() {
this._eventCache.clear();
this._onceListeners.clear();
this.removeAllListeners();
}
emit(event, ...args) {
// Store the event and its args
this._eventCache.set(event, args);
// If there are any 'once' listeners waiting, call them
if (this._onceListeners.has(event)) {
const listeners = this._onceListeners.get(event);
for (const listener of listeners) {
listener(...args);
}
if (this.onSuccess) {
this.onSuccess();
}
this._onceListeners.delete(event);
}
return super.emit(event, ...args);
}
on(event, listener) {
if (this._eventCache.has(event)) {
listener(...this._eventCache.get(event));
}
return super.on(event, listener);
}
once(event, listener) {
if (this._eventCache.has(event)) {
listener(...this._eventCache.get(event));
if (this.onSuccess) {
this.onSuccess();
}
} else {
// Store listener in case emit comes before
if (!this._onceListeners.has(event)) {
this._onceListeners.set(event, []);
}
this._onceListeners.get(event).push(listener);
super.once(event, listener); // Also attach to native once
}
return this;
}
}
module.exports = StickyEventEmitter;