initial support for siprec recording (#36)

* initial support for siprec recording

* handle pause/resume siprec recording
This commit is contained in:
Dave Horton
2022-06-23 16:23:09 -04:00
committed by GitHub
parent c9401ab3c8
commit d2b5597571
5 changed files with 5720 additions and 17 deletions

View File

@@ -1,4 +1,4 @@
FROM node:slim
FROM node:lts-slim
WORKDIR /opt/app/
COPY package.json package-lock.json ./
RUN npm ci

View File

@@ -1,4 +1,5 @@
const Emitter = require('events');
const SrsClient = require('./srs-client');
const {makeRtpEngineOpts, SdpWantsSrtp, makeCallCountKey} = require('./utils');
const {forwardInDialogRequests} = require('drachtio-fn-b2b-sugar');
const {parseUri, stringifyUri, SipError} = require('drachtio-srf');
@@ -62,7 +63,10 @@ class CallSession extends Emitter {
blockDTMF,
unblockDTMF,
subscribeDTMF,
unsubscribeDTMF
unsubscribeDTMF,
subscribeRequest,
subscribeAnswer,
unsubscribe
} = engine;
this.offer = offer;
this.answer = answer;
@@ -73,6 +77,9 @@ class CallSession extends Emitter {
this.unblockDTMF = unblockDTMF;
this.subscribeDTMF = subscribeDTMF;
this.unsubscribeDTMF = unsubscribeDTMF;
this.subscribeRequest = subscribeRequest;
this.subscribeAnswer = subscribeAnswer;
this.unsubscribe = unsubscribe;
const featureServer = await this.getFeatureServer();
if (!featureServer) {
@@ -224,6 +231,12 @@ class CallSession extends Emitter {
this.rtpEngineResource.destroy().catch((err) => {});
this.activeCallIds.delete(callId);
if (dlg.other && dlg.other.connected) dlg.other.destroy().catch((e) => {});
if (this.srsClient) {
this.srsClient.stop();
this.srsClient = null;
}
this.srf.endSession(this.req);
});
@@ -288,6 +301,11 @@ class CallSession extends Emitter {
dlg.other = null;
other.other = null;
if (this.srsClient) {
this.srsClient.stop();
this.srsClient = null;
}
this.logger.info(`call ended with normal termination, there are ${this.activeCallIds.size} active`);
this.srf.endSession(this.req);
});
@@ -449,6 +467,7 @@ Duration=${payload.duration} `
async _onInfo(dlg, req, res) {
const fromTag = dlg.type === 'uas' ? this.rtpEngineOpts.uas.tag : this.rtpEngineOpts.uac.tag;
const toTag = dlg.type === 'uas' ? this.rtpEngineOpts.uac.tag : this.rtpEngineOpts.uas.tag;
try {
if (dlg.type === 'uac' && req.has('X-Reason')) {
const reason = req.get('X-Reason');
@@ -458,16 +477,100 @@ Duration=${payload.duration} `
'from-tag': fromTag
};
this.logger.info(`_onInfo: got request ${reason}`);
res.send(200);
if (reason.startsWith('mute')) {
const response = Promise.all([this.blockMedia(opts), this.blockDTMF(opts)]);
res.send(200);
this.logger.info({response}, `_onInfo: response to rtpengine command for ${reason}`);
}
else if (reason.startsWith('unmute')) {
const response = Promise.all([this.unblockMedia(opts), this.unblockDTMF(opts)]);
res.send(200);
this.logger.info({response}, `_onInfo: response to rtpengine command for ${reason}`);
}
else if (reason.includes('CallRecording')) {
let succeeded = false;
if (reason === 'startCallRecording') {
const from = this.req.getParsedHeader('From');
const to = this.req.getParsedHeader('To');
const aorFrom = from.uri;
const aorTo = to.uri;
this.logger.info({to, from}, 'startCallRecording request for a call');
const srsUrl = req.get('X-Srs-Url');
const srsRecordingId = req.get('X-Srs-Recording-ID');
const callSid = req.get('X-Call-Sid');
const accountSid = req.get('X-Account-Sid');
const applicationSid = req.get('X-Application-Sid');
if (this.srsClient) {
res.send(400);
this.logger.info('discarding duplicate startCallRecording request for a call');
return;
}
if (!srsUrl) {
this.logger.info('startCallRecording request is missing X-Srs-Url header');
res.send(400);
return;
}
this.srsClient = new SrsClient(this.logger, {
srf: dlg.srf,
originalInvite: this.req,
callingNumber: this.req.callingNumber,
calledNumber: this.req.calledNumber,
srsUrl,
srsRecordingId,
callSid,
accountSid,
applicationSid,
rtpEngineOpts: this.rtpEngineOpts,
fromTag,
toTag,
aorFrom,
aorTo,
subscribeRequest: this.subscribeRequest,
subscribeAnswer: this.subscribeAnswer,
del: this.del,
blockMedia: this.blockMedia,
unblockMedia: this.unblockMedia,
unsubscribe: this.unsubscribe
});
try {
succeeded = await this.srsClient.start();
} catch (err) {
this.logger.error({err}, 'Error starting SipRec call recording');
}
}
else if (reason === 'stopCallRecording') {
if (!this.srsClient) {
res.send(400);
this.logger.info('discarding stopCallRecording request because we are not recording');
return;
}
try {
succeeded = await this.srsClient.stop();
} catch (err) {
this.logger.error({err}, 'Error stopping SipRec call recording');
}
this.srsClient = null;
}
else if (reason === 'pauseCallRecording') {
if (!this.srsClient || this.srsClient.paused) {
this.logger.info('discarding invalid pauseCallRecording request');
res.send(400);
return;
}
succeeded = await this.srsClient.pause();
}
else if (reason === 'resumeCallRecording') {
if (!this.srsClient || !this.srsClient.paused) {
res.send(400);
this.logger.info('discarding invalid resumeCallRecording request');
return;
}
succeeded = await this.srsClient.resume();
}
res.send(succeeded ? 200 : 503);
}
}
else {
const immutableHdrs = ['via', 'from', 'to', 'call-id', 'cseq', 'max-forwards', 'content-length'];
@@ -483,6 +586,10 @@ Duration=${payload.duration} `
res.send(response.status, {headers: responseHeaders, body: response.body});
}
} catch (err) {
if (this.srsClient) {
this.srsClient = null;
}
res.send(500);
this.logger.info({err}, `Error handing INFO request on ${dlg.type} leg`);
}
}

249
lib/srs-client.js Normal file
View File

@@ -0,0 +1,249 @@
const Emitter = require('events');
const assert = require('assert');
const transform = require('sdp-transform');
const { v4: uuidv4 } = require('uuid');
const createMultipartSdp = (sdp, {
originalInvite,
srsRecordingId,
callSid,
accountSid,
applicationSid,
sipCallId,
aorFrom,
aorTo,
callingNumber,
calledNumber
}) => {
const sessionId = uuidv4();
const uuidStream1 = uuidv4();
const uuidStream2 = uuidv4();
const participant1 = uuidv4();
const participant2 = uuidv4();
const sipSessionId = originalInvite.get('Call-ID');
const {originator = 'unknown', carrier = 'unknown'} = originalInvite.locals;
const x = `--uniqueBoundary
Content-Disposition: session;handling=required
Content-Type: application/sdp
--sdp-placeholder--
--uniqueBoundary
Content-Disposition: recording-session
Content-Type: application/rs-metadata+xml
<?xml version="1.0" encoding="UTF-8"?>
<recording xmlns="urn:ietf:params:xml:ns:recording:1">
<datamode>complete</datamode>
<session session_id="${sessionId}">
<sipSessionID>${sipSessionId}</sipSessionID>
</session>
<extensiondata xmlns:jb="http://jambonz.org/siprec">
<jb:callsid>${callSid}</jb:callsid>
<jb:accountsid>${accountSid}</jb:accountsid>
<jb:applicationsid>${applicationSid}</jb:applicationsid>
<jb:recordingid>${srsRecordingId}</jb:recordingid>
<jb:originationsource>${originator}</jb:originationsource>
<jb:carrier>${carrier}</jb:carrier>
</extensiondata>
<participant participant_id="${participant1}">
<nameID aor="${aorFrom}">
<name>${callingNumber}</name>
</nameID>
</participant>
<participantsessionassoc participant_id="${participant1}" session_id="${sessionId}">
</participantsessionassoc>
<stream stream_id="${uuidStream1}" session_id="${sessionId}">
<label>1</label>
</stream>
<participant participant_id="${participant2}">
<nameID aor="${aorTo}">
<name>${calledNumber}</name>
</nameID>
</participant>
<participantsessionassoc participant_id="${participant2}" session_id="${sessionId}">
</participantsessionassoc>
<stream stream_id="${uuidStream2}" session_id="${sessionId}">
<label>2</label>
</stream>
<participantstreamassoc participant_id="${participant1}">
<send>${uuidStream1}</send>
<recv>${uuidStream2}</recv>
</participantstreamassoc>
<participantstreamassoc participant_id="${participant2}">
<send>${uuidStream2}</send>
<recv>${uuidStream1}</recv>
</participantstreamassoc>
</recording>`
.replace(/\n/g, '\r\n')
.replace('--sdp-placeholder--', sdp);
return `${x}\r\n`;
};
class SrsClient extends Emitter {
constructor(logger, opts) {
super();
const {
srf,
originalInvite,
calledNumber,
callingNumber,
srsUrl,
srsRecordingId,
callSid,
accountSid,
applicationSid,
srsDestUserName,
rtpEngineOpts,
//fromTag,
toTag,
aorFrom,
aorTo,
subscribeRequest,
subscribeAnswer,
del,
blockMedia,
unblockMedia,
unsubscribe
} = opts;
this.logger = logger;
this.srf = srf;
this.originalInvite = originalInvite;
this.callingNumber = callingNumber;
this.calledNumber = calledNumber;
this.subscribeRequest = subscribeRequest;
this.subscribeAnswer = subscribeAnswer;
this.del = del;
this.blockMedia = blockMedia;
this.unblockMedia = unblockMedia;
this.unsubscribe = unsubscribe;
this.srsUrl = srsUrl;
this.srsRecordingId = srsRecordingId;
this.callSid = callSid;
this.accountSid = accountSid;
this.applicationSid = applicationSid;
this.srsDestUserName = srsDestUserName;
this.rtpEngineOpts = rtpEngineOpts;
this.sipRecFromTag = toTag;
this.aorFrom = aorFrom;
this.aorTo = aorTo;
/* state */
this.activated = false;
this.paused = false;
}
async start() {
assert(!this.activated);
const opts = {
'call-id': this.rtpEngineOpts.common['call-id'],
'from-tag': this.sipRecFromTag
};
let response = await this.subscribeRequest({...opts, label: '1', flags: ['all'], interface: 'public'});
if (response.result !== 'ok') {
this.logger.error({response}, 'SrsClient:start error calling subscribe request');
throw new Error('error calling subscribe request');
}
this.siprecToTag = response['to-tag'];
const parsed = transform.parse(response.sdp);
parsed.name = 'jambonz SRS';
parsed.media[0].label = '1';
parsed.media[1].label = '2';
this.sdpOffer = transform.write(parsed);
const sdp = createMultipartSdp(this.sdpOffer, {
originalInvite: this.originalInvite,
srsRecordingId: this.srsRecordingId,
callSid: this.callSid,
accountSid: this.accountSid,
applicationSid: this.applicationSid,
calledNumber: this.calledNumber,
callingNumber: this.callingNumber,
aorFrom: this.aorFrom,
aorTo: this.aorTo
});
this.logger.info({response}, `SrsClient: sending SDP ${sdp}`);
/* */
try {
this.uac = await this.srf.createUAC(this.srsUrl, {
headers: {
'Content-Type': 'multipart/mixed;boundary=uniqueBoundary',
},
localSdp: sdp
});
} catch (err) {
this.logger.info({err}, `Error sending SIPREC INVITE to ${this.srsUrl}`);
throw err;
}
this.logger.info({sdp: this.uac.remote.sdp}, `SrsClient:start - successfully connected to SRS ${this.srsUrl}`);
response = await this.subscribeAnswer({
...opts,
sdp: this.uac.remote.sdp,
'to-tag': response['to-tag'],
label: '2'
});
if (response.result !== 'ok') {
this.logger.error({response}, 'SrsClient:start error calling subscribe answer');
throw new Error('error calling subscribe answer');
}
this.activated = true;
this.logger.info('successfully established siprec connection');
return true;
}
async stop() {
assert(this.activated);
const opts = {
'call-id': this.rtpEngineOpts.common['call-id'],
'from-tag': this.sipRecFromTag
};
this.del(opts)
//.then((response) => this.logger.debug({response}, 'Successfully stopped siprec media'))
.catch((err) => this.logger.info({err}, 'Error deleting siprec media session'));
this.uac.destroy().catch(() => {});
this.activated = false;
return true;
}
async pause() {
assert(!this.paused);
const opts = {
'call-id': this.rtpEngineOpts.common['call-id'],
'from-tag': this.sipRecFromTag
};
try {
await this.blockMedia(opts);
await this.uac.modify(this.sdpOffer.replace(/sendonly/g, 'inactive'));
this.paused = true;
return true;
} catch (err) {
this.logger.info({err}, 'Error pausing siprec media session');
}
return false;
}
async resume() {
assert(this.paused);
const opts = {
'call-id': this.rtpEngineOpts.common['call-id'],
'from-tag': this.sipRecFromTag
};
try {
await this.blockMedia(opts);
await this.uac.modify(this.sdpOffer);
} catch (err) {
this.logger.info({err}, 'Error resuming siprec media session');
}
return true;
}
}
module.exports = SrsClient;

5373
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -40,6 +40,8 @@
"drachtio-srf": "^4.5.0",
"express": "^4.18.1",
"pino": "^7.11.0",
"sdp-transform": "^2.14.1",
"uuid": "^8.3.2",
"verify-aws-sns-signature": "^0.0.6",
"xml2js": "^0.4.23"
},