From c3438aabf74c3091a24b785d4380333c79e35fe7 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Thu, 28 May 2026 21:00:00 -0400 Subject: [PATCH] Add more loggings to debug --- .../beam_PostCommit_Python_Versions.json | 2 +- .../prism/internal/engine/elementmanager.go | 11 +++++--- .../runners/prism/internal/environments.go | 5 +++- .../beam/runners/prism/internal/execute.go | 22 +++++++++++++++ .../pkg/beam/runners/prism/internal/stage.go | 8 +++++- .../runners/prism/internal/worker/worker.go | 27 +++++++++++++++++++ .../runners/portability/prism_runner.py | 14 ++++++++-- 7 files changed, 80 insertions(+), 9 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Versions.json b/.github/trigger_files/beam_PostCommit_Python_Versions.json index 8b2c8c445c1f..86bf1193abd9 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Versions.json +++ b/.github/trigger_files/beam_PostCommit_Python_Versions.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "revision": 5 + "revision": 7 } diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 3949d1af3248..7fbdc8a6f533 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -240,8 +240,10 @@ type ElementManager struct { } func (em *ElementManager) addPending(v int) { + prev := em.livePending.Load() em.livePending.Add(int64(v)) em.pendingElements.Add(v) + slog.Info("em.addPending", "delta", v, "prev", prev, "current", em.livePending.Load()) } // LinkID represents a fully qualified input or output. @@ -533,7 +535,7 @@ func (em *ElementManager) DumpStages() string { stageState = append(stageState, fmt.Sprintf("TestStreamHandler: completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v \n", em.testStreamHandler.completed, em.testStreamHandler.nextEventIndex, len(em.testStreamHandler.events), em.testStreamHandler.events, em.testStreamHandler.processingTime, mtime.FromTime(em.testStreamHandler.processingTime), em.processTimeEvents)) } else { - stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n", em.processingTimeNow(), em.processTimeEvents.events, em.injectedBundles)) + stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v livePending: %v\n", em.processingTimeNow(), em.processTimeEvents.events, em.injectedBundles, em.livePending.Load())) } sort.Strings(ids) for _, id := range ids { @@ -1094,9 +1096,8 @@ func (em *ElementManager) FailBundle(rb RunBundle) { em.markChangedAndClearBundle(rb.StageID, rb.BundleID, nil) } -// ReturnResiduals is called after a successful split, so the remaining work -// can be re-assigned to a new bundle. func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals Residuals) { + slog.Info("ElementManager.ReturnResiduals start", "bundle", rb, "firstRsIndex", firstRsIndex) stage := em.stages[rb.StageID] stage.splitBundle(rb, firstRsIndex, em) @@ -1106,6 +1107,7 @@ func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputI count := stage.AddPending(em, unprocessedElements) em.addPending(count) } + slog.Info("ElementManager.ReturnResiduals end", "bundle", rb, "unprocessedCount", len(unprocessedElements), "livePending", em.livePending.Load()) em.markStagesAsChanged(singleSet(rb.StageID)) } @@ -2190,7 +2192,7 @@ func (ss *stageState) splitBundle(rb RunBundle, firstResidual int, em *ElementMa defer ss.mu.Unlock() es := ss.inprogress[rb.BundleID] - slog.Debug("split elements", "bundle", rb, "elem count", len(es.es), "res", firstResidual) + slog.Info("splitBundle start", "bundle", rb, "elem count", len(es.es), "firstResidual", firstResidual, "livePending", em.livePending.Load()) prim := es.es[:firstResidual] res := es.es[firstResidual:] @@ -2210,6 +2212,7 @@ func (ss *stageState) splitBundle(rb RunBundle, firstResidual int, em *ElementMa // we don't need to increment pending count in em, since it is already pending ss.kind.addPending(ss, em, res) ss.inprogress[rb.BundleID] = es + slog.Info("splitBundle completed", "bundle", rb, "primaryCount", len(prim), "residualCount", len(res), "livePending", em.livePending.Load()) } // minimumPendingTimestamp returns the minimum pending timestamp from all pending elements, diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go b/sdks/go/pkg/beam/runners/prism/internal/environments.go index 47f58f8f2532..343d2ba507bc 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/environments.go +++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go @@ -159,9 +159,12 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo // Previous context cancelled so we need a new one // for this request. - pool.StopWorker(bgContext, &fnpb.StopWorkerRequest{ + _, err = pool.StopWorker(bgContext, &fnpb.StopWorkerRequest{ WorkerId: wk.ID, }) + if err != nil { + slog.Warn("StopWorker failed", "worker", wk, "error", err) + } wk.Stop() } diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index f6e148f9f3f6..ef2ea2e4ae0f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -22,7 +22,9 @@ import ( "io" "log/slog" "runtime/debug" + "runtime/pprof" "sort" + "strings" "sync/atomic" "time" @@ -391,6 +393,19 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic // Log a heartbeat every 60 seconds case <-ticker.C: j.Logger.Info("pipeline is running", slog.String("job", j.String())) + j.Logger.Info("pipeline stages state", slog.String("stages", em.DumpStages())) + var buf strings.Builder + goroutineDump(&buf) + j.Logger.Info("goroutines", slog.String("dump", buf.String())) + for envID, wk := range wks { + if wk != nil && wk.Connected() && !wk.Stopped() { + j.Logger.Info("worker status", + slog.String("workerID", wk.ID), + slog.String("envID", envID), + slog.Duration("uptime", wk.Uptime()), + slog.Any("active_bundles", wk.ActiveBundles())) + } + } } } } @@ -501,3 +516,10 @@ func buildTrigger(tpb *pipepb.Trigger) engine.Trigger { return &engine.TriggerDefault{} } } + +func goroutineDump(statusInfo *strings.Builder) { + profile := pprof.Lookup("goroutine") + if profile != nil { + profile.WriteTo(statusInfo, 1) + } +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index b46c9c2fd5b1..8ebeb76f9e54 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -184,6 +184,8 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c panic(err) } + bundleStart := time.Now() + // Progress + split loop. previousIndex := int64(-2) previousTotalCount := int64(-2) // Total count of all pcollection elements. @@ -232,7 +234,11 @@ progress: md := wk.MonitoringMetadata(ctx, unknownIDs) j.AddMetricShortIDs(md) } - slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex) + runningFor := time.Since(bundleStart) + slog.Debug("progress report", "bundle", rb, "runningFor", runningFor, "index", index, "prevIndex", previousIndex) + if runningFor > 5*time.Minute { + slog.Warn("Bundle has been running for a long time", "bundle", rb, "runningFor", runningFor, "worker", wk.ID) + } // Check if there has been any measurable progress by the input, or all output pcollections since last report. slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"] diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index 33c8c3a7de5f..6a0ede344f5e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -70,6 +70,7 @@ type W struct { // These are the ID sources inst uint64 connected, stopped atomic.Bool + StartTime time.Time StoppedChan chan struct{} // Channel to Broadcast stopped state. InstReqs chan *fnpb.InstructionRequest @@ -292,11 +293,37 @@ func (wk *W) Stopped() bool { return wk.stopped.Load() } +// Uptime returns how long the worker has been connected. +func (wk *W) Uptime() time.Duration { + wk.mu.Lock() + defer wk.mu.Unlock() + if wk.StartTime.IsZero() { + return 0 + } + return time.Since(wk.StartTime) +} + +// ActiveBundles returns a list of active bundles currently processing on this worker. +func (wk *W) ActiveBundles() []string { + wk.mu.Lock() + defer wk.mu.Unlock() + var bundles []string + for id, responder := range wk.activeInstructions { + if b, ok := responder.(*B); ok { + bundles = append(bundles, fmt.Sprintf("%s (%s)", id, b.PBDID)) + } + } + return bundles +} + // Control relays instructions to SDKs and back again, coordinated via unique instructionIDs. // // Requests come from the runner, and are sent to the client in the SDK. func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { wk.connected.Store(true) + wk.mu.Lock() + wk.StartTime = time.Now() + wk.mu.Unlock() done := make(chan error, 1) go func() { for { diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index 105c27fcb8c9..8e3c67fdc19e 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -140,8 +140,18 @@ def filter(self, record): record.pathname = json_record["source"]["file"] record.filename = os.path.basename(record.pathname) record.lineno = json_record["source"]["line"] - record.created = datetime.datetime.fromisoformat( - json_record["time"]).timestamp() + time_str = json_record["time"] + match = re.match(r"^(.*?)(\.\d+)?(Z|[+-]\d{2}:?\d{2})?$", time_str) + if match: + base, frac, tz = match.groups() + if frac: + frac = (frac + "000000")[:7] + else: + frac = "" + if tz == 'Z': + tz = '+00:00' + time_str = base + frac + (tz or "") + record.created = datetime.datetime.fromisoformat(time_str).timestamp() extras = { k: v for k, v in json_record.items()