mirror of
https://github.com/jambonz/jambonz-feature-server.git
synced 2025-12-19 04:17:44 +00:00
* fixed dial transcribe is not able to received final transcribe when close call. * wip * fix review comment * support call session delay detroy ep when current task is transcribe * wip * wip * fixed review comments * fixed review comments
158 lines
5.1 KiB
JavaScript
158 lines
5.1 KiB
JavaScript
const Task = require('./task');
|
|
const {TaskName, TaskPreconditions, DequeueResults, BONG_TONE} = require('../utils/constants');
|
|
const Emitter = require('events');
|
|
const bent = require('bent');
|
|
const assert = require('assert');
|
|
const { sleepFor } = require('../utils/helpers');
|
|
|
|
const getUrl = (cs) => `${cs.srf.locals.serviceUrl}/v1/dequeue/${cs.callSid}`;
|
|
|
|
class TaskDequeue extends Task {
|
|
constructor(logger, opts, parentTask) {
|
|
super(logger, opts);
|
|
this.preconditions = TaskPreconditions.Endpoint;
|
|
|
|
this.queueName = this.data.name;
|
|
this.timeout = this.data.timeout || 0;
|
|
this.beep = this.data.beep === true;
|
|
this.callSid = this.data.callSid;
|
|
|
|
this.emitter = new Emitter();
|
|
this.state = DequeueResults.Timeout;
|
|
}
|
|
|
|
get name() { return TaskName.Dequeue; }
|
|
|
|
async exec(cs, {ep}) {
|
|
await super.exec(cs);
|
|
this.ep = ep;
|
|
this.queueName = `queue:${cs.accountSid}:${this.queueName}`;
|
|
|
|
const url = await this._getMemberFromQueue(cs);
|
|
if (!url) this.performAction({dequeueResult: 'timeout'}).catch((err) => {});
|
|
else {
|
|
try {
|
|
await this._dequeueUrl(cs, ep, url);
|
|
this.performAction({dequeueResult: 'complete'}).catch((err) => {});
|
|
} catch (err) {
|
|
this.emitter.removeAllListeners();
|
|
this.performAction({dequeueResult: 'hangup'}).catch((err) => {});
|
|
}
|
|
}
|
|
}
|
|
|
|
async kill(cs) {
|
|
super.kill(cs);
|
|
if (this.state === DequeueResults.Bridged) {
|
|
this.logger.info(`TaskDequeue:kill - notifying partner we are going away ${this.partnerUrl}`);
|
|
bent('POST', 202)(this.partnerUrl, {event: 'hangup'}).catch((err) => {
|
|
this.logger.info(err, 'TaskDequeue:kill error notifying partner of hangup');
|
|
});
|
|
}
|
|
this.emitter.emit('kill');
|
|
}
|
|
|
|
_getMemberFromQueue(cs) {
|
|
const {retrieveFromSortedSet, retrieveByPatternSortedSet} = cs.srf.locals.dbHelpers;
|
|
|
|
return new Promise(async(resolve) => {
|
|
let timer;
|
|
let timedout = false, found = false;
|
|
if (this.timeout > 0) {
|
|
timer = setTimeout(() => {
|
|
this.logger.info(`TaskDequeue:_getMemberFromQueue timed out after ${this.timeout}s`);
|
|
timedout = true;
|
|
resolve();
|
|
}, this.timeout * 1000);
|
|
}
|
|
|
|
await sleepFor(1000); // to avoid clipping if we dial and immediately connect
|
|
|
|
do {
|
|
try {
|
|
let url;
|
|
if (this.callSid) {
|
|
const r = await retrieveByPatternSortedSet(this.queueName, `*${this.callSid}`);
|
|
url = r[0];
|
|
} else {
|
|
url = await retrieveFromSortedSet(this.queueName);
|
|
}
|
|
if (url) {
|
|
found = true;
|
|
clearTimeout(timer);
|
|
this.logger.info(`TaskDequeue:_getMemberFromQueue popped ${url} from queue ${this.queueName}`);
|
|
resolve(url);
|
|
}
|
|
} catch (err) {
|
|
this.logger.debug({err}, 'TaskDequeue:_getMemberFromQueue error Sorted Set');
|
|
}
|
|
await sleepFor(5000);
|
|
} while (!this.killed && !timedout && !found);
|
|
});
|
|
}
|
|
|
|
_dequeueUrl(cs, ep, url) {
|
|
this.partnerUrl = url;
|
|
|
|
return new Promise(async(resolve, reject) => {
|
|
let bridgeTimer;
|
|
this.emitter
|
|
.on('bridged', () => {
|
|
clearTimeout(bridgeTimer);
|
|
this.state = DequeueResults.Bridged;
|
|
})
|
|
.on('hangup', () => {
|
|
this.logger.info('TaskDequeue:_dequeueUrl hangup from partner');
|
|
resolve();
|
|
})
|
|
.on('kill', () => {
|
|
resolve();
|
|
});
|
|
|
|
// now notify partner to bridge to me
|
|
try {
|
|
// TODO: if we have a confirmHook, retrieve the app and pass it on
|
|
await bent('POST', 202)(url, {
|
|
event: 'dequeue',
|
|
dequeueSipAddress: cs.srf.locals.localSipAddress,
|
|
epUuid: ep.uuid,
|
|
notifyUrl: getUrl(cs),
|
|
dequeuer: cs.callInfo.toJSON()
|
|
});
|
|
this.logger.info(`TaskDequeue:_dequeueUrl successfully sent POST to ${url}`);
|
|
bridgeTimer = setTimeout(() => reject(new Error('bridge timeout')), 20000);
|
|
} catch (err) {
|
|
this.logger.info({err, url}, `TaskDequeue:_dequeueUrl error dequeueing from ${this.queueName}, try again`);
|
|
reject(new Error('bridge failure'));
|
|
}
|
|
});
|
|
}
|
|
|
|
async notifyQueueEvent(cs, opts) {
|
|
if (opts.event === 'ready') {
|
|
assert(opts.notifyUrl && opts.epUuid);
|
|
this.partnerUrl = opts.notifyUrl;
|
|
this.logger.info({opts}, `TaskDequeue:notifyDequeueEvent: about to bridge member from ${this.queueName}`);
|
|
|
|
if (this.beep) {
|
|
this.logger.debug({opts}, `TaskDequeue:notifyDequeueEvent: playing beep tone ${this.queueName}`);
|
|
await this.ep.play(BONG_TONE).catch((err) => {
|
|
this.logger.error(err, 'TaskDequeue:notifyDequeueEvent error playing beep');
|
|
});
|
|
}
|
|
await this.ep.bridge(opts.epUuid);
|
|
this.emitter.emit('bridged');
|
|
this.logger.info({opts}, `TaskDequeue:notifyDequeueEvent: successfully bridged member from ${this.queueName}`);
|
|
}
|
|
else if (opts.event === 'hangup') {
|
|
this.emitter.emit('hangup');
|
|
}
|
|
else {
|
|
this.logger.error({opts}, 'TaskDequeue:notifyDequeueEvent - unsupported event/payload');
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
module.exports = TaskDequeue;
|