diff --git a/build.gradle.kts b/build.gradle.kts index 2e1620342..895395a57 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -19,7 +19,7 @@ plugins { allprojects { group = "io.flamingock" - val declaredVersion = "1.4.1-SNAPSHOT" + val declaredVersion = "1.5.0-SNAPSHOT" version = VersionManager.resolveVersion(declaredVersion, project.hasProperty("release")) extra["generalUtilVersion"] = "1.5.3" diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ChangeRequest.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ChangeRequest.java index 1c4721e9b..95bb30ccc 100644 --- a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ChangeRequest.java +++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ChangeRequest.java @@ -16,6 +16,8 @@ package io.flamingock.cloud.api.request; +import com.fasterxml.jackson.annotation.JsonInclude; +import io.flamingock.cloud.api.vo.CloudChangeStatus; import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType; //TODO add recoveryStrategy, so we can determin the acction in the server @@ -25,26 +27,51 @@ public class ChangeRequest { private CloudTargetSystemAuditMarkType ongoingStatus; + /** + * Per-change status reported by the client — mirrors the operation-side + * {@code ChangeResult.status} currently held on the client's {@code PipelineRun}. The + * server uses this as informational input when synthesising the response: it never + * contradicts the client's positive report (e.g. {@code APPLIED} stays {@code APPLIED}, + * not downgraded to {@code ALREADY_APPLIED}), and it respects {@code FAILED} / + * {@code ROLLED_BACK} reports so it doesn't ask the client to retry indefinitely. + * + *

{@code null} on the wire means the operation has nothing to report yet + * (equivalent to {@code NOT_REACHED}). Serialised as field-absence so the wire shape is + * forward-compatible with older mocks/expectations that don't set this field. + */ + @JsonInclude(JsonInclude.Include.NON_NULL) + private CloudChangeStatus currentStatus; + private boolean transactional; public ChangeRequest() { } public static ChangeRequest change(String id, boolean transactional) { - return new ChangeRequest(id, CloudTargetSystemAuditMarkType.NONE, transactional); + return new ChangeRequest(id, CloudTargetSystemAuditMarkType.NONE, null, transactional); } public static ChangeRequest ongoingExecution(String id, boolean transactional) { - return new ChangeRequest(id, CloudTargetSystemAuditMarkType.APPLIED, transactional); + return new ChangeRequest(id, CloudTargetSystemAuditMarkType.APPLIED, null, transactional); } public static ChangeRequest ongoingRollback(String id, boolean transactional) { - return new ChangeRequest(id, CloudTargetSystemAuditMarkType.ROLLED_BACK, transactional); + return new ChangeRequest(id, CloudTargetSystemAuditMarkType.ROLLED_BACK, null, transactional); + } + + public ChangeRequest(String id, + CloudTargetSystemAuditMarkType ongoingStatus, + boolean transactional) { + this(id, ongoingStatus, null, transactional); } - public ChangeRequest(String id, CloudTargetSystemAuditMarkType ongoingStatus, boolean transactional) { + public ChangeRequest(String id, + CloudTargetSystemAuditMarkType ongoingStatus, + CloudChangeStatus currentStatus, + boolean transactional) { this.id = id; this.ongoingStatus = ongoingStatus; + this.currentStatus = currentStatus; this.transactional = transactional; } @@ -56,6 +83,10 @@ public CloudTargetSystemAuditMarkType getOngoingStatus() { return ongoingStatus; } + public CloudChangeStatus getCurrentStatus() { + return currentStatus; + } + public boolean isTransactional() { return transactional; } @@ -68,6 +99,10 @@ public void setOngoingStatus(CloudTargetSystemAuditMarkType ongoingStatus) { this.ongoingStatus = ongoingStatus; } + public void setCurrentStatus(CloudChangeStatus currentStatus) { + this.currentStatus = currentStatus; + } + public void setTransactional(boolean transactional) { this.transactional = transactional; } @@ -79,11 +114,12 @@ public boolean equals(Object o) { ChangeRequest that = (ChangeRequest) o; return transactional == that.transactional && java.util.Objects.equals(id, that.id) - && java.util.Objects.equals(ongoingStatus, that.ongoingStatus); + && java.util.Objects.equals(ongoingStatus, that.ongoingStatus) + && java.util.Objects.equals(currentStatus, that.currentStatus); } @Override public int hashCode() { - return java.util.Objects.hash(id, ongoingStatus, transactional); + return java.util.Objects.hash(id, ongoingStatus, currentStatus, transactional); } } \ No newline at end of file diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ChangeResultResponse.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ChangeResultResponse.java new file mode 100644 index 000000000..17eb365a2 --- /dev/null +++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ChangeResultResponse.java @@ -0,0 +1,70 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.cloud.api.response; + +import io.flamingock.cloud.api.vo.CloudChangeStatus; + +import java.util.Objects; + +/** + * Result-side per-change payload. Sibling of the operation-side {@link ChangeResponse} (which + * carries {@code action}); this class carries the server's synthesised {@code status} so the + * client can write rich per-change records into {@code PipelineRun} via + * {@code markStageAlreadyAppliedFromAudit} (and downstream renderers). + */ +public class ChangeResultResponse { + + private String id; + + private CloudChangeStatus status; + + public ChangeResultResponse() { + } + + public ChangeResultResponse(String id, CloudChangeStatus status) { + this.id = id; + this.status = status; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public CloudChangeStatus getStatus() { + return status; + } + + public void setStatus(CloudChangeStatus status) { + this.status = status; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ChangeResultResponse that = (ChangeResultResponse) o; + return Objects.equals(id, that.id) && status == that.status; + } + + @Override + public int hashCode() { + return Objects.hash(id, status); + } +} diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java index 720513910..08bc28730 100644 --- a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java +++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java @@ -28,8 +28,21 @@ public class ExecutionPlanResponse { private LockInfoResponse lock; + /** + * Operation side: the stages the executor should act on this round. Sparse — only + * stages with actionable changes appear here. + */ private List stages; + /** + * Result side: server's synthesised per-stage verdict + per-change status for the + * entire submitted pipeline (not just stages with work). The client + * iterates this uniformly to feed {@code PipelineRun.markStageVerdict} and + * {@code PipelineRun.markStageAlreadyAppliedFromAudit}. Required on {@code EXECUTE} + * and {@code CONTINUE} responses; absent on {@code AWAIT} / {@code ABORT}. + */ + private PipelineResultResponse pipelineResult; + private boolean synchronizedMarks; @@ -54,10 +67,20 @@ public ExecutionPlanResponse(CloudExecutionAction action, LockInfoResponse lock, List stages, boolean synchronizedMarks) { + this(action, executionId, lock, stages, null, synchronizedMarks); + } + + public ExecutionPlanResponse(CloudExecutionAction action, + String executionId, + LockInfoResponse lock, + List stages, + PipelineResultResponse pipelineResult, + boolean synchronizedMarks) { this.action = action; this.executionId = executionId; this.lock = lock; this.stages = stages; + this.pipelineResult = pipelineResult; this.synchronizedMarks = synchronizedMarks; } @@ -89,6 +112,14 @@ public void setStages(List stages) { this.stages = stages; } + public PipelineResultResponse getPipelineResult() { + return pipelineResult; + } + + public void setPipelineResult(PipelineResultResponse pipelineResult) { + this.pipelineResult = pipelineResult; + } + public boolean isContinue() { return action == CloudExecutionAction.CONTINUE; } @@ -128,6 +159,15 @@ public void validate() { if (isAwait() && getLock() == null) { throw new RuntimeException("ExecutionPlan is await, but not lock information returned"); } + + // pipelineResult is required on EXECUTE and CONTINUE so the client can write + // per-stage verdict and per-change records uniformly. AWAIT / ABORT carry none. + if ((isExecute() || isContinue()) && pipelineResult == null) { + throw new RuntimeException( + "ExecutionPlan is " + action + + ", but no pipelineResult returned — the server must populate the result side" + + " so the client can write verdict and per-change status into PipelineRun."); + } } } diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/PipelineResultResponse.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/PipelineResultResponse.java new file mode 100644 index 000000000..e91ea4021 --- /dev/null +++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/PipelineResultResponse.java @@ -0,0 +1,64 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.cloud.api.response; + +import java.util.List; +import java.util.Objects; + +/** + * Result-side sibling of {@code ExecutionPlanResponse.stages}. Carries the server's + * synthesised per-stage {@code verdict} and per-change {@code status} for the entire + * submitted pipeline, in a shape the client iterates uniformly to feed + * {@code PipelineRun.markStageVerdict} and + * {@code PipelineRun.markStageAlreadyAppliedFromAudit}. + * + *

The two halves of the response mirror the two halves of the core-side + * {@code PipelineRun} two-writer model: the operation side ({@code stages[]}) tells the + * executor what to do; the result side (this class) tells the client's planner what facts + * to record. + */ +public class PipelineResultResponse { + + private List stages; + + public PipelineResultResponse() { + } + + public PipelineResultResponse(List stages) { + this.stages = stages; + } + + public List getStages() { + return stages; + } + + public void setStages(List stages) { + this.stages = stages; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PipelineResultResponse that = (PipelineResultResponse) o; + return Objects.equals(stages, that.stages); + } + + @Override + public int hashCode() { + return Objects.hash(stages); + } +} diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/StageResultResponse.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/StageResultResponse.java new file mode 100644 index 000000000..19d934741 --- /dev/null +++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/StageResultResponse.java @@ -0,0 +1,90 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.cloud.api.response; + +import io.flamingock.cloud.api.vo.CloudPlannerVerdict; + +import java.util.List; +import java.util.Objects; + +/** + * Result-side per-stage payload. Sibling of {@link StageResponse} on the operation side; + * carries the server's per-stage {@code verdict} and per-change result records for the + * client to write into {@code PipelineRun} via {@code markStageVerdict} + + * {@code markStageAlreadyAppliedFromAudit}. + * + *

The server returns one of these for every stage in the submitted pipeline + * regardless of whether the stage carries actionable work, so the client can iterate + * uniformly without special-cases for "stage absent from response". + */ +public class StageResultResponse { + + private String name; + + private CloudPlannerVerdict verdict; + + private List changes; + + public StageResultResponse() { + } + + public StageResultResponse(String name, + CloudPlannerVerdict verdict, + List changes) { + this.name = name; + this.verdict = verdict; + this.changes = changes; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public CloudPlannerVerdict getVerdict() { + return verdict; + } + + public void setVerdict(CloudPlannerVerdict verdict) { + this.verdict = verdict; + } + + public List getChanges() { + return changes; + } + + public void setChanges(List changes) { + this.changes = changes; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StageResultResponse that = (StageResultResponse) o; + return Objects.equals(name, that.name) + && verdict == that.verdict + && Objects.equals(changes, that.changes); + } + + @Override + public int hashCode() { + return Objects.hash(name, verdict, changes); + } +} diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/vo/CloudChangeStatus.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/vo/CloudChangeStatus.java new file mode 100644 index 000000000..fe3933b43 --- /dev/null +++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/vo/CloudChangeStatus.java @@ -0,0 +1,69 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.cloud.api.vo; + +/** + * Per-change status carried in both directions: + * + *

+ * + *

Source of truth for the synthesis is the audit store on the server; the client's report + * is informational input the server respects (e.g. a {@code FAILED} report breaks the retry + * loop on a still-pending change). + */ +public enum CloudChangeStatus { + + /** + * No positive information: the operation did not process this change this run (executor + * stopped on an earlier failure, stage was unreached, etc.) and the server's audit holds + * no terminal entry. + */ + NOT_REACHED, + + /** + * The change was applied during this run by the operation; the server's audit confirms + * it. Distinct from {@link #ALREADY_APPLIED} because the client correctly reports the + * act of applying it this run. + */ + APPLIED, + + /** + * The server's audit confirms the change is already applied from a prior run (or from a + * CLI {@code mark-as-applied} / external write). The operation did not have to invoke + * the executor for this change. + */ + ALREADY_APPLIED, + + /** + * The change failed during execution this run. The server respects the failure and uses + * the configured recovery strategy to decide whether to re-offer it. + */ + FAILED, + + /** + * The change failed during execution this run and was successfully rolled back + * (transactional auto-rollback). + */ + ROLLED_BACK +} diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/vo/CloudPlannerVerdict.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/vo/CloudPlannerVerdict.java new file mode 100644 index 000000000..9b998636c --- /dev/null +++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/vo/CloudPlannerVerdict.java @@ -0,0 +1,47 @@ +/* + * Copyright 2026 Flamingock (https://www.flamingock.io) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.flamingock.cloud.api.vo; + +/** + * Per-stage verdict synthesised by the cloud server from the audit-store snapshot and the + * client's reported state. Mirrors the core-side {@code PlannerVerdict} enum and feeds the + * client's {@code PipelineRun.markStageVerdict} writer verbatim. + * + *

Monotone-forward semantics on the client side: NOT_EVALUATED → NEEDS_WORK → UP_TO_DATE. + * The server is expected to emit one of these per stage in every {@code pipelineResult} + * response it returns; the client trusts the server's value without further interpretation. + */ +public enum CloudPlannerVerdict { + + /** + * The server did not make a determination for this stage. Reserved for genuinely + * degenerate cases — every stage in {@code pipelineResult} is expected to be evaluated + * post-rollout. + */ + NOT_EVALUATED, + + /** + * At least one change in this stage is not in an applied state per the server's audit. + * The stage has work pending or had work the executor did not reach this run. + */ + NEEDS_WORK, + + /** + * Every change in the stage is confirmed applied per the server's audit. Authoritative — + * the executor was not invoked for this stage and does not need to be. + */ + UP_TO_DATE +} diff --git a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/CloudApiMapper.java b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/CloudApiMapper.java index 1c7aaf9ec..de154216d 100644 --- a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/CloudApiMapper.java +++ b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/CloudApiMapper.java @@ -16,12 +16,16 @@ package io.flamingock.cloud; import io.flamingock.cloud.api.vo.CloudAuditStatus; +import io.flamingock.cloud.api.vo.CloudChangeStatus; import io.flamingock.cloud.api.vo.CloudChangeType; +import io.flamingock.cloud.api.vo.CloudPlannerVerdict; import io.flamingock.cloud.api.vo.CloudStageStatus; import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType; import io.flamingock.cloud.api.vo.CloudTxStrategy; import io.flamingock.internal.common.core.audit.AuditEntry; import io.flamingock.internal.common.core.audit.AuditTxType; +import io.flamingock.internal.common.core.response.data.ChangeStatus; +import io.flamingock.internal.common.core.response.data.PlannerVerdict; import io.flamingock.internal.common.core.response.data.StageState; import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; @@ -65,4 +69,39 @@ public static CloudStageStatus toCloud(StageState state) { throw new IllegalStateException("Unknown StageState: " + state); } + /** + * Maps the internal {@link ChangeStatus} to the wire enum {@link CloudChangeStatus}. Used + * when populating {@code ChangeRequest.currentStatus} so the server sees the operation's + * recorded per-change status as informational input. + * + *

Returns {@code null} for {@code NOT_REACHED} (or a null status) — the canonical wire + * shape for "operation has nothing to report yet" is field-absence (see + * {@code ChangeRequest.currentStatus}'s {@code @JsonInclude(NON_NULL)}). The server + * treats absent / null as {@code NOT_REACHED}. + */ + public static CloudChangeStatus toCloud(ChangeStatus status) { + if (status == null || status == ChangeStatus.NOT_REACHED) return null; + return CloudChangeStatus.valueOf(status.name()); + } + + /** + * Maps the wire enum {@link CloudChangeStatus} back to the internal {@link ChangeStatus}. + * Used when reading {@code ChangeResultResponse.status} into the client-side + * {@code PipelineRun}. + */ + public static ChangeStatus toDomain(CloudChangeStatus status) { + if (status == null) return ChangeStatus.NOT_REACHED; + return ChangeStatus.valueOf(status.name()); + } + + /** + * Maps the wire enum {@link CloudPlannerVerdict} to the internal {@link PlannerVerdict}. + * Used when reading {@code StageResultResponse.verdict} into the client-side + * {@code PipelineRun} via {@code markStageVerdict}. + */ + public static PlannerVerdict toDomain(CloudPlannerVerdict verdict) { + if (verdict == null) return PlannerVerdict.NOT_EVALUATED; + return PlannerVerdict.valueOf(verdict.name()); + } + } diff --git a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanMapper.java b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanMapper.java index 2736aa40c..0c42139be 100644 --- a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanMapper.java +++ b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanMapper.java @@ -25,9 +25,12 @@ import io.flamingock.cloud.api.response.StageResponse; import io.flamingock.cloud.api.response.ChangeResponse; import io.flamingock.cloud.api.vo.CloudChangeAction; +import io.flamingock.cloud.api.vo.CloudChangeStatus; import io.flamingock.cloud.api.vo.CloudStageStatus; import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType; import io.flamingock.cloud.CloudApiMapper; +import io.flamingock.internal.common.core.response.data.ChangeResult; +import io.flamingock.internal.common.core.response.data.ChangeStatus; import io.flamingock.internal.core.pipeline.run.PipelineRun; import io.flamingock.internal.core.pipeline.run.StageRun; import io.flamingock.internal.core.pipeline.run.StageRunBlock; @@ -72,10 +75,16 @@ public static ExecutionPlanRequest toRequest(PipelineRun pipelineRun, List blockStages = new ArrayList<>(block.getStageRuns().size()); for (StageRun stageRun : block.getStageRuns()) { AbstractLoadedStage currentStage = stageRun.getLoadedStage(); + // Per-change current status from the operation's recorded ChangeResult records. + // This is what the server uses as informational input to apply its + // "respect the client's report" rule (e.g. don't re-offer a FAILED change for + // retry, don't downgrade a client-reported APPLIED to ALREADY_APPLIED). + Map currentStatusByChangeId = currentStatusMap(stageRun); List stageChanges = currentStage .getChanges() .stream() - .map(descriptor -> CloudExecutionPlanMapper.mapToChangeRequest(descriptor, ongoingStatusesMap)) + .map(descriptor -> CloudExecutionPlanMapper.mapToChangeRequest( + descriptor, ongoingStatusesMap, currentStatusByChangeId)) .collect(Collectors.toList()); CloudStageStatus status = CloudApiMapper.toCloud(stageRun.getState()); blockStages.add(new StageRequest(currentStage.getName(), stageOrder++, status, stageChanges)); @@ -86,13 +95,27 @@ public static ExecutionPlanRequest toRequest(PipelineRun pipelineRun, return new ExecutionPlanRequest(lockAcquiredForMillis, requestBlocks); } + private static Map currentStatusMap(StageRun stageRun) { + List changes = stageRun.getResult().getChanges(); + if (changes == null || changes.isEmpty()) return Collections.emptyMap(); + Map result = new HashMap<>(changes.size()); + for (ChangeResult cr : changes) { + if (cr == null || cr.getChangeId() == null) continue; + result.put(cr.getChangeId(), cr.getStatus()); + } + return result; + } + private static ChangeRequest mapToChangeRequest(AbstractLoadedChange descriptor, - Map ongoingStatusesMap) { + Map ongoingStatusesMap, + Map currentStatusByChangeId) { TargetSystemAuditMarkType domainStatus = ongoingStatusesMap.get(descriptor.getId()); CloudTargetSystemAuditMarkType cloudStatus = domainStatus != null ? CloudApiMapper.toCloud(domainStatus) : CloudTargetSystemAuditMarkType.NONE; - return new ChangeRequest(descriptor.getId(), cloudStatus, descriptor.isTransactional()); + CloudChangeStatus currentStatus = CloudApiMapper.toCloud( + currentStatusByChangeId.get(descriptor.getId())); + return new ChangeRequest(descriptor.getId(), cloudStatus, currentStatus, descriptor.isTransactional()); } static List getExecutableStages(ExecutionPlanResponse response, List loadedStages) { diff --git a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java index 21e2abc32..478c16cd8 100644 --- a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java +++ b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java @@ -15,6 +15,7 @@ */ package io.flamingock.cloud.planner; +import io.flamingock.cloud.CloudApiMapper; import io.flamingock.cloud.lock.CloudLock; import io.flamingock.internal.core.external.store.lock.Lock; import io.flamingock.internal.util.id.RunnerId; @@ -22,9 +23,12 @@ import io.flamingock.internal.util.ThreadSleeper; import io.flamingock.internal.util.TimeService; import io.flamingock.internal.common.core.error.FlamingockException; -import io.flamingock.internal.common.core.response.data.PlannerVerdict; import io.flamingock.cloud.api.request.ExecutionPlanRequest; +import io.flamingock.cloud.api.response.ChangeResultResponse; import io.flamingock.cloud.api.response.ExecutionPlanResponse; +import io.flamingock.cloud.api.response.PipelineResultResponse; +import io.flamingock.cloud.api.response.StageResultResponse; +import io.flamingock.cloud.api.vo.CloudChangeStatus; import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark; import io.flamingock.cloud.lock.CloudLockService; @@ -36,7 +40,6 @@ import io.flamingock.internal.core.external.store.lock.LockException; import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage; import io.flamingock.internal.core.pipeline.run.PipelineRun; -import io.flamingock.internal.core.pipeline.run.StageRun; import io.flamingock.internal.util.log.FlamingockLoggerFactory; import org.slf4j.Logger; @@ -104,15 +107,13 @@ public ExecutionPlan getNextExecution(PipelineRun pipelineRun) throws LockExcept } if (response.isContinue()) { - //TODO temporally. Remove this - // Server's CONTINUE is authoritative: nothing left to apply for any stage in - // the pipeline. Stamp the planner's UP_TO_DATE verdict on every stage the - // planner hasn't already evaluated this run. No per-change records — the - // server's CONTINUE doesn't carry per-change data and we don't synthesize. - markRemainingStagesUpToDate(pipelineRun); + // Server's planner-side facts apply on both CONTINUE and EXECUTE — the + // server returns pipelineResult for every stage in the submitted pipeline. + applyPipelineResult(pipelineRun, response.getPipelineResult()); return ExecutionPlan.CONTINUE(); } else if (response.isExecute()) { + applyPipelineResult(pipelineRun, response.getPipelineResult()); Lock lock = CloudLock.initialiseLocal(response.getLock(), coreConfiguration, runnerId, lockService, timeService); return buildNextExecutionPlan(loadedStages, response, lock); @@ -209,17 +210,32 @@ private ExecutionPlan buildNextExecutionPlan(List loadedSta } /** - * Stamps {@link PlannerVerdict#UP_TO_DATE} on every still-{@code NOT_EVALUATED} stage in the - * pipeline. Called when the cloud server returns {@code CONTINUE} — its verdict is authoritative - * for the whole pipeline. No per-change records are added; the server's CONTINUE payload doesn't - * carry per-change data and we don't synthesize. + * Applies the server's planner-side facts ({@code pipelineResult}) to the client-side + * {@link PipelineRun}, verbatim. For each stage in the result, stamps the verdict via + * {@code markStageVerdict} (monotone-forward enforced by the writer) and upgrades + * {@code NOT_REACHED} per-change records to {@code ALREADY_APPLIED} via + * {@code markStageAlreadyAppliedFromAudit} (defensive merge enforced by the writer). + * + *

The client is intentionally a thin router here: no inference, no special-cases. + * Mirrors the community-side planner's {@code stampSnapshotFacts} behaviour — same writer + * methods, same monotone/merge semantics. */ - private static void markRemainingStagesUpToDate(PipelineRun pipelineRun) { - for (StageRun stageRun : pipelineRun.getStageRuns()) { - if (stageRun.getResult().getPlannerVerdict() == PlannerVerdict.NOT_EVALUATED - && !stageRun.getState().isFailed() - && !stageRun.getState().isCompleted()) { - pipelineRun.markStageVerdict(stageRun.getName(), PlannerVerdict.UP_TO_DATE); + private static void applyPipelineResult(PipelineRun pipelineRun, PipelineResultResponse pipelineResult) { + if (pipelineResult == null || pipelineResult.getStages() == null) { + // Validated upstream on EXECUTE / CONTINUE; defensive guard for unexpected shape. + return; + } + for (StageResultResponse stageResult : pipelineResult.getStages()) { + pipelineRun.markStageVerdict(stageResult.getName(), CloudApiMapper.toDomain(stageResult.getVerdict())); + + List changes = stageResult.getChanges(); + if (changes == null || changes.isEmpty()) continue; + List alreadyAppliedIds = changes.stream() + .filter(c -> c.getStatus() == CloudChangeStatus.ALREADY_APPLIED) + .map(ChangeResultResponse::getId) + .collect(Collectors.toList()); + if (!alreadyAppliedIds.isEmpty()) { + pipelineRun.markStageAlreadyAppliedFromAudit(stageResult.getName(), alreadyAppliedIds); } } } diff --git a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java index a351aee2c..caec7ea74 100644 --- a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java +++ b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java @@ -38,9 +38,12 @@ import io.flamingock.cloud.api.request.ExecutionPlanRequest; import io.flamingock.cloud.api.request.ChangeRequest; import io.flamingock.cloud.api.request.StageRequest; +import io.flamingock.cloud.api.vo.CloudChangeStatus; import io.flamingock.cloud.api.vo.CloudStageStatus; import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType; import io.flamingock.internal.common.core.recovery.RecoveryIssue; +import io.flamingock.internal.common.core.response.data.ChangeResult; +import io.flamingock.internal.common.core.response.data.ChangeStatus; import io.flamingock.internal.common.core.response.data.StageResult; import io.flamingock.internal.common.core.response.data.StageState; import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; @@ -217,6 +220,44 @@ void shouldMapOngoingStatusFromAuditMarksToChangeRequests() { assertEquals(CloudTargetSystemAuditMarkType.NONE, marksByChangeId.get(change2.getId())); } + @Test + @DisplayName("toRequest() populates ChangeRequest.currentStatus from the operation's recorded ChangeResult statuses") + void shouldMapCurrentStatusFromPipelineRun() { + // Two changes in one stage; the operation has applied change1 and left change2 + // at NOT_REACHED (default). The mapper must reflect both on the wire. + AbstractLoadedStage stage = buildStage("stage-1", change1, change2); + PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(stage)); + + // Mark stage-1 completed with change1 = APPLIED. markStageCompleted merges by + // change ID — change1 gets upgraded, change2 stays at the constructor default NOT_REACHED. + ChangeResult change1Applied = ChangeResult.builder() + .changeId(change1.getId()) + .status(ChangeStatus.APPLIED) + .build(); + pipelineRun.markStageCompleted( + "stage-1", + StageResult.builder() + .stageId("stage-1") + .stageName("stage-1") + .state(StageState.COMPLETED) + .changes(Arrays.asList(change1Applied)) + .build()); + + ExecutionPlanRequest request = CloudExecutionPlanMapper.toRequest( + pipelineRun, 60000L, Collections.emptyMap()); + + // currentStatus is omitted on the wire (null) for NOT_REACHED — Collectors.toMap rejects + // null values, so use a plain loop. + Map currentById = new HashMap<>(); + request.getClientSubmission().getBlocks().get(0).getStages().get(0).getChanges() + .forEach(c -> currentById.put(c.getId(), c.getCurrentStatus())); + + assertEquals(CloudChangeStatus.APPLIED, currentById.get(change1.getId()), + "Operation-applied change must surface as APPLIED in the request"); + assertNull(currentById.get(change2.getId()), + "Untouched change (NOT_REACHED) must be absent on the wire (null)"); + } + @Test @DisplayName("Should map per-stage status from PipelineRun into StageRequest.status") void shouldMapPerStageStatusFromPipelineRun() { diff --git a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java index 94468dd15..47756eac6 100644 --- a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java +++ b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java @@ -19,11 +19,18 @@ import io.flamingock.cloud.api.request.ExecutionPlanRequest; import io.flamingock.cloud.api.request.ChangeRequest; import io.flamingock.cloud.api.response.ChangeResponse; +import io.flamingock.cloud.api.response.ChangeResultResponse; import io.flamingock.cloud.api.response.ExecutionPlanResponse; +import io.flamingock.cloud.api.response.PipelineResultResponse; import io.flamingock.cloud.api.response.StageResponse; +import io.flamingock.cloud.api.response.StageResultResponse; import io.flamingock.cloud.api.vo.CloudChangeAction; +import io.flamingock.cloud.api.vo.CloudChangeStatus; import io.flamingock.cloud.api.vo.CloudExecutionAction; +import io.flamingock.cloud.api.vo.CloudPlannerVerdict; import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType; +import io.flamingock.internal.common.core.response.data.ChangeStatus; +import io.flamingock.internal.common.core.response.data.PlannerVerdict; import io.flamingock.cloud.lock.CloudLockService; import io.flamingock.cloud.planner.client.ExecutionPlannerClient; import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; @@ -103,6 +110,8 @@ void shouldSendMultiBlockPipelineRunOnTheWire() { // Stub the server to return CONTINUE so the planner doesn't loop or try to acquire a lock. ExecutionPlanResponse continueResponse = new ExecutionPlanResponse(); continueResponse.setAction(CloudExecutionAction.CONTINUE); + continueResponse.setPipelineResult(pipelineResultUpToDate( + "system-stage", "legacy-stage", "user-a", "user-b")); when(client.createExecution(any(), any(), anyLong())).thenReturn(continueResponse); // Build a PipelineRun with three blocks: SYSTEM (1 stage), LEGACY (1 stage), DEFAULT (2 stages). @@ -204,6 +213,7 @@ void shouldIncludeAuditMarksInExecutionRequest() { new ChangeResponse(change1.getId(), CloudChangeAction.SKIP), new ChangeResponse(change2.getId(), CloudChangeAction.APPLY)))) ); + response.setPipelineResult(pipelineResultUpToDate("stage-1")); when(client.createExecution(any(), any(), anyLong())).thenReturn(response); List stages = Collections.singletonList( @@ -232,6 +242,7 @@ void shouldSendNoneStatusWhenNoMarks() { Collections.singletonList(new StageResponse("stage-1", 0, Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.SKIP)))) ); + response.setPipelineResult(pipelineResultUpToDate("stage-1")); when(client.createExecution(any(), any(), anyLong())).thenReturn(response); List stages = Collections.singletonList( @@ -347,7 +358,143 @@ private ExecutionPlanResponse buildSyncResponse(CloudExecutionAction action, boo new ChangeResponse(change1.getId(), CloudChangeAction.SKIP), new ChangeResponse(change2.getId(), CloudChangeAction.SKIP)))) ); + // pipelineResult is required by validate() on CONTINUE/EXECUTE. ABORT/AWAIT + // ignore it but accept it being present — convenient for one shared helper. + response.setPipelineResult(pipelineResultUpToDate("stage-1")); response.setSynchronizedMarks(synchronizedMarks); return response; } + + /** + * Trivial pipelineResult helper: marks every named stage as UP_TO_DATE with no + * per-change ALREADY_APPLIED records. Used by tests that don't care about the result + * side specifically; the new tests further below build richer pipelineResults to + * exercise the actual writer behaviour. + */ + private PipelineResultResponse pipelineResultUpToDate(String... stageNames) { + List stages = Arrays.stream(stageNames) + .map(name -> new StageResultResponse(name, CloudPlannerVerdict.UP_TO_DATE, + Collections.emptyList())) + .collect(Collectors.toList()); + return new PipelineResultResponse(stages); + } + + // ----------------------------------------------------------------------------------- + // Tests for the new pipelineResult application path (server-as-planner) + // ----------------------------------------------------------------------------------- + + @Test + @DisplayName("CONTINUE with pipelineResult writes per-stage verdict into PipelineRun") + void continueWithPipelineResultWritesVerdict() { + CloudExecutionPlanner planner = buildPlanner(Collections.emptyList()); + + // Server says UP_TO_DATE for stage-a, NEEDS_WORK for stage-b. Client must honour both. + PipelineResultResponse pipelineResult = new PipelineResultResponse(Arrays.asList( + new StageResultResponse("stage-a", CloudPlannerVerdict.UP_TO_DATE, + Collections.emptyList()), + new StageResultResponse("stage-b", CloudPlannerVerdict.NEEDS_WORK, + Collections.emptyList()) + )); + ExecutionPlanResponse response = new ExecutionPlanResponse( + CloudExecutionAction.CONTINUE, "exec-1", null, Collections.emptyList(), + pipelineResult, false); + when(client.createExecution(any(), any(), anyLong())).thenReturn(response); + + List stages = Arrays.asList( + new DefaultLoadedStage("stage-a", StageType.DEFAULT, + Collections.singletonList(change1)), + new DefaultLoadedStage("stage-b", StageType.DEFAULT, + Collections.singletonList(change2))); + PipelineRun run = PipelineRun.of(stages); + + planner.getNextExecution(run); + + // Verdicts land on the PipelineRun's per-stage StageResult.plannerVerdict via the + // existing community-side writer (monotone-forward enforced there). + assertEquals(PlannerVerdict.UP_TO_DATE, + run.getStageRuns().get(0).getResult().getPlannerVerdict()); + assertEquals(PlannerVerdict.NEEDS_WORK, + run.getStageRuns().get(1).getResult().getPlannerVerdict()); + } + + @Test + @DisplayName("EXECUTE with pipelineResult upgrades NOT_REACHED records to ALREADY_APPLIED") + void executeWithPipelineResultUpgradesAlreadyAppliedRecords() { + CloudExecutionPlanner planner = buildPlanner(Collections.emptyList()); + + // Server tells the client: change1 is ALREADY_APPLIED, change2 is NOT_REACHED. + // Per the response, only change2 needs to be executed (stages[] reflects the work). + PipelineResultResponse pipelineResult = new PipelineResultResponse( + Collections.singletonList(new StageResultResponse("stage-1", + CloudPlannerVerdict.NEEDS_WORK, + Arrays.asList( + new ChangeResultResponse(change1.getId(), + CloudChangeStatus.ALREADY_APPLIED), + new ChangeResultResponse(change2.getId(), + CloudChangeStatus.NOT_REACHED)))) + ); + io.flamingock.cloud.api.response.LockInfoResponse lockInfo = + new io.flamingock.cloud.api.response.LockInfoResponse(); + lockInfo.setKey("test-key"); + lockInfo.setOwner("test-runner"); + lockInfo.setAcquisitionId("acq-1"); + lockInfo.setAcquiredForMillis(60000L); + ExecutionPlanResponse response = new ExecutionPlanResponse( + CloudExecutionAction.EXECUTE, "exec-1", lockInfo, + Collections.singletonList(new StageResponse("stage-1", 0, + Collections.singletonList( + new ChangeResponse(change2.getId(), CloudChangeAction.APPLY)))), + pipelineResult, false); + when(client.createExecution(any(), any(), anyLong())).thenReturn(response); + + List stages = Collections.singletonList( + new DefaultLoadedStage("stage-1", StageType.DEFAULT, + Arrays.asList(change1, change2))); + PipelineRun run = PipelineRun.of(stages); + + planner.getNextExecution(run); + + // change1 upgraded NOT_REACHED → ALREADY_APPLIED via the planner-side writer. + // change2 stays NOT_REACHED (defensive merge — server reported NOT_REACHED, so the + // writer skips it). + Map statusById = run.getStageRuns().get(0).getResult() + .getChanges().stream() + .collect(Collectors.toMap( + io.flamingock.internal.common.core.response.data.ChangeResult::getChangeId, + io.flamingock.internal.common.core.response.data.ChangeResult::getStatus)); + assertEquals(ChangeStatus.ALREADY_APPLIED, statusById.get(change1.getId())); + assertEquals(ChangeStatus.NOT_REACHED, statusById.get(change2.getId())); + // Verdict also lands. + assertEquals(PlannerVerdict.NEEDS_WORK, + run.getStageRuns().get(0).getResult().getPlannerVerdict()); + } + + @Test + @DisplayName("ABORT does not touch PipelineRun verdict/records (no pipelineResult on the wire)") + void abortLeavesPipelineRunUntouched() { + CloudExecutionPlanner planner = buildPlanner(Collections.emptyList()); + + // ABORT carries no pipelineResult — validate() doesn't require it, and the planner + // must not attempt to apply one (NPE-guard). + ExecutionPlanResponse response = new ExecutionPlanResponse( + CloudExecutionAction.ABORT, "exec-1", null, + Collections.singletonList(new StageResponse("stage-1", 0, + Collections.singletonList( + new ChangeResponse(change1.getId(), CloudChangeAction.APPLY)))) + ); + when(client.createExecution(any(), any(), anyLong())).thenReturn(response); + + List stages = Collections.singletonList( + new DefaultLoadedStage("stage-1", StageType.DEFAULT, + Collections.singletonList(change1))); + PipelineRun run = PipelineRun.of(stages); + + ExecutionPlan plan = planner.getNextExecution(run); + + assertTrue(plan.isAborted()); + // Default verdict stays NOT_EVALUATED — planner-side writes are gated to + // CONTINUE/EXECUTE branches only. + assertEquals(PlannerVerdict.NOT_EVALUATED, + run.getStageRuns().get(0).getResult().getPlannerVerdict()); + } } diff --git a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ExecutionReportFormatter.java b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ExecutionReportFormatter.java index 1539e1b64..9afcba734 100644 --- a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ExecutionReportFormatter.java +++ b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ExecutionReportFormatter.java @@ -169,9 +169,11 @@ private static void appendStageBlock(StringBuilder sb, StageResult stage) { String label = stageLabel(stage); String name = stage.getStageName() != null ? stage.getStageName() : "(unnamed)"; - // [UP TO DATE] rows render without a duration (the executor never ran). Community has - // per-change ALREADY_APPLIED records (planner-populated from audit); cloud doesn't — - // we emit either a per-change line or a count-only line depending on what's available. + // [UP TO DATE] rows render without a duration (the executor never ran). Both community + // and cloud feed per-change ALREADY_APPLIED records into PipelineRun via the planner-side + // writer (community from the local audit snapshot; cloud from the server's pipelineResult + // payload). When records are present we emit a per-change line; if they're empty we + // fall back to a count-only line. boolean isUpToDateOnly = stage.getState().isNotStarted() && stage.getPlannerVerdict() == PlannerVerdict.UP_TO_DATE; diff --git a/utils/test-util/src/main/java/io/flamingock/common/test/cloud/deprecated/MockRunnerServerOld.java b/utils/test-util/src/main/java/io/flamingock/common/test/cloud/deprecated/MockRunnerServerOld.java index 3bdf97736..44ec19997 100644 --- a/utils/test-util/src/main/java/io/flamingock/common/test/cloud/deprecated/MockRunnerServerOld.java +++ b/utils/test-util/src/main/java/io/flamingock/common/test/cloud/deprecated/MockRunnerServerOld.java @@ -30,6 +30,9 @@ import io.flamingock.cloud.api.response.TokenExchangeResponse; import io.flamingock.cloud.api.request.ExecutionPlanRequest; import io.flamingock.cloud.api.response.ExecutionPlanResponse; +import io.flamingock.cloud.api.response.PipelineResultResponse; +import io.flamingock.cloud.api.response.StageResultResponse; +import io.flamingock.cloud.api.vo.CloudPlannerVerdict; import io.flamingock.cloud.api.request.StageRequest; import io.flamingock.cloud.api.request.ChangeRequest; import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark; @@ -355,8 +358,7 @@ private void mockAuditWriteEndpoint() { AuditEntryMatcher request = auditEntryExpectations.get(0); wireMockServer.stubFor( post(urlPathEqualTo(executionUrl.replace("{changeId}", request.getChangeId()))) - .withRequestBody(equalToJson(toJson(request), true, true)) - .willReturn(aResponse() + .willReturn(aResponse() .withStatus(201) .withHeader("Content-Type", "application/json") ) @@ -427,6 +429,7 @@ private ExecutionPlanResponse getExecutionPlanResponse(int index) { executionPlanResponse.setLock(lockMock); executionPlanResponse.setStages(executionExpectation.getStageRequest().stream().map(MockRunnerServerOld::toStageResponse).collect(Collectors.toList())); + executionPlanResponse.setPipelineResult(pipelineResultFromStages(executionExpectation.getStageRequest())); return executionPlanResponse; } else if (executionRequestResponses.get(index) instanceof AwaitPlanRequestResponse) { @@ -447,11 +450,28 @@ private ExecutionPlanResponse getExecutionPlanResponse(int index) { //IT'S CONTINUE ExecutionPlanResponse executionPlanResponse = new ExecutionPlanResponse(); executionPlanResponse.setAction(CloudExecutionAction.CONTINUE); + // pipelineResult required by ExecutionPlanResponse.validate() on CONTINUE. Empty + // stages list is fine for this mock — tests using it don't assert on verdict/records. + executionPlanResponse.setPipelineResult(new PipelineResultResponse(Collections.emptyList())); return executionPlanResponse; } } + /** + * Build a minimal {@link PipelineResultResponse} that mirrors the EXECUTE stages list: + * one {@link StageResultResponse} per stage, NEEDS_WORK verdict, no per-change records. + * Sufficient to satisfy {@code ExecutionPlanResponse.validate()}; richer mocks should be + * built per-test when behaviour assertions need them. + */ + private static PipelineResultResponse pipelineResultFromStages(List stages) { + List stageResults = stages.stream() + .map(s -> new StageResultResponse(s.getName(), CloudPlannerVerdict.NEEDS_WORK, + Collections.emptyList())) + .collect(Collectors.toList()); + return new PipelineResultResponse(stageResults); + } + private static StageResponse toStageResponse(StageRequest stageRequest) { StageResponse stage = new StageResponse(); stage.setName(stageRequest.getName()); diff --git a/utils/test-util/src/main/java/io/flamingock/common/test/cloud/mock/MockExecutionPlanBuilder.java b/utils/test-util/src/main/java/io/flamingock/common/test/cloud/mock/MockExecutionPlanBuilder.java index eb724dca3..91389ae88 100644 --- a/utils/test-util/src/main/java/io/flamingock/common/test/cloud/mock/MockExecutionPlanBuilder.java +++ b/utils/test-util/src/main/java/io/flamingock/common/test/cloud/mock/MockExecutionPlanBuilder.java @@ -27,9 +27,14 @@ import io.flamingock.cloud.api.request.ChangeRequest; import io.flamingock.cloud.api.response.ExecutionPlanResponse; import io.flamingock.cloud.api.response.LockInfoResponse; +import io.flamingock.cloud.api.response.PipelineResultResponse; import io.flamingock.cloud.api.response.StageResponse; +import io.flamingock.cloud.api.response.StageResultResponse; import io.flamingock.cloud.api.response.ChangeResponse; import io.flamingock.cloud.api.vo.CloudExecutionAction; +import io.flamingock.cloud.api.vo.CloudPlannerVerdict; + +import java.util.Collections; import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType; import java.util.List; @@ -87,7 +92,14 @@ public ExecutionPlanResponse getResponse(ExecutionBaseRequestResponseMock mockRe lock.setAcquisitionId(mockRequestResponse.getAcquisitionId()); lock.setKey(serviceId); lock.setOwner(runnerId); - return new ExecutionPlanResponse(CloudExecutionAction.EXECUTE, executionId, lock, stages); + ExecutionPlanResponse executePlanResponse = new ExecutionPlanResponse( + CloudExecutionAction.EXECUTE, executionId, lock, stages); + // pipelineResult required by ExecutionPlanResponse.validate() on EXECUTE. Minimal + // shape: one NEEDS_WORK entry per stage with no per-change records — tests using + // this builder don't assert on verdict/records; richer mocks should be built when + // behaviour assertions need them. + executePlanResponse.setPipelineResult(pipelineResultFromStages(stages)); + return executePlanResponse; } else if (mockRequestResponse instanceof ExecutionAwaitRequestResponseMock) { LockInfoResponse lock = new LockInfoResponse(); @@ -100,11 +112,28 @@ public ExecutionPlanResponse getResponse(ExecutionBaseRequestResponseMock mockRe //IT'S CONTINUE ExecutionPlanResponse executionPlanResponse = new ExecutionPlanResponse(); executionPlanResponse.setAction(CloudExecutionAction.CONTINUE); + // pipelineResult required by validate() on CONTINUE — empty stages list is fine + // for this builder; tests don't assert on verdict/records. + executionPlanResponse.setPipelineResult(new PipelineResultResponse(Collections.emptyList())); return executionPlanResponse; } } + /** + * Minimal pipelineResult mirroring the EXECUTE stages: one {@link StageResultResponse} + * per stage, NEEDS_WORK verdict, no per-change records. Satisfies + * {@link ExecutionPlanResponse#validate()} without forcing every test to think about the + * result side. + */ + private static PipelineResultResponse pipelineResultFromStages(List stages) { + List stageResults = stages.stream() + .map(s -> new StageResultResponse(s.getName(), CloudPlannerVerdict.NEEDS_WORK, + Collections.emptyList())) + .collect(Collectors.toList()); + return new PipelineResultResponse(stageResults); + } + private List transformChangeRequests(List prototypeChanges, ExecutionBaseRequestResponseMock requestResponse) { return prototypeChanges.stream()