more queue fixes

This commit is contained in:
Dave Horton
2020-05-07 13:28:41 -04:00
parent a0508a2494
commit 1d9658905f
7 changed files with 72 additions and 25 deletions

View File

@@ -686,7 +686,13 @@ class CallSession extends Emitter {
'X-Retain-Call-Sid': this.callSid
}
});
return [200, 202].includes(res.status);
if ([200, 202].includes(res.status)) {
this.tasks = [];
this.taskIdx = 0;
this.callMoved = true;
return true;
}
return false;
}
getRemainingTaskData() {
@@ -732,6 +738,8 @@ class CallSession extends Emitter {
* @param {number} [duration] - duration of a completed call, in seconds
*/
_notifyCallStatusChange({callStatus, sipStatus, duration}) {
if (this.callMoved) return;
assert((typeof duration === 'number' && callStatus === CallStatus.Completed) ||
(!duration && callStatus !== CallStatus.Completed),
'duration MUST be supplied when call completed AND ONLY when call completed');

View File

@@ -1,7 +1,7 @@
const Task = require('./task');
const Emitter = require('events');
const ConfirmCallSession = require('../session/confirm-call-session');
const {TaskName, TaskPreconditions} = require('../utils/constants');
const {TaskName, TaskPreconditions, BONG_TONE} = require('../utils/constants');
const normalizeJambones = require('../utils/normalize-jambones');
const makeTask = require('./make_task');
const bent = require('bent');
@@ -58,6 +58,11 @@ class Conference extends Task {
this.emitter = new Emitter();
this.results = {};
// transferred from another server in order to bridge to a local caller?
if (this.data._ && this.data._.connectTime) {
this.connectTime = this.data._.connectTime;
}
}
get name() { return TaskName.Conference; }
@@ -66,6 +71,11 @@ class Conference extends Task {
await super.exec(cs);
this.ep = ep;
const dlg = cs.dlg;
// reset answer time if we were transferred from another feature server
if (this.this.connectTime) dlg.connectTime = this.connectTime;
this.ep.on('destroy', this._kicked.bind(this, cs, dlg));
try {
@@ -238,7 +248,9 @@ class Conference extends Task {
localServer: cs.srf.locals.localSipAddress,
confServer: this.joinDetails.conferenceSipAddress
}, `Conference:_doJoin: conference ${this.confName} is hosted elsewhere`);
const success = await this.transferCallToFeatureServer(cs, this.joinDetails.conferenceSipAddress);
const success = await this.transferCallToFeatureServer(cs, this.joinDetails.conferenceSipAddress, {
connectTime: dlg.connectTime.valueOf()
});
/**
* If the REFER succeeded, we will get a BYE from the SBC
@@ -334,7 +346,7 @@ class Conference extends Task {
// optionally play beep to conference on entry
if (this.beep === true) {
this.ep.api('conference',
[this.confName, 'play', 'tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)'])
[this.confName, 'play', BONG_TONE])
.catch((err) => {});
}

View File

@@ -1,5 +1,5 @@
const Task = require('./task');
const {TaskName, TaskPreconditions, DequeueResults} = require('../utils/constants');
const {TaskName, TaskPreconditions, DequeueResults, BONG_TONE} = require('../utils/constants');
const Emitter = require('events');
const bent = require('bent');
const assert = require('assert');
@@ -15,6 +15,7 @@ class TaskDequeue extends Task {
this.queueName = this.data.name;
this.timeout = this.data.timeout || 0;
this.beep = this.data.beep === true;
this.emitter = new Emitter();
this.state = DequeueResults.Timeout;
@@ -24,6 +25,7 @@ class TaskDequeue extends Task {
async exec(cs, ep) {
await super.exec(cs);
this.ep = ep;
this.queueName = `queue:${cs.accountSid}:${this.queueName}`;
const url = await this._getMemberFromQueue(cs);
@@ -64,12 +66,15 @@ class TaskDequeue extends Task {
}, this.timeout * 1000);
}
await sleepFor(1000); // to avoid clipping if we dial and immediately connect
do {
try {
const url = await popFront(this.queueName);
if (url) {
found = true;
clearTimeout(timer);
this.logger.info(`TaskDequeue:_getMemberFromQueue popped ${url} from queue ${this.queueName}`);
resolve(url);
}
} catch (err) {
@@ -100,12 +105,14 @@ class TaskDequeue extends Task {
// now notify partner to bridge to me
try {
// TODO: if we have a confirmHook, retrieve the app and pass it on
await bent('POST', 202)(url, {
event: 'dequeue',
dequeueSipAddress: cs.srf.locals.localSipAddress,
epUuid: ep.uuid,
notifyUrl: getUrl(cs)
});
this.logger.info(`TaskDequeue:_dequeueUrl successfully sent POST to ${url}`);
bridgeTimer = setTimeout(() => reject(new Error('bridge timeout')), 20000);
} catch (err) {
this.logger.info({err, url}, `TaskDequeue:_dequeueUrl error dequeueing from ${this.queueName}, try again`);
@@ -114,12 +121,21 @@ class TaskDequeue extends Task {
});
}
notifyQueueEvent(cs, opts) {
if (opts.event === 'bridged') {
assert(opts.notifyUrl);
this.logger.info({opts}, `TaskDequeue:notifyDequeueEvent: successfully bridged to member from ${this.queueName}`);
async notifyQueueEvent(cs, opts) {
if (opts.event === 'ready') {
assert(opts.notifyUrl && opts.epUuid);
this.partnerUrl = opts.notifyUrl;
this.logger.info({opts}, `TaskDequeue:notifyDequeueEvent: about to bridge member from ${this.queueName}`);
if (this.beep) {
this.logger.debug({opts}, `TaskDequeue:notifyDequeueEvent: playing beep tone ${this.queueName}`);
await this.ep.play(BONG_TONE).catch((err) => {
this.logger.error(err, 'TaskDequeue:notifyDequeueEvent error playing beep');
});
}
await this.ep.bridge(opts.epUuid);
this.emitter.emit('bridged');
this.logger.info({opts}, `TaskDequeue:notifyDequeueEvent: successfully bridged member from ${this.queueName}`);
}
else if (opts.event === 'hangup') {
this.emitter.emit('hangup');

View File

@@ -31,6 +31,7 @@ class TaskEnqueue extends Task {
notifyUrl: this.data._.notifyUrl
};
this.waitStartTime = this.data._.waitStartTime;
this.connectTime = this.data._.connectTime;
}
}
@@ -47,9 +48,12 @@ class TaskEnqueue extends Task {
await this._doWait(cs, dlg, ep);
}
else {
// update dialog's answer time to when it was answered on the previous server, not now
dlg.connectTime = this.connectTime;
await this._doBridge(cs, dlg, ep);
}
if (!this.callMoved) await this.performAction();
await this.awaitTaskDone();
this.logger.debug(`TaskEnqueue:exec - task done queue ${this.queueName}`);
} catch (err) {
@@ -61,6 +65,7 @@ class TaskEnqueue extends Task {
super.kill(cs);
this.logger.info(`TaskEnqueue:kill ${this.queueName}`);
this.emitter.emit('kill');
this.notifyTaskDone();
}
async _addToQueue(cs, dlg) {
@@ -153,7 +158,8 @@ class TaskEnqueue extends Task {
const success = await this.transferCallToFeatureServer(cs, this.bridgeDetails.dequeueSipAddress, {
waitStartTime: this.waitStartTime,
epUuid: this.bridgeDetails.epUuid,
notifyUrl: this.bridgeDetails.notifyUrl
notifyUrl: this.bridgeDetails.notifyUrl,
connectTime: dlg.connectTime.valueOf()
});
/**
@@ -169,30 +175,29 @@ class TaskEnqueue extends Task {
return;
}
this.state = QueueResults.Error;
this.notifyTaskDone();
return;
}
this.logger.info(`TaskEnqueue:_doBridge: queue ${this.queueName} is hosted locally`);
await this._bridgeLocal(cs, dlg, ep);
this.notifyTaskDone();
}
_bridgeLocal(cs, dlg, ep) {
assert(this.bridgeDetails.epUuid && this.bridgeDetails.notifyUrl);
assert(this.bridgeDetails.notifyUrl);
return new Promise(async(resolve, reject) => {
try {
this.other = {epUuid: this.bridgeDetails.epUuid};
// bridge to the dequeuing endpoint
this.logger.debug(`TaskEnqueue:_doBridge: attempting to bridge call to ${this.other.epUuid}`);
await ep.bridge(this.other.epUuid);
this.state = QueueResults.Bridged;
this.logger.info(`TaskEnqueue:_doBridge: successfully bridged to ${this.other.epUuid}`);
// notify partner we are on - give him our possibly new url
// notify partner we are ready to be bridged - giving him our possibly new url and endpoint
const notifyUrl = getUrl(cs);
const url = this.bridgeDetails.notifyUrl;
this.logger.debug('TaskEnqueue:_doBridge: ready to be bridged');
bent('POST', 202)(url, {
event: 'bridged',
notifyUrl: getUrl(cs)
event: 'ready',
epUuid: ep.uuid,
notifyUrl
}).catch((err) => {
this.logger.info({err, url}, 'TaskEnqueue:_bridgeLocal error sending bridged event');
/**
@@ -220,9 +225,10 @@ class TaskEnqueue extends Task {
});
resolve();
});
} catch (err) {
this.state = QueueResults.Error;
this.logger.error(err, `Failed to bridge to ep ${this.other.epUuid}`);
this.logger.error(err, 'TaskEnqueue:_bridgeLocal error');
reject(err);
}
});

View File

@@ -13,7 +13,8 @@
"properties": {
"name": "string",
"actionHook": "object|string",
"timeout": "number"
"timeout": "number",
"beep": "boolean"
},
"required": [
"name"

View File

@@ -106,7 +106,10 @@ class Task extends Emitter {
delete obj.requestor;
delete obj.notifier;
obj.tasks = cs.getRemainingTaskData();
if (opts && obj.tasks.length > 1) obj.tasks[0]._ = opts;
if (opts && obj.tasks.length > 1) {
const key = Object.keys(obj.tasks[0])[0];
Object.assign(obj.tasks[0][key], {_: opts});
}
this.logger.debug({obj}, 'Task:_doRefer');

View File

@@ -81,5 +81,6 @@
"Hangup": "hangup",
"Timeout": "timeout"
},
"MAX_SIMRINGS": 10
"MAX_SIMRINGS": 10,
"BONG_TONE": "tone_stream://v=-7;%(100,0,941.0,1477.0);v=-7;>=2;+=.1;%(1400,0,350,440)"
}