-
Notifications
You must be signed in to change notification settings - Fork 255
feat(CLDSRV-884): Add OpenTelemetry tracing instrumentation #6140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: development/9.3
Are you sure you want to change the base?
Changes from all commits
0e2111a
84d79b1
035b5da
b87f219
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| 'use strict'; | ||
|
|
||
| const tracing = require('../tracing'); | ||
|
|
||
| let tracer = null; | ||
| function getTracer() { | ||
| if (tracer) { | ||
| return tracer; | ||
| } | ||
| const { trace } = require('@opentelemetry/api'); | ||
| const { version } = require('../../package.json'); | ||
| tracer = trace.getTracer('cloudserver-api', version); | ||
| return tracer; | ||
| } | ||
|
|
||
| async function endSpanWhenSettled(promise, endSpan) { | ||
| try { | ||
| const value = await promise; | ||
| endSpan(); | ||
| return value; | ||
| } catch (err) { | ||
| endSpan(err); | ||
| throw err; | ||
| } | ||
| } | ||
|
|
||
| function instrumentApiMethod(apiMethod, methodName) { | ||
| if (!tracing.isEnabled()) { | ||
| return apiMethod; | ||
| } | ||
|
|
||
| const api = require('@opentelemetry/api'); | ||
| const spanName = `api.${methodName}`; | ||
|
|
||
| return function instrumented(...args) { | ||
| const callbackIndex = args.findLastIndex(a => typeof a === 'function'); | ||
| const span = getTracer().startSpan(spanName, { kind: api.SpanKind.INTERNAL }); | ||
|
|
||
| // End-once guard. Multiple termination paths can race: the | ||
| // wrapped callback may fire and then the handler may also throw | ||
| // synchronously, or a callback-and-Promise hybrid handler may | ||
| // resolve after firing the callback. | ||
| let spanEnded = false; | ||
| const endSpan = err => { | ||
| if (spanEnded) { | ||
| return; | ||
| } | ||
| spanEnded = true; | ||
| if (err) { | ||
| span.recordException(err); | ||
| span.setStatus({ code: api.SpanStatusCode.ERROR }); | ||
| if (err.code) { | ||
| span.setAttribute('cloudserver.error_code', err.code); | ||
| } | ||
| } else { | ||
| span.setStatus({ code: api.SpanStatusCode.OK }); | ||
| } | ||
| span.end(); | ||
| }; | ||
|
|
||
| const wrappedArgs = [...args]; | ||
| if (callbackIndex !== -1) { | ||
| const originalCallback = args[callbackIndex]; | ||
| wrappedArgs[callbackIndex] = function wrappedCallback(err, ...results) { | ||
| endSpan(err); | ||
| return originalCallback.call(this, err, ...results); | ||
| }; | ||
| } | ||
|
|
||
| const ctx = api.trace.setSpan(api.context.active(), span); | ||
| try { | ||
| const result = api.context.with(ctx, () => apiMethod.apply(this, wrappedArgs)); | ||
| if (callbackIndex === -1) { | ||
| if (result && typeof result.then === 'function') { | ||
| return endSpanWhenSettled(result, endSpan); | ||
| } | ||
| endSpan(); | ||
| } | ||
| // Callback-style handler: the wrapped callback drives the | ||
| // span lifecycle. If the handler also returns a thenable | ||
| // (hybrid migration shape), pass it through untouched — | ||
| // attaching a second .then() chain would surface as an | ||
| // unhandled rejection in callback-only callers. | ||
| return result; | ||
| } catch (error) { | ||
|
delthas marked this conversation as resolved.
|
||
| endSpan(error); | ||
| throw error; | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| module.exports = { instrumentApiMethod }; | ||
|
delthas marked this conversation as resolved.
delthas marked this conversation as resolved.
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |
| const { setServerHeader } = arsenal.s3routes.routesUtils; | ||
| const { RedisClient, StatsClient } = arsenal.metrics; | ||
| const monitoringClient = require('./utilities/monitoringHandler'); | ||
| const tracing = require('./tracing'); | ||
|
|
||
| const logger = require('./utilities/logger'); | ||
| const { internalHandlers } = require('./utilities/internalHandlers'); | ||
|
|
@@ -15,15 +16,11 @@ | |
| const api = require('./api/api'); | ||
| const dataWrapper = require('./data/wrapper'); | ||
| const kms = require('./kms/wrapper'); | ||
| const locationStorageCheck = | ||
| require('./api/apiUtils/object/locationStorageCheck'); | ||
| const locationStorageCheck = require('./api/apiUtils/object/locationStorageCheck'); | ||
| const vault = require('./auth/vault'); | ||
| const metadata = require('./metadata/wrapper'); | ||
| const { initManagement } = require('./management'); | ||
| const { | ||
| initManagementClient, | ||
| isManagementAgentUsed, | ||
| } = require('./management/agentClient'); | ||
| const { initManagementClient, isManagementAgentUsed } = require('./management/agentClient'); | ||
| const { startCleanupJob } = require('./api/apiUtils/rateLimit/cleanup'); | ||
| const { startRefillJob, stopRefillJob } = require('./api/apiUtils/rateLimit/refillJob'); | ||
|
|
||
|
|
@@ -46,8 +43,7 @@ | |
| _config.on('location-constraints-update', () => { | ||
| if (implName === 'multipleBackends') { | ||
| const clients = parseLC(_config, vault); | ||
| client = new MultipleBackendGateway( | ||
| clients, metadata, locationStorageCheck); | ||
| client = new MultipleBackendGateway(clients, metadata, locationStorageCheck); | ||
| } | ||
| }); | ||
|
|
||
|
|
@@ -59,8 +55,7 @@ | |
| // stats client | ||
| const STATS_INTERVAL = 5; // 5 seconds | ||
| const STATS_EXPIRY = 30; // 30 seconds | ||
| const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL, | ||
| STATS_EXPIRY); | ||
| const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL, STATS_EXPIRY); | ||
| const enableRemoteManagement = true; | ||
|
|
||
| class S3Server { | ||
|
|
@@ -84,7 +79,7 @@ | |
| process.on('SIGHUP', this.cleanUp.bind(this)); | ||
| process.on('SIGQUIT', this.cleanUp.bind(this)); | ||
| process.on('SIGTERM', this.cleanUp.bind(this)); | ||
| process.on('SIGPIPE', () => { }); | ||
| process.on('SIGPIPE', () => {}); | ||
| // This will pick up exceptions up the stack | ||
| process.on('uncaughtException', err => { | ||
| // If just send the error object results in empty | ||
|
|
@@ -130,9 +125,10 @@ | |
| const requestStartTime = process.hrtime.bigint(); | ||
|
|
||
| // Skip server access logs for heartbeat. | ||
| const isLoggingEnabled = _config.serverAccessLogs | ||
| && (_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY | ||
| || _config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED); | ||
| const isLoggingEnabled = | ||
| _config.serverAccessLogs && | ||
| (_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY || | ||
| _config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED); | ||
| const isInternalRoute = req.url.startsWith('/_'); | ||
| const isBackbeatRoute = req.url.startsWith('/_/backbeat/'); | ||
| if (isLoggingEnabled && (!isInternalRoute || isBackbeatRoute)) { | ||
|
|
@@ -176,9 +172,7 @@ | |
| labels.action = req.apiMethod; | ||
| } | ||
| monitoringClient.httpRequestsTotal.labels(labels).inc(); | ||
| monitoringClient.httpRequestDurationSeconds | ||
| .labels(labels) | ||
| .observe(responseTimeInNs / 1e9); | ||
| monitoringClient.httpRequestDurationSeconds.labels(labels).observe(responseTimeInNs / 1e9); | ||
| monitoringClient.httpActiveRequests.dec(); | ||
| }; | ||
| res.on('close', monitorEndOfRequest); | ||
|
|
@@ -206,6 +200,7 @@ | |
| vault, | ||
| }, | ||
| }; | ||
|
|
||
| arsenal.s3routes.routes(req, res, params, logger, this.config); | ||
| } | ||
|
|
||
|
|
@@ -231,14 +226,13 @@ | |
| }; | ||
|
|
||
| let reqUids = req.headers['x-scal-request-uids']; | ||
| if (reqUids !== undefined && !/*isValidReqUids*/(reqUids.length < 128)) { | ||
| if (reqUids !== undefined && !(/*isValidReqUids*/ (reqUids.length < 128))) { | ||
| // simply ignore invalid id (any user can provide an | ||
| // invalid request ID through a crafted header) | ||
| reqUids = undefined; | ||
| } | ||
| const log = (reqUids !== undefined ? | ||
| logger.newRequestLoggerFromSerializedUids(reqUids) : | ||
| logger.newRequestLogger()); | ||
| const log = | ||
| reqUids !== undefined ? logger.newRequestLoggerFromSerializedUids(reqUids) : logger.newRequestLogger(); | ||
| log.end().addDefaultFields(clientInfo); | ||
|
|
||
| log.debug('received admin request', clientInfo); | ||
|
|
@@ -292,8 +286,7 @@ | |
| server.requestTimeout = 0; // disabling request timeout | ||
|
|
||
| server.on('connection', socket => { | ||
| socket.on('error', err => logger.info('request rejected', | ||
| { error: err })); | ||
| socket.on('error', err => logger.info('request rejected', { error: err })); | ||
| }); | ||
|
|
||
| // https://nodejs.org/dist/latest-v6.x/ | ||
|
|
@@ -309,8 +302,11 @@ | |
| }; | ||
| const { address } = addr; | ||
| logger.info('server started', { | ||
| address, port, | ||
| pid: process.pid, serverIP: address, serverPort: port | ||
| address, | ||
| port, | ||
| pid: process.pid, | ||
| serverIP: address, | ||
| serverPort: port, | ||
| }); | ||
| }); | ||
|
|
||
|
|
@@ -323,32 +319,41 @@ | |
| this.servers.push(server); | ||
| } | ||
|
|
||
| /* | ||
| * This exits the running process properly. | ||
| */ | ||
| cleanUp() { | ||
| async cleanUp() { | ||
| logger.info('server shutting down'); | ||
| // Stop token refill job if running | ||
| if (this.config.rateLimiting?.enabled) { | ||
| stopRefillJob(logger); | ||
| } | ||
| Promise.all(this.servers.map(server => | ||
| new Promise(resolve => server.close(resolve)) | ||
| )).then(() => process.exit(0)); | ||
| try { | ||
| await Promise.all(this.servers.map(server => new Promise(resolve => server.close(resolve)))); | ||
| await tracing.close(); | ||
| } finally { | ||
| process.exit(0); | ||
| } | ||
| } | ||
|
|
||
| caughtExceptionShutdown() { | ||
| async caughtExceptionShutdown() { | ||
| if (!this.cluster) { | ||
| process.exit(1); | ||
| try { | ||
| await tracing.close(); | ||
| } finally { | ||
| process.exit(1); | ||
| } | ||
| return; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dead code: |
||
| } | ||
| logger.error('shutdown of worker due to exception', { | ||
| workerId: this.worker ? this.worker.id : undefined, | ||
| workerPid: this.worker ? this.worker.process.pid : undefined, | ||
| }); | ||
| // Will close all servers, cause disconnect event on primary and kill | ||
| // worker process with 'SIGTERM'. | ||
| // worker.kill() is graceful (closes servers, disconnects IPC) but | ||
| // does not fire our SIGTERM handler, so the BatchSpanProcessor | ||
| // would lose buffered spans without an explicit flush here. | ||
| if (this.worker) { | ||
| this.worker.kill(); | ||
| try { | ||
| await tracing.close(); | ||
| } finally { | ||
| this.worker.kill(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -363,10 +368,7 @@ | |
| } | ||
|
|
||
| initiateStartup(log) { | ||
| series([ | ||
| next => metadata.setup(next), | ||
| next => clientCheck(true, log, next), | ||
| ], (err, results) => { | ||
| series([next => metadata.setup(next), next => clientCheck(true, log, next)], (err, results) => { | ||
Check noticeCode scanning / CodeQL Callback-style function (async migration) Note
This function uses a callback parameter ('next'). Refactor to async/await.
Check noticeCode scanning / CodeQL Callback-style function (async migration) Note
This function uses a callback parameter ('next'). Refactor to async/await.
|
||
|
|
||
| if (err) { | ||
| log.warn('initial health check failed, delaying startup', { | ||
| error: err, | ||
|
|
@@ -417,8 +419,10 @@ | |
|
|
||
| try { | ||
| logger.info('ServerAccessLogger config', { config: _config.serverAccessLogs }); | ||
| if (_config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY | ||
| || _config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED) { | ||
| if ( | ||
| _config.serverAccessLogs.mode === serverAccessLogsModes.LOG_ONLY || | ||
| _config.serverAccessLogs.mode === serverAccessLogsModes.ENABLED | ||
| ) { | ||
| var serverAccessLogger = new ServerAccessLogger( | ||
| _config.serverAccessLogs.outputFile, | ||
| _config.serverAccessLogs.highWaterMarkBytes, | ||
|
|
@@ -434,7 +438,6 @@ | |
| logger.error('ServerAccessLogger creation error', error); | ||
| } | ||
|
|
||
|
|
||
| this.started = true; | ||
| }); | ||
| } | ||
|
|
@@ -490,8 +493,7 @@ | |
| }); | ||
|
|
||
| const metricServer = new S3Server(_config); | ||
| metricServer.startServer(_config.metricsListenOn, | ||
| _config.metricsPort, metricServer.routeAdminRequest); | ||
| metricServer.startServer(_config.metricsListenOn, _config.metricsPort, metricServer.routeAdminRequest); | ||
| } | ||
| if (_config.isCluster && cluster.isWorker) { | ||
| const server = new S3Server(_config, cluster.worker); | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we use about that ? We usually trace them as they can have an impact (as you said high-frequency). The filter is not done here but in the tool used to explore trace ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both options exist; we deliberately filter at ingest. Three reasons:
The filter is |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| 'use strict'; | ||
|
|
||
| // Probe + scrape paths that should never produce a span. Filtered at | ||
| // ingest (not at the trace backend) because probe rate × pod count × | ||
| // always-on sampling overwhelms the exporter and storage with traffic | ||
| // nobody queries. | ||
| const HEALTH_PATHS = new Set(['/live', '/ready', '/_/healthcheck', '/_/healthcheck/deep', '/metrics']); | ||
|
|
||
| function isHealthPath(url) { | ||
| if (typeof url !== 'string' || url.length === 0) { | ||
| return false; | ||
| } | ||
| const qIdx = url.indexOf('?'); | ||
| const path = qIdx === -1 ? url : url.slice(0, qIdx); | ||
| return HEALTH_PATHS.has(path); | ||
| } | ||
|
|
||
| module.exports = { isHealthPath }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But then we will not have the following trace if we have a double-ending ? Why did you introduce what ? Did you encounter that ? If we remove that, what is the impact ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes — encountered during initial unit-test development. The case: a callback-style handler fires the callback (which ends the span via the wrapped-callback closure) and then throws synchronously afterwards (e.g. the callback body itself threw). The outer
try/catchthen callsendSpan(err)and we get two.end()calls. OTEL warns on double-end and the second status overwrites the first, so the trace would record ERROR when the operation actually succeeded. The guard makes the first ending win. Theends span exactly once when callback fires then handler throwstest ininstrumentationSimple.spec.jsexercises exactly this path.