From 5c8237b382356826d3c5ad7dcf24f5267925a835 Mon Sep 17 00:00:00 2001 From: rammohan-y <37395033+rammohan-y@users.noreply.github.com> Date: Mon, 12 May 2025 05:55:48 +0530 Subject: [PATCH] Feat 1179 race issue with play verb (#1183) * Fixed race issue between queueCommand false and queueCommand true when play task is involved https://github.com/jambonz/jambonz-feature-server/issues/1179 * removed unnecessary emitter * added destroy mechanism for stickyEventEmitter * clearing stickyEventEmitter * memory leak fix --- lib/session/call-session.js | 18 +++++++- lib/tasks/play.js | 24 +++++++++-- lib/tasks/task.js | 1 + lib/utils/sticky-event-emitter.js | 70 +++++++++++++++++++++++++++++++ 4 files changed, 108 insertions(+), 5 deletions(-) create mode 100644 lib/utils/sticky-event-emitter.js diff --git a/lib/session/call-session.js b/lib/session/call-session.js index f847ac5f..8ca3d41a 100644 --- a/lib/session/call-session.js +++ b/lib/session/call-session.js @@ -23,6 +23,7 @@ const HttpRequestor = require('../utils/http-requestor'); const WsRequestor = require('../utils/ws-requestor'); const ActionHookDelayProcessor = require('../utils/action-hook-delay'); const TtsStreamingBuffer = require('../utils/tts-streaming-buffer'); +const StickyEventEmitter = require('../utils/sticky-event-emitter'); const {parseUri} = require('drachtio-srf'); const { JAMBONES_INJECT_CONTENT, @@ -79,6 +80,10 @@ class CallSession extends Emitter { this.callGone = false; this.notifiedComplete = false; this.rootSpan = rootSpan; + this.stickyEventEmitter = new StickyEventEmitter(); + this.stickyEventEmitter.onSuccess =() => { + this.taskInProgress = null; + }; this.backgroundTaskManager = new BackgroundTaskManager({ cs: this, logger, @@ -1180,7 +1185,9 @@ class CallSession extends Emitter { const taskNum = ++this.taskIdx; const stackNum = this.stackIdx; const task = this.tasks.shift(); - this.logger.info(`CallSession:exec starting task #${stackNum}:${taskNum}: ${task.name}`); + this.isCurTaskPlay = TaskName.Play === task.name; + this.taskInProgress = task; + this.logger.info(`CallSession:exec starting task #${stackNum}:${taskNum}: ${task.name} : {task.taskId}`); this._notifyTaskStatus(task, {event: 'starting'}); // Register verbhook span wait for end task.on('VerbHookSpanWaitForEnd', ({span}) => { @@ -1919,6 +1926,8 @@ Duration=${duration} ` this.logger.debug({tasks: listTaskNames(tasks)}, `CallSession:replaceApplication reset with ${tasks.length} new tasks, stack depth is ${this.stackIdx}`); if (this.currentTask) { + this.logger.debug('CallSession:replaceApplication - killing current task ' + + this.currentTask?.name + ', taskId: ' + this.currentTask.taskId); this.currentTask.kill(this, KillReason.Replaced); this.currentTask = null; } @@ -1927,6 +1936,10 @@ Duration=${duration} ` this.wakeupResolver({reason: 'new tasks'}); this.wakeupResolver = null; } + if ((!this.currentTask || this.currentTask === undefined) && this.isCurTaskPlay) { + this.logger.debug(`CallSession:replaceApplication - emitting uuid_break, taskId: ${this.taskInProgress?.taskId}`); + this.stickyEventEmitter.emit('uuid_break', this.taskInProgress); + } } kill(onBackgroundGatherBargein = false) { @@ -2387,6 +2400,9 @@ Duration=${duration} ` * Hang up the call and free the media endpoint */ async _clearResources() { + this.stickyEventEmitter.destroy(); + this.stickyEventEmitter = null; + this.taskInProgress = null; for (const resource of [this.dlg, this.ep, this.ep2]) { try { if (resource && resource.connected) await resource.destroy(); diff --git a/lib/tasks/play.js b/lib/tasks/play.js index 8e2ee956..ae220c10 100644 --- a/lib/tasks/play.js +++ b/lib/tasks/play.js @@ -1,7 +1,6 @@ const Task = require('./task'); const {TaskName, TaskPreconditions} = require('../utils/constants'); const { PlayFileNotFoundError } = require('../utils/error'); - class TaskPlay extends Task { constructor(logger, opts) { super(logger, opts); @@ -27,6 +26,7 @@ class TaskPlay extends Task { let playbackSeconds = 0; let playbackMilliseconds = 0; let completed = !(this.timeoutSecs > 0 || this.loop); + cs.playingAudio = true; if (this.timeoutSecs > 0) { timeout = setTimeout(async() => { completed = true; @@ -40,6 +40,22 @@ class TaskPlay extends Task { try { this.notifyStatus({event: 'start-playback'}); while (!this.killed && (this.loop === 'forever' || this.loop--) && this.ep.connected) { + /* Listen for playback-start event and set up a one-time listener for uuid_break + * that will kill the audio playback if the taskIds match. This ensures that + * we only kill the currently playing audio and not audio from other tasks. + * As we are using stickyEventEmitter, even if the event is emitted before the listener is registered, + * the listener will receive the most recent event. + */ + ep.once('playback-start', (evt) => { + this.logger.debug({evt}, 'Play got playback-start'); + this.cs.stickyEventEmitter.once('uuid_break', (t) => { + if (t?.taskId === this.taskId) { + this.logger.debug(`Play got kill-playback, executing uuid_break, taskId: ${t?.taskId}`); + this.ep.api('uuid_break', this.ep.uuid).catch((err) => this.logger.info(err, 'Error killing audio')); + this.notifyStatus({event: 'kill-playback'}); + } + }); + }); if (cs.isInConference) { const {memberId, confName, confUuid} = cs; if (Array.isArray(this.url)) { @@ -87,15 +103,15 @@ class TaskPlay extends Task { async kill(cs) { super.kill(cs); - if (this.ep.connected && !this.playComplete) { + if (this.ep?.connected && !this.playComplete) { this.logger.debug('TaskPlay:kill - killing audio'); if (cs.isInConference) { const {memberId, confName} = cs; this.killPlayToConfMember(this.ep, memberId, confName); } else { - this.notifyStatus({event: 'kill-playback'}); - this.ep.api('uuid_break', this.ep.uuid).catch((err) => this.logger.info(err, 'Error killing audio')); + //this.ep.api('uuid_break', this.ep.uuid).catch((err) => this.logger.info(err, 'Error killing audio')); + cs.stickyEventEmitter.emit('uuid_break', this); } } } diff --git a/lib/tasks/task.js b/lib/tasks/task.js index 25605b8d..ddd255a6 100644 --- a/lib/tasks/task.js +++ b/lib/tasks/task.js @@ -19,6 +19,7 @@ class Task extends Emitter { this.data = data; this.actionHook = this.data.actionHook; this.id = data.id; + this.taskId = this.name + '-' + uuidv4(); this._killInProgress = false; this._completionPromise = new Promise((resolve) => this._completionResolver = resolve); diff --git a/lib/utils/sticky-event-emitter.js b/lib/utils/sticky-event-emitter.js new file mode 100644 index 00000000..caea154d --- /dev/null +++ b/lib/utils/sticky-event-emitter.js @@ -0,0 +1,70 @@ +const EventEmitter = require('events'); + +/** + * A specialized EventEmitter that caches the most recent event emissions. + * When new listeners are added, they immediately receive the most recent + * event if it was previously emitted. This is useful for handling state + * changes where late subscribers need to know the current state. + * + * Features: + * - Caches the most recent emission for each event type + * - New listeners immediately receive the cached event if available + * - Supports both regular (on) and one-time (once) listeners + * - Maintains compatibility with Node's EventEmitter interface + */ +class StickyEventEmitter extends EventEmitter { + constructor() { + super(); + this._eventCache = new Map(); + this._onceListeners = new Map(); // For storing once listeners if needed + } + destroy() { + this._eventCache.clear(); + this._onceListeners.clear(); + this.removeAllListeners(); + } + emit(event, ...args) { + // Store the event and its args + this._eventCache.set(event, args); + + // If there are any 'once' listeners waiting, call them + if (this._onceListeners.has(event)) { + const listeners = this._onceListeners.get(event); + for (const listener of listeners) { + listener(...args); + } + if (this.onSuccess) { + this.onSuccess(); + } + this._onceListeners.delete(event); + } + + return super.emit(event, ...args); + } + + on(event, listener) { + if (this._eventCache.has(event)) { + listener(...this._eventCache.get(event)); + } + return super.on(event, listener); + } + + once(event, listener) { + if (this._eventCache.has(event)) { + listener(...this._eventCache.get(event)); + if (this.onSuccess) { + this.onSuccess(); + } + } else { + // Store listener in case emit comes before + if (!this._onceListeners.has(event)) { + this._onceListeners.set(event, []); + } + this._onceListeners.get(event).push(listener); + super.once(event, listener); // Also attach to native once + } + return this; + } +} + +module.exports = StickyEventEmitter;