@@ -10,16 +10,20 @@ import * as workerStore from './worker-store-factory';
1010import { deleteJobProgress } from './rtdb' ;
1111import { cancelRecoveryCheck } from './cloud-tasks' ;
1212import * as Sentry from '@sentry/nextjs' ;
13+ import { createLogger } from './logger' ;
14+
15+ const log = createLogger ( 'JobStore' ) ;
16+ const recoveryLog = createLogger ( 'Recovery' ) ;
1317
1418const USE_FIRESTORE = typeof process . env . GOOGLE_CLOUD_PROJECT === 'string' && process . env . GOOGLE_CLOUD_PROJECT . length > 0 ;
1519
1620// Log mode detection at startup
17- console . log ( `[Job Store] Running in ${ USE_FIRESTORE ? 'GCP' : 'LOCAL' } mode` ) ;
21+ log . info ( ' Running in mode' , { mode : USE_FIRESTORE ? 'GCP' : 'LOCAL' } ) ;
1822if ( USE_FIRESTORE ) {
19- console . log ( `[Job Store] Project: ${ process . env . GOOGLE_CLOUD_PROJECT } ` ) ;
20- console . log ( `[Job Store] Using: Firestore + Cloud Storage + Pub/Sub` ) ;
23+ log . info ( 'Project' , { project : process . env . GOOGLE_CLOUD_PROJECT } ) ;
24+ log . info ( ' Using Firestore + Cloud Storage + Pub/Sub' ) ;
2125} else {
22- console . log ( `[Job Store] Using: SQLite + local filesystem` ) ;
26+ log . info ( ' Using SQLite + local filesystem' ) ;
2327}
2428
2529export function isGcpMode ( ) : boolean {
@@ -312,7 +316,7 @@ async function recoverStaleSimulations(
312316 const jobStartedMs = job . startedAt ? job . startedAt . getTime ( ) : job . createdAt . getTime ( ) ;
313317 const pendingForMs = now - jobStartedMs ;
314318 if ( pendingForMs > STALE_PENDING_THRESHOLD_MS ) {
315- console . log ( `[Recovery] Job ${ jobId } sim ${ sim . simId } stuck PENDING for ${ Math . round ( pendingForMs / 1000 ) } s, republishing` ) ;
319+ recoveryLog . info ( 'Sim stuck PENDING, republishing' , { jobId, simId : sim . simId , pendingSec : Math . round ( pendingForMs / 1000 ) } ) ;
316320 simsToRepublish . push ( sim ) ;
317321 recovered = true ;
318322 }
@@ -322,7 +326,7 @@ async function recoverStaleSimulations(
322326 if ( sim . state === 'RUNNING' && sim . startedAt ) {
323327 const runningForMs = now - new Date ( sim . startedAt ) . getTime ( ) ;
324328 if ( runningForMs > STALE_RUNNING_THRESHOLD_MS ) {
325- console . log ( `[Recovery] Job ${ jobId } sim ${ sim . simId } stuck RUNNING for ${ Math . round ( runningForMs / 60000 ) } min, marking FAILED for retry` ) ;
329+ recoveryLog . info ( 'Sim stuck RUNNING, marking FAILED for retry' , { jobId, simId : sim . simId , runningMin : Math . round ( runningForMs / 60000 ) } ) ;
326330 const updated = await conditionalUpdateSimulationStatus ( jobId , sim . simId , [ 'RUNNING' ] , {
327331 state : 'FAILED' ,
328332 errorMessage : `Simulation timed out after ${ Math . round ( runningForMs / 60000 ) } minutes` ,
@@ -337,7 +341,7 @@ async function recoverStaleSimulations(
337341
338342 // Case 3: RUNNING sim whose specific worker is dead
339343 if ( sim . state === 'RUNNING' && sim . workerId && ! activeWorkerIds . has ( sim . workerId ) ) {
340- console . log ( `[Recovery] Job ${ jobId } sim ${ sim . simId } worker ${ sim . workerId } is dead, marking FAILED for retry` ) ;
344+ recoveryLog . info ( 'Sim worker is dead, marking FAILED for retry' , { jobId , simId : sim . simId , workerId : sim . workerId } ) ;
341345 const updated = await conditionalUpdateSimulationStatus ( jobId , sim . simId , [ 'RUNNING' ] , {
342346 state : 'FAILED' ,
343347 errorMessage : 'Worker lost connection' ,
@@ -351,7 +355,7 @@ async function recoverStaleSimulations(
351355
352356 // Case 4: FAILED sim — retry by resetting to PENDING + republish
353357 if ( sim . state === 'FAILED' && activeWorkers . length > 0 ) {
354- console . log ( `[Recovery] Job ${ jobId } sim ${ sim . simId } is FAILED, retrying` ) ;
358+ recoveryLog . info ( 'Sim is FAILED, retrying' , { jobId, simId : sim . simId } ) ;
355359 simsToRepublish . push ( sim ) ;
356360 recovered = true ;
357361 }
@@ -377,9 +381,9 @@ async function recoverStaleSimulations(
377381 } ) ;
378382 } ) ;
379383 await Promise . all ( promises ) ;
380- console . log ( `[Recovery] Republished ${ simsToRepublish . length } simulation messages for job ${ jobId } ` ) ;
384+ recoveryLog . info ( ' Republished simulation messages' , { jobId, count : simsToRepublish . length } ) ;
381385 } catch ( err ) {
382- console . warn ( `[Recovery] Failed to republish sims for job ${ jobId } :` , err ) ;
386+ recoveryLog . warn ( ' Failed to republish sims' , { jobId, error : err instanceof Error ? err . message : String ( err ) } ) ;
383387 }
384388 }
385389
@@ -394,7 +398,7 @@ async function recoverStaleSimulations(
394398 const needsRetrigger = job . status === 'RUNNING' || job . needsAggregation === true ;
395399 if ( needsRetrigger ) {
396400 aggregateJobResults ( jobId ) . catch ( ( err ) => {
397- console . error ( `[Recovery] Aggregation failed for job ${ jobId } :` , err ) ;
401+ recoveryLog . error ( ' Aggregation failed' , { jobId, error : err instanceof Error ? err . message : String ( err ) } ) ;
398402 Sentry . captureException ( err , { tags : { component : 'recovery-aggregation' , jobId } } ) ;
399403 } ) ;
400404 }
@@ -427,7 +431,7 @@ async function recoverStaleQueuedJob(jobId: string, job: Job): Promise<boolean>
427431 const activeWorkers = await workerStore . getActiveWorkers ( ) ;
428432 if ( activeWorkers . length === 0 ) return false ;
429433
430- console . log ( `[Recovery] Job ${ jobId } stuck in QUEUED for ${ Math . round ( queuedForMs / 1000 ) } s, re-publishing to Pub/Sub` ) ;
434+ recoveryLog . info ( ' Job stuck in QUEUED, re-publishing to Pub/Sub' , { jobId , queuedSec : Math . round ( queuedForMs / 1000 ) } ) ;
431435 requeueCooldowns . set ( jobId , Date . now ( ) ) ;
432436
433437 try {
@@ -456,12 +460,12 @@ async function recoverStaleQueuedJob(jobId: string, job: Job): Promise<boolean>
456460 return topic . publishMessage ( { json : msg } ) ;
457461 } ) ;
458462 await Promise . all ( promises ) ;
459- console . log ( `[Recovery] Re-published ${ pendingSims . length } pending simulation messages for job ${ jobId } ` ) ;
463+ recoveryLog . info ( ' Re-published pending simulation messages' , { jobId, count : pendingSims . length } ) ;
460464 }
461465 }
462466 return true ;
463467 } catch ( err ) {
464- console . warn ( `[Recovery] Failed to re-publish queued job ${ jobId } :` , err ) ;
468+ recoveryLog . warn ( ' Failed to re-publish queued job' , { jobId, error : err instanceof Error ? err . message : String ( err ) } ) ;
465469 return false ;
466470 }
467471}
@@ -563,22 +567,22 @@ export async function aggregateJobResults(jobId: string): Promise<void> {
563567 if ( Array . isArray ( job . deckIds ) && job . deckIds . length === 4 && structuredData ?. games ?. length ) {
564568 const { processJobForRatings } = await import ( './trueskill-service' ) ;
565569 processJobForRatings ( jobId , job . deckIds , structuredData . games ) . catch ( ( err ) => {
566- console . error ( `[ TrueSkill] Rating update failed for job ${ jobId } (non-fatal):` , err ) ;
570+ log . error ( ' TrueSkill rating update failed (non-fatal)' , { jobId , error : err instanceof Error ? err . message : String ( err ) } ) ;
567571 Sentry . captureException ( err , { tags : { component : 'trueskill' , jobId } } ) ;
568572 } ) ;
569573 }
570574
571575 // Don't overwrite CANCELLED status — logs are ingested above, but status stays CANCELLED
572576 if ( job . status === 'CANCELLED' ) {
573577 await setNeedsAggregation ( jobId , false ) ;
574- deleteJobProgress ( jobId ) . catch ( err => console . warn ( '[ Cleanup] fire-and-forget failed: ' , err instanceof Error ? err . message : err ) ) ;
578+ deleteJobProgress ( jobId ) . catch ( err => log . warn ( 'Cleanup fire-and-forget failed' , { jobId , error : err instanceof Error ? err . message : err } ) ) ;
575579 return ;
576580 }
577581
578582 const allCancelled = sims . every ( s => s . state === 'CANCELLED' ) ;
579583 if ( allCancelled ) {
580584 await setNeedsAggregation ( jobId , false ) ;
581- deleteJobProgress ( jobId ) . catch ( err => console . warn ( '[ Cleanup] fire-and-forget failed: ' , err instanceof Error ? err . message : err ) ) ;
585+ deleteJobProgress ( jobId ) . catch ( err => log . warn ( 'Cleanup fire-and-forget failed' , { jobId , error : err instanceof Error ? err . message : err } ) ) ;
582586 return ; // Already handled by cancel flow
583587 }
584588
@@ -588,7 +592,7 @@ export async function aggregateJobResults(jobId: string): Promise<void> {
588592 await setNeedsAggregation ( jobId , false ) ;
589593
590594 // Clean up RTDB ephemeral data and cancel recovery task
591- cancelRecoveryCheck ( jobId ) . catch ( err => console . warn ( '[ Cleanup] fire-and-forget failed: ' , err instanceof Error ? err . message : err ) ) ;
592- deleteJobProgress ( jobId ) . catch ( err => console . warn ( '[ Cleanup] fire-and-forget failed: ' , err instanceof Error ? err . message : err ) ) ;
595+ cancelRecoveryCheck ( jobId ) . catch ( err => log . warn ( 'Cleanup fire-and-forget failed' , { jobId , error : err instanceof Error ? err . message : err } ) ) ;
596+ deleteJobProgress ( jobId ) . catch ( err => log . warn ( 'Cleanup fire-and-forget failed' , { jobId , error : err instanceof Error ? err . message : err } ) ) ;
593597}
594598
0 commit comments