Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { TestBed } from "@angular/core/testing";
import { HttpClientTestingModule } from "@angular/common/http/testing";
import { of } from "rxjs";
import { ComputingUnitStatusService } from "./computing-unit-status.service";
import { WorkflowComputingUnitManagingService } from "../workflow-computing-unit/workflow-computing-unit-managing.service";
import { WorkflowWebsocketService } from "../../../../workspace/service/workflow-websocket/workflow-websocket.service";
import { WorkflowStatusService } from "../../../../workspace/service/workflow-status/workflow-status.service";
import { UserService } from "../../user/user.service";
import { StubUserService } from "../../user/stub-user.service";
import { AuthService } from "../../user/auth.service";
import { StubAuthService } from "../../user/stub-auth.service";
import { DashboardWorkflowComputingUnit } from "../../../type/workflow-computing-unit";
import { commonTestProviders } from "../../../testing/test-utils";

describe("ComputingUnitStatusService", () => {
let service: ComputingUnitStatusService;
let websocketService: WorkflowWebsocketService;

const mockUnit = (cuid: number) => ({ computingUnit: { cuid } }) as unknown as DashboardWorkflowComputingUnit;

beforeEach(() => {
const managingStub = {
listComputingUnits: () => of([]),
getComputingUnit: (cuid: number) => of(mockUnit(cuid)),
terminateComputingUnit: () => of(undefined),
};

TestBed.configureTestingModule({
imports: [HttpClientTestingModule],
providers: [
ComputingUnitStatusService,
WorkflowWebsocketService,
WorkflowStatusService,
{ provide: WorkflowComputingUnitManagingService, useValue: managingStub },
{ provide: UserService, useClass: StubUserService },
{ provide: AuthService, useClass: StubAuthService },
...commonTestProviders,
],
});

service = TestBed.inject(ComputingUnitStatusService);
websocketService = TestBed.inject(WorkflowWebsocketService);
});

afterEach(() => {
// tear down the interval poll started by selectComputingUnit() so it can't outlive the test
service.ngOnDestroy();
});

it("should be created", () => {
expect(service).toBeTruthy();
});

it("reconnects when re-selecting the same workflow after disconnect (regression #3120)", () => {
const openSpy = vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {});
const closeSpy = vi.spyOn(websocketService, "closeWebsocket");
(service as any).allComputingUnitsSubject.next([mockUnit(7)]);

// Enter workflow 5 on computing unit 7 → opens the websocket once.
service.selectComputingUnit(5, 7);
expect(openSpy).toHaveBeenCalledTimes(1);

// User returns to the dashboard.
service.disconnect();
expect(closeSpy).toHaveBeenCalled();

// Re-enter the SAME workflow (the `wid -> null -> wid` pattern): without the
// cleanup, the retained currentConnectedWid/Cuid would suppress the reconnect.
service.selectComputingUnit(5, 7);
expect(openSpy).toHaveBeenCalledTimes(2);
});

it("disconnect() clears the selected computing unit", () => {
vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {});
(service as any).allComputingUnitsSubject.next([mockUnit(7)]);
service.selectComputingUnit(5, 7);

let latest: DashboardWorkflowComputingUnit | null = mockUnit(7);
service.getSelectedComputingUnit().subscribe(unit => (latest = unit));
expect(latest).not.toBeNull();

service.disconnect();
expect(latest).toBeNull();
});

it("emits a connection-reset signal when switching to a different computing unit (issue #3120)", () => {
let connected = false;
vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {
connected = true;
});
vi.spyOn(websocketService, "closeWebsocket").mockImplementation(() => {
connected = false;
});
vi.spyOn(websocketService, "isConnected", "get").mockImplementation(() => connected);
(service as any).allComputingUnitsSubject.next([mockUnit(7), mockUnit(8)]);

let resetCount = 0;
service.getConnectionResetStream().subscribe(() => resetCount++);

// First connection on unit 7: nothing to tear down yet → no signal.
service.selectComputingUnit(5, 7);
expect(resetCount).toBe(0);

// Switch to a different unit while connected → tear-down signal fires once.
service.selectComputingUnit(5, 8);
expect(resetCount).toBe(1);
});

it("emits a connection-reset signal when switching units even if the socket already dropped (issue #3120)", () => {
vi.spyOn(websocketService, "openWebsocket").mockImplementation(() => {});
vi.spyOn(websocketService, "closeWebsocket").mockImplementation(() => {});
// socket reports disconnected throughout, e.g. the previous unit was terminated
vi.spyOn(websocketService, "isConnected", "get").mockReturnValue(false);
(service as any).allComputingUnitsSubject.next([mockUnit(7), mockUnit(8)]);

let resetCount = 0;
service.getConnectionResetStream().subscribe(() => resetCount++);

// First connection on unit 7: nothing to tear down yet → no signal.
service.selectComputingUnit(5, 7);
expect(resetCount).toBe(0);

// Switch units while disconnected: unit 7's stale state must still be cleared.
service.selectComputingUnit(5, 8);
expect(resetCount).toBe(1);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ export class ComputingUnitStatusService implements OnDestroy {

private readonly refreshComputingUnitListSignal = new Subject<void>();

// Emits when the active connection is torn down to switch computing units, so
// session consumers can clear their websocket-derived state.
private readonly connectionResetSubject = new Subject<void>();

// Refresh interval in milliseconds
private readonly REFRESH_INTERVAL_MS = 2000;
private refreshSubscription: Subscription | null = null;
Expand Down Expand Up @@ -158,9 +162,16 @@ export class ComputingUnitStatusService implements OnDestroy {
// open websocket if needed
const shouldReconnect = this.currentConnectedCuid !== cuid || this.currentConnectedWid !== wid;
if (isDefined(wid) && shouldReconnect) {
// Tear down stale state on switch even if the socket already dropped
// (e.g. the prior unit was terminated), not just while still connected.
const hadPreviousConnection = isDefined(this.currentConnectedWid) || isDefined(this.currentConnectedCuid);
if (this.workflowWebsocketService.isConnected) {
this.workflowWebsocketService.closeWebsocket();
}
if (hadPreviousConnection) {
this.workflowStatusService.clearStatus();
// switching units: signal consumers to clear their stale state
this.connectionResetSubject.next();
}

this.workflowWebsocketService.openWebsocket(wid, this.userService.getCurrentUser()?.uid, cuid);
Expand Down Expand Up @@ -225,13 +236,36 @@ export class ComputingUnitStatusService implements OnDestroy {
);
}

/**
* Emits when the connection is reset to switch computing units. Consumers
* subscribe to clear their websocket-derived session state.
*/
public getConnectionResetStream(): Observable<void> {
return this.connectionResetSubject.asObservable();
}

/**
* Tear down all websocket connection state when leaving the workspace, so
* re-entering a workflow starts from a clean connection instead of reusing
* the previous one.
*/
public disconnect(): void {
this.workflowWebsocketService.closeWebsocket();
this.workflowStatusService.clearStatus();
this.stopPollingSelectedUnit();
this.currentConnectedCuid = undefined;
this.currentConnectedWid = undefined;
this.selectedUnitSubject.next(null);
}

// Clean up on service destroy
ngOnDestroy(): void {
this.refreshSubscription?.unsubscribe();
this.selectedUnitPoll?.unsubscribe();

this.selectedUnitSubject.complete();
this.allComputingUnitsSubject.complete();
this.connectionResetSubject.complete();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { ComponentFixture, TestBed } from "@angular/core/testing";

import { ResultPanelComponent } from "./result-panel.component";
import { ExecuteWorkflowService } from "../../service/execute-workflow/execute-workflow.service";
import { WorkflowResultService } from "../../service/workflow-result/workflow-result.service";
import { WorkflowActionService } from "../../service/workflow-graph/model/workflow-action.service";
import { OperatorMetadataService } from "../../service/operator-metadata/operator-metadata.service";
import { StubOperatorMetadataService } from "../../service/operator-metadata/stub-operator-metadata.service";
Expand All @@ -38,6 +39,7 @@ describe("ResultPanelComponent", () => {
let fixture: ComponentFixture<ResultPanelComponent>;
let executeWorkflowService: ExecuteWorkflowService;
let workflowActionService: WorkflowActionService;
let workflowResultService: WorkflowResultService;

beforeEach(async () => {
await TestBed.configureTestingModule({
Expand All @@ -60,6 +62,7 @@ describe("ResultPanelComponent", () => {
component = fixture.componentInstance;
executeWorkflowService = TestBed.inject(ExecuteWorkflowService);
workflowActionService = TestBed.inject(WorkflowActionService);
workflowResultService = TestBed.inject(WorkflowResultService);
fixture.detectChanges();
});

Expand All @@ -82,4 +85,22 @@ describe("ResultPanelComponent", () => {
const resultPanelHtmlElement: HTMLElement = resultPanelDiv.nativeElement;
expect(resultPanelHtmlElement).toBeTruthy();
});

it("wipes the panel and operator selection when results are cleared, e.g. on a computing-unit switch (#3120)", () => {
// Simulate a result frame on screen for a currently-highlighted operator.
// ResultPanelComponent stands in as a throwaway frame component; it's cleared before it renders.
component.currentOperatorId = "op1";
component.operatorTitle = "Operator 1";
component.frameComponentConfigs.set("Result", { component: ResultPanelComponent, componentInputs: {} });
expect(component.frameComponentConfigs.size).toBe(1);

// A unit switch drops the cached results and emits on the cleared stream. The operator
// stays highlighted, so the normal rerender path won't tear the frame down — only this
// handler does, which is the part that actually fixes the lingering-stale-frame bug.
workflowResultService.clearResults();

expect(component.frameComponentConfigs.size).toBe(0);
expect(component.currentOperatorId).toBeUndefined();
expect(component.operatorTitle).toBe("");
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ export class ResultPanelComponent implements OnInit, OnDestroy {
this.updateReturnPosition(DEFAULT_HEIGHT, this.height);
this.registerAutoRerenderResultPanel();
this.registerAutoOpenResultPanel();
this.registerResultClearedHandler();
this.handleResultPanelForVersionPreview();
this.panelService.closePanelStream.pipe(untilDestroyed(this)).subscribe(() => this.closePanel());
this.panelService.resetPanelStream.pipe(untilDestroyed(this)).subscribe(() => {
Expand Down Expand Up @@ -218,6 +219,22 @@ export class ResultPanelComponent implements OnInit, OnDestroy {
});
}

/**
* Wipe the panel when results are dropped (e.g. switching computing units): a
* still-highlighted operator isn't re-rendered, so its stale frames would linger.
*/
registerResultClearedHandler() {
this.workflowResultService
.getResultClearedStream()
.pipe(untilDestroyed(this))
.subscribe(() => {
this.clearResultPanel();
this.currentOperatorId = undefined;
this.operatorTitle = "";
this.changeDetectorRef.detectChanges();
});
}

registerAutoRerenderResultPanel() {
merge(
this.executeWorkflowService
Expand Down
41 changes: 40 additions & 1 deletion frontend/src/app/workspace/component/workspace.component.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ import { WorkflowCompilingService } from "../service/compile-workflow/workflow-c
import { OperatorMetadataService } from "../service/operator-metadata/operator-metadata.service";
import { UndoRedoService } from "../service/undo-redo/undo-redo.service";
import { WorkflowConsoleService } from "../service/workflow-console/workflow-console.service";
import { ExecuteWorkflowService } from "../service/execute-workflow/execute-workflow.service";
import { WorkflowResultService } from "../service/workflow-result/workflow-result.service";
import { WorkflowActionService } from "../service/workflow-graph/model/workflow-action.service";
import { OperatorReuseCacheStatusService } from "../service/workflow-status/operator-reuse-cache-status.service";
import { ComputingUnitStatusService } from "../../common/service/computing-unit/computing-unit-status/computing-unit-status.service";
import { EntityType, HubService } from "../../hub/service/hub.service";
import { commonTestProviders } from "../../common/testing/test-utils";
import { WorkspaceComponent } from "./workspace.component";
Expand All @@ -62,6 +65,11 @@ describe("WorkspaceComponent", () => {
let messageService: any;
let routerMock: any;
let locationMock: any;
let computingUnitStatusService: any;
let executeWorkflowService: any;
let workflowConsoleService: any;
let workflowResultService: any;
let connectionResetSubject: Subject<void>;
let metadataChangedSubject: Subject<void>;
let stubGraph: { triggerCenterEvent: ReturnType<typeof vi.fn>; hasElementWithID: ReturnType<typeof vi.fn> };

Expand Down Expand Up @@ -136,6 +144,14 @@ describe("WorkspaceComponent", () => {

routerMock = { navigate: vi.fn() };
locationMock = { go: vi.fn() };
connectionResetSubject = new Subject<void>();
computingUnitStatusService = {
disconnect: vi.fn(),
getConnectionResetStream: () => connectionResetSubject.asObservable(),
};
executeWorkflowService = { resetExecutionAndWorkers: vi.fn() };
workflowConsoleService = { clearConsoleMessages: vi.fn() };
workflowResultService = { clearResults: vi.fn() };

// Drop the standalone component's child imports and allow unknown elements via
// CUSTOM_ELEMENTS_SCHEMA. The template still renders, so `<ng-template #codeEditor>`
Expand Down Expand Up @@ -167,8 +183,11 @@ describe("WorkspaceComponent", () => {
// The three services listed in the constructor only to force their
// initialization aren't exercised by any test here; provide stubs.
{ provide: WorkflowCompilingService, useValue: {} },
{ provide: WorkflowConsoleService, useValue: {} },
{ provide: WorkflowConsoleService, useValue: workflowConsoleService },
{ provide: OperatorReuseCacheStatusService, useValue: {} },
{ provide: ComputingUnitStatusService, useValue: computingUnitStatusService },
{ provide: ExecuteWorkflowService, useValue: executeWorkflowService },
{ provide: WorkflowResultService, useValue: workflowResultService },
...commonTestProviders,
],
schemas: [NO_ERRORS_SCHEMA],
Expand Down Expand Up @@ -415,6 +434,26 @@ describe("WorkspaceComponent", () => {
// Cleanup of the workflow state still happens regardless.
expect(workflowActionService.clearWorkflow).toHaveBeenCalled();
});

it("tears down every piece of websocket-derived state when leaving the workspace (issue #3120)", async () => {
await createFixture();
fixture.detectChanges();
component.ngOnDestroy();
expect(computingUnitStatusService.disconnect).toHaveBeenCalled();
expect(executeWorkflowService.resetExecutionAndWorkers).toHaveBeenCalled();
expect(workflowConsoleService.clearConsoleMessages).toHaveBeenCalled();
expect(workflowResultService.clearResults).toHaveBeenCalled();
});

it("clears the workflow session state when the computing unit is switched in-canvas (issue #3120)", async () => {
await createFixture();
fixture.detectChanges();
// Switching to a different unit emits on the connection-reset stream.
connectionResetSubject.next();
expect(executeWorkflowService.resetExecutionAndWorkers).toHaveBeenCalled();
expect(workflowConsoleService.clearConsoleMessages).toHaveBeenCalled();
expect(workflowResultService.clearResults).toHaveBeenCalled();
});
});

describe("copilotEnabled", () => {
Expand Down
Loading
Loading