Feat/record upload buffer (#285)

* uploader with buffer for google and azure

* wip

* wip

* wip
This commit is contained in:
Hoan Luu Huu
2024-01-15 21:51:10 +07:00
committed by GitHub
parent 556717a9a4
commit 8b2a2e196e
5 changed files with 76 additions and 10 deletions

View File

@@ -1,28 +1,58 @@
const { Writable } = require('stream');
const { BlobServiceClient } = require('@azure/storage-blob');
const { v4: uuidv4 } = require('uuid');
const streamBuffers = require('stream-buffers');
class AzureStorageUploadStream extends Writable {
constructor(logger, opts) {
super(opts);
const blobServiceClient = BlobServiceClient.fromConnectionString(opts.connection_string);
this.blockBlobClient = blobServiceClient.getContainerClient(opts.bucketName).getBlockBlobClient(opts.Key);
this.metadata = opts.metadata;
this.blocks = [];
this.bufferSize = 2 * 1024 * 1024; // Buffer size set to 2MB
this.buffer = new streamBuffers.WritableStreamBuffer({
initialSize: this.bufferSize,
incrementAmount: this.bufferSize
});
}
async _write(chunk, encoding, callback) {
const blockID = uuidv4().replace(/-/g, '');
this.blocks.push(blockID);
try {
await this.blockBlobClient.stageBlock(blockID, chunk, chunk.length);
this.buffer.write(chunk, encoding);
if (this.buffer.size() >= this.bufferSize) {
const blockID = uuidv4().replace(/-/g, '');
this.blocks.push(blockID);
try {
const dataToWrite = this.buffer.getContents();
await this.blockBlobClient.stageBlock(blockID, dataToWrite, dataToWrite.length);
callback();
} catch (error) {
callback(error);
}
} else {
callback();
} catch (error) {
callback(error);
}
}
async _final(callback) {
// Write any remaining data in buffer
if (this.buffer.size() > 0) {
const remainingData = this.buffer.getContents();
const blockID = uuidv4().replace(/-/g, '');
this.blocks.push(blockID);
try {
await this.blockBlobClient.stageBlock(blockID, remainingData, remainingData.length);
} catch (error) {
callback(error);
return;
}
}
try {
await this.blockBlobClient.commitBlockList(this.blocks);
// remove all null/undefined props

View File

@@ -1,5 +1,6 @@
const { Storage } = require('@google-cloud/storage');
const { Writable } = require('stream');
const streamBuffers = require('stream-buffers');
class GoogleStorageUploadStream extends Writable {
@@ -12,18 +13,38 @@ class GoogleStorageUploadStream extends Writable {
this.gcsFile = storage.bucket(opts.bucketName).file(opts.Key);
this.writeStream = this.gcsFile.createWriteStream();
this.bufferSize = 2 * 1024 * 1024; // Buffer size set to 2MB
this.buffer = new streamBuffers.WritableStreamBuffer({
initialSize: this.bufferSize,
incrementAmount: this.bufferSize
});
this.writeStream.on('error', (err) => this.logger.error(err));
this.writeStream.on('finish', () => {
this.logger.info('google storage Upload completed.');
this.logger.info('Google storage Upload completed.');
this._addMetadata();
});
}
_write(chunk, encoding, callback) {
this.writeStream.write(chunk, encoding, callback);
this.buffer.write(chunk, encoding);
// Write to GCS when buffer reaches desired size
if (this.buffer.size() >= this.bufferSize) {
const dataToWrite = this.buffer.getContents();
this.writeStream.write(dataToWrite, callback);
} else {
callback();
}
}
_final(callback) {
// Write any remaining data in the buffer to GCS
if (this.buffer.size() > 0) {
const remainingData = this.buffer.getContents();
this.writeStream.write(remainingData);
}
this.writeStream.end();
this.writeStream.once('finish', callback);
}
@@ -33,7 +54,7 @@ class GoogleStorageUploadStream extends Writable {
await this.gcsFile.setMetadata({metadata: this.metadata});
this.logger.info('Google storage Upload and metadata setting completed.');
} catch (err) {
this.logger.error(err, 'Google storage An error occurred while setting metadata');
this.logger.error(err, 'Google storage An error occurred while setting metadata');
}
}
}

View File

@@ -16,7 +16,7 @@ class S3MultipartUploadStream extends Writable {
this.partNumber = 1;
this.multipartETags = [];
this.buffer = Buffer.alloc(0);
this.minPartSize = 5 * 1024 * 1024; // 5 MB
this.minPartSize = 2 * 1024 * 1024; // 5 MB
this.s3 = new S3Client(opts.bucketCredential);
this.metadata = opts.metadata;
}

14
package-lock.json generated
View File

@@ -42,6 +42,7 @@
"passport-http-bearer": "^1.0.1",
"pino": "^5.17.0",
"short-uuid": "^4.1.0",
"stream-buffers": "^3.0.2",
"stripe": "^8.222.0",
"swagger-ui-express": "^4.4.0",
"uuid": "^8.3.2",
@@ -10138,6 +10139,14 @@
"node": ">= 0.4"
}
},
"node_modules/stream-buffers": {
"version": "3.0.2",
"resolved": "https://registry.npmjs.org/stream-buffers/-/stream-buffers-3.0.2.tgz",
"integrity": "sha512-DQi1h8VEBA/lURbSwFtEHnSTb9s2/pwLEaFuNhXwy1Dx3Sa0lOuYT2yNUr4/j2fs8oCAMANtrZ5OrPZtyVs3MQ==",
"engines": {
"node": ">= 0.10.0"
}
},
"node_modules/stream-events": {
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/stream-events/-/stream-events-1.0.5.tgz",
@@ -19029,6 +19038,11 @@
"internal-slot": "^1.0.4"
}
},
"stream-buffers": {
"version": "3.0.2",
"resolved": "https://registry.npmjs.org/stream-buffers/-/stream-buffers-3.0.2.tgz",
"integrity": "sha512-DQi1h8VEBA/lURbSwFtEHnSTb9s2/pwLEaFuNhXwy1Dx3Sa0lOuYT2yNUr4/j2fs8oCAMANtrZ5OrPZtyVs3MQ=="
},
"stream-events": {
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/stream-events/-/stream-events-1.0.5.tgz",

View File

@@ -52,6 +52,7 @@
"passport-http-bearer": "^1.0.1",
"pino": "^5.17.0",
"short-uuid": "^4.1.0",
"stream-buffers": "^3.0.2",
"stripe": "^8.222.0",
"swagger-ui-express": "^4.4.0",
"uuid": "^8.3.2",