mirror of
https://github.com/jambonz/jambonz-api-server.git
synced 2025-12-19 05:47:46 +00:00
- Replaced Buffer.concat with chunk accumulation to reduce time complexity during writes. - Introduced bufferedBytes to track total size of accumulated chunks. - Updated upload logic to handle parts more efficiently, minimizing memory overhead. - Enhanced logging in upload function to include selected encoder format for better traceability. (cherry picked from commit ce8bba2f18d807d4872b168e451e4501b1acb824)
102 lines
3.5 KiB
JavaScript
102 lines
3.5 KiB
JavaScript
const Account = require('../models/account');
|
|
const Websocket = require('ws');
|
|
const PCMToMP3Encoder = require('./encoder');
|
|
const wav = require('wav');
|
|
const { getUploader } = require('./utils');
|
|
const { pipeline } = require('stream');
|
|
|
|
async function upload(logger, socket) {
|
|
socket._recvInitialMetadata = false;
|
|
socket.on('message', async function(data, isBinary) {
|
|
try {
|
|
if (!isBinary && !socket._recvInitialMetadata) {
|
|
socket._recvInitialMetadata = true;
|
|
logger.debug(`initial metadata: ${data}`);
|
|
const obj = JSON.parse(data.toString());
|
|
logger.info({ obj }, 'received JSON message from jambonz');
|
|
const { sampleRate, accountSid, callSid, direction, from, to,
|
|
callId, applicationSid, originatingSipIp, originatingSipTrunkName } = obj;
|
|
const account = await Account.retrieve(accountSid);
|
|
if (account && account.length && account[0].bucket_credential) {
|
|
const obj = account[0].bucket_credential;
|
|
// add tags to metadata
|
|
const metadata = {
|
|
accountSid,
|
|
callSid,
|
|
direction,
|
|
from,
|
|
to,
|
|
callId,
|
|
applicationSid,
|
|
originatingSipIp,
|
|
originatingSipTrunkName,
|
|
sampleRate: `${sampleRate}`
|
|
};
|
|
if (obj.tags && obj.tags.length) {
|
|
obj.tags.forEach((tag) => {
|
|
metadata[tag.Key] = tag.Value;
|
|
});
|
|
}
|
|
// create S3 path
|
|
const day = new Date();
|
|
let key = `${day.getFullYear()}/${(day.getMonth() + 1).toString().padStart(2, '0')}`;
|
|
key += `/${day.getDate().toString().padStart(2, '0')}/${callSid}.${account[0].record_format}`;
|
|
|
|
// Uploader
|
|
const uploadStream = getUploader(key, metadata, obj, logger);
|
|
if (!uploadStream) {
|
|
logger.info('There is no available record uploader, close the socket.');
|
|
socket.close();
|
|
}
|
|
|
|
/**encoder */
|
|
let encoder;
|
|
let recordFormat;
|
|
if (account[0].record_format === 'wav') {
|
|
encoder = new wav.Writer({ channels: 2, sampleRate, bitDepth: 16 });
|
|
recordFormat = 'wav';
|
|
} else {
|
|
// default is mp3
|
|
encoder = new PCMToMP3Encoder({
|
|
channels: 2,
|
|
sampleRate: sampleRate,
|
|
bitrate: 128
|
|
}, logger);
|
|
recordFormat = 'mp3';
|
|
}
|
|
logger.info({ record_format: recordFormat, channels: 2, sampleRate }, 'record upload: selected encoder');
|
|
|
|
/* start streaming data */
|
|
pipeline(
|
|
Websocket.createWebSocketStream(socket),
|
|
encoder,
|
|
uploadStream,
|
|
(error) => {
|
|
if (error) {
|
|
logger.error({ error }, 'pipeline error, cannot upload data to storage');
|
|
socket.close();
|
|
}
|
|
}
|
|
);
|
|
} else {
|
|
logger.info(`account ${accountSid} does not have any bucket credential, close the socket`);
|
|
socket.close();
|
|
}
|
|
}
|
|
} catch (err) {
|
|
logger.error({ err, data }, 'error parsing message during connection');
|
|
}
|
|
});
|
|
socket.on('error', function(err) {
|
|
logger.error({ err }, 'record upload: error');
|
|
});
|
|
socket.on('close', (data) => {
|
|
logger.info({ data }, 'record upload: close');
|
|
});
|
|
socket.on('end', function(err) {
|
|
logger.error({ err }, 'record upload: socket closed from jambonz');
|
|
});
|
|
}
|
|
|
|
module.exports = upload;
|