feat: advanced queues (#362)

* feat: advanced queues

* feat: advanced queues

* feat: advanced queues

* feat: advanced queues

* update verb specification

* add testcase

* add testcase

* add testcase

* updte testcase

* fixed

* update queue

* fix: fix waithook params

* fix: fix waithook params

* fix: performQueueWebhook with correct members length

* fix merge conflict

* debug log

* debug listen test

* debug listen test

* debug listen test

* debug listen test

* debug listen test

* debug listen issue

* feat: add tts on account level

---------

Co-authored-by: Dave Horton <daveh@beachdognet.com>
This commit is contained in:
Hoan Luu Huu
2023-06-03 19:16:05 +07:00
committed by GitHub
parent 01260ad054
commit b7070121ee
22 changed files with 987 additions and 166 deletions

View File

@@ -18,6 +18,7 @@ class TaskEnqueue extends Task {
this.preconditions = TaskPreconditions.Endpoint;
this.queueName = this.data.name;
this.priority = this.data.priority;
this.waitHook = this.data.waitHook;
this.emitter = new Emitter();
@@ -70,12 +71,22 @@ class TaskEnqueue extends Task {
}
async _addToQueue(cs, dlg) {
const {pushBack} = cs.srf.locals.dbHelpers;
const {addToSortedSet, sortedSetLength} = cs.srf.locals.dbHelpers;
const url = getUrl(cs);
this.waitStartTime = Date.now();
this.logger.debug({queue: this.queueName, url}, 'pushing url onto queue');
const members = await pushBack(this.queueName, url);
this.logger.info(`TaskEnqueue:_addToQueue: added to queue, length now ${members}`);
if (this.priority < 0) {
this.logger.warn(`priority ${this.priority} is invalid, need to be non-negative integer,
999 will be used for priority`);
}
let members = await addToSortedSet(this.queueName, url, this.priority);
if (members === 1) {
this.logger.info('TaskEnqueue:_addToQueue: added to queue');
} else {
this.logger.info('TaskEnqueue:_addToQueue: failed to add to queue');
}
members = await sortedSetLength(this.queueName);
this.notifyUrl = url;
/* invoke account-level webhook for queue event notifications */
@@ -90,9 +101,9 @@ class TaskEnqueue extends Task {
}
async _removeFromQueue(cs) {
const {removeFromList, lengthOfList} = cs.srf.locals.dbHelpers;
await removeFromList(this.queueName, getUrl(cs));
return await lengthOfList(this.queueName);
const {retrieveByPatternSortedSet, sortedSetLength} = cs.srf.locals.dbHelpers;
await retrieveByPatternSortedSet(this.queueName, `*${getUrl(cs)}`);
return await sortedSetLength(this.queueName);
}
async performAction() {
@@ -279,8 +290,8 @@ class TaskEnqueue extends Task {
this.emitter.emit('dequeue', opts);
try {
const {lengthOfList} = cs.srf.locals.dbHelpers;
const members = await lengthOfList(this.queueName);
const {sortedSetLength} = cs.srf.locals.dbHelpers;
const members = await sortedSetLength(this.queueName);
this.dequeued = true;
cs.performQueueWebhook({
event: 'leave',
@@ -301,7 +312,7 @@ class TaskEnqueue extends Task {
}
async _playHook(cs, dlg, hook, allowed = [TaskName.Play, TaskName.Say, TaskName.Pause, TaskName.Leave]) {
const {lengthOfList, getListPosition} = cs.srf.locals.dbHelpers;
const {sortedSetLength, sortedSetPositionByPattern} = cs.srf.locals.dbHelpers;
const b3 = this.getTracingPropagation();
const httpHeaders = b3 && {b3};
@@ -313,9 +324,14 @@ class TaskEnqueue extends Task {
queueTime: getElapsedTime(this.waitStartTime)
};
try {
const queueSize = await lengthOfList(this.queueName);
const queuePosition = await getListPosition(this.queueName, this.notifyUrl);
Object.assign(params, {queueSize, queuePosition});
const queueSize = await sortedSetLength(this.queueName);
const queuePosition = await sortedSetPositionByPattern(this.queueName, `*${this.notifyUrl}`);
Object.assign(params, {
queueSize,
queuePosition: queuePosition.length ? queuePosition[0] : 0,
callSid: this.cs.callSid,
callId: this.cs.callId,
});
} catch (err) {
this.logger.error({err}, `TaskEnqueue:_playHook error retrieving list info for queue ${this.queueName}`);
}