S3 compatible storage (#228)

* compatible credential test

* support s3 compatible storages

* fix typo

* change logging

* add missing option
This commit is contained in:
Anton Voylenko
2023-09-12 19:25:06 +03:00
committed by GitHub
parent d5f5e3a86f
commit 02c9a951d4
6 changed files with 47 additions and 25 deletions

View File

@@ -11,7 +11,7 @@ class S3MultipartUploadStream extends Writable {
super(opts);
this.logger = logger;
this.bucketName = opts.bucketName;
this.objectKey = opts.Key;
this.objectKey = opts.objectKey;
this.uploadId = null;
this.partNumber = 1;
this.multipartETags = [];

View File

@@ -5,7 +5,6 @@ const wav = require('wav');
const { getUploader } = require('./utils');
async function upload(logger, socket) {
socket._recvInitialMetadata = false;
socket.on('message', async function(data, isBinary) {
try {
@@ -39,11 +38,11 @@ async function upload(logger, socket) {
}
// create S3 path
const day = new Date();
let Key = `${day.getFullYear()}/${(day.getMonth() + 1).toString().padStart(2, '0')}`;
Key += `/${day.getDate().toString().padStart(2, '0')}/${callSid}.${account[0].record_format}`;
let key = `${day.getFullYear()}/${(day.getMonth() + 1).toString().padStart(2, '0')}`;
key += `/${day.getDate().toString().padStart(2, '0')}/${callSid}.${account[0].record_format}`;
// Uploader
const uploadStream = getUploader(Key, metadata, obj, logger);
const uploadStream = getUploader(key, metadata, obj, logger);
if (!uploadStream) {
logger.info('There is no available record uploader, close the socket.');
socket.close();
@@ -74,13 +73,13 @@ async function upload(logger, socket) {
}
});
socket.on('error', function(err) {
logger.error({err}, 'aws upload: error');
logger.error({err}, 'record upload: error');
});
socket.on('close', (data) => {
logger.info({data}, 'aws_s3: close');
logger.info({data}, 'record upload: close');
});
socket.on('end', function(err) {
logger.error({err}, 'aws upload: socket closed from jambonz');
logger.error({err}, 'record upload: socket closed from jambonz');
});
}

View File

@@ -2,10 +2,10 @@ const AzureStorageUploadStream = require('./azure-storage');
const GoogleStorageUploadStream = require('./google-storage');
const S3MultipartUploadStream = require('./s3-multipart-upload-stream');
const getUploader = (Key, metadata, bucket_credential, logger) => {
const getUploader = (key, metadata, bucket_credential, logger) => {
const uploaderOpts = {
bucketName: bucket_credential.name,
Key,
objectKey: key,
metadata
};
switch (bucket_credential.vendor) {
@@ -18,6 +18,17 @@ const getUploader = (Key, metadata, bucket_credential, logger) => {
region: bucket_credential.region || 'us-east-1'
};
return new S3MultipartUploadStream(logger, uploaderOpts);
case 's3_compatible':
uploaderOpts.bucketCredential = {
endpoint: bucket_credential.endpoint,
credentials: {
accessKeyId: bucket_credential.access_key_id,
secretAccessKey: bucket_credential.secret_access_key,
},
region: 'us-east-1',
forcePathStyle: true
};
return new S3MultipartUploadStream(logger, uploaderOpts);
case 'google':
const serviceKey = JSON.parse(bucket_credential.service_key);
uploaderOpts.bucketCredential = {
@@ -31,7 +42,6 @@ const getUploader = (Key, metadata, bucket_credential, logger) => {
case 'azure':
uploaderOpts.connection_string = bucket_credential.connection_string;
return new AzureStorageUploadStream(logger, uploaderOpts);
default:
logger.error(`unknown bucket vendor: ${bucket_credential.vendor}`);
break;

View File

@@ -24,7 +24,7 @@ const {
const short = require('short-uuid');
const VoipCarrier = require('../../models/voip-carrier');
const { encrypt } = require('../../utils/encrypt-decrypt');
const { testAwsS3, testGoogleStorage, testAzureStorage } = require('../../utils/storage-utils');
const { testS3Storage, testGoogleStorage, testAzureStorage } = require('../../utils/storage-utils');
const translator = short();
let idx = 0;
@@ -544,7 +544,8 @@ function encryptBucketCredential(obj) {
secret_access_key,
tags,
service_key,
connection_string
connection_string,
endpoint
} = obj.bucket_credential;
switch (vendor) {
@@ -557,6 +558,15 @@ function encryptBucketCredential(obj) {
secret_access_key, tags});
obj.bucket_credential = encrypt(awsData);
break;
case 's3_compatible':
assert(access_key_id, 'invalid aws S3 bucket credential: access_key_id is required');
assert(secret_access_key, 'invalid aws S3 bucket credential: secret_access_key is required');
assert(name, 'invalid aws bucket name: name is required');
assert(endpoint, 'invalid endpoint uri: endpoint is required');
const s3Data = JSON.stringify({vendor, endpoint, name, access_key_id,
secret_access_key, tags});
obj.bucket_credential = encrypt(s3Data);
break;
case 'google':
assert(service_key, 'invalid google cloud storage credential: service_key is required');
const googleData = JSON.stringify({vendor, name, service_key, tags});
@@ -572,7 +582,7 @@ function encryptBucketCredential(obj) {
obj.bucket_credential = null;
break;
default:
throw new DbErrorBadRequest(`unknow storage vendor: ${vendor}`);
throw new DbErrorBadRequest(`unknown storage vendor: ${vendor}`);
}
}
@@ -722,14 +732,18 @@ router.post('/:sid/BucketCredentialTest', async(req, res) => {
try {
const account_sid = parseAccountSid(req);
await validateRequest(req, account_sid);
const {vendor, name, region, access_key_id, secret_access_key, service_key, connection_string} = req.body;
const {vendor, name, region, access_key_id, secret_access_key, service_key, connection_string, endpoint} = req.body;
const ret = {
status: 'not tested'
};
switch (vendor) {
case 'aws_s3':
await testAwsS3(logger, {vendor, name, region, access_key_id, secret_access_key});
await testS3Storage(logger, {vendor, name, region, access_key_id, secret_access_key});
ret.status = 'ok';
break;
case 's3_compatible':
await testS3Storage(logger, {vendor, name, endpoint, access_key_id, secret_access_key});
ret.status = 'ok';
break;
case 'google':

View File

@@ -138,6 +138,7 @@ router.get('/:call_sid/record/:year/:month/:day/:format', async(req, res) => {
let stream;
switch (bucket_credential.vendor) {
case 'aws_s3':
case 's3_compatible':
stream = await getS3Object(logger, getOptions);
break;
case 'google':
@@ -181,6 +182,7 @@ router.delete('/:call_sid/record/:year/:month/:day/:format', async(req, res) =>
switch (bucket_credential.vendor) {
case 'aws_s3':
case 's3_compatible':
await deleteS3Object(logger, deleteOptions);
break;
case 'google':
@@ -193,11 +195,11 @@ router.delete('/:call_sid/record/:year/:month/:day/:format', async(req, res) =>
logger.error(`There is no handler for deleting record from ${bucket_credential.vendor}`);
return res.sendStatus(500);
}
res.sendStatus(204);
} catch (err) {
logger.error({err}, ` error deleting recording ${call_sid}`);
res.sendStatus(404);
}
});
module.exports = router;

View File

@@ -74,7 +74,7 @@ async function deleteGoogleStorageObject(logger, opts) {
await file.delete();
}
// AWS S3
// S3
function _initS3Client(opts) {
return new S3Client({
@@ -82,21 +82,19 @@ function _initS3Client(opts) {
accessKeyId: opts.access_key_id,
secretAccessKey: opts.secret_access_key,
},
region: opts.region || 'us-east-1'
region: opts.region || 'us-east-1',
...(opts.vendor === 's3_compatible' && { endpoint: opts.endpoint, forcePathStyle: true })
});
}
async function testAwsS3(logger, opts) {
async function testS3Storage(logger, opts) {
const s3 = _initS3Client(opts);
const input = {
'Body': 'Hello From Jambonz',
'Bucket': opts.name,
'Key': 'jambonz-sample.text'
};
const command = new PutObjectCommand(input);
await s3.send(command);
}
@@ -114,7 +112,6 @@ async function getS3Object(logger, opts) {
async function deleteS3Object(logger, opts) {
const s3 = _initS3Client(opts);
const command = new DeleteObjectCommand(
{
Bucket: opts.name,
@@ -125,7 +122,7 @@ async function deleteS3Object(logger, opts) {
}
module.exports = {
testAwsS3,
testS3Storage,
getS3Object,
deleteS3Object,
testGoogleStorage,