Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 178 additions & 16 deletions src/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import {
type SessionCache,
cleanupOrphanedTempFiles,
computeEnvFingerprint,
DURABLE_PROVIDER_NAMES,
fingerprintFile,
loadCache,
reconcileFile,
saveCache,
} from './session-cache.js'
import type { ParsedProviderCall } from './providers/types.js'
import type { ParsedProviderCall, SessionSource } from './providers/types.js'
import type {
AssistantMessageContent,
ClassifiedTurn,
Expand Down Expand Up @@ -1324,6 +1325,10 @@ function buildSessionSummary(
totalCacheWrite += call.usage.cacheCreationInputTokens
apiCalls++

if (process.env['DEBUG_OTEL'] && (call.usage.cacheReadInputTokens > 0 || call.usage.cacheCreationInputTokens > 0)) {
console.warn(`[Aggregate] Model=${call.model}, cache_read=${call.usage.cacheReadInputTokens}, cache_write=${call.usage.cacheCreationInputTokens}`)
}

const modelKey = getShortModelName(call.model)
if (!modelBreakdown[modelKey]) {
modelBreakdown[modelKey] = {
Expand Down Expand Up @@ -1946,7 +1951,7 @@ function warnProviderParseFailure(providerName: string, sourcePath: string, err:

async function parseProviderSources(
providerName: string,
sources: Array<{ path: string; project: string }>,
sources: SessionSource[],
seenKeys: Set<string>,
diskCache: SessionCache,
dateRange?: DateRange,
Expand All @@ -1957,8 +1962,8 @@ async function parseProviderSources(
const section = getOrCreateProviderSection(diskCache, providerName)
const allDiscoveredFiles = new Set<string>()

type SourceInfo = { source: { path: string; project: string }; fp: NonNullable<Awaited<ReturnType<typeof fingerprintFile>>> }
const unchangedSources: Array<{ source: { path: string; project: string }; cached: CachedFile }> = []
type SourceInfo = { source: SessionSource; fp: NonNullable<Awaited<ReturnType<typeof fingerprintFile>>> }
const unchangedSources: Array<{ source: SessionSource; cached: CachedFile }> = []
const changedSources: SourceInfo[] = []

for (const source of sources) {
Expand Down Expand Up @@ -2001,29 +2006,83 @@ async function parseProviderSources(

// Parse changed files, update cache
let didParse = false
// Track which paths have already been cleared this pass so that subsequent
// sources sharing the same path (e.g. multiple OTel conversations from one
// agent-traces.db) can accumulate via the merge logic below rather than
// being wiped on every iteration.
const clearedPaths = new Set<string>()
try {
for (const { source, fp } of changedSources) {
if (dateRange) {
if (fp.mtimeMs < dateRange.start.getTime()) continue
}

// Clear stale entry before parse — if parse fails, file is excluded
delete section.files[source.path]
// Clear stale entry before parse — but only once per path so that
// multiple sources mapping to the same file path can merge their turns.
// Durable providers (e.g. copilot OTel) never clear existing entries so
// that pruned-away data is preserved for monotonic monthly totals.
if (!provider.durableSources && !clearedPaths.has(source.path)) {
delete section.files[source.path]
clearedPaths.add(source.path)
}

const parser = provider.createSessionParser(
{ path: source.path, project: source.project, provider: providerName },
parserDedup,
dateRange,
)
const parser = provider.createSessionParser(source, parserDedup, dateRange)

try {
const providerCalls: ParsedProviderCall[] = []
for await (const call of parser.parse()) {
providerCalls.push(call)
}
if (process.env['DEBUG_OTEL']) {
console.warn(`[Parse] File ${source.path.substring(0, 40)}: Collected ${providerCalls.length} provider calls, ${providerCalls.filter(c => c.cacheReadInputTokens > 0 || c.cacheCreationInputTokens > 0).length} with cache tokens`)
}
if (process.env['DEBUG_OTEL'] && providerCalls.some(c => c.cacheReadInputTokens > 0 || c.cacheCreationInputTokens > 0)) {
console.warn(`[Parse] After conversion: turns to be created...`)
}
const canonicalCalls = await Promise.all(providerCalls.map(canonicalizeProviderCallProject))
const turns = providerCallsToCachedTurns(canonicalCalls)
section.files[source.path] = { fingerprint: fp, mcpInventory: [], turns }
if (process.env['DEBUG_OTEL'] && providerCalls.some(c => c.cacheReadInputTokens > 0 || c.cacheCreationInputTokens > 0)) {
console.warn(`[Parse] After conversion: ${turns.length} turns, calls with cache: ${turns.flatMap(t => t.calls).filter(c => c.usage.cacheReadInputTokens > 0 || c.usage.cacheCreationInputTokens > 0).length}`)
}

// Store/merge parsed turns into the cache.
// Durable providers use a union-by-deduplicationKey merge: existing turns
// are NEVER deleted (preserves data for spans pruned from the DB), and
// only turns whose dedup keys are not already cached are appended.
// Non-durable providers keep the original overwrite-or-append behaviour.
if (provider.durableSources) {
const existingEntry = section.files[source.path]
if (existingEntry) {
const existingKeys = new Set(
existingEntry.turns.flatMap(t => t.calls.map(c => c.deduplicationKey))
)
const newTurns = turns.filter(t =>
t.calls.every(c => !existingKeys.has(c.deduplicationKey))
)
existingEntry.turns = [...existingEntry.turns, ...newTurns]
existingEntry.fingerprint = fp
if (process.env['DEBUG_OTEL']) {
console.warn(`[Parse] Durable union-merge for ${source.path.substring(0, 40)}: kept ${existingEntry.turns.length - newTurns.length} existing, added ${newTurns.length} new turns`)
}
} else {
section.files[source.path] = { fingerprint: fp, mcpInventory: [], turns }
}
} else {
// Non-durable: overwrite (clearedPaths already deleted stale entry above)
// or append when multiple sources map to the same path.
const existingCacheEntry = section.files[source.path]
if (process.env['DEBUG_OTEL']) {
console.warn(`[Parse] Cache entry exists for path: ${!!existingCacheEntry}, turns to merge: ${turns.length}`)
}
if (existingCacheEntry) {
existingCacheEntry.turns = [...existingCacheEntry.turns, ...turns]
if (process.env['DEBUG_OTEL']) {
console.warn(`[Parse] Merged with existing cache entry for ${source.path.substring(0, 40)}, now has ${existingCacheEntry.turns.length} turns total`)
}
} else {
section.files[source.path] = { fingerprint: fp, mcpInventory: [], turns }
}
}
didParse = true
;(diskCache as { _dirty?: boolean })._dirty = true
} catch (err) {
Expand Down Expand Up @@ -2051,7 +2110,14 @@ async function parseProviderSources(
}
}

if (sources.length > 0) {
// Stamp the durable flag into the cache section so the orphan-bootstrap in
// parseAllSessions can fast-check without a getProvider() round-trip.
if (provider.durableSources && !section.durable) {
section.durable = true
;(diskCache as { _dirty?: boolean })._dirty = true
}

if (sources.length > 0 && !provider.durableSources) {
for (const cachedPath of Object.keys(section.files)) {
if (!allDiscoveredFiles.has(cachedPath)) {
delete section.files[cachedPath]
Expand All @@ -2060,17 +2126,52 @@ async function parseProviderSources(
}
}

// 90-day age-out for durable providers: remove entries whose newest call is
// older than 90 days so the cache doesn't grow unboundedly over time.
if (provider.durableSources) {
const cutoffMs = Date.now() - 90 * 24 * 60 * 60 * 1000
for (const [cachedPath, cachedFile] of Object.entries(section.files)) {
const newestTs = cachedFile.turns
.flatMap(t => t.calls)
.map(c => new Date(c.timestamp).getTime())
.filter(ts => !isNaN(ts))
.reduce((max, ts) => Math.max(max, ts), 0)
if (newestTs > 0 && newestTs < cutoffMs) {
delete section.files[cachedPath]
;(diskCache as { _dirty?: boolean })._dirty = true
}
}
}

// Query-time: derive SessionSummary from all cached turns.
// Uses seenKeys (shared across providers) for cross-provider dedup.
const sessionMap = new Map<string, { project: string; projectPath?: string; turns: ClassifiedTurn[] }>()

if (process.env['DEBUG_OTEL']) {
const totalCacheCalls = sources.flatMap(s => section.files[s.path]?.turns ?? []).flatMap(t => t.calls).filter(c => c.usage.cacheReadInputTokens > 0 || c.usage.cacheCreationInputTokens > 0)
console.warn(`[SessionMap] Starting with ${sources.length} sources, ${totalCacheCalls.length} calls with cache tokens`)

const filesInfo = Object.entries(section.files).map(([path, file]) => ({
path: path.substring(0, 50),
turns: file.turns.length,
totalCalls: file.turns.flatMap(t => t.calls).length,
cacheTokenCalls: file.turns.flatMap(t => t.calls).filter(c => c.usage.cacheReadInputTokens > 0 || c.usage.cacheCreationInputTokens > 0).length
}))
console.warn(`[SessionMap] Cache files: ${JSON.stringify(filesInfo)}`)
}

for (const source of sources) {
const cachedFile = section.files[source.path]
if (!cachedFile) continue

for (const turn of cachedFile.turns) {
const hasDup = turn.calls.some(c => seenKeys.has(c.deduplicationKey))
if (hasDup) continue
if (hasDup) {
if (process.env['DEBUG_OTEL'] && turn.calls.some(c => c.usage.cacheReadInputTokens > 0 || c.usage.cacheCreationInputTokens > 0)) {
console.warn(`[SessionMap] Skipping turn with cache tokens due to dedup: ${turn.calls.map(c => c.deduplicationKey).join(',')}`)
}
continue
}

for (const c of turn.calls) seenKeys.add(c.deduplicationKey)

Expand All @@ -2085,6 +2186,10 @@ async function parseProviderSources(
const project = turn.calls[0]?.project ?? source.project
const key = `${providerName}:${turn.sessionId}:${project}`

if (process.env['DEBUG_OTEL'] && turn.calls.some(c => c.usage.cacheReadInputTokens > 0 || c.usage.cacheCreationInputTokens > 0)) {
console.warn(`[SessionMap] Adding turn with cache tokens to sessionMap key=${key}`)
}

const existing = sessionMap.get(key)
if (existing) {
existing.turns.push(classified)
Expand All @@ -2097,6 +2202,43 @@ async function parseProviderSources(
}
}

// Second pass: durable orphans — cache entries for paths that are no longer
// discovered (e.g. OTel conversations pruned from the DB). Their turns are
// counted here so the monthly total never drops.
if (provider.durableSources) {
for (const [cachedPath, cachedFile] of Object.entries(section.files)) {
if (allDiscoveredFiles.has(cachedPath)) continue // already counted above

for (const turn of cachedFile.turns) {
const hasDup = turn.calls.some(c => seenKeys.has(c.deduplicationKey))
if (hasDup) continue

for (const c of turn.calls) seenKeys.add(c.deduplicationKey)

if (dateRange) {
const callTs = turn.calls[0]?.timestamp
if (!callTs) continue
const ts = new Date(callTs)
if (ts < dateRange.start || ts > dateRange.end) continue
}

const classified = cachedTurnToClassified(turn)
const project = turn.calls[0]?.project ?? providerName
const key = `${providerName}:${turn.sessionId}:${project}`

const existingEntry = sessionMap.get(key)
if (existingEntry) {
existingEntry.turns.push(classified)
if (!existingEntry.projectPath && turn.calls[0]?.projectPath) {
existingEntry.projectPath = turn.calls[0]!.projectPath
}
} else {
sessionMap.set(key, { project, projectPath: turn.calls[0]?.projectPath, turns: [classified] })
}
}
}
}

const projectMap = new Map<string, { projectPath?: string; sessions: SessionSummary[] }>()
for (const [key, { project, projectPath, turns }] of sessionMap) {
const sessionId = key.split(':')[1] ?? key
Expand Down Expand Up @@ -2246,10 +2388,10 @@ export async function parseAllSessions(dateRange?: DateRange, providerFilter?: s
const claudeDirs = claudeSources.map(s => ({ path: s.path, name: s.project }))
const claudeProjects = await scanProjectDirs(claudeDirs, seenMsgIds, diskCache, dateRange)

const providerGroups = new Map<string, Array<{ path: string; project: string }>>()
const providerGroups = new Map<string, SessionSource[]>()
for (const source of nonClaudeSources) {
const existing = providerGroups.get(source.provider) ?? []
existing.push({ path: source.path, project: source.project })
existing.push(source)
providerGroups.set(source.provider, existing)
}

Expand All @@ -2259,6 +2401,26 @@ export async function parseAllSessions(dateRange?: DateRange, providerFilter?: s
otherProjects.push(...projects)
}

// Durable providers with cached data but NO discovered sources (all files pruned
// by VS Code / the external tool) still need their orphan pass to run so the
// monthly total never drops. Call parseProviderSources with empty sources for
// any such provider found in the disk cache.
const processedProviders = new Set(providerGroups.keys())
for (const providerName of Object.keys(diskCache.providers)) {
if (processedProviders.has(providerName)) continue
// Skip if filtered to a different provider
if (providerFilter && providerFilter !== 'all' && providerFilter !== providerName) continue
const section = diskCache.providers[providerName]
if (!section || Object.keys(section.files).length === 0) continue
// Use the persisted durable flag (set by parseProviderSources when it first
// processes a durableSources provider) OR the static DURABLE_PROVIDER_NAMES
// constant — both checks are O(1) and avoid a getProvider() dynamic-import
// round-trip for every unprocessed provider in the disk cache.
if (!section.durable && !DURABLE_PROVIDER_NAMES.has(providerName)) continue
const projects = await parseProviderSources(providerName, [], seenKeys, diskCache, dateRange)
otherProjects.push(...projects)
}

if ((diskCache as { _dirty?: boolean })._dirty) {
try { await saveCache(diskCache) } catch {}
}
Expand Down
Loading