diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts new file mode 100644 index 00000000000..d9c3319cf34 --- /dev/null +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.spec.ts @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { TestBed } from "@angular/core/testing"; +import { HttpClientTestingModule } from "@angular/common/http/testing"; +import { of } from "rxjs"; +import { ComputingUnitStatusService } from "./computing-unit-status.service"; +import { WorkflowComputingUnitManagingService } from "../workflow-computing-unit/workflow-computing-unit-managing.service"; +import { WorkflowWebsocketService } from "../../../../workspace/service/workflow-websocket/workflow-websocket.service"; +import { WorkflowStatusService } from "../../../../workspace/service/workflow-status/workflow-status.service"; +import { UserService } from "../../user/user.service"; +import { StubUserService } from "../../user/stub-user.service"; +import { AuthService } from "../../user/auth.service"; +import { StubAuthService } from "../../user/stub-auth.service"; +import { DashboardWorkflowComputingUnit } from "../../../type/workflow-computing-unit"; +import { commonTestProviders } from "../../../testing/test-utils"; + +describe("ComputingUnitStatusService", () => { + let service: ComputingUnitStatusService; + let websocketService: WorkflowWebsocketService; + + const mockUnit = (cuid: number) => ({ computingUnit: { cuid } }) as unknown as DashboardWorkflowComputingUnit; + + beforeEach(() => { + const managingStub = { + listComputingUnits: () => of([]), + getComputingUnit: (cuid: number) => of(mockUnit(cuid)), + terminateComputingUnit: () => of(undefined), + }; + + TestBed.configureTestingModule({ + imports: [HttpClientTestingModule], + providers: [ + ComputingUnitStatusService, + WorkflowWebsocketService, + WorkflowStatusService, + { provide: WorkflowComputingUnitManagingService, useValue: managingStub }, + { provide: UserService, useClass: StubUserService }, + { provide: AuthService, useClass: StubAuthService }, + ...commonTestProviders, + ], + }); + + service = TestBed.inject(ComputingUnitStatusService); + websocketService = TestBed.inject(WorkflowWebsocketService); + }); + + afterEach(() => { + // tear down the interval poll started by selectComputingUnit() so it can't outlive the test + service.ngOnDestroy(); + }); + + it("should be created", () => { + expect(service).toBeTruthy(); + }); + + it("reconnects when re-selecting the same workflow after disconnect (regression #3120)", () => { + const openSpy = vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {}); + const closeSpy = vi.spyOn(websocketService, "closeWebsocket"); + (service as any).allComputingUnitsSubject.next([mockUnit(7)]); + + // Enter workflow 5 on computing unit 7 → opens the websocket once. + service.selectComputingUnit(5, 7); + expect(openSpy).toHaveBeenCalledTimes(1); + + // User returns to the dashboard. + service.disconnect(); + expect(closeSpy).toHaveBeenCalled(); + + // Re-enter the SAME workflow (the `wid -> null -> wid` pattern): without the + // cleanup, the retained currentConnectedWid/Cuid would suppress the reconnect. + service.selectComputingUnit(5, 7); + expect(openSpy).toHaveBeenCalledTimes(2); + }); + + it("disconnect() clears the selected computing unit", () => { + vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {}); + (service as any).allComputingUnitsSubject.next([mockUnit(7)]); + service.selectComputingUnit(5, 7); + + let latest: DashboardWorkflowComputingUnit | null = mockUnit(7); + service.getSelectedComputingUnit().subscribe(unit => (latest = unit)); + expect(latest).not.toBeNull(); + + service.disconnect(); + expect(latest).toBeNull(); + }); + + it("emits a connection-reset signal when switching to a different computing unit (issue #3120)", () => { + let connected = false; + vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => { + connected = true; + }); + vi.spyOn(websocketService, "closeWebsocket").mockImplementation(() => { + connected = false; + }); + vi.spyOn(websocketService, "isConnected", "get").mockImplementation(() => connected); + (service as any).allComputingUnitsSubject.next([mockUnit(7), mockUnit(8)]); + + let resetCount = 0; + service.getConnectionResetStream().subscribe(() => resetCount++); + + // First connection on unit 7: nothing to tear down yet → no signal. + service.selectComputingUnit(5, 7); + expect(resetCount).toBe(0); + + // Switch to a different unit while connected → tear-down signal fires once. + service.selectComputingUnit(5, 8); + expect(resetCount).toBe(1); + }); + + it("emits a connection-reset signal when switching units even if the socket already dropped (issue #3120)", () => { + vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {}); + vi.spyOn(websocketService, "closeWebsocket").mockImplementation(() => {}); + // socket reports disconnected throughout, e.g. the previous unit was terminated + vi.spyOn(websocketService, "isConnected", "get").mockReturnValue(false); + (service as any).allComputingUnitsSubject.next([mockUnit(7), mockUnit(8)]); + + let resetCount = 0; + service.getConnectionResetStream().subscribe(() => resetCount++); + + // First connection on unit 7: nothing to tear down yet → no signal. + service.selectComputingUnit(5, 7); + expect(resetCount).toBe(0); + + // Switch units while disconnected: unit 7's stale state must still be cleared. + service.selectComputingUnit(5, 8); + expect(resetCount).toBe(1); + }); +}); diff --git a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts index 831263183b0..e5b6c443885 100644 --- a/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts +++ b/frontend/src/app/common/service/computing-unit/computing-unit-status/computing-unit-status.service.ts @@ -47,6 +47,10 @@ export class ComputingUnitStatusService implements OnDestroy { private readonly refreshComputingUnitListSignal = new Subject(); + // Emits when the active connection is torn down to switch computing units, so + // session consumers can clear their websocket-derived state. + private readonly connectionResetSubject = new Subject(); + // Refresh interval in milliseconds private readonly REFRESH_INTERVAL_MS = 2000; private refreshSubscription: Subscription | null = null; @@ -158,9 +162,16 @@ export class ComputingUnitStatusService implements OnDestroy { // open websocket if needed const shouldReconnect = this.currentConnectedCuid !== cuid || this.currentConnectedWid !== wid; if (isDefined(wid) && shouldReconnect) { + // Tear down stale state on switch even if the socket already dropped + // (e.g. the prior unit was terminated), not just while still connected. + const hadPreviousConnection = isDefined(this.currentConnectedWid) || isDefined(this.currentConnectedCuid); if (this.workflowWebsocketService.isConnected) { this.workflowWebsocketService.closeWebsocket(); + } + if (hadPreviousConnection) { this.workflowStatusService.clearStatus(); + // switching units: signal consumers to clear their stale state + this.connectionResetSubject.next(); } this.workflowWebsocketService.openWebsocket(wid, this.userService.getCurrentUser()?.uid, cuid); @@ -225,6 +236,28 @@ export class ComputingUnitStatusService implements OnDestroy { ); } + /** + * Emits when the connection is reset to switch computing units. Consumers + * subscribe to clear their websocket-derived session state. + */ + public getConnectionResetStream(): Observable { + return this.connectionResetSubject.asObservable(); + } + + /** + * Tear down all websocket connection state when leaving the workspace, so + * re-entering a workflow starts from a clean connection instead of reusing + * the previous one. + */ + public disconnect(): void { + this.workflowWebsocketService.closeWebsocket(); + this.workflowStatusService.clearStatus(); + this.stopPollingSelectedUnit(); + this.currentConnectedCuid = undefined; + this.currentConnectedWid = undefined; + this.selectedUnitSubject.next(null); + } + // Clean up on service destroy ngOnDestroy(): void { this.refreshSubscription?.unsubscribe(); @@ -232,6 +265,7 @@ export class ComputingUnitStatusService implements OnDestroy { this.selectedUnitSubject.complete(); this.allComputingUnitsSubject.complete(); + this.connectionResetSubject.complete(); } /** diff --git a/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts b/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts index 1017d759b5d..edfbcbde611 100644 --- a/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts +++ b/frontend/src/app/workspace/component/result-panel/result-panel.component.spec.ts @@ -21,6 +21,7 @@ import { ComponentFixture, TestBed } from "@angular/core/testing"; import { ResultPanelComponent } from "./result-panel.component"; import { ExecuteWorkflowService } from "../../service/execute-workflow/execute-workflow.service"; +import { WorkflowResultService } from "../../service/workflow-result/workflow-result.service"; import { WorkflowActionService } from "../../service/workflow-graph/model/workflow-action.service"; import { OperatorMetadataService } from "../../service/operator-metadata/operator-metadata.service"; import { StubOperatorMetadataService } from "../../service/operator-metadata/stub-operator-metadata.service"; @@ -38,6 +39,7 @@ describe("ResultPanelComponent", () => { let fixture: ComponentFixture; let executeWorkflowService: ExecuteWorkflowService; let workflowActionService: WorkflowActionService; + let workflowResultService: WorkflowResultService; beforeEach(async () => { await TestBed.configureTestingModule({ @@ -60,6 +62,7 @@ describe("ResultPanelComponent", () => { component = fixture.componentInstance; executeWorkflowService = TestBed.inject(ExecuteWorkflowService); workflowActionService = TestBed.inject(WorkflowActionService); + workflowResultService = TestBed.inject(WorkflowResultService); fixture.detectChanges(); }); @@ -82,4 +85,22 @@ describe("ResultPanelComponent", () => { const resultPanelHtmlElement: HTMLElement = resultPanelDiv.nativeElement; expect(resultPanelHtmlElement).toBeTruthy(); }); + + it("wipes the panel and operator selection when results are cleared, e.g. on a computing-unit switch (#3120)", () => { + // Simulate a result frame on screen for a currently-highlighted operator. + // ResultPanelComponent stands in as a throwaway frame component; it's cleared before it renders. + component.currentOperatorId = "op1"; + component.operatorTitle = "Operator 1"; + component.frameComponentConfigs.set("Result", { component: ResultPanelComponent, componentInputs: {} }); + expect(component.frameComponentConfigs.size).toBe(1); + + // A unit switch drops the cached results and emits on the cleared stream. The operator + // stays highlighted, so the normal rerender path won't tear the frame down — only this + // handler does, which is the part that actually fixes the lingering-stale-frame bug. + workflowResultService.clearResults(); + + expect(component.frameComponentConfigs.size).toBe(0); + expect(component.currentOperatorId).toBeUndefined(); + expect(component.operatorTitle).toBe(""); + }); }); diff --git a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts index 3260afb290d..b5b0c045daf 100644 --- a/frontend/src/app/workspace/component/result-panel/result-panel.component.ts +++ b/frontend/src/app/workspace/component/result-panel/result-panel.component.ts @@ -136,6 +136,7 @@ export class ResultPanelComponent implements OnInit, OnDestroy { this.updateReturnPosition(DEFAULT_HEIGHT, this.height); this.registerAutoRerenderResultPanel(); this.registerAutoOpenResultPanel(); + this.registerResultClearedHandler(); this.handleResultPanelForVersionPreview(); this.panelService.closePanelStream.pipe(untilDestroyed(this)).subscribe(() => this.closePanel()); this.panelService.resetPanelStream.pipe(untilDestroyed(this)).subscribe(() => { @@ -218,6 +219,22 @@ export class ResultPanelComponent implements OnInit, OnDestroy { }); } + /** + * Wipe the panel when results are dropped (e.g. switching computing units): a + * still-highlighted operator isn't re-rendered, so its stale frames would linger. + */ + registerResultClearedHandler() { + this.workflowResultService + .getResultClearedStream() + .pipe(untilDestroyed(this)) + .subscribe(() => { + this.clearResultPanel(); + this.currentOperatorId = undefined; + this.operatorTitle = ""; + this.changeDetectorRef.detectChanges(); + }); + } + registerAutoRerenderResultPanel() { merge( this.executeWorkflowService diff --git a/frontend/src/app/workspace/component/workspace.component.spec.ts b/frontend/src/app/workspace/component/workspace.component.spec.ts index 7659f346f32..f85294e42ad 100644 --- a/frontend/src/app/workspace/component/workspace.component.spec.ts +++ b/frontend/src/app/workspace/component/workspace.component.spec.ts @@ -40,8 +40,11 @@ import { WorkflowCompilingService } from "../service/compile-workflow/workflow-c import { OperatorMetadataService } from "../service/operator-metadata/operator-metadata.service"; import { UndoRedoService } from "../service/undo-redo/undo-redo.service"; import { WorkflowConsoleService } from "../service/workflow-console/workflow-console.service"; +import { ExecuteWorkflowService } from "../service/execute-workflow/execute-workflow.service"; +import { WorkflowResultService } from "../service/workflow-result/workflow-result.service"; import { WorkflowActionService } from "../service/workflow-graph/model/workflow-action.service"; import { OperatorReuseCacheStatusService } from "../service/workflow-status/operator-reuse-cache-status.service"; +import { ComputingUnitStatusService } from "../../common/service/computing-unit/computing-unit-status/computing-unit-status.service"; import { EntityType, HubService } from "../../hub/service/hub.service"; import { commonTestProviders } from "../../common/testing/test-utils"; import { WorkspaceComponent } from "./workspace.component"; @@ -62,6 +65,11 @@ describe("WorkspaceComponent", () => { let messageService: any; let routerMock: any; let locationMock: any; + let computingUnitStatusService: any; + let executeWorkflowService: any; + let workflowConsoleService: any; + let workflowResultService: any; + let connectionResetSubject: Subject; let metadataChangedSubject: Subject; let stubGraph: { triggerCenterEvent: ReturnType; hasElementWithID: ReturnType }; @@ -136,6 +144,14 @@ describe("WorkspaceComponent", () => { routerMock = { navigate: vi.fn() }; locationMock = { go: vi.fn() }; + connectionResetSubject = new Subject(); + computingUnitStatusService = { + disconnect: vi.fn(), + getConnectionResetStream: () => connectionResetSubject.asObservable(), + }; + executeWorkflowService = { resetExecutionAndWorkers: vi.fn() }; + workflowConsoleService = { clearConsoleMessages: vi.fn() }; + workflowResultService = { clearResults: vi.fn() }; // Drop the standalone component's child imports and allow unknown elements via // CUSTOM_ELEMENTS_SCHEMA. The template still renders, so `` @@ -167,8 +183,11 @@ describe("WorkspaceComponent", () => { // The three services listed in the constructor only to force their // initialization aren't exercised by any test here; provide stubs. { provide: WorkflowCompilingService, useValue: {} }, - { provide: WorkflowConsoleService, useValue: {} }, + { provide: WorkflowConsoleService, useValue: workflowConsoleService }, { provide: OperatorReuseCacheStatusService, useValue: {} }, + { provide: ComputingUnitStatusService, useValue: computingUnitStatusService }, + { provide: ExecuteWorkflowService, useValue: executeWorkflowService }, + { provide: WorkflowResultService, useValue: workflowResultService }, ...commonTestProviders, ], schemas: [NO_ERRORS_SCHEMA], @@ -415,6 +434,26 @@ describe("WorkspaceComponent", () => { // Cleanup of the workflow state still happens regardless. expect(workflowActionService.clearWorkflow).toHaveBeenCalled(); }); + + it("tears down every piece of websocket-derived state when leaving the workspace (issue #3120)", async () => { + await createFixture(); + fixture.detectChanges(); + component.ngOnDestroy(); + expect(computingUnitStatusService.disconnect).toHaveBeenCalled(); + expect(executeWorkflowService.resetExecutionAndWorkers).toHaveBeenCalled(); + expect(workflowConsoleService.clearConsoleMessages).toHaveBeenCalled(); + expect(workflowResultService.clearResults).toHaveBeenCalled(); + }); + + it("clears the workflow session state when the computing unit is switched in-canvas (issue #3120)", async () => { + await createFixture(); + fixture.detectChanges(); + // Switching to a different unit emits on the connection-reset stream. + connectionResetSubject.next(); + expect(executeWorkflowService.resetExecutionAndWorkers).toHaveBeenCalled(); + expect(workflowConsoleService.clearConsoleMessages).toHaveBeenCalled(); + expect(workflowResultService.clearResults).toHaveBeenCalled(); + }); }); describe("copilotEnabled", () => { diff --git a/frontend/src/app/workspace/component/workspace.component.ts b/frontend/src/app/workspace/component/workspace.component.ts index da220feaab7..e7bfcb7bf4f 100644 --- a/frontend/src/app/workspace/component/workspace.component.ts +++ b/frontend/src/app/workspace/component/workspace.component.ts @@ -51,6 +51,9 @@ import { THROTTLE_TIME_MS } from "../../hub/component/workflow/detail/hub-workfl import { WorkflowCompilingService } from "../service/compile-workflow/workflow-compiling.service"; import { USER_WORKSPACE } from "../../app-routing.constant"; import { GuiConfigService } from "../../common/service/gui-config.service"; +import { ComputingUnitStatusService } from "../../common/service/computing-unit/computing-unit-status/computing-unit-status.service"; +import { ExecuteWorkflowService } from "../service/execute-workflow/execute-workflow.service"; +import { WorkflowResultService } from "../service/workflow-result/workflow-result.service"; import { checkIfWorkflowBroken } from "../../common/util/workflow-check"; import { NzSpinComponent } from "ng-zorro-antd/spin"; import { ResultPanelComponent } from "./result-panel/result-panel.component"; @@ -126,7 +129,10 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { private hubService: HubService, private codeEditorService: CodeEditorService, private config: GuiConfigService, - private changeDetectorRef: ChangeDetectorRef + private changeDetectorRef: ChangeDetectorRef, + private computingUnitStatusService: ComputingUnitStatusService, + private executeWorkflowService: ExecuteWorkflowService, + private workflowResultService: WorkflowResultService ) {} ngOnInit() { @@ -144,6 +150,12 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { */ this.pid = parseInt(this.route.snapshot.queryParams.pid) || undefined; this.workflowActionService.setHighlightingEnabled(true); + // Clear session state when the user switches computing units in-canvas, so + // the previous unit's status/console/results don't linger. + this.computingUnitStatusService + .getConnectionResetStream() + .pipe(untilDestroyed(this)) + .subscribe(() => this.resetWorkflowSessionState()); } ngAfterViewInit(): void { @@ -184,6 +196,20 @@ export class WorkspaceComponent implements AfterViewInit, OnInit, OnDestroy { this.codeEditorViewRef.clear(); this.workflowActionService.clearWorkflow(); + // Tear down the connection and all websocket-derived session state so a + // re-entered workflow starts clean instead of reusing the previous one. + this.computingUnitStatusService.disconnect(); + this.resetWorkflowSessionState(); + } + + /** + * Clear websocket-derived session state (execution status, console, results). + * Shared by workspace teardown and in-canvas unit switches. + */ + private resetWorkflowSessionState(): void { + this.executeWorkflowService.resetExecutionAndWorkers(); + this.workflowConsoleService.clearConsoleMessages(); + this.workflowResultService.clearResults(); } registerAutoPersistWorkflow(): void { diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts index b8d2779c0c3..e1ff418abb9 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.spec.ts @@ -86,6 +86,21 @@ describe("ExecuteWorkflowService", () => { expect(injectedService).toBeTruthy(); })); + it("resetExecutionAndWorkers() clears the execution state and worker assignments", () => { + (service as any).currentState = { state: ExecutionState.Running }; + (service as any).assignedWorkerIds.set("op1", ["w1", "w2"]); + + const emittedStates: ExecutionState[] = []; + service.getExecutionStateStream().subscribe(event => emittedStates.push(event.current.state)); + + service.resetExecutionAndWorkers(); + + expect(service.getExecutionState().state).toBe(ExecutionState.Uninitialized); + expect(service.getWorkerIds("op1")).toEqual([]); + // must broadcast on the stream so subscribers (menu, result panel) drop stale status + expect(emittedStates).toContain(ExecutionState.Uninitialized); + }); + it("should generate a logical plan request based on the workflow graph that is passed to the function", () => { const newLogicalPlan: LogicalPlan = ExecuteWorkflowService.getLogicalPlanRequest(mockWorkflowPlan_scan_result); expect(newLogicalPlan).toEqual(mockLogicalPlan_scan_result); diff --git a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts index eb86194e7cf..c2ab3eac0d3 100644 --- a/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts +++ b/frontend/src/app/workspace/service/execute-workflow/execute-workflow.service.ts @@ -354,6 +354,16 @@ export class ExecuteWorkflowService { }; } + /** + * Reset execution status and worker assignments. Unlike resetExecutionState(), + * this also clears worker assignments and broadcasts the reset on + * executionStateStream so subscribers drop the previous unit's status. + */ + public resetExecutionAndWorkers(): void { + this.updateExecutionState({ state: ExecutionState.Uninitialized }); + this.assignedWorkerIds.clear(); + } + private updateExecutionState(stateInfo: ExecutionStateInfo): void { if (isEqual(this.currentState, stateInfo)) { return; diff --git a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts index 9be036cdcce..a211eea52bf 100644 --- a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts +++ b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.spec.ts @@ -34,4 +34,17 @@ describe("WorkflowConsoleService", () => { it("should be created", () => { expect(service).toBeTruthy(); }); + + it("clearConsoleMessages() removes all messages and notifies subscribers", () => { + (service as any).consoleMessages.set("op1", []); + expect(service.hasConsoleMessages("op1")).toBe(true); + + let notified = false; + service.getConsoleMessageUpdateStream().subscribe(() => (notified = true)); + + service.clearConsoleMessages(); + + expect(service.hasConsoleMessages("op1")).toBe(false); + expect(notified).toBe(true); + }); }); diff --git a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts index b7d60868aea..5f88a22ba9a 100644 --- a/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts +++ b/frontend/src/app/workspace/service/workflow-console/workflow-console.service.ts @@ -73,4 +73,13 @@ export class WorkflowConsoleService { getConsoleMessageUpdateStream(): Observable { return this.consoleMessagesUpdateStream.asObservable(); } + + /** + * Clear all console messages so a re-entered workflow doesn't show the + * previous session's output. + */ + public clearConsoleMessages(): void { + this.consoleMessages.clear(); + this.consoleMessagesUpdateStream.next(); + } } diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts index 8406a55c75e..031de9bfa0b 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.spec.ts @@ -37,6 +37,33 @@ describe("WorkflowResultService", () => { it("should be created", () => { expect(service).toBeTruthy(); }); + + it("clearResults() drops cached operator results", () => { + (service as any).operatorResultServices.set("op1", {}); + (service as any).paginatedResultServices.set("op2", {}); + expect(service.hasAnyResult("op1")).toBe(true); + expect(service.hasAnyResult("op2")).toBe(true); + + service.clearResults(); + + expect(service.hasAnyResult("op1")).toBe(false); + expect(service.hasAnyResult("op2")).toBe(false); + }); + + it("clearResults() resets table stats to empty for subscribers", () => { + const pairs: [unknown, unknown][] = []; + service.getResultTableStats().subscribe(p => pairs.push(p)); + (service as any).resultTableStats.next({ op1: {} }); + service.clearResults(); + expect(pairs[pairs.length - 1][1]).toEqual({}); + }); + + it("clearResults() emits on the cleared stream so the UI tears down stale frames", () => { + let clearedCount = 0; + service.getResultClearedStream().subscribe(() => clearedCount++); + service.clearResults(); + expect(clearedCount).toBe(1); + }); }); describe("OperatorPaginationResultService", () => { diff --git a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts index 9fd18e0f161..ffd9b43b91e 100644 --- a/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts +++ b/frontend/src/app/workspace/service/workflow-result/workflow-result.service.ts @@ -49,6 +49,8 @@ export class WorkflowResultService { private resultUpdateStream = new Subject>(); private resultTableStats = new ReplaySubject>>>(1); private resultInitiateStream = new Subject(); + // emits when clearResults() drops cached results, so the UI can drop stale frames + private resultClearedStream = new Subject(); constructor(private wsService: WorkflowWebsocketService) { this.wsService.subscribeToEvent("WebResultUpdateEvent").subscribe(event => { @@ -87,6 +89,14 @@ export class WorkflowResultService { return this.resultInitiateStream.asObservable(); } + /** + * Emits when clearResults() drops cached results, so consumers can tear down + * stale frames (clearing the caches alone won't re-render a displayed operator). + */ + public getResultClearedStream(): Observable { + return this.resultClearedStream.asObservable(); + } + public getPaginatedResultService(operatorID: string): OperatorPaginationResultService | undefined { return this.paginatedResultServices.get(operatorID); } @@ -95,6 +105,18 @@ export class WorkflowResultService { return this.operatorResultServices.get(operatorID); } + /** + * Drop cached results and reset table stats so a re-entered workflow doesn't show + * stale results (resultTableStats is a ReplaySubject, so push an empty snapshot). + * Emits resultClearedStream so subscribers tear down already-displayed frames. + */ + public clearResults(): void { + this.operatorResultServices.clear(); + this.paginatedResultServices.clear(); + this.resultTableStats.next({}); + this.resultClearedStream.next(); + } + private handleCleanResultCache(event: WorkflowAvailableResultEvent): void { const removedOrInvalidatedOperators = new Set(); // remove operators that no longer have results diff --git a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts index f2862d18d7c..4fb0fe1d10d 100644 --- a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts +++ b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.spec.ts @@ -94,4 +94,12 @@ describe("WorkflowWebsocketService", () => { window.WebSocket = originalWebSocket; } }); + + it("should reset the cached worker count when the websocket is closed", () => { + // numWorkers is populated from ClusterStatusUpdateEvent on the live connection; + // once the socket is closed the count is stale and must reset. + service.numWorkers = 5; + service.closeWebsocket(); + expect(service.numWorkers).toBe(-1); + }); }); diff --git a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts index 017e43ee6a4..4e52d3fa09e 100644 --- a/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts +++ b/frontend/src/app/workspace/service/workflow-websocket/workflow-websocket.service.ts @@ -93,6 +93,8 @@ export class WorkflowWebsocketService { this.wsWithReconnectSubscription?.unsubscribe(); this.statusUpdateSubscription?.unsubscribe(); this.websocket?.complete(); + // the worker count comes from the live connection; reset it once the socket is gone + this.numWorkers = -1; this.updateConnectionStatus(false); }