Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@ require('werelogs').stderrUtils.catchAndTimestampStderr(
require('cluster').isPrimary ? 1 : null,
);

// Start tracing before requiring anything that hooks into HTTP, MongoDB,
// or ioredis — instrumentation patches modules on require, so anything
// loaded earlier than init() would run unpatched.
require('./lib/tracing').init();

require('./lib/server.js')();
325 changes: 173 additions & 152 deletions lib/api/api.js

Large diffs are not rendered by default.

92 changes: 92 additions & 0 deletions lib/instrumentation/simple.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
'use strict';

const tracing = require('../tracing');

let tracer = null;
function getTracer() {
if (tracer) {
return tracer;
}
const { trace } = require('@opentelemetry/api');
const { version } = require('../../package.json');
tracer = trace.getTracer('cloudserver-api', version);
return tracer;
}

async function endSpanWhenSettled(promise, endSpan) {
try {
const value = await promise;
endSpan();
return value;
} catch (err) {
endSpan(err);
throw err;
}
}

function instrumentApiMethod(apiMethod, methodName) {
if (!tracing.isEnabled()) {
return apiMethod;
}

const api = require('@opentelemetry/api');
const spanName = `api.${methodName}`;

return function instrumented(...args) {
const callbackIndex = args.findLastIndex(a => typeof a === 'function');
const span = getTracer().startSpan(spanName, { kind: api.SpanKind.INTERNAL });

// End-once guard. Multiple termination paths can race: the
// wrapped callback may fire and then the handler may also throw
// synchronously, or a callback-and-Promise hybrid handler may
// resolve after firing the callback.
let spanEnded = false;
const endSpan = err => {
if (spanEnded) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then we will not have the following trace if we have a double-ending ? Why did you introduce what ? Did you encounter that ? If we remove that, what is the impact ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — encountered during initial unit-test development. The case: a callback-style handler fires the callback (which ends the span via the wrapped-callback closure) and then throws synchronously afterwards (e.g. the callback body itself threw). The outer try/catch then calls endSpan(err) and we get two .end() calls. OTEL warns on double-end and the second status overwrites the first, so the trace would record ERROR when the operation actually succeeded. The guard makes the first ending win. The ends span exactly once when callback fires then handler throws test in instrumentationSimple.spec.js exercises exactly this path.

return;
}
spanEnded = true;
if (err) {
span.recordException(err);
span.setStatus({ code: api.SpanStatusCode.ERROR });
if (err.code) {
span.setAttribute('cloudserver.error_code', err.code);
}
} else {
span.setStatus({ code: api.SpanStatusCode.OK });
}
span.end();
};

const wrappedArgs = [...args];
if (callbackIndex !== -1) {
const originalCallback = args[callbackIndex];
wrappedArgs[callbackIndex] = function wrappedCallback(err, ...results) {
endSpan(err);
return originalCallback.call(this, err, ...results);
};
}

const ctx = api.trace.setSpan(api.context.active(), span);
try {
const result = api.context.with(ctx, () => apiMethod.apply(this, wrappedArgs));
if (callbackIndex === -1) {
if (result && typeof result.then === 'function') {
return endSpanWhenSettled(result, endSpan);
}
endSpan();
}
// Callback-style handler: the wrapped callback drives the
// span lifecycle. If the handler also returns a thenable
// (hybrid migration shape), pass it through untouched —
// attaching a second .then() chain would surface as an
// unhandled rejection in callback-only callers.
return result;
} catch (error) {
Comment thread
delthas marked this conversation as resolved.
endSpan(error);
throw error;
}
};
}

module.exports = { instrumentApiMethod };
Comment thread
delthas marked this conversation as resolved.
Comment thread
delthas marked this conversation as resolved.
96 changes: 49 additions & 47 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
const { setServerHeader } = arsenal.s3routes.routesUtils;
const { RedisClient, StatsClient } = arsenal.metrics;
const monitoringClient = require('./utilities/monitoringHandler');
const tracing = require('./tracing');

const logger = require('./utilities/logger');
const { internalHandlers } = require('./utilities/internalHandlers');
Expand All @@ -15,15 +16,11 @@
const api = require('./api/api');
const dataWrapper = require('./data/wrapper');
const kms = require('./kms/wrapper');
const locationStorageCheck =
require('./api/apiUtils/object/locationStorageCheck');
const locationStorageCheck = require('./api/apiUtils/object/locationStorageCheck');
const vault = require('./auth/vault');
const metadata = require('./metadata/wrapper');
const { initManagement } = require('./management');
const {
initManagementClient,
isManagementAgentUsed,
} = require('./management/agentClient');
const { initManagementClient, isManagementAgentUsed } = require('./management/agentClient');
const { startCleanupJob } = require('./api/apiUtils/rateLimit/cleanup');
const { startRefillJob, stopRefillJob } = require('./api/apiUtils/rateLimit/refillJob');

Expand All @@ -46,8 +43,7 @@
_config.on('location-constraints-update', () => {
if (implName === 'multipleBackends') {
const clients = parseLC(_config, vault);
client = new MultipleBackendGateway(
clients, metadata, locationStorageCheck);
client = new MultipleBackendGateway(clients, metadata, locationStorageCheck);
}
});

Expand All @@ -59,8 +55,7 @@
// stats client
const STATS_INTERVAL = 5; // 5 seconds
const STATS_EXPIRY = 30; // 30 seconds
const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL,
STATS_EXPIRY);
const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL, STATS_EXPIRY);
const enableRemoteManagement = true;

class S3Server {
Expand All @@ -84,7 +79,7 @@
process.on('SIGHUP', this.cleanUp.bind(this));
process.on('SIGQUIT', this.cleanUp.bind(this));
process.on('SIGTERM', this.cleanUp.bind(this));
process.on('SIGPIPE', () => { });
process.on('SIGPIPE', () => {});
// This will pick up exceptions up the stack
process.on('uncaughtException', err => {
// If just send the error object results in empty
Expand Down Expand Up @@ -130,9 +125,10 @@
const requestStartTime = process.hrtime.bigint();

// Skip server access logs for heartbeat.
const isLoggingEnabled = _config.serverAccessLogs
&& (_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY
|| _config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED);
const isLoggingEnabled =
_config.serverAccessLogs &&
(_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY ||
_config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED);
const isInternalRoute = req.url.startsWith('/_');
const isBackbeatRoute = req.url.startsWith('/_/backbeat/');
if (isLoggingEnabled && (!isInternalRoute || isBackbeatRoute)) {
Expand Down Expand Up @@ -176,9 +172,7 @@
labels.action = req.apiMethod;
}
monitoringClient.httpRequestsTotal.labels(labels).inc();
monitoringClient.httpRequestDurationSeconds
.labels(labels)
.observe(responseTimeInNs / 1e9);
monitoringClient.httpRequestDurationSeconds.labels(labels).observe(responseTimeInNs / 1e9);
monitoringClient.httpActiveRequests.dec();
};
res.on('close', monitorEndOfRequest);
Expand Down Expand Up @@ -206,6 +200,7 @@
vault,
},
};

arsenal.s3routes.routes(req, res, params, logger, this.config);
}

Expand All @@ -231,14 +226,13 @@
};

let reqUids = req.headers['x-scal-request-uids'];
if (reqUids !== undefined && !/*isValidReqUids*/(reqUids.length < 128)) {
if (reqUids !== undefined && !(/*isValidReqUids*/ (reqUids.length < 128))) {
// simply ignore invalid id (any user can provide an
// invalid request ID through a crafted header)
reqUids = undefined;
}
const log = (reqUids !== undefined ?
logger.newRequestLoggerFromSerializedUids(reqUids) :
logger.newRequestLogger());
const log =
reqUids !== undefined ? logger.newRequestLoggerFromSerializedUids(reqUids) : logger.newRequestLogger();
log.end().addDefaultFields(clientInfo);

log.debug('received admin request', clientInfo);
Expand Down Expand Up @@ -292,8 +286,7 @@
server.requestTimeout = 0; // disabling request timeout

server.on('connection', socket => {
socket.on('error', err => logger.info('request rejected',
{ error: err }));
socket.on('error', err => logger.info('request rejected', { error: err }));
});

// https://nodejs.org/dist/latest-v6.x/
Expand All @@ -309,8 +302,11 @@
};
const { address } = addr;
logger.info('server started', {
address, port,
pid: process.pid, serverIP: address, serverPort: port
address,
port,
pid: process.pid,
serverIP: address,
serverPort: port,
});
});

Expand All @@ -323,32 +319,41 @@
this.servers.push(server);
}

/*
* This exits the running process properly.
*/
cleanUp() {
async cleanUp() {
logger.info('server shutting down');
// Stop token refill job if running
if (this.config.rateLimiting?.enabled) {
stopRefillJob(logger);
}
Promise.all(this.servers.map(server =>
new Promise(resolve => server.close(resolve))
)).then(() => process.exit(0));
try {
await Promise.all(this.servers.map(server => new Promise(resolve => server.close(resolve))));
await tracing.close();
} finally {
process.exit(0);
}
}

caughtExceptionShutdown() {
async caughtExceptionShutdown() {
if (!this.cluster) {
process.exit(1);
try {
await tracing.close();
} finally {
process.exit(1);
}
return;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code: process.exit(1) in the finally block above terminates the process synchronously, so this return is unreachable. Remove it to avoid misleading readers into thinking there is a non-exit path.

— Claude Code

}
logger.error('shutdown of worker due to exception', {
workerId: this.worker ? this.worker.id : undefined,
workerPid: this.worker ? this.worker.process.pid : undefined,
});
// Will close all servers, cause disconnect event on primary and kill
// worker process with 'SIGTERM'.
// worker.kill() is graceful (closes servers, disconnects IPC) but
// does not fire our SIGTERM handler, so the BatchSpanProcessor
// would lose buffered spans without an explicit flush here.
if (this.worker) {
this.worker.kill();
try {
await tracing.close();
} finally {
this.worker.kill();
}
}
}

Expand All @@ -363,10 +368,7 @@
}

initiateStartup(log) {
series([
next => metadata.setup(next),
next => clientCheck(true, log, next),
], (err, results) => {
series([next => metadata.setup(next), next => clientCheck(true, log, next)], (err, results) => {

Check notice

Code scanning / CodeQL

Callback-style function (async migration) Note

This function uses a callback parameter ('next'). Refactor to async/await.

Check notice

Code scanning / CodeQL

Callback-style function (async migration) Note

This function uses a callback parameter ('next'). Refactor to async/await.
if (err) {
log.warn('initial health check failed, delaying startup', {
error: err,
Expand Down Expand Up @@ -417,8 +419,10 @@

try {
logger.info('ServerAccessLogger config', { config: _config.serverAccessLogs });
if (_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY
|| _config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED) {
if (
_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY ||
_config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED
) {
var serverAccessLogger = new ServerAccessLogger(
_config.serverAccessLogs.outputFile,
_config.serverAccessLogs.highWaterMarkBytes,
Expand All @@ -434,7 +438,6 @@
logger.error('ServerAccessLogger creation error', error);
}


this.started = true;
});
}
Expand Down Expand Up @@ -490,8 +493,7 @@
});

const metricServer = new S3Server(_config);
metricServer.startServer(_config.metricsListenOn,
_config.metricsPort, metricServer.routeAdminRequest);
metricServer.startServer(_config.metricsListenOn, _config.metricsPort, metricServer.routeAdminRequest);
}
if (_config.isCluster && cluster.isWorker) {
const server = new S3Server(_config, cluster.worker);
Expand Down
18 changes: 18 additions & 0 deletions lib/tracing/healthPaths.js
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we use about that ? We usually trace them as they can have an impact (as you said high-frequency). The filter is not done here but in the tool used to explore trace ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both options exist; we deliberately filter at ingest. Three reasons:

  1. Volume: k8s liveness/readiness probes hit at 1–10 Hz per pod, plus Prometheus scrape on /metrics at 15s. With parentBasedSampler honoring upstream decisions (which we want, to follow NGINX/Beyla traces correctly), an upstream traceparent with sampled=01 would force every probe to be sampled. That's bytes on the wire to the collector, OTLP exporter pressure, and storage cost paid for traces nobody queries.
  2. Signal/noise in the trace UI: even with backend filters, autocomplete and "recent operations" lists fill up with probe entries.
  3. Convention: most production OTEL deployment guides drop probes at the SDK.

The filter is isHealthPath in lib/tracing/healthPaths.js — five explicit paths. Easy to remove or invert later if the cost/benefit changes.

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
'use strict';

// Probe + scrape paths that should never produce a span. Filtered at
// ingest (not at the trace backend) because probe rate × pod count ×
// always-on sampling overwhelms the exporter and storage with traffic
// nobody queries.
const HEALTH_PATHS = new Set(['/live', '/ready', '/_/healthcheck', '/_/healthcheck/deep', '/metrics']);

function isHealthPath(url) {
if (typeof url !== 'string' || url.length === 0) {
return false;
}
const qIdx = url.indexOf('?');
const path = qIdx === -1 ? url : url.slice(0, qIdx);
return HEALTH_PATHS.has(path);
}

module.exports = { isHealthPath };
Loading
Loading