Refactor S3MultipartUploadStream to optimize buffer handling and improve upload efficiency (#494)

- Replaced Buffer.concat with chunk accumulation to reduce time complexity during writes.
- Introduced bufferedBytes to track total size of accumulated chunks.
- Updated upload logic to handle parts more efficiently, minimizing memory overhead.
- Enhanced logging in upload function to include selected encoder format for better traceability.

(cherry picked from commit ce8bba2f18d807d4872b168e451e4501b1acb824)
This commit is contained in:
RJ Burnham
2025-09-04 12:34:19 +01:00
committed by GitHub
parent 2e0ea56925
commit a297d2038f
2 changed files with 23 additions and 9 deletions

View File

@@ -15,7 +15,9 @@ class S3MultipartUploadStream extends Writable {
this.uploadId = null; this.uploadId = null;
this.partNumber = 1; this.partNumber = 1;
this.multipartETags = []; this.multipartETags = [];
this.buffer = Buffer.alloc(0); // accumulate incoming chunks to avoid O(n^2) Buffer.concat on every write
this.chunks = [];
this.bufferedBytes = 0;
this.minPartSize = 5 * 1024 * 1024; // 5 MB this.minPartSize = 5 * 1024 * 1024; // 5 MB
this.s3 = new S3Client(opts.bucketCredential); this.s3 = new S3Client(opts.bucketCredential);
this.metadata = opts.metadata; this.metadata = opts.metadata;
@@ -31,13 +33,13 @@ class S3MultipartUploadStream extends Writable {
return response.UploadId; return response.UploadId;
} }
async _uploadBuffer() { async _uploadPart(bodyBuffer) {
const uploadPartCommand = new UploadPartCommand({ const uploadPartCommand = new UploadPartCommand({
Bucket: this.bucketName, Bucket: this.bucketName,
Key: this.objectKey, Key: this.objectKey,
PartNumber: this.partNumber, PartNumber: this.partNumber,
UploadId: this.uploadId, UploadId: this.uploadId,
Body: this.buffer, Body: bodyBuffer,
}); });
const uploadPartResponse = await this.s3.send(uploadPartCommand); const uploadPartResponse = await this.s3.send(uploadPartCommand);
@@ -54,11 +56,16 @@ class S3MultipartUploadStream extends Writable {
this.uploadId = await this._initMultipartUpload(); this.uploadId = await this._initMultipartUpload();
} }
this.buffer = Buffer.concat([this.buffer, chunk]); // accumulate without concatenating on every write
this.chunks.push(chunk);
this.bufferedBytes += chunk.length;
if (this.buffer.length >= this.minPartSize) { if (this.bufferedBytes >= this.minPartSize) {
await this._uploadBuffer(); const partBuffer = Buffer.concat(this.chunks, this.bufferedBytes);
this.buffer = Buffer.alloc(0); // reset accumulators before awaiting upload to allow GC
this.chunks = [];
this.bufferedBytes = 0;
await this._uploadPart(partBuffer);
} }
callback(null); callback(null);
@@ -69,8 +76,11 @@ class S3MultipartUploadStream extends Writable {
async _finalize(err) { async _finalize(err) {
try { try {
if (this.buffer.length > 0) { if (this.bufferedBytes > 0) {
await this._uploadBuffer(); const finalBuffer = Buffer.concat(this.chunks, this.bufferedBytes);
this.chunks = [];
this.bufferedBytes = 0;
await this._uploadPart(finalBuffer);
} }
const completeMultipartUploadCommand = new CompleteMultipartUploadCommand({ const completeMultipartUploadCommand = new CompleteMultipartUploadCommand({

View File

@@ -51,8 +51,10 @@ async function upload(logger, socket) {
/**encoder */ /**encoder */
let encoder; let encoder;
let recordFormat;
if (account[0].record_format === 'wav') { if (account[0].record_format === 'wav') {
encoder = new wav.Writer({ channels: 2, sampleRate, bitDepth: 16 }); encoder = new wav.Writer({ channels: 2, sampleRate, bitDepth: 16 });
recordFormat = 'wav';
} else { } else {
// default is mp3 // default is mp3
encoder = new PCMToMP3Encoder({ encoder = new PCMToMP3Encoder({
@@ -60,7 +62,9 @@ async function upload(logger, socket) {
sampleRate: sampleRate, sampleRate: sampleRate,
bitrate: 128 bitrate: 128
}, logger); }, logger);
recordFormat = 'mp3';
} }
logger.info({ record_format: recordFormat, channels: 2, sampleRate }, 'record upload: selected encoder');
/* start streaming data */ /* start streaming data */
pipeline( pipeline(