From 19ffa9a16d337a1a9940045b984cba7126bf5af5 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Sun, 7 Jun 2026 22:57:01 -0700 Subject: [PATCH 1/7] fix(frontend): clean up websocket state when returning to the dashboard (#3120) WorkflowWebsocketService, ComputingUnitStatusService, and the execution-session services (WorkflowStatusService, ExecuteWorkflowService, WorkflowConsoleService, WorkflowResultService) are all root-scoped singletons, so their state outlived the workspace. WorkspaceComponent.ngOnDestroy cleared the workflow graph but never tore down the websocket session, leaving the socket open and the derived state (connection tracking, worker count, execution status, console output, result caches) populated. Re-entering a workflow then reused the stale session; for the same wid (the wid -> null -> wid pattern) the reconnect guard even short-circuited, the case PR #3093's in-canvas guard did not cover. Tear down every piece of websocket-derived state on workspace destroy: ComputingUnitStatusService.disconnect() (close socket, clear status, stop polling, reset connection tracking and selected unit), WorkflowWebsocketService.closeWebsocket() (reset numWorkers), ExecuteWorkflowService.resetState() (execution state + worker assignments), WorkflowConsoleService.clearConsoleMessages(), and WorkflowResultService.clearResults() (result caches + table-stats ReplaySubject). WorkspaceComponent.ngOnDestroy invokes all of them so re-entering any workflow (same wid included) starts from a clean session. --- .../computing-unit-status.service.spec.ts | 100 ++++++++++++++++++ .../computing-unit-status.service.ts | 16 +++ .../component/workspace.component.spec.ts | 26 ++++- .../component/workspace.component.ts | 16 ++- .../execute-workflow.service.spec.ts | 10 ++ .../execute-workflow.service.ts | 10 ++ .../workflow-console.service.spec.ts | 13 +++ .../workflow-console.service.ts | 10 ++ .../workflow-result.service.spec.ts | 20 ++++ .../workflow-result.service.ts | 12 +++ .../workflow-websocket.service.spec.ts | 8 ++ .../workflow-websocket.service.ts | 2 + 12 files changed, 241 insertions(+), 2 deletions(-) create mode 100644 frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts new file mode 100644 index 00000000000..6e74fdd36c7 --- /dev/null +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { TestBed } from "@angular/core/testing"; +import { HttpClientTestingModule } from "@angular/common/http/testing"; +import { of } from "rxjs"; +import { ComputingUnitStatusService } from "./computing-unit-status.service"; +import { WorkflowComputingUnitManagingService } from "../workflow-computing-unit/workflow-computing-unit-managing.service"; +import { WorkflowWebsocketService } from "../../../../workspace/service/workflow-websocket/workflow-websocket.service"; +import { WorkflowStatusService } from "../../../../workspace/service/workflow-status/workflow-status.service"; +import { UserService } from "../../user/user.service"; +import { StubUserService } from "../../user/stub-user.service"; +import { AuthService } from "../../user/auth.service"; +import { StubAuthService } from "../../user/stub-auth.service"; +import { DashboardWorkflowComputingUnit } from "../../../type/workflow-computing-unit"; +import { commonTestProviders } from "../../../testing/test-utils"; + +describe("ComputingUnitStatusService", () => { + let service: ComputingUnitStatusService; + let websocketService: WorkflowWebsocketService; + + const mockUnit = (cuid: number) => ({ computingUnit: { cuid } }) as unknown as DashboardWorkflowComputingUnit; + + beforeEach(() => { + const managingStub = { + listComputingUnits: () => of([]), + getComputingUnit: (cuid: number) => of(mockUnit(cuid)), + terminateComputingUnit: () => of(undefined), + }; + + TestBed.configureTestingModule({ + imports: [HttpClientTestingModule], + providers: [ + ComputingUnitStatusService, + WorkflowWebsocketService, + WorkflowStatusService, + { provide: WorkflowComputingUnitManagingService, useValue: managingStub }, + { provide: UserService, useClass: StubUserService }, + { provide: AuthService, useClass: StubAuthService }, + ...commonTestProviders, + ], + }); + + service = TestBed.inject(ComputingUnitStatusService); + websocketService = TestBed.inject(WorkflowWebsocketService); + }); + + it("should be created", () => { + expect(service).toBeTruthy(); + }); + + it("reconnects when re-selecting the same workflow after disconnect (regression #3120)", () => { + const openSpy = vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {}); + const closeSpy = vi.spyOn(websocketService, "closeWebsocket"); + (service as any).allComputingUnitsSubject.next([mockUnit(7)]); + + // Enter workflow 5 on computing unit 7 → opens the websocket once. + service.selectComputingUnit(5, 7); + expect(openSpy).toHaveBeenCalledTimes(1); + + // User returns to the dashboard. + service.disconnect(); + expect(closeSpy).toHaveBeenCalled(); + + // Re-enter the SAME workflow (the `wid -> null -> wid` pattern). Without + // cleanup the retained currentConnectedWid/Cuid would suppress the reconnect + // and the stale socket would be reused; after disconnect it must reconnect. + service.selectComputingUnit(5, 7); + expect(openSpy).toHaveBeenCalledTimes(2); + }); + + it("disconnect() clears the selected computing unit", () => { + vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {}); + (service as any).allComputingUnitsSubject.next([mockUnit(7)]); + service.selectComputingUnit(5, 7); + + let latest: DashboardWorkflowComputingUnit | null = mockUnit(7); + service.getSelectedComputingUnit().subscribe(unit => (latest = unit)); + expect(latest).not.toBeNull(); + + service.disconnect(); + expect(latest).toBeNull(); + }); +}); diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts index 831263183b0..66a9e3b5d14 100644 --- a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts @@ -225,6 +225,22 @@ export class ComputingUnitStatusService implements OnDestroy { ); } + /** + * Tear down all websocket-related connection state. Called when the user + * leaves the workspace (e.g. returns to the dashboard) so that re-entering a + * workflow — even the same wid, which arrives as a `wid -> null -> wid` + * sequence — starts from a clean connection instead of reusing the previous + * one. See issue #3120. + */ + public disconnect(): void { + this.workflowWebsocketService.closeWebsocket(); + this.workflowStatusService.clearStatus(); + this.stopPollingSelectedUnit(); + this.currentConnectedCuid = undefined; + this.currentConnectedWid = undefined; + this.selectedUnitSubject.next(null); + } + // Clean up on service destroy ngOnDestroy(): void { this.refreshSubscription?.unsubscribe(); diff --git a/frontend/src/app/workspace/component/workspace.component.spec.ts b/frontend/src/app/workspace/component/workspace.component.spec.ts index 7659f346f32..6904237c8e6 100644 --- a/frontend/src/app/workspace/component/workspace.component.spec.ts +++ b/frontend/src/app/workspace/component/workspace.component.spec.ts @@ -40,8 +40,11 @@ import { WorkflowCompilingService } from "../service/compile-workflow/workflow-c import { OperatorMetadataService } from "../service/operator-metadata/operator-metadata.service"; import { UndoRedoService } from "../service/undo-redo/undo-redo.service"; import { WorkflowConsoleService } from "../service/workflow-console/workflow-console.service"; +import { ExecuteWorkflowService } from "../service/execute-workflow/execute-workflow.service"; +import { WorkflowResultService } from "../service/workflow-result/workflow-result.service"; import { WorkflowActionService } from "../service/workflow-graph/model/workflow-action.service"; import { OperatorReuseCacheStatusService } from "../service/workflow-status/operator-reuse-cache-status.service"; +import { ComputingUnitStatusService } from "../../common/service/computing-unit/computing-unit-status/computing-unit-status.service"; import { EntityType, HubService } from "../../hub/service/hub.service"; import { commonTestProviders } from "../../common/testing/test-utils"; import { WorkspaceComponent } from "./workspace.component"; @@ -62,6 +65,10 @@ describe("WorkspaceComponent", () => { let messageService: any; let routerMock: any; let locationMock: any; + let computingUnitStatusService: any; + let executeWorkflowService: any; + let workflowConsoleService: any; + let workflowResultService: any; let metadataChangedSubject: Subject; let stubGraph: { triggerCenterEvent: ReturnType; hasElementWithID: ReturnType }; @@ -136,6 +143,10 @@ describe("WorkspaceComponent", () => { routerMock = { navigate: vi.fn() }; locationMock = { go: vi.fn() }; + computingUnitStatusService = { disconnect: vi.fn() }; + executeWorkflowService = { resetState: vi.fn() }; + workflowConsoleService = { clearConsoleMessages: vi.fn() }; + workflowResultService = { clearResults: vi.fn() }; // Drop the standalone component's child imports and allow unknown elements via // CUSTOM_ELEMENTS_SCHEMA. The template still renders, so `` @@ -167,8 +178,11 @@ describe("WorkspaceComponent", () => { // The three services listed in the constructor only to force their // initialization aren't exercised by any test here; provide stubs. { provide: WorkflowCompilingService, useValue: {} }, - { provide: WorkflowConsoleService, useValue: {} }, + { provide: WorkflowConsoleService, useValue: workflowConsoleService }, { provide: OperatorReuseCacheStatusService, useValue: {} }, + { provide: ComputingUnitStatusService, useValue: computingUnitStatusService }, + { provide: ExecuteWorkflowService, useValue: executeWorkflowService }, + { provide: WorkflowResultService, useValue: workflowResultService }, ...commonTestProviders, ], schemas: [NO_ERRORS_SCHEMA], @@ -415,6 +429,16 @@ describe("WorkspaceComponent", () => { // Cleanup of the workflow state still happens regardless. expect(workflowActionService.clearWorkflow).toHaveBeenCalled(); }); + + it("tears down every piece of websocket-derived state when leaving the workspace (issue #3120)", async () => { + await createFixture(); + fixture.detectChanges(); + component.ngOnDestroy(); + expect(computingUnitStatusService.disconnect).toHaveBeenCalled(); + expect(executeWorkflowService.resetState).toHaveBeenCalled(); + expect(workflowConsoleService.clearConsoleMessages).toHaveBeenCalled(); + expect(workflowResultService.clearResults).toHaveBeenCalled(); + }); }); describe("copilotEnabled", () => { diff --git a/frontend/src/app/workspace/component/workspace.component.ts b/frontend/src/app/workspace/component/workspace.component.ts index da220feaab7..81030685558 100644 --- a/frontend/src/app/workspace/component/workspace.component.ts +++ b/frontend/src/app/workspace/component/workspace.component.ts @@ -51,6 +51,9 @@ import { THROTTLE_TIME_MS } from "../../hub/component/workflow/detail/hub-workfl import { WorkflowCompilingService } from "../service/compile-workflow/workflow-compiling.service"; import { USER_WORKSPACE } from "../../app-routing.constant"; import { GuiConfigService } from "../../common/service/gui-config.service"; +import { ComputingUnitStatusService } from "../../common/service/computing-unit/computing-unit-status/computing-unit-status.service"; +import { ExecuteWorkflowService } from "../service/execute-workflow/execute-workflow.service"; +import { WorkflowResultService } from "../service/workflow-result/workflow-result.service"; import { checkIfWorkflowBroken } from "../../common/util/workflow-check"; import { NzSpinComponent } from "ng-zorro-antd/spin"; import { ResultPanelComponent } from "./result-panel/result-panel.component"; @@ -126,7 +129,10 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { private hubService: HubService, private codeEditorService: CodeEditorService, private config: GuiConfigService, - private changeDetectorRef: ChangeDetectorRef + private changeDetectorRef: ChangeDetectorRef, + private computingUnitStatusService: ComputingUnitStatusService, + private executeWorkflowService: ExecuteWorkflowService, + private workflowResultService: WorkflowResultService ) {} ngOnInit() { @@ -184,6 +190,14 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { this.codeEditorViewRef.clear(); this.workflowActionService.clearWorkflow(); + // Tear down every piece of websocket-derived state so that re-entering a + // workflow starts from a clean session instead of reusing the previous one + // (issue #3120): the connection itself, plus the execution status, console + // output, and result caches that are populated from websocket events. + this.computingUnitStatusService.disconnect(); + this.executeWorkflowService.resetState(); + this.workflowConsoleService.clearConsoleMessages(); + this.workflowResultService.clearResults(); } registerAutoPersistWorkflow(): void { diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts index b8d2779c0c3..6cb2a95fa61 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts @@ -86,6 +86,16 @@ describe("ExecuteWorkflowService", () => { expect(injectedService).toBeTruthy(); })); + it("resetState() clears the execution state and worker assignments", () => { + (service as any).currentState = { state: ExecutionState.Running }; + (service as any).assignedWorkerIds.set("op1", ["w1", "w2"]); + + service.resetState(); + + expect(service.getExecutionState().state).toBe(ExecutionState.Uninitialized); + expect(service.getWorkerIds("op1")).toEqual([]); + }); + it("should generate a logical plan request based on the workflow graph that is passed to the function", () => { const newLogicalPlan: LogicalPlan = ExecuteWorkflowService.getLogicalPlanRequest(mockWorkflowPlan_scan_result); expect(newLogicalPlan).toEqual(mockLogicalPlan_scan_result); diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts index eb86194e7cf..313899569c5 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts @@ -354,6 +354,16 @@ export class ExecuteWorkflowService { }; } + /** + * Reset all websocket-derived execution state. Called when leaving the + * workspace so a re-entered workflow does not inherit the previous + * execution's status or worker assignments (issue #3120). + */ + public resetState(): void { + this.resetExecutionState(); + this.assignedWorkerIds.clear(); + } + private updateExecutionState(stateInfo: ExecutionStateInfo): void { if (isEqual(this.currentState, stateInfo)) { return; diff --git a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts index 9be036cdcce..a211eea52bf 100644 --- a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts +++ b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts @@ -34,4 +34,17 @@ describe("WorkflowConsoleService", () => { it("should be created", () => { expect(service).toBeTruthy(); }); + + it("clearConsoleMessages() removes all messages and notifies subscribers", () => { + (service as any).consoleMessages.set("op1", []); + expect(service.hasConsoleMessages("op1")).toBe(true); + + let notified = false; + service.getConsoleMessageUpdateStream().subscribe(() => (notified = true)); + + service.clearConsoleMessages(); + + expect(service.hasConsoleMessages("op1")).toBe(false); + expect(notified).toBe(true); + }); }); diff --git a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts index b7d60868aea..501c3e75b52 100644 --- a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts +++ b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts @@ -73,4 +73,14 @@ export class WorkflowConsoleService { getConsoleMessageUpdateStream(): Observable { return this.consoleMessagesUpdateStream.asObservable(); } + + /** + * Clear all console messages. Called when leaving the workspace so a + * re-entered workflow does not show the previous session's console output + * (issue #3120). + */ + public clearConsoleMessages(): void { + this.consoleMessages.clear(); + this.consoleMessagesUpdateStream.next(); + } } diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts index 8406a55c75e..456b2bc2120 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts @@ -37,6 +37,26 @@ describe("WorkflowResultService", () => { it("should be created", () => { expect(service).toBeTruthy(); }); + + it("clearResults() drops cached operator results", () => { + (service as any).operatorResultServices.set("op1", {}); + (service as any).paginatedResultServices.set("op2", {}); + expect(service.hasAnyResult("op1")).toBe(true); + expect(service.hasAnyResult("op2")).toBe(true); + + service.clearResults(); + + expect(service.hasAnyResult("op1")).toBe(false); + expect(service.hasAnyResult("op2")).toBe(false); + }); + + it("clearResults() resets table stats to empty for subscribers", () => { + const pairs: [unknown, unknown][] = []; + service.getResultTableStats().subscribe(p => pairs.push(p)); + (service as any).resultTableStats.next({ op1: {} }); + service.clearResults(); + expect(pairs[pairs.length - 1][1]).toEqual({}); + }); }); describe("OperatorPaginationResultService", () => { diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts index 9fd18e0f161..fb9c1c4b62c 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts @@ -95,6 +95,18 @@ export class WorkflowResultService { return this.operatorResultServices.get(operatorID); } + /** + * Drop all cached operator results and reset table stats. Called when leaving + * the workspace so a re-entered workflow does not show the previous session's + * results (issue #3120). resultTableStats is a ReplaySubject, so it is reset + * to an empty snapshot to avoid replaying stale stats to new subscribers. + */ + public clearResults(): void { + this.operatorResultServices.clear(); + this.paginatedResultServices.clear(); + this.resultTableStats.next({}); + } + private handleCleanResultCache(event: WorkflowAvailableResultEvent): void { const removedOrInvalidatedOperators = new Set(); // remove operators that no longer have results diff --git a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts index f2862d18d7c..4c45a15fc0d 100644 --- a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts +++ b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts @@ -94,4 +94,12 @@ describe("WorkflowWebsocketService", () => { window.WebSocket = originalWebSocket; } }); + + it("should reset the cached worker count when the websocket is closed", () => { + // numWorkers is populated from ClusterStatusUpdateEvent on the live connection; + // once the socket is closed the count is stale and must reset (see issue #3120). + service.numWorkers = 5; + service.closeWebsocket(); + expect(service.numWorkers).toBe(-1); + }); }); diff --git a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts index 017e43ee6a4..4e52d3fa09e 100644 --- a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts +++ b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts @@ -93,6 +93,8 @@ export class WorkflowWebsocketService { this.wsWithReconnectSubscription?.unsubscribe(); this.statusUpdateSubscription?.unsubscribe(); this.websocket?.complete(); + // the worker count comes from the live connection; reset it once the socket is gone + this.numWorkers = -1; this.updateConnectionStatus(false); } From 177eeb3b8705af936569ffb1cbd276de26158a78 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Sun, 7 Jun 2026 23:47:40 -0700 Subject: [PATCH 2/7] fix(frontend): clear websocket session state on in-canvas computing-unit switch Follow-up to #3120 review feedback: - Extend the session-state teardown (execution status, console output, results) to the in-canvas computing-unit switch, which previously cleared only operator status and left the prior unit's console/results/execution badge on screen. ComputingUnitStatusService now emits getConnectionResetStream() when it tears down an active connection to reconnect to a different unit; WorkspaceComponent subscribes and runs a shared resetWorkflowSessionState() helper that ngOnDestroy also uses. - Rename ExecuteWorkflowService.resetState() to resetExecutionAndWorkers() to distinguish it from the existing resetExecutionState(). --- .../computing-unit-status.service.spec.ts | 23 +++++++++++++++++++ .../computing-unit-status.service.ts | 20 ++++++++++++++++ .../component/workspace.component.spec.ts | 21 ++++++++++++++--- .../component/workspace.component.ts | 23 ++++++++++++++++--- .../execute-workflow.service.spec.ts | 4 ++-- .../execute-workflow.service.ts | 9 ++++---- 6 files changed, 88 insertions(+), 12 deletions(-) diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts index 6e74fdd36c7..5401e9e9b80 100644 --- a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts @@ -97,4 +97,27 @@ describe("ComputingUnitStatusService", () => { service.disconnect(); expect(latest).toBeNull(); }); + + it("emits a connection-reset signal when switching to a different computing unit (issue #3120)", () => { + let connected = false; + vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => { + connected = true; + }); + vi.spyOn(websocketService, "closeWebsocket").mockImplementation(() => { + connected = false; + }); + vi.spyOn(websocketService, "isConnected", "get").mockImplementation(() => connected); + (service as any).allComputingUnitsSubject.next([mockUnit(7), mockUnit(8)]); + + let resetCount = 0; + service.getConnectionResetStream().subscribe(() => resetCount++); + + // First connection on unit 7: nothing to tear down yet → no signal. + service.selectComputingUnit(5, 7); + expect(resetCount).toBe(0); + + // Switch to a different unit while connected → tear-down signal fires once. + service.selectComputingUnit(5, 8); + expect(resetCount).toBe(1); + }); }); diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts index 66a9e3b5d14..b39f47f114b 100644 --- a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts @@ -47,6 +47,11 @@ export class ComputingUnitStatusService implements OnDestroy { private readonly refreshComputingUnitListSignal = new Subject(); + // Emits when an active websocket connection is torn down to switch to a + // different computing unit, so session-scoped consumers (execution status, + // console, results) can clear their websocket-derived state. See issue #3120. + private readonly connectionResetSubject = new Subject(); + // Refresh interval in milliseconds private readonly REFRESH_INTERVAL_MS = 2000; private refreshSubscription: Subscription | null = null; @@ -161,6 +166,10 @@ export class ComputingUnitStatusService implements OnDestroy { if (this.workflowWebsocketService.isConnected) { this.workflowWebsocketService.closeWebsocket(); this.workflowStatusService.clearStatus(); + // We are tearing down an active connection to switch to a different + // unit; tell session consumers to clear their stale execution, + // console, and result state so the new unit starts fresh (#3120). + this.connectionResetSubject.next(); } this.workflowWebsocketService.openWebsocket(wid, this.userService.getCurrentUser()?.uid, cuid); @@ -225,6 +234,16 @@ export class ComputingUnitStatusService implements OnDestroy { ); } + /** + * Emits whenever an active connection is reset to switch to a different + * computing unit. Consumers that hold websocket-derived session state (e.g. + * execution status, console output, results) subscribe to this to clear that + * state so the new unit starts fresh. See issue #3120. + */ + public getConnectionResetStream(): Observable { + return this.connectionResetSubject.asObservable(); + } + /** * Tear down all websocket-related connection state. Called when the user * leaves the workspace (e.g. returns to the dashboard) so that re-entering a @@ -248,6 +267,7 @@ export class ComputingUnitStatusService implements OnDestroy { this.selectedUnitSubject.complete(); this.allComputingUnitsSubject.complete(); + this.connectionResetSubject.complete(); } /** diff --git a/frontend/src/app/workspace/component/workspace.component.spec.ts b/frontend/src/app/workspace/component/workspace.component.spec.ts index 6904237c8e6..f85294e42ad 100644 --- a/frontend/src/app/workspace/component/workspace.component.spec.ts +++ b/frontend/src/app/workspace/component/workspace.component.spec.ts @@ -69,6 +69,7 @@ describe("WorkspaceComponent", () => { let executeWorkflowService: any; let workflowConsoleService: any; let workflowResultService: any; + let connectionResetSubject: Subject; let metadataChangedSubject: Subject; let stubGraph: { triggerCenterEvent: ReturnType; hasElementWithID: ReturnType }; @@ -143,8 +144,12 @@ describe("WorkspaceComponent", () => { routerMock = { navigate: vi.fn() }; locationMock = { go: vi.fn() }; - computingUnitStatusService = { disconnect: vi.fn() }; - executeWorkflowService = { resetState: vi.fn() }; + connectionResetSubject = new Subject(); + computingUnitStatusService = { + disconnect: vi.fn(), + getConnectionResetStream: () => connectionResetSubject.asObservable(), + }; + executeWorkflowService = { resetExecutionAndWorkers: vi.fn() }; workflowConsoleService = { clearConsoleMessages: vi.fn() }; workflowResultService = { clearResults: vi.fn() }; @@ -435,7 +440,17 @@ describe("WorkspaceComponent", () => { fixture.detectChanges(); component.ngOnDestroy(); expect(computingUnitStatusService.disconnect).toHaveBeenCalled(); - expect(executeWorkflowService.resetState).toHaveBeenCalled(); + expect(executeWorkflowService.resetExecutionAndWorkers).toHaveBeenCalled(); + expect(workflowConsoleService.clearConsoleMessages).toHaveBeenCalled(); + expect(workflowResultService.clearResults).toHaveBeenCalled(); + }); + + it("clears the workflow session state when the computing unit is switched in-canvas (issue #3120)", async () => { + await createFixture(); + fixture.detectChanges(); + // Switching to a different unit emits on the connection-reset stream. + connectionResetSubject.next(); + expect(executeWorkflowService.resetExecutionAndWorkers).toHaveBeenCalled(); expect(workflowConsoleService.clearConsoleMessages).toHaveBeenCalled(); expect(workflowResultService.clearResults).toHaveBeenCalled(); }); diff --git a/frontend/src/app/workspace/component/workspace.component.ts b/frontend/src/app/workspace/component/workspace.component.ts index 81030685558..8808265cac5 100644 --- a/frontend/src/app/workspace/component/workspace.component.ts +++ b/frontend/src/app/workspace/component/workspace.component.ts @@ -150,6 +150,13 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { */ this.pid = parseInt(this.route.snapshot.queryParams.pid) || undefined; this.workflowActionService.setHighlightingEnabled(true); + // When the user switches computing units without leaving the workspace, the + // socket reconnects to the new unit; clear the previous unit's session state + // (execution status, console, results) so it does not linger (issue #3120). + this.computingUnitStatusService + .getConnectionResetStream() + .pipe(untilDestroyed(this)) + .subscribe(() => this.resetWorkflowSessionState()); } ngAfterViewInit(): void { @@ -192,10 +199,20 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { this.workflowActionService.clearWorkflow(); // Tear down every piece of websocket-derived state so that re-entering a // workflow starts from a clean session instead of reusing the previous one - // (issue #3120): the connection itself, plus the execution status, console - // output, and result caches that are populated from websocket events. + // (issue #3120): the connection itself, plus the session state that is + // populated from websocket events. this.computingUnitStatusService.disconnect(); - this.executeWorkflowService.resetState(); + this.resetWorkflowSessionState(); + } + + /** + * Clear the websocket-derived session state (execution status, console + * output, result caches) so a fresh workflow session does not inherit the + * previous one. Shared by workspace teardown (returning to the dashboard) and + * in-canvas computing-unit switches (issue #3120). + */ + private resetWorkflowSessionState(): void { + this.executeWorkflowService.resetExecutionAndWorkers(); this.workflowConsoleService.clearConsoleMessages(); this.workflowResultService.clearResults(); } diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts index 6cb2a95fa61..e789153cbc3 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts @@ -86,11 +86,11 @@ describe("ExecuteWorkflowService", () => { expect(injectedService).toBeTruthy(); })); - it("resetState() clears the execution state and worker assignments", () => { + it("resetExecutionAndWorkers() clears the execution state and worker assignments", () => { (service as any).currentState = { state: ExecutionState.Running }; (service as any).assignedWorkerIds.set("op1", ["w1", "w2"]); - service.resetState(); + service.resetExecutionAndWorkers(); expect(service.getExecutionState().state).toBe(ExecutionState.Uninitialized); expect(service.getWorkerIds("op1")).toEqual([]); diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts index 313899569c5..52f67e918e6 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts @@ -355,11 +355,12 @@ export class ExecuteWorkflowService { } /** - * Reset all websocket-derived execution state. Called when leaving the - * workspace so a re-entered workflow does not inherit the previous - * execution's status or worker assignments (issue #3120). + * Reset all websocket-derived execution state. Unlike resetExecutionState(), + * which only resets the current execution status, this also clears the worker + * assignments. Called when leaving the workspace so a re-entered workflow does + * not inherit the previous execution's status or worker assignments (issue #3120). */ - public resetState(): void { + public resetExecutionAndWorkers(): void { this.resetExecutionState(); this.assignedWorkerIds.clear(); } From 589b073b24d976a75c34eaafc155f6a06b7685e1 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Sun, 7 Jun 2026 23:59:15 -0700 Subject: [PATCH 3/7] style(frontend): simplify code comments for the websocket cleanup (#3120) Tighten the verbose doc comments (and one test comment) added in this PR. No behavior change. --- .../computing-unit-status.service.spec.ts | 5 ++-- .../computing-unit-status.service.ts | 23 +++++++------------ .../component/workspace.component.ts | 17 +++++--------- .../execute-workflow.service.ts | 6 ++--- .../workflow-console.service.ts | 5 ++-- .../workflow-result.service.ts | 7 +++--- 6 files changed, 23 insertions(+), 40 deletions(-) diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts index 5401e9e9b80..553e27c2fc1 100644 --- a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts @@ -78,9 +78,8 @@ describe("ComputingUnitStatusService", () => { service.disconnect(); expect(closeSpy).toHaveBeenCalled(); - // Re-enter the SAME workflow (the `wid -> null -> wid` pattern). Without - // cleanup the retained currentConnectedWid/Cuid would suppress the reconnect - // and the stale socket would be reused; after disconnect it must reconnect. + // Re-enter the SAME workflow (the `wid -> null -> wid` pattern): without the + // cleanup, the retained currentConnectedWid/Cuid would suppress the reconnect. service.selectComputingUnit(5, 7); expect(openSpy).toHaveBeenCalledTimes(2); }); diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts index b39f47f114b..768b15e361d 100644 --- a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts @@ -47,9 +47,8 @@ export class ComputingUnitStatusService implements OnDestroy { private readonly refreshComputingUnitListSignal = new Subject(); - // Emits when an active websocket connection is torn down to switch to a - // different computing unit, so session-scoped consumers (execution status, - // console, results) can clear their websocket-derived state. See issue #3120. + // Emits when the active connection is torn down to switch computing units, so + // session consumers can clear their websocket-derived state (#3120). private readonly connectionResetSubject = new Subject(); // Refresh interval in milliseconds @@ -166,9 +165,7 @@ export class ComputingUnitStatusService implements OnDestroy { if (this.workflowWebsocketService.isConnected) { this.workflowWebsocketService.closeWebsocket(); this.workflowStatusService.clearStatus(); - // We are tearing down an active connection to switch to a different - // unit; tell session consumers to clear their stale execution, - // console, and result state so the new unit starts fresh (#3120). + // switching units: signal consumers to clear their stale state (#3120) this.connectionResetSubject.next(); } @@ -235,21 +232,17 @@ export class ComputingUnitStatusService implements OnDestroy { } /** - * Emits whenever an active connection is reset to switch to a different - * computing unit. Consumers that hold websocket-derived session state (e.g. - * execution status, console output, results) subscribe to this to clear that - * state so the new unit starts fresh. See issue #3120. + * Emits when the connection is reset to switch computing units. Consumers + * subscribe to clear their websocket-derived session state (#3120). */ public getConnectionResetStream(): Observable { return this.connectionResetSubject.asObservable(); } /** - * Tear down all websocket-related connection state. Called when the user - * leaves the workspace (e.g. returns to the dashboard) so that re-entering a - * workflow — even the same wid, which arrives as a `wid -> null -> wid` - * sequence — starts from a clean connection instead of reusing the previous - * one. See issue #3120. + * Tear down all websocket connection state when leaving the workspace, so + * re-entering a workflow starts from a clean connection instead of reusing + * the previous one (#3120). */ public disconnect(): void { this.workflowWebsocketService.closeWebsocket(); diff --git a/frontend/src/app/workspace/component/workspace.component.ts b/frontend/src/app/workspace/component/workspace.component.ts index 8808265cac5..28e054ee103 100644 --- a/frontend/src/app/workspace/component/workspace.component.ts +++ b/frontend/src/app/workspace/component/workspace.component.ts @@ -150,9 +150,8 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { */ this.pid = parseInt(this.route.snapshot.queryParams.pid) || undefined; this.workflowActionService.setHighlightingEnabled(true); - // When the user switches computing units without leaving the workspace, the - // socket reconnects to the new unit; clear the previous unit's session state - // (execution status, console, results) so it does not linger (issue #3120). + // Clear session state when the user switches computing units in-canvas, so + // the previous unit's status/console/results don't linger (#3120). this.computingUnitStatusService .getConnectionResetStream() .pipe(untilDestroyed(this)) @@ -197,19 +196,15 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { this.codeEditorViewRef.clear(); this.workflowActionService.clearWorkflow(); - // Tear down every piece of websocket-derived state so that re-entering a - // workflow starts from a clean session instead of reusing the previous one - // (issue #3120): the connection itself, plus the session state that is - // populated from websocket events. + // Tear down the connection and all websocket-derived session state so a + // re-entered workflow starts clean instead of reusing the previous one (#3120). this.computingUnitStatusService.disconnect(); this.resetWorkflowSessionState(); } /** - * Clear the websocket-derived session state (execution status, console - * output, result caches) so a fresh workflow session does not inherit the - * previous one. Shared by workspace teardown (returning to the dashboard) and - * in-canvas computing-unit switches (issue #3120). + * Clear websocket-derived session state (execution status, console, results). + * Shared by workspace teardown and in-canvas unit switches (#3120). */ private resetWorkflowSessionState(): void { this.executeWorkflowService.resetExecutionAndWorkers(); diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts index 52f67e918e6..85971871495 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts @@ -355,10 +355,8 @@ export class ExecuteWorkflowService { } /** - * Reset all websocket-derived execution state. Unlike resetExecutionState(), - * which only resets the current execution status, this also clears the worker - * assignments. Called when leaving the workspace so a re-entered workflow does - * not inherit the previous execution's status or worker assignments (issue #3120). + * Reset execution status and worker assignments. Unlike resetExecutionState(), + * which resets only the status, this also clears the worker assignments (#3120). */ public resetExecutionAndWorkers(): void { this.resetExecutionState(); diff --git a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts index 501c3e75b52..8218a829760 100644 --- a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts +++ b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts @@ -75,9 +75,8 @@ export class WorkflowConsoleService { } /** - * Clear all console messages. Called when leaving the workspace so a - * re-entered workflow does not show the previous session's console output - * (issue #3120). + * Clear all console messages so a re-entered workflow doesn't show the + * previous session's output (#3120). */ public clearConsoleMessages(): void { this.consoleMessages.clear(); diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts index fb9c1c4b62c..c71049a09c5 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts @@ -96,10 +96,9 @@ export class WorkflowResultService { } /** - * Drop all cached operator results and reset table stats. Called when leaving - * the workspace so a re-entered workflow does not show the previous session's - * results (issue #3120). resultTableStats is a ReplaySubject, so it is reset - * to an empty snapshot to avoid replaying stale stats to new subscribers. + * Drop cached operator results and reset table stats so a re-entered workflow + * doesn't show stale results (#3120). resultTableStats is a ReplaySubject, so + * reset it to an empty snapshot to avoid replaying stale stats to subscribers. */ public clearResults(): void { this.operatorResultServices.clear(); From 164c776fd0e614a20bc179004b91262b8e33b218 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Mon, 8 Jun 2026 00:02:32 -0700 Subject: [PATCH 4/7] style(frontend): drop issue-number references from code comments No behavior change. --- .../computing-unit-status.service.ts | 8 ++++---- .../src/app/workspace/component/workspace.component.ts | 6 +++--- .../service/execute-workflow/execute-workflow.service.ts | 2 +- .../service/workflow-console/workflow-console.service.ts | 2 +- .../service/workflow-result/workflow-result.service.ts | 2 +- .../workflow-websocket/workflow-websocket.service.spec.ts | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts index 768b15e361d..7b237600e7e 100644 --- a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts @@ -48,7 +48,7 @@ export class ComputingUnitStatusService implements OnDestroy { private readonly refreshComputingUnitListSignal = new Subject(); // Emits when the active connection is torn down to switch computing units, so - // session consumers can clear their websocket-derived state (#3120). + // session consumers can clear their websocket-derived state. private readonly connectionResetSubject = new Subject(); // Refresh interval in milliseconds @@ -165,7 +165,7 @@ export class ComputingUnitStatusService implements OnDestroy { if (this.workflowWebsocketService.isConnected) { this.workflowWebsocketService.closeWebsocket(); this.workflowStatusService.clearStatus(); - // switching units: signal consumers to clear their stale state (#3120) + // switching units: signal consumers to clear their stale state this.connectionResetSubject.next(); } @@ -233,7 +233,7 @@ export class ComputingUnitStatusService implements OnDestroy { /** * Emits when the connection is reset to switch computing units. Consumers - * subscribe to clear their websocket-derived session state (#3120). + * subscribe to clear their websocket-derived session state. */ public getConnectionResetStream(): Observable { return this.connectionResetSubject.asObservable(); @@ -242,7 +242,7 @@ export class ComputingUnitStatusService implements OnDestroy { /** * Tear down all websocket connection state when leaving the workspace, so * re-entering a workflow starts from a clean connection instead of reusing - * the previous one (#3120). + * the previous one. */ public disconnect(): void { this.workflowWebsocketService.closeWebsocket(); diff --git a/frontend/src/app/workspace/component/workspace.component.ts b/frontend/src/app/workspace/component/workspace.component.ts index 28e054ee103..e7bfcb7bf4f 100644 --- a/frontend/src/app/workspace/component/workspace.component.ts +++ b/frontend/src/app/workspace/component/workspace.component.ts @@ -151,7 +151,7 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { this.pid = parseInt(this.route.snapshot.queryParams.pid) || undefined; this.workflowActionService.setHighlightingEnabled(true); // Clear session state when the user switches computing units in-canvas, so - // the previous unit's status/console/results don't linger (#3120). + // the previous unit's status/console/results don't linger. this.computingUnitStatusService .getConnectionResetStream() .pipe(untilDestroyed(this)) @@ -197,14 +197,14 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { this.codeEditorViewRef.clear(); this.workflowActionService.clearWorkflow(); // Tear down the connection and all websocket-derived session state so a - // re-entered workflow starts clean instead of reusing the previous one (#3120). + // re-entered workflow starts clean instead of reusing the previous one. this.computingUnitStatusService.disconnect(); this.resetWorkflowSessionState(); } /** * Clear websocket-derived session state (execution status, console, results). - * Shared by workspace teardown and in-canvas unit switches (#3120). + * Shared by workspace teardown and in-canvas unit switches. */ private resetWorkflowSessionState(): void { this.executeWorkflowService.resetExecutionAndWorkers(); diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts index 85971871495..8145f1a5fd5 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts @@ -356,7 +356,7 @@ export class ExecuteWorkflowService { /** * Reset execution status and worker assignments. Unlike resetExecutionState(), - * which resets only the status, this also clears the worker assignments (#3120). + * which resets only the status, this also clears the worker assignments. */ public resetExecutionAndWorkers(): void { this.resetExecutionState(); diff --git a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts index 8218a829760..5f88a22ba9a 100644 --- a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts +++ b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts @@ -76,7 +76,7 @@ export class WorkflowConsoleService { /** * Clear all console messages so a re-entered workflow doesn't show the - * previous session's output (#3120). + * previous session's output. */ public clearConsoleMessages(): void { this.consoleMessages.clear(); diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts index c71049a09c5..a0c21fa0a9f 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts @@ -97,7 +97,7 @@ export class WorkflowResultService { /** * Drop cached operator results and reset table stats so a re-entered workflow - * doesn't show stale results (#3120). resultTableStats is a ReplaySubject, so + * doesn't show stale results. resultTableStats is a ReplaySubject, so * reset it to an empty snapshot to avoid replaying stale stats to subscribers. */ public clearResults(): void { diff --git a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts index 4c45a15fc0d..4fb0fe1d10d 100644 --- a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts +++ b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts @@ -97,7 +97,7 @@ describe("WorkflowWebsocketService", () => { it("should reset the cached worker count when the websocket is closed", () => { // numWorkers is populated from ClusterStatusUpdateEvent on the live connection; - // once the socket is closed the count is stale and must reset (see issue #3120). + // once the socket is closed the count is stale and must reset. service.numWorkers = 5; service.closeWebsocket(); expect(service.numWorkers).toBe(-1); From cb93c71562d43190ea15db1728edbb4beecd4e07 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Tue, 9 Jun 2026 02:05:43 -0700 Subject: [PATCH 5/7] fix(frontend): propagate websocket state cleanup to the UI on unit switch Address review feedback on the websocket-state-cleanup change: - WorkflowResultService.clearResults() now emits a cleared signal that ResultPanelComponent uses to tear down stale result/console frames; clearing the caches alone does not re-render a still-highlighted operator. - ExecuteWorkflowService.resetExecutionAndWorkers() broadcasts the reset on executionStateStream so MenuComponent and ResultPanelComponent drop the previous unit's execution status instead of leaving it on screen. - ComputingUnitStatusService clears status and emits the connection-reset signal when switching units even if the socket already dropped (e.g. the prior unit was terminated), not only while currently connected. - Tear down the service in the computing-unit-status spec so the interval poll started by selectComputingUnit() doesn't outlive the test. --- .../computing-unit-status.service.spec.ts | 26 +++++++++++++++++++ .../computing-unit-status.service.ts | 6 +++++ .../result-panel/result-panel.component.ts | 19 ++++++++++++++ .../execute-workflow.service.spec.ts | 5 ++++ .../execute-workflow.service.ts | 7 ++++- .../workflow-result.service.spec.ts | 7 +++++ .../workflow-result.service.ts | 14 ++++++++++ 7 files changed, 83 insertions(+), 1 deletion(-) diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts index 553e27c2fc1..3e7e2d8edd9 100644 --- a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts @@ -61,6 +61,12 @@ describe("ComputingUnitStatusService", () => { websocketService = TestBed.inject(WorkflowWebsocketService); }); + afterEach(() => { + // selectComputingUnit() starts an RxJS interval poll; tear the service down so + // the timer doesn't outlive the test and make the suite hang/flap. + service.ngOnDestroy(); + }); + it("should be created", () => { expect(service).toBeTruthy(); }); @@ -119,4 +125,24 @@ describe("ComputingUnitStatusService", () => { service.selectComputingUnit(5, 8); expect(resetCount).toBe(1); }); + + it("emits a connection-reset signal when switching units even if the socket already dropped (issue #3120)", () => { + vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {}); + vi.spyOn(websocketService, "closeWebsocket").mockImplementation(() => {}); + // socket reports disconnected throughout, e.g. the previous unit was terminated + vi.spyOn(websocketService, "isConnected", "get").mockReturnValue(false); + (service as any).allComputingUnitsSubject.next([mockUnit(7), mockUnit(8)]); + + let resetCount = 0; + service.getConnectionResetStream().subscribe(() => resetCount++); + + // First connection on unit 7: nothing to tear down yet → no signal. + service.selectComputingUnit(5, 7); + expect(resetCount).toBe(0); + + // Switch units while the socket is already disconnected: stale state from + // unit 7 must still be cleared, so the signal fires regardless of isConnected. + service.selectComputingUnit(5, 8); + expect(resetCount).toBe(1); + }); }); diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts index 7b237600e7e..3312140cf8a 100644 --- a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts @@ -162,8 +162,14 @@ export class ComputingUnitStatusService implements OnDestroy { // open websocket if needed const shouldReconnect = this.currentConnectedCuid !== cuid || this.currentConnectedWid !== wid; if (isDefined(wid) && shouldReconnect) { + // A previous connection attempt means websocket-derived state may be + // lingering even if the socket has since dropped (e.g. the prior unit was + // terminated), so tear it down regardless of the current isConnected flag. + const hadPreviousConnection = isDefined(this.currentConnectedWid) || isDefined(this.currentConnectedCuid); if (this.workflowWebsocketService.isConnected) { this.workflowWebsocketService.closeWebsocket(); + } + if (hadPreviousConnection) { this.workflowStatusService.clearStatus(); // switching units: signal consumers to clear their stale state this.connectionResetSubject.next(); diff --git a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts index 3260afb290d..bc75baa1077 100644 --- a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts +++ b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts @@ -136,6 +136,7 @@ export class ResultPanelComponent implements OnInit, OnDestroy { this.updateReturnPosition(DEFAULT_HEIGHT, this.height); this.registerAutoRerenderResultPanel(); this.registerAutoOpenResultPanel(); + this.registerResultClearedHandler(); this.handleResultPanelForVersionPreview(); this.panelService.closePanelStream.pipe(untilDestroyed(this)).subscribe(() => this.closePanel()); this.panelService.resetPanelStream.pipe(untilDestroyed(this)).subscribe(() => { @@ -218,6 +219,24 @@ export class ResultPanelComponent implements OnInit, OnDestroy { }); } + /** + * When all session results are dropped (e.g. switching computing units or + * leaving the workspace), wipe the panel. Clearing the result caches alone does + * not re-render an operator that stays highlighted, so the previous unit's + * result/console frames would otherwise linger on screen. + */ + registerResultClearedHandler() { + this.workflowResultService + .getResultClearedStream() + .pipe(untilDestroyed(this)) + .subscribe(() => { + this.clearResultPanel(); + this.currentOperatorId = undefined; + this.operatorTitle = ""; + this.changeDetectorRef.detectChanges(); + }); + } + registerAutoRerenderResultPanel() { merge( this.executeWorkflowService diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts index e789153cbc3..e1ff418abb9 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts @@ -90,10 +90,15 @@ describe("ExecuteWorkflowService", () => { (service as any).currentState = { state: ExecutionState.Running }; (service as any).assignedWorkerIds.set("op1", ["w1", "w2"]); + const emittedStates: ExecutionState[] = []; + service.getExecutionStateStream().subscribe(event => emittedStates.push(event.current.state)); + service.resetExecutionAndWorkers(); expect(service.getExecutionState().state).toBe(ExecutionState.Uninitialized); expect(service.getWorkerIds("op1")).toEqual([]); + // must broadcast on the stream so subscribers (menu, result panel) drop stale status + expect(emittedStates).toContain(ExecutionState.Uninitialized); }); it("should generate a logical plan request based on the workflow graph that is passed to the function", () => { diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts index 8145f1a5fd5..f165d6273ec 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts @@ -357,9 +357,14 @@ export class ExecuteWorkflowService { /** * Reset execution status and worker assignments. Unlike resetExecutionState(), * which resets only the status, this also clears the worker assignments. + * + * Goes through updateExecutionState() (rather than resetExecutionState()) so the + * change is broadcast on executionStateStream; otherwise subscribers such as + * MenuComponent and ResultPanelComponent would keep showing the previous unit's + * execution status after a computing-unit switch. */ public resetExecutionAndWorkers(): void { - this.resetExecutionState(); + this.updateExecutionState({ state: ExecutionState.Uninitialized }); this.assignedWorkerIds.clear(); } diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts index 456b2bc2120..031de9bfa0b 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts @@ -57,6 +57,13 @@ describe("WorkflowResultService", () => { service.clearResults(); expect(pairs[pairs.length - 1][1]).toEqual({}); }); + + it("clearResults() emits on the cleared stream so the UI tears down stale frames", () => { + let clearedCount = 0; + service.getResultClearedStream().subscribe(() => clearedCount++); + service.clearResults(); + expect(clearedCount).toBe(1); + }); }); describe("OperatorPaginationResultService", () => { diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts index a0c21fa0a9f..4605f915700 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts @@ -49,6 +49,8 @@ export class WorkflowResultService { private resultUpdateStream = new Subject>(); private resultTableStats = new ReplaySubject>>>(1); private resultInitiateStream = new Subject(); + // emits when all cached results are dropped, so the UI can tear down stale result frames + private resultClearedStream = new Subject(); constructor(private wsService: WorkflowWebsocketService) { this.wsService.subscribeToEvent("WebResultUpdateEvent").subscribe(event => { @@ -87,6 +89,15 @@ export class WorkflowResultService { return this.resultInitiateStream.asObservable(); } + /** + * Emits when clearResults() drops all cached results. Consumers (e.g. + * ResultPanelComponent) subscribe to tear down stale result frames, since + * clearing the caches alone does not re-render an already-displayed operator. + */ + public getResultClearedStream(): Observable { + return this.resultClearedStream.asObservable(); + } + public getPaginatedResultService(operatorID: string): OperatorPaginationResultService | undefined { return this.paginatedResultServices.get(operatorID); } @@ -99,11 +110,14 @@ export class WorkflowResultService { * Drop cached operator results and reset table stats so a re-entered workflow * doesn't show stale results. resultTableStats is a ReplaySubject, so * reset it to an empty snapshot to avoid replaying stale stats to subscribers. + * Emits on resultClearedStream so subscribers tear down already-displayed + * result frames instead of leaving them on screen. */ public clearResults(): void { this.operatorResultServices.clear(); this.paginatedResultServices.clear(); this.resultTableStats.next({}); + this.resultClearedStream.next(); } private handleCleanResultCache(event: WorkflowAvailableResultEvent): void { From 9e481c7f03736ddb28171907f17b805957c66230 Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Tue, 9 Jun 2026 02:17:35 -0700 Subject: [PATCH 6/7] style(frontend): simplify comments for the websocket-cleanup follow-up --- .../computing-unit-status.service.spec.ts | 6 ++---- .../computing-unit-status.service.ts | 5 ++--- .../result-panel/result-panel.component.ts | 6 ++---- .../execute-workflow/execute-workflow.service.ts | 8 ++------ .../workflow-result/workflow-result.service.ts | 15 ++++++--------- 5 files changed, 14 insertions(+), 26 deletions(-) diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts index 3e7e2d8edd9..d9c3319cf34 100644 --- a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts @@ -62,8 +62,7 @@ describe("ComputingUnitStatusService", () => { }); afterEach(() => { - // selectComputingUnit() starts an RxJS interval poll; tear the service down so - // the timer doesn't outlive the test and make the suite hang/flap. + // tear down the interval poll started by selectComputingUnit() so it can't outlive the test service.ngOnDestroy(); }); @@ -140,8 +139,7 @@ describe("ComputingUnitStatusService", () => { service.selectComputingUnit(5, 7); expect(resetCount).toBe(0); - // Switch units while the socket is already disconnected: stale state from - // unit 7 must still be cleared, so the signal fires regardless of isConnected. + // Switch units while disconnected: unit 7's stale state must still be cleared. service.selectComputingUnit(5, 8); expect(resetCount).toBe(1); }); diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts index 3312140cf8a..e5b6c443885 100644 --- a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts @@ -162,9 +162,8 @@ export class ComputingUnitStatusService implements OnDestroy { // open websocket if needed const shouldReconnect = this.currentConnectedCuid !== cuid || this.currentConnectedWid !== wid; if (isDefined(wid) && shouldReconnect) { - // A previous connection attempt means websocket-derived state may be - // lingering even if the socket has since dropped (e.g. the prior unit was - // terminated), so tear it down regardless of the current isConnected flag. + // Tear down stale state on switch even if the socket already dropped + // (e.g. the prior unit was terminated), not just while still connected. const hadPreviousConnection = isDefined(this.currentConnectedWid) || isDefined(this.currentConnectedCuid); if (this.workflowWebsocketService.isConnected) { this.workflowWebsocketService.closeWebsocket(); diff --git a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts index bc75baa1077..b5b0c045daf 100644 --- a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts +++ b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts @@ -220,10 +220,8 @@ export class ResultPanelComponent implements OnInit, OnDestroy { } /** - * When all session results are dropped (e.g. switching computing units or - * leaving the workspace), wipe the panel. Clearing the result caches alone does - * not re-render an operator that stays highlighted, so the previous unit's - * result/console frames would otherwise linger on screen. + * Wipe the panel when results are dropped (e.g. switching computing units): a + * still-highlighted operator isn't re-rendered, so its stale frames would linger. */ registerResultClearedHandler() { this.workflowResultService diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts index f165d6273ec..c2ab3eac0d3 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts @@ -356,12 +356,8 @@ export class ExecuteWorkflowService { /** * Reset execution status and worker assignments. Unlike resetExecutionState(), - * which resets only the status, this also clears the worker assignments. - * - * Goes through updateExecutionState() (rather than resetExecutionState()) so the - * change is broadcast on executionStateStream; otherwise subscribers such as - * MenuComponent and ResultPanelComponent would keep showing the previous unit's - * execution status after a computing-unit switch. + * this also clears worker assignments and broadcasts the reset on + * executionStateStream so subscribers drop the previous unit's status. */ public resetExecutionAndWorkers(): void { this.updateExecutionState({ state: ExecutionState.Uninitialized }); diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts index 4605f915700..ffd9b43b91e 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts @@ -49,7 +49,7 @@ export class WorkflowResultService { private resultUpdateStream = new Subject>(); private resultTableStats = new ReplaySubject>>>(1); private resultInitiateStream = new Subject(); - // emits when all cached results are dropped, so the UI can tear down stale result frames + // emits when clearResults() drops cached results, so the UI can drop stale frames private resultClearedStream = new Subject(); constructor(private wsService: WorkflowWebsocketService) { @@ -90,9 +90,8 @@ export class WorkflowResultService { } /** - * Emits when clearResults() drops all cached results. Consumers (e.g. - * ResultPanelComponent) subscribe to tear down stale result frames, since - * clearing the caches alone does not re-render an already-displayed operator. + * Emits when clearResults() drops cached results, so consumers can tear down + * stale frames (clearing the caches alone won't re-render a displayed operator). */ public getResultClearedStream(): Observable { return this.resultClearedStream.asObservable(); @@ -107,11 +106,9 @@ export class WorkflowResultService { } /** - * Drop cached operator results and reset table stats so a re-entered workflow - * doesn't show stale results. resultTableStats is a ReplaySubject, so - * reset it to an empty snapshot to avoid replaying stale stats to subscribers. - * Emits on resultClearedStream so subscribers tear down already-displayed - * result frames instead of leaving them on screen. + * Drop cached results and reset table stats so a re-entered workflow doesn't show + * stale results (resultTableStats is a ReplaySubject, so push an empty snapshot). + * Emits resultClearedStream so subscribers tear down already-displayed frames. */ public clearResults(): void { this.operatorResultServices.clear(); From d7f62ebcc11ab1743cf323427b0c029814da0f4e Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Tue, 9 Jun 2026 02:39:05 -0700 Subject: [PATCH 7/7] test(frontend): cover the result-cleared handler in the result-panel spec --- .../result-panel.component.spec.ts | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts b/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts index 1017d759b5d..edfbcbde611 100644 --- a/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts +++ b/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts @@ -21,6 +21,7 @@ import { ComponentFixture, TestBed } from "@angular/core/testing"; import { ResultPanelComponent } from "./result-panel.component"; import { ExecuteWorkflowService } from "../../service/execute-workflow/execute-workflow.service"; +import { WorkflowResultService } from "../../service/workflow-result/workflow-result.service"; import { WorkflowActionService } from "../../service/workflow-graph/model/workflow-action.service"; import { OperatorMetadataService } from "../../service/operator-metadata/operator-metadata.service"; import { StubOperatorMetadataService } from "../../service/operator-metadata/stub-operator-metadata.service"; @@ -38,6 +39,7 @@ describe("ResultPanelComponent", () => { let fixture: ComponentFixture; let executeWorkflowService: ExecuteWorkflowService; let workflowActionService: WorkflowActionService; + let workflowResultService: WorkflowResultService; beforeEach(async () => { await TestBed.configureTestingModule({ @@ -60,6 +62,7 @@ describe("ResultPanelComponent", () => { component = fixture.componentInstance; executeWorkflowService = TestBed.inject(ExecuteWorkflowService); workflowActionService = TestBed.inject(WorkflowActionService); + workflowResultService = TestBed.inject(WorkflowResultService); fixture.detectChanges(); }); @@ -82,4 +85,22 @@ describe("ResultPanelComponent", () => { const resultPanelHtmlElement: HTMLElement = resultPanelDiv.nativeElement; expect(resultPanelHtmlElement).toBeTruthy(); }); + + it("wipes the panel and operator selection when results are cleared, e.g. on a computing-unit switch (#3120)", () => { + // Simulate a result frame on screen for a currently-highlighted operator. + // ResultPanelComponent stands in as a throwaway frame component; it's cleared before it renders. + component.currentOperatorId = "op1"; + component.operatorTitle = "Operator 1"; + component.frameComponentConfigs.set("Result", { component: ResultPanelComponent, componentInputs: {} }); + expect(component.frameComponentConfigs.size).toBe(1); + + // A unit switch drops the cached results and emits on the cleared stream. The operator + // stays highlighted, so the normal rerender path won't tear the frame down — only this + // handler does, which is the part that actually fixes the lingering-stale-frame bug. + workflowResultService.clearResults(); + + expect(component.frameComponentConfigs.size).toBe(0); + expect(component.currentOperatorId).toBeUndefined(); + expect(component.operatorTitle).toBe(""); + }); });