Add try catch to getUpload to catch init errors with invalid credentials (#229)

* add try catch to getUpload to catch init errors with invalid credentials

* properly handle errors occured while streaming

---------

Co-authored-by: Markus Frindt <m.frindt@cognigy.com>
This commit is contained in:
Markus Frindt
2023-09-13 13:52:28 +02:00
committed by GitHub
parent 02c9a951d4
commit ad483ba0b7
2 changed files with 62 additions and 45 deletions

View File

@@ -12,9 +12,9 @@ async function upload(logger, socket) {
socket._recvInitialMetadata = true; socket._recvInitialMetadata = true;
logger.debug(`initial metadata: ${data}`); logger.debug(`initial metadata: ${data}`);
const obj = JSON.parse(data.toString()); const obj = JSON.parse(data.toString());
logger.info({obj}, 'received JSON message from jambonz'); logger.info({ obj }, 'received JSON message from jambonz');
const {sampleRate, accountSid, callSid, direction, from, to, const { sampleRate, accountSid, callSid, direction, from, to,
callId, applicationSid, originatingSipIp, originatingSipTrunkName} = obj; callId, applicationSid, originatingSipIp, originatingSipTrunkName } = obj;
const account = await Account.retrieve(accountSid); const account = await Account.retrieve(accountSid);
if (account && account.length && account[0].bucket_credential) { if (account && account.length && account[0].bucket_credential) {
const obj = account[0].bucket_credential; const obj = account[0].bucket_credential;
@@ -60,26 +60,39 @@ async function upload(logger, socket) {
bitrate: 128 bitrate: 128
}); });
} }
const handleError = (err, streamType) => {
logger.error(
{ err },
`Error while streaming for vendor: ${obj.vendor}, pipe: ${streamType}: ${err.message}`
);
};
/* start streaming data */ /* start streaming data */
const duplex = Websocket.createWebSocketStream(socket); const duplex = Websocket.createWebSocketStream(socket);
duplex.pipe(encoder).pipe(uploadStream); duplex
.on('error', (err) => handleError(err, 'duplex'))
.pipe(encoder)
.on('error', (err) => handleError(err, 'encoder'))
.pipe(uploadStream)
.on('error', (err) => handleError(err, 'uploadStream'));
} else { } else {
logger.info(`account ${accountSid} does not have any bucket credential, close the socket`); logger.info(`account ${accountSid} does not have any bucket credential, close the socket`);
socket.close(); socket.close();
} }
} }
} catch (err) { } catch (err) {
logger.error({err, data}, 'error parsing message during connection'); logger.error({ err, data }, 'error parsing message during connection');
} }
}); });
socket.on('error', function(err) { socket.on('error', function(err) {
logger.error({err}, 'record upload: error'); logger.error({ err }, 'record upload: error');
}); });
socket.on('close', (data) => { socket.on('close', (data) => {
logger.info({data}, 'record upload: close'); logger.info({ data }, 'record upload: close');
}); });
socket.on('end', function(err) { socket.on('end', function(err) {
logger.error({err}, 'record upload: socket closed from jambonz'); logger.error({ err }, 'record upload: socket closed from jambonz');
}); });
} }

View File

@@ -8,43 +8,47 @@ const getUploader = (key, metadata, bucket_credential, logger) => {
objectKey: key, objectKey: key,
metadata metadata
}; };
switch (bucket_credential.vendor) { try {
case 'aws_s3': switch (bucket_credential.vendor) {
uploaderOpts.bucketCredential = { case 'aws_s3':
credentials: { uploaderOpts.bucketCredential = {
accessKeyId: bucket_credential.access_key_id, credentials: {
secretAccessKey: bucket_credential.secret_access_key, accessKeyId: bucket_credential.access_key_id,
}, secretAccessKey: bucket_credential.secret_access_key,
region: bucket_credential.region || 'us-east-1' },
}; region: bucket_credential.region || 'us-east-1'
return new S3MultipartUploadStream(logger, uploaderOpts); };
case 's3_compatible': return new S3MultipartUploadStream(logger, uploaderOpts);
uploaderOpts.bucketCredential = { case 's3_compatible':
endpoint: bucket_credential.endpoint, uploaderOpts.bucketCredential = {
credentials: { endpoint: bucket_credential.endpoint,
accessKeyId: bucket_credential.access_key_id, credentials: {
secretAccessKey: bucket_credential.secret_access_key, accessKeyId: bucket_credential.access_key_id,
}, secretAccessKey: bucket_credential.secret_access_key,
region: 'us-east-1', },
forcePathStyle: true region: 'us-east-1',
}; forcePathStyle: true
return new S3MultipartUploadStream(logger, uploaderOpts); };
case 'google': return new S3MultipartUploadStream(logger, uploaderOpts);
const serviceKey = JSON.parse(bucket_credential.service_key); case 'google':
uploaderOpts.bucketCredential = { const serviceKey = JSON.parse(bucket_credential.service_key);
projectId: serviceKey.project_id, uploaderOpts.bucketCredential = {
credentials: { projectId: serviceKey.project_id,
client_email: serviceKey.client_email, credentials: {
private_key: serviceKey.private_key client_email: serviceKey.client_email,
} private_key: serviceKey.private_key
}; }
return new GoogleStorageUploadStream(logger, uploaderOpts); };
case 'azure': return new GoogleStorageUploadStream(logger, uploaderOpts);
uploaderOpts.connection_string = bucket_credential.connection_string; case 'azure':
return new AzureStorageUploadStream(logger, uploaderOpts); uploaderOpts.connection_string = bucket_credential.connection_string;
default: return new AzureStorageUploadStream(logger, uploaderOpts);
logger.error(`unknown bucket vendor: ${bucket_credential.vendor}`); default:
break; logger.error(`unknown bucket vendor: ${bucket_credential.vendor}`);
break;
}
} catch (err) {
logger.error(`Error creating uploader, vendor: ${bucket_credential.vendor}, reason: ${err.message}`);
} }
return null; return null;
}; };