Feature/fast http client (#132)

* initial changes to use undici for http client and connection pooling

* use body.json() mixin

* logging

* add pipelining env var

* implement socket close
This commit is contained in:
Dave Horton
2022-07-18 15:32:03 +02:00
committed by GitHub
parent bb9c3a8df0
commit 6979affb86
4 changed files with 96 additions and 31 deletions

View File

@@ -1,9 +1,11 @@
const bent = require('bent');
const {Client, Pool} = require('undici');
const parseUrl = require('parse-url');
const assert = require('assert');
const BaseRequestor = require('./base-requestor');
const {HookMsgTypes} = require('./constants.json');
const snakeCaseKeys = require('./snakecase-keys');
const pools = new Map();
const HTTP_TIMEOUT = 10000;
const toBase64 = (str) => Buffer.from(str || '', 'utf8').toString('base64');
@@ -22,21 +24,41 @@ class HttpRequestor extends BaseRequestor {
this.method = hook.method || 'POST';
this.authHeader = basicAuth(hook.username, hook.password);
const u = parseUrl(this.url);
//const myPort = u.port ? `:${u.port}` : '';
const baseUrl = this._baseUrl = `${u.protocol}://${u.resource}`;
this.get = bent(baseUrl, 'GET', 'buffer', 200, 201);
this.post = bent(baseUrl, 'POST', 'buffer', 200, 201);
assert(this._isAbsoluteUrl(this.url));
assert(['GET', 'POST'].includes(this.method));
const u = this._parsedUrl = parseUrl(this.url);
this._baseUrl = `${u.protocol}://${u.resource}`;
this._resource = u.resource;
this._protocol = u.protocol;
this._usePools = process.env.HTTP_POOL && parseInt(process.env.HTTP_POOL);
if (this._usePools) {
if (pools.has(this._baseUrl)) {
this.client = pools.get(this._baseUrl);
}
else {
const connections = process.env.HTTP_POOLSIZE ? parseInt(process.env.HTTP_POOLSIZE) : 10;
const pipelining = process.env.HTTP_PIPELINING ? parseInt(process.env.HTTP_PIPELINING) : 1;
const pool = this.client = new Pool(this._baseUrl, {
connections,
pipelining
});
pools.set(this._baseUrl, pool);
this.logger.debug(`HttpRequestor:created pool for ${this._baseUrl}`);
}
}
else this.client = new Client(`${u.protocol}://${u.resource}`);
}
get baseUrl() {
return this._baseUrl;
}
close() {
if (!this._usePools && !this.client?.closed) this.client.close();
}
/**
* Make an HTTP request.
* All requests use json bodies.
@@ -57,6 +79,7 @@ class HttpRequestor extends BaseRequestor {
const payload = params ? snakeCaseKeys(params, ['customerData', 'sip']) : null;
const url = hook.url || hook;
const method = hook.method || 'POST';
let buf = '';
assert.ok(url, 'HttpRequestor:request url was not provided');
assert.ok, (['GET', 'POST'].includes(method), `HttpRequestor:request method must be 'GET' or 'POST' not ${method}`);
@@ -64,14 +87,46 @@ class HttpRequestor extends BaseRequestor {
this.logger.debug({url: urlInfo, method: methodInfo, payload}, `HttpRequestor:request ${method} ${url}`);
const startAt = process.hrtime();
let buf;
let newClient;
try {
let client, path;
if (this._isRelativeUrl(url)) {
client = this.client;
path = url;
}
else {
const u = parseUrl(url);
if (u.resource === this._resource && u.protocol === this._protocol) {
client = this.client;
path = u.pathname;
}
else {
client = newClient = new Client(`${u.protocol}://${u.resource}`);
path = u.pathname;
}
}
const sigHeader = this._generateSigHeader(payload, this.secret);
const headers = {...sigHeader, ...this.authHeader, ...httpHeaders};
this.logger.debug({url, headers}, 'send webhook');
buf = this._isRelativeUrl(url) ?
await this.post(url, payload, headers) :
await bent(method, 'buffer', 200, 201, 202)(url, payload, headers);
const hdrs = {
...sigHeader,
...this.authHeader,
...httpHeaders,
...('POST' === method && {'Content-Type': 'application/json'})
};
const absUrl = this._isRelativeUrl(url) ? `${this.baseUrl}${url}` : url;
this.logger.debug({url, absUrl, hdrs}, 'send webhook');
const {statusCode, headers, body} = await client.request({
path,
method,
headers: hdrs,
...('POST' === method && {body: JSON.stringify(payload)}),
timeout: HTTP_TIMEOUT,
followRedirects: false
});
if (![200, 202, 204].includes(statusCode)) throw new Error({statusCode});
if (headers['content-type'].includes('application/json')) {
buf = await body.json();
}
if (newClient) newClient.close();
} catch (err) {
if (err.statusCode) {
this.logger.info({baseUrl: this.baseUrl, url},
@@ -93,20 +148,15 @@ class HttpRequestor extends BaseRequestor {
}
this.Alerter.writeAlerts(opts).catch((err) => this.logger.info({err, opts}, 'Error writing alert'));
if (newClient) newClient.close();
throw err;
}
const rtt = this._roundTrip(startAt);
if (buf) this.stats.histogram('app.hook.response_time', rtt, ['hook_type:app']);
if (buf && buf.toString().length > 0) {
try {
const json = JSON.parse(buf.toString());
this.logger.info({response: json}, `HttpRequestor:request ${method} ${url} succeeded in ${rtt}ms`);
return json;
}
catch (err) {
//this.logger.debug({err, url, method}, `HttpRequestor:request returned non-JSON content: '${buf.toString()}'`);
}
if (buf && Array.isArray(buf)) {
this.logger.info({response: buf}, `HttpRequestor:request ${method} ${url} succeeded in ${rtt}ms`);
return buf;
}
}
}

View File

@@ -158,7 +158,7 @@ class WsRequestor extends BaseRequestor {
close() {
this.closedGracefully = true;
this.logger.info('WsRequestor:close closing socket');
this.logger.debug('WsRequestor:close closing socket');
try {
if (this.ws) {
this.ws.close();