mirror of
https://github.com/jambonz/jambonz-feature-server.git
synced 2025-12-20 08:40:38 +00:00
add retry for http/ws requestor (#1210)
* add retry for http requestor * fix failing testcase * wip * update ws-requestor * wip * wip * wip
This commit is contained in:
@@ -79,7 +79,44 @@ class BaseRequestor extends Emitter {
|
||||
return time.toFixed(0);
|
||||
}
|
||||
|
||||
_parseHashParams(hash) {
|
||||
// Remove the leading # if present
|
||||
const hashString = hash.startsWith('#') ? hash.substring(1) : hash;
|
||||
// Use URLSearchParams for parsing
|
||||
const params = new URLSearchParams(hashString);
|
||||
// Convert to a regular object
|
||||
const result = {};
|
||||
for (const [key, value] of params.entries()) {
|
||||
result[key] = value;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the error should be retried based on retry policy
|
||||
* @param {Error} err - The error that occurred
|
||||
* @param {string[]} rpValues - Array of retry policy values
|
||||
* @returns {boolean} True if the error should be retried
|
||||
*/
|
||||
_shouldRetry(err, rpValues) {
|
||||
// ct = connection timeout (ECONNREFUSED, ETIMEDOUT, etc)
|
||||
const isCt = err.code === 'ECONNREFUSED' ||
|
||||
err.code === 'ETIMEDOUT' ||
|
||||
err.code === 'ECONNRESET' ||
|
||||
err.code === 'ECONNABORTED';
|
||||
// rt = request timeout
|
||||
const isRt = err.name === 'TimeoutError';
|
||||
// 4xx = client errors
|
||||
const is4xx = err.statusCode >= 400 && err.statusCode < 500;
|
||||
// 5xx = server errors
|
||||
const is5xx = err.statusCode >= 500 && err.statusCode < 600;
|
||||
// Check if error type is included in retry policy
|
||||
return rpValues.includes('all') ||
|
||||
(isCt && rpValues.includes('ct')) ||
|
||||
(isRt && rpValues.includes('rt')) ||
|
||||
(is4xx && rpValues.includes('4xx')) ||
|
||||
(is5xx && rpValues.includes('5xx'));
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = BaseRequestor;
|
||||
|
||||
@@ -43,6 +43,7 @@ class HttpRequestor extends BaseRequestor {
|
||||
|
||||
this.method = hook.method || 'POST';
|
||||
this.authHeader = basicAuth(hook.username, hook.password);
|
||||
this.backoffMs = 500;
|
||||
|
||||
assert(this._isAbsoluteUrl(this.url));
|
||||
assert(['GET', 'POST'].includes(this.method));
|
||||
@@ -136,25 +137,46 @@ class HttpRequestor extends BaseRequestor {
|
||||
|
||||
let newClient;
|
||||
try {
|
||||
this.backoffMs = 500;
|
||||
// Parse URL and extract hash parameters for retry configuration
|
||||
// Prepare request options - only do this once
|
||||
const absUrl = this._isRelativeUrl(url) ? `${this.baseUrl}${url}` : url;
|
||||
const parsedUrl = parseUrl(absUrl);
|
||||
const hash = parsedUrl.hash || '';
|
||||
const hashObj = hash ? this._parseHashParams(hash) : {};
|
||||
|
||||
// Retry policy: rp valid values: 4xx, 5xx, ct, rt, all, default is ct
|
||||
// Retry count: rc valid values: 1-5, default is 0
|
||||
// rc is the number of attempts we'll make AFTER the initial try
|
||||
const rc = hash ? Math.min(Math.abs(parseInt(hashObj.rc || '0')), 5) : 0;
|
||||
const rp = hashObj.rp || 'ct';
|
||||
const rpValues = rp.split(',').map((v) => v.trim());
|
||||
let retryCount = 0;
|
||||
|
||||
// Set up client, path and query parameters - only do this once
|
||||
let client, path, query;
|
||||
if (this._isRelativeUrl(url)) {
|
||||
client = this.client;
|
||||
path = url;
|
||||
}
|
||||
else {
|
||||
const u = parseUrl(url);
|
||||
if (u.resource === this._resource && u.port === this._port && u.protocol === this._protocol) {
|
||||
if (parsedUrl.resource === this._resource &&
|
||||
parsedUrl.port === this._port &&
|
||||
parsedUrl.protocol === this._protocol) {
|
||||
client = this.client;
|
||||
path = u.pathname;
|
||||
query = u.query;
|
||||
path = parsedUrl.pathname;
|
||||
query = parsedUrl.query;
|
||||
}
|
||||
else {
|
||||
if (u.port) client = newClient = new Client(`${u.protocol}://${u.resource}:${u.port}`);
|
||||
else client = newClient = new Client(`${u.protocol}://${u.resource}`);
|
||||
path = u.pathname;
|
||||
query = u.query;
|
||||
if (parsedUrl.port) {
|
||||
client = newClient = new Client(`${parsedUrl.protocol}://${parsedUrl.resource}:${parsedUrl.port}`);
|
||||
}
|
||||
else client = newClient = new Client(`${parsedUrl.protocol}://${parsedUrl.resource}`);
|
||||
path = parsedUrl.pathname;
|
||||
query = parsedUrl.query;
|
||||
}
|
||||
}
|
||||
|
||||
const sigHeader = this._generateSigHeader(payload, this.secret);
|
||||
const hdrs = {
|
||||
...sigHeader,
|
||||
@@ -162,20 +184,8 @@ class HttpRequestor extends BaseRequestor {
|
||||
...httpHeaders,
|
||||
...('POST' === method && {'Content-Type': 'application/json'})
|
||||
};
|
||||
const absUrl = this._isRelativeUrl(url) ? `${this.baseUrl}${url}` : url;
|
||||
this.logger.debug({url, absUrl, hdrs}, 'send webhook');
|
||||
const {statusCode, headers, body} = HTTP_PROXY_IP ? await request(
|
||||
this.baseUrl,
|
||||
{
|
||||
path,
|
||||
query,
|
||||
method,
|
||||
headers: hdrs,
|
||||
...('POST' === method && {body: JSON.stringify(payload)}),
|
||||
timeout: HTTP_TIMEOUT,
|
||||
followRedirects: false
|
||||
}
|
||||
) : await client.request({
|
||||
|
||||
const requestOptions = {
|
||||
path,
|
||||
query,
|
||||
method,
|
||||
@@ -183,14 +193,51 @@ class HttpRequestor extends BaseRequestor {
|
||||
...('POST' === method && {body: JSON.stringify(payload)}),
|
||||
timeout: HTTP_TIMEOUT,
|
||||
followRedirects: false
|
||||
});
|
||||
if (![200, 202, 204].includes(statusCode)) {
|
||||
const err = new HTTPResponseError(statusCode);
|
||||
throw err;
|
||||
}
|
||||
if (headers['content-type']?.includes('application/json')) {
|
||||
buf = await body.json();
|
||||
};
|
||||
|
||||
// Simplified makeRequest function that just executes the HTTP request
|
||||
const makeRequest = async() => {
|
||||
this.logger.debug({url, absUrl, hdrs, retryCount},
|
||||
`send webhook${retryCount > 0 ? ' (retry ' + retryCount + ')' : ''}`);
|
||||
|
||||
const {statusCode, headers, body} = HTTP_PROXY_IP ? await request(
|
||||
this.baseUrl,
|
||||
requestOptions
|
||||
) : await client.request(requestOptions);
|
||||
|
||||
if (![200, 202, 204].includes(statusCode)) {
|
||||
const err = new HTTPResponseError(statusCode);
|
||||
throw err;
|
||||
}
|
||||
|
||||
if (headers['content-type']?.includes('application/json')) {
|
||||
return await body.json();
|
||||
}
|
||||
return '';
|
||||
};
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
buf = await makeRequest();
|
||||
break; // Success, exit the retry loop
|
||||
} catch (err) {
|
||||
retryCount++;
|
||||
|
||||
// Check if we should retry
|
||||
if (retryCount <= rc && this._shouldRetry(err, rpValues)) {
|
||||
this.logger.info(
|
||||
{err, baseUrl: this.baseUrl, url, retryCount, maxRetries: rc},
|
||||
`Retrying request (${retryCount}/${rc})`
|
||||
);
|
||||
const delay = this.backoffMs;
|
||||
this.backoffMs = this.backoffMs < 2000 ? this.backoffMs * 2 : (this.backoffMs + 2000);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
continue;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
if (newClient) newClient.close();
|
||||
} catch (err) {
|
||||
if (err.statusCode) {
|
||||
@@ -221,8 +268,8 @@ class HttpRequestor extends BaseRequestor {
|
||||
|
||||
if (buf && (Array.isArray(buf) || type == 'llm:tool-call')) {
|
||||
this.logger.info({response: buf}, `HttpRequestor:request ${method} ${url} succeeded in ${rtt}ms`);
|
||||
return buf;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
const assert = require('assert');
|
||||
const BaseRequestor = require('./base-requestor');
|
||||
const short = require('short-uuid');
|
||||
const parseUrl = require('parse-url');
|
||||
const {HookMsgTypes, WS_CLOSE_CODES} = require('./constants.json');
|
||||
const Websocket = require('ws');
|
||||
const snakeCaseKeys = require('./snakecase-keys');
|
||||
@@ -41,6 +42,19 @@ class WsRequestor extends BaseRequestor {
|
||||
|
||||
assert(this._isAbsoluteUrl(this.url));
|
||||
|
||||
const parsedUrl = parseUrl(this.url);
|
||||
const hash = parsedUrl.hash || '';
|
||||
const hashObj = hash ? this._parseHashParams(hash) : {};
|
||||
|
||||
// remove hash
|
||||
this.cleanUrl = hash ? this.url.replace(`#${hash}`, '') : this.url;
|
||||
|
||||
// Retry policy: rp valid values: 4xx, 5xx, ct, rt, all, default is ct
|
||||
// Retry count: rc valid values: 1-5, default is 5 for websockets
|
||||
this.maxReconnects = Math.min(Math.abs(parseInt(hashObj.rc) || MAX_RECONNECTS), 5);
|
||||
this.retryPolicy = hashObj.rp || 'ct';
|
||||
this.retryPolicyValues = this.retryPolicy.split(',').map((v) => v.trim());
|
||||
|
||||
this.on('socket-closed', this._onSocketClosed.bind(this));
|
||||
}
|
||||
|
||||
@@ -111,16 +125,65 @@ class WsRequestor extends BaseRequestor {
|
||||
}
|
||||
this.connectInProgress = true;
|
||||
this.logger.debug(`WsRequestor:request(${this.id}) - connecting since we do not have a connection for ${type}`);
|
||||
if (this.connections >= MAX_RECONNECTS) {
|
||||
return Promise.reject(`max attempts connecting to ${this.url}`);
|
||||
}
|
||||
|
||||
try {
|
||||
const startAt = process.hrtime();
|
||||
await this._connect();
|
||||
const rtt = this._roundTrip(startAt);
|
||||
this.stats.histogram('app.hook.connect_time', rtt, ['hook_type:app']);
|
||||
let retryCount = 0;
|
||||
let lastError = null;
|
||||
|
||||
while (retryCount <= this.maxReconnects) {
|
||||
try {
|
||||
this.logger.error({retryCount, maxReconnects: this.maxReconnects},
|
||||
'WsRequestor:request - attempting connection');
|
||||
|
||||
// Ensure clean state before each connection attempt
|
||||
if (this.ws) {
|
||||
this.ws.removeAllListeners();
|
||||
this.ws = null;
|
||||
}
|
||||
|
||||
this.logger.error({retryCount}, 'WsRequestor:request - calling _connect()');
|
||||
const startAt = process.hrtime();
|
||||
await this._connect();
|
||||
const rtt = this._roundTrip(startAt);
|
||||
this.stats.histogram('app.hook.connect_time', rtt, ['hook_type:app']);
|
||||
this.logger.error({retryCount}, 'WsRequestor:request - connection successful, exiting retry loop');
|
||||
lastError = null;
|
||||
break;
|
||||
} catch (error) {
|
||||
lastError = error;
|
||||
retryCount++;
|
||||
this.logger.error({error: error.message, retryCount, maxReconnects: this.maxReconnects},
|
||||
'WsRequestor:request - connection attempt failed');
|
||||
|
||||
if (retryCount <= this.maxReconnects &&
|
||||
this.retryPolicyValues?.length &&
|
||||
this._shouldRetry(error, this.retryPolicyValues)) {
|
||||
|
||||
this.logger.error(
|
||||
{url, error, retryCount, maxRetries: this.maxReconnects},
|
||||
`WsRequestor:request - connection failed, retrying (${retryCount}/${this.maxReconnects})`
|
||||
);
|
||||
|
||||
const delay = this.backoffMs;
|
||||
this.backoffMs = this.backoffMs < 2000 ? this.backoffMs * 2 : (this.backoffMs + 2000);
|
||||
this.logger.error({delay}, 'WsRequestor:request - waiting before retry');
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
this.logger.error('WsRequestor:request - retry delay complete, attempting retry');
|
||||
continue;
|
||||
}
|
||||
this.logger.error({lastError: lastError.message, retryCount, maxReconnects: this.maxReconnects},
|
||||
'WsRequestor:request - throwing last error');
|
||||
throw lastError;
|
||||
}
|
||||
}
|
||||
|
||||
// If we exit the loop without success, throw the last error
|
||||
if (lastError) {
|
||||
throw lastError;
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.info({url, err}, 'WsRequestor:request - failed connecting');
|
||||
this.logger.info({url, err, retryPolicy: this.retryPolicy},
|
||||
'WsRequestor:request - all connection attempts failed');
|
||||
this.connectInProgress = false;
|
||||
return Promise.reject(err);
|
||||
}
|
||||
@@ -301,17 +364,23 @@ class WsRequestor extends BaseRequestor {
|
||||
};
|
||||
if (this.username && this.password) opts = {...opts, auth: `${this.username}:${this.password}`};
|
||||
|
||||
// Clean up any existing connection event listeners to prevent interference between retry attempts
|
||||
this.removeAllListeners('ready');
|
||||
this.removeAllListeners('not-ready');
|
||||
|
||||
this
|
||||
.once('ready', (ws) => {
|
||||
this.logger.error({retryCount: 'unknown'}, 'WsRequestor:_connect - ready event fired, resolving Promise');
|
||||
this.removeAllListeners('not-ready');
|
||||
if (this.connections > 1) this.request('session:reconnect', this.url);
|
||||
resolve();
|
||||
})
|
||||
.once('not-ready', (err) => {
|
||||
this.logger.error({err: err.message}, 'WsRequestor:_connect - not-ready event fired, rejecting Promise');
|
||||
this.removeAllListeners('ready');
|
||||
reject(err);
|
||||
});
|
||||
const ws = new Websocket(this.url, ['ws.jambonz.org'], opts);
|
||||
const ws = new Websocket(this.cleanUrl, ['ws.jambonz.org'], opts);
|
||||
this._setHandlers(ws);
|
||||
});
|
||||
}
|
||||
@@ -335,10 +404,13 @@ class WsRequestor extends BaseRequestor {
|
||||
}
|
||||
|
||||
_onError(err) {
|
||||
if (this.connections > 0) {
|
||||
this.logger.info({url: this.url, err}, 'WsRequestor:_onError');
|
||||
if (this.connectInProgress) {
|
||||
this.logger.info({url: this.url, err}, 'WsRequestor:_onError - emitting not-ready for connection attempt');
|
||||
this.emit('not-ready', err);
|
||||
}
|
||||
else if (this.connections === 0) {
|
||||
this.emit('not-ready', err);
|
||||
}
|
||||
else this.emit('not-ready', err);
|
||||
}
|
||||
|
||||
_onOpen(ws) {
|
||||
@@ -375,30 +447,44 @@ class WsRequestor extends BaseRequestor {
|
||||
statusMessage: res.statusMessage
|
||||
}, 'WsRequestor - unexpected response');
|
||||
this.emit('connection-failure');
|
||||
this.emit('not-ready', new Error(`${res.statusCode} ${res.statusMessage}`));
|
||||
this.connections++;
|
||||
|
||||
const error = new Error(`${res.statusCode} ${res.statusMessage}`);
|
||||
error.statusCode = res.statusCode;
|
||||
this.connectInProgress = false;
|
||||
|
||||
this.emit('not-ready', error);
|
||||
}
|
||||
|
||||
_onSocketClosed() {
|
||||
this.ws = null;
|
||||
this.emit('connection-dropped');
|
||||
this._stopPingTimer();
|
||||
if (this.connections > 0 && this.connections < MAX_RECONNECTS && !this.closedGracefully) {
|
||||
|
||||
if (this.connections > 0 && this.connections < this.maxReconnects && !this.closedGracefully) {
|
||||
if (!this._initMsgId) this._clearPendingMessages();
|
||||
this.logger.debug(`WsRequestor:_onSocketClosed waiting ${this.backoffMs} to reconnect`);
|
||||
setTimeout(() => {
|
||||
this._scheduleReconnect('_onSocketClosed');
|
||||
}
|
||||
}
|
||||
|
||||
_scheduleReconnect(source) {
|
||||
this.logger.debug(`WsRequestor:_scheduleReconnect waiting ${this.backoffMs} to reconnect (${source})`);
|
||||
setTimeout(() => {
|
||||
this.logger.debug(
|
||||
{haveWs: !!this.ws, connectInProgress: this.connectInProgress},
|
||||
`WsRequestor:_scheduleReconnect time to reconnect (${source})`);
|
||||
if (!this.ws && !this.connectInProgress) {
|
||||
this.connectInProgress = true;
|
||||
return this._connect()
|
||||
.catch((err) => this.logger.error(`WsRequestor:${source} There is error while reconnect`, err))
|
||||
.finally(() => this.connectInProgress = false);
|
||||
} else {
|
||||
this.logger.debug(
|
||||
{haveWs: !!this.ws, connectInProgress: this.connectInProgress},
|
||||
'WsRequestor:_onSocketClosed time to reconnect');
|
||||
if (!this.ws && !this.connectInProgress) {
|
||||
this.connectInProgress = true;
|
||||
return this._connect()
|
||||
.catch((err) => this.logger.error('WsRequestor:_onSocketClosed There is error while reconnect', err))
|
||||
.finally(() => this.connectInProgress = false);
|
||||
}
|
||||
}, this.backoffMs);
|
||||
this.backoffMs = this.backoffMs < 2000 ? this.backoffMs * 2 : (this.backoffMs + 2000);
|
||||
}
|
||||
`WsRequestor:_scheduleReconnect skipping reconnect attempt (${source}) - conditions not met`);
|
||||
}
|
||||
}, this.backoffMs);
|
||||
this.backoffMs = this.backoffMs < 2000 ? this.backoffMs * 2 : (this.backoffMs + 2000);
|
||||
}
|
||||
|
||||
_onMessage(content, isBinary) {
|
||||
|
||||
Reference in New Issue
Block a user