Skip to content
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
local telemetry.
- Local telemetry now records `http.requests` rollups for per-route HTTP
health without emitting one event per request.
- Connection lifecycle now records local telemetry: SSH process
discovery/loss/recovery with sampled network info, and reconnecting
WebSocket open, drop, reconnect, and state transitions, so connection
stability is captured alongside other local telemetry.

### Fixed

Expand Down
29 changes: 18 additions & 11 deletions src/api/coderApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
import {
ConnectionState,
ReconnectingWebSocket,
type ReconnectingWebSocketOptions,
type SocketFactory,
} from "../websocket/reconnectingWebSocket";
import { SseConnection } from "../websocket/sseConnection";
Expand Down Expand Up @@ -95,6 +96,7 @@ export class CoderApi extends Api implements vscode.Disposable {

private constructor(
private readonly output: Logger,
private readonly telemetry: TelemetryReporter,
private readonly httpRequestsTelemetry: HttpRequestsTelemetry,
) {
super();
Expand All @@ -104,7 +106,9 @@ export class CoderApi extends Api implements vscode.Disposable {
/**
* Create a new CoderApi instance with the provided configuration.
* Automatically sets up logging interceptors, certificate handling,
* and HTTP request telemetry that emits via the given reporter.
* HTTP request telemetry, and WebSocket connection telemetry. All
* telemetry routes through the single reporter passed in (defaults to
* NOOP_TELEMETRY_REPORTER for throwaway clients).
*/
static create(
baseUrl: string,
Expand All @@ -113,7 +117,7 @@ export class CoderApi extends Api implements vscode.Disposable {
telemetry: TelemetryReporter = NOOP_TELEMETRY_REPORTER,
): CoderApi {
const httpRequestsTelemetry = new HttpRequestsTelemetry(telemetry);
const client = new CoderApi(output, httpRequestsTelemetry);
const client = new CoderApi(output, telemetry, httpRequestsTelemetry);
client.setCredentials(baseUrl, token);

setupInterceptors(client, output, httpRequestsTelemetry);
Expand Down Expand Up @@ -463,18 +467,21 @@ export class CoderApi extends Api implements vscode.Disposable {
private async createReconnectingSocket<TData>(
socketFactory: SocketFactory<TData>,
): Promise<ReconnectingWebSocket<TData>> {
const options: ReconnectingWebSocketOptions = {
onCertificateRefreshNeeded: async () => {
const refreshCommand = getRefreshCommand();
if (!refreshCommand) {
return false;
}
return refreshCertificates(refreshCommand, this.output);
},
telemetry: this.telemetry,
};

const reconnectingSocket = await ReconnectingWebSocket.create<TData>(
socketFactory,
this.output,
{
onCertificateRefreshNeeded: async () => {
const refreshCommand = getRefreshCommand();
if (!refreshCommand) {
return false;
}
return refreshCertificates(refreshCommand, this.output);
},
},
options,
() => this.reconnectingSockets.delete(reconnectingSocket),
);

Expand Down
180 changes: 180 additions & 0 deletions src/instrumentation/ssh.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import type { NetworkInfo } from "../remote/sshProcess";
import type { TelemetryReporter } from "../telemetry/reporter";

const NETWORK_SAMPLE_INTERVAL_MS = 60_000;
const NETWORK_LATENCY_CHANGE_RATIO = 0.2;
const NETWORK_LATENCY_MIN_ABSOLUTE_CHANGE_MS = 25;

export type ProcessLossCause = "stale_network_info" | "missing_network_info";

interface NetworkSample {
readonly emittedAtMs: number;
readonly p2p: boolean;
readonly preferredDerp: string;
readonly latencyMs: number;
}

interface ProcessDiscoveryResult {
readonly pid: number | undefined;
readonly attempts: number;
}

export class SshTelemetry {
readonly #telemetry: TelemetryReporter;
#processStartedAtMs: number | undefined;
#processLostAtMs: number | undefined;
#lastNetworkSample: NetworkSample | undefined;

public constructor(telemetry: TelemetryReporter) {
this.#telemetry = telemetry;
}

public traceProcessDiscovery(
fn: () => Promise<ProcessDiscoveryResult>,
): Promise<number | undefined> {
return this.#telemetry.trace("ssh.process.discovered", async (span) => {
Comment thread
EhabY marked this conversation as resolved.
const { pid, attempts } = await fn();
span.setProperty("found", String(pid !== undefined));
span.setMeasurement("attempts", attempts);
return pid;
});
}

public processStarted(): void {
this.#processStartedAtMs = performance.now();
this.#processLostAtMs = undefined;
}

public processLost(cause: ProcessLossCause): void {
if (
this.#processStartedAtMs === undefined ||
this.#processLostAtMs !== undefined
) {
return;
}
const now = performance.now();
this.#processLostAtMs = now;
this.#telemetry.log(
"ssh.process.lost",
{ cause },
{ uptimeMs: now - this.#processStartedAtMs },
);
}

public processRecovered(): void {
if (this.#processLostAtMs === undefined) {
return;
}
this.#telemetry.log(
"ssh.process.recovered",
{},
{ recoveryDurationMs: performance.now() - this.#processLostAtMs },
);
this.#processLostAtMs = undefined;
}

/** Handover to a different SSH process. Always emits `ssh.process.replaced`,
* even when the prior process was already lost (replacement is operationally
* distinct from recovery). */
public processReplaced(): void {
const now = performance.now();
if (this.#processStartedAtMs !== undefined) {
const wasLost = this.#processLostAtMs !== undefined;
const measurements: Record<string, number> = {
previousUptimeMs: now - this.#processStartedAtMs,
};
if (this.#processLostAtMs !== undefined) {
measurements.lostDurationMs = now - this.#processLostAtMs;
}
this.#telemetry.log(
"ssh.process.replaced",
{ wasLost: String(wasLost) },
measurements,
);
}
this.#processStartedAtMs = now;
this.#processLostAtMs = undefined;
this.#lastNetworkSample = undefined;
}

/** Terminal teardown signal. Emits regardless of prior lost state so
* consumers always see a session-ending event. */
public disposed(): void {
if (this.#processStartedAtMs === undefined) {
return;
}
const now = performance.now();
const wasLost = this.#processLostAtMs !== undefined;
this.#telemetry.log(
"ssh.process.disposed",
{ wasLost: String(wasLost) },
{ uptimeMs: now - this.#processStartedAtMs },
);
this.#processStartedAtMs = undefined;
this.#processLostAtMs = undefined;
this.#lastNetworkSample = undefined;
}

public networkSampled(network: NetworkInfo): void {
const now = performance.now();
const previous = this.#lastNetworkSample;
if (previous && !shouldEmitSample(previous, network, now)) {
return;
}

this.#lastNetworkSample = {
emittedAtMs: now,
p2p: network.p2p,
preferredDerp: network.preferred_derp,
latencyMs: network.latency,
};
this.#telemetry.log(
"ssh.network.sampled",
{
p2p: String(network.p2p),
preferredDerp: network.preferred_derp,
},
{
latencyMs: network.latency,
downloadMbits: bytesPerSecondToMbits(network.download_bytes_sec),
uploadMbits: bytesPerSecondToMbits(network.upload_bytes_sec),
},
);
}
}

/** Emit on p2p flip, DERP change, large latency swing, or heartbeat interval. */
function shouldEmitSample(
previous: NetworkSample,
current: NetworkInfo,
now: number,
): boolean {
if (now - previous.emittedAtMs >= NETWORK_SAMPLE_INTERVAL_MS) {
return true;
}
if (current.p2p !== previous.p2p) {
return true;
}
if (current.preferred_derp !== previous.preferredDerp) {
return true;
}
return hasMeaningfulLatencyChange(current.latency, previous.latencyMs);
}

function hasMeaningfulLatencyChange(
current: number,
previous: number,
): boolean {
if (previous === 0) {
return current !== 0;
}
const absoluteChange = Math.abs(current - previous);
return (
absoluteChange >= NETWORK_LATENCY_MIN_ABSOLUTE_CHANGE_MS ||
absoluteChange / Math.abs(previous) >= NETWORK_LATENCY_CHANGE_RATIO
);
}

function bytesPerSecondToMbits(bytesPerSecond: number): number {
return (bytesPerSecond * 8) / 1_000_000;
}
Loading
Loading