diff --git a/build.gradle.kts b/build.gradle.kts
index 2e1620342..895395a57 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -19,7 +19,7 @@ plugins {
allprojects {
group = "io.flamingock"
- val declaredVersion = "1.4.1-SNAPSHOT"
+ val declaredVersion = "1.5.0-SNAPSHOT"
version = VersionManager.resolveVersion(declaredVersion, project.hasProperty("release"))
extra["generalUtilVersion"] = "1.5.3"
diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ChangeRequest.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ChangeRequest.java
index 1c4721e9b..95bb30ccc 100644
--- a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ChangeRequest.java
+++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/request/ChangeRequest.java
@@ -16,6 +16,8 @@
package io.flamingock.cloud.api.request;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import io.flamingock.cloud.api.vo.CloudChangeStatus;
import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType;
//TODO add recoveryStrategy, so we can determin the acction in the server
@@ -25,26 +27,51 @@ public class ChangeRequest {
private CloudTargetSystemAuditMarkType ongoingStatus;
+ /**
+ * Per-change status reported by the client — mirrors the operation-side
+ * {@code ChangeResult.status} currently held on the client's {@code PipelineRun}. The
+ * server uses this as informational input when synthesising the response: it never
+ * contradicts the client's positive report (e.g. {@code APPLIED} stays {@code APPLIED},
+ * not downgraded to {@code ALREADY_APPLIED}), and it respects {@code FAILED} /
+ * {@code ROLLED_BACK} reports so it doesn't ask the client to retry indefinitely.
+ *
+ *
{@code null} on the wire means the operation has nothing to report yet
+ * (equivalent to {@code NOT_REACHED}). Serialised as field-absence so the wire shape is
+ * forward-compatible with older mocks/expectations that don't set this field.
+ */
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private CloudChangeStatus currentStatus;
+
private boolean transactional;
public ChangeRequest() {
}
public static ChangeRequest change(String id, boolean transactional) {
- return new ChangeRequest(id, CloudTargetSystemAuditMarkType.NONE, transactional);
+ return new ChangeRequest(id, CloudTargetSystemAuditMarkType.NONE, null, transactional);
}
public static ChangeRequest ongoingExecution(String id, boolean transactional) {
- return new ChangeRequest(id, CloudTargetSystemAuditMarkType.APPLIED, transactional);
+ return new ChangeRequest(id, CloudTargetSystemAuditMarkType.APPLIED, null, transactional);
}
public static ChangeRequest ongoingRollback(String id, boolean transactional) {
- return new ChangeRequest(id, CloudTargetSystemAuditMarkType.ROLLED_BACK, transactional);
+ return new ChangeRequest(id, CloudTargetSystemAuditMarkType.ROLLED_BACK, null, transactional);
+ }
+
+ public ChangeRequest(String id,
+ CloudTargetSystemAuditMarkType ongoingStatus,
+ boolean transactional) {
+ this(id, ongoingStatus, null, transactional);
}
- public ChangeRequest(String id, CloudTargetSystemAuditMarkType ongoingStatus, boolean transactional) {
+ public ChangeRequest(String id,
+ CloudTargetSystemAuditMarkType ongoingStatus,
+ CloudChangeStatus currentStatus,
+ boolean transactional) {
this.id = id;
this.ongoingStatus = ongoingStatus;
+ this.currentStatus = currentStatus;
this.transactional = transactional;
}
@@ -56,6 +83,10 @@ public CloudTargetSystemAuditMarkType getOngoingStatus() {
return ongoingStatus;
}
+ public CloudChangeStatus getCurrentStatus() {
+ return currentStatus;
+ }
+
public boolean isTransactional() {
return transactional;
}
@@ -68,6 +99,10 @@ public void setOngoingStatus(CloudTargetSystemAuditMarkType ongoingStatus) {
this.ongoingStatus = ongoingStatus;
}
+ public void setCurrentStatus(CloudChangeStatus currentStatus) {
+ this.currentStatus = currentStatus;
+ }
+
public void setTransactional(boolean transactional) {
this.transactional = transactional;
}
@@ -79,11 +114,12 @@ public boolean equals(Object o) {
ChangeRequest that = (ChangeRequest) o;
return transactional == that.transactional
&& java.util.Objects.equals(id, that.id)
- && java.util.Objects.equals(ongoingStatus, that.ongoingStatus);
+ && java.util.Objects.equals(ongoingStatus, that.ongoingStatus)
+ && java.util.Objects.equals(currentStatus, that.currentStatus);
}
@Override
public int hashCode() {
- return java.util.Objects.hash(id, ongoingStatus, transactional);
+ return java.util.Objects.hash(id, ongoingStatus, currentStatus, transactional);
}
}
\ No newline at end of file
diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ChangeResultResponse.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ChangeResultResponse.java
new file mode 100644
index 000000000..17eb365a2
--- /dev/null
+++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ChangeResultResponse.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2026 Flamingock (https://www.flamingock.io)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.flamingock.cloud.api.response;
+
+import io.flamingock.cloud.api.vo.CloudChangeStatus;
+
+import java.util.Objects;
+
+/**
+ * Result-side per-change payload. Sibling of the operation-side {@link ChangeResponse} (which
+ * carries {@code action}); this class carries the server's synthesised {@code status} so the
+ * client can write rich per-change records into {@code PipelineRun} via
+ * {@code markStageAlreadyAppliedFromAudit} (and downstream renderers).
+ */
+public class ChangeResultResponse {
+
+ private String id;
+
+ private CloudChangeStatus status;
+
+ public ChangeResultResponse() {
+ }
+
+ public ChangeResultResponse(String id, CloudChangeStatus status) {
+ this.id = id;
+ this.status = status;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public CloudChangeStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(CloudChangeStatus status) {
+ this.status = status;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ChangeResultResponse that = (ChangeResultResponse) o;
+ return Objects.equals(id, that.id) && status == that.status;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, status);
+ }
+}
diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java
index 720513910..08bc28730 100644
--- a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java
+++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/ExecutionPlanResponse.java
@@ -28,8 +28,21 @@ public class ExecutionPlanResponse {
private LockInfoResponse lock;
+ /**
+ * Operation side: the stages the executor should act on this round. Sparse — only
+ * stages with actionable changes appear here.
+ */
private List stages;
+ /**
+ * Result side: server's synthesised per-stage verdict + per-change status for the
+ * entire submitted pipeline (not just stages with work). The client
+ * iterates this uniformly to feed {@code PipelineRun.markStageVerdict} and
+ * {@code PipelineRun.markStageAlreadyAppliedFromAudit}. Required on {@code EXECUTE}
+ * and {@code CONTINUE} responses; absent on {@code AWAIT} / {@code ABORT}.
+ */
+ private PipelineResultResponse pipelineResult;
+
private boolean synchronizedMarks;
@@ -54,10 +67,20 @@ public ExecutionPlanResponse(CloudExecutionAction action,
LockInfoResponse lock,
List stages,
boolean synchronizedMarks) {
+ this(action, executionId, lock, stages, null, synchronizedMarks);
+ }
+
+ public ExecutionPlanResponse(CloudExecutionAction action,
+ String executionId,
+ LockInfoResponse lock,
+ List stages,
+ PipelineResultResponse pipelineResult,
+ boolean synchronizedMarks) {
this.action = action;
this.executionId = executionId;
this.lock = lock;
this.stages = stages;
+ this.pipelineResult = pipelineResult;
this.synchronizedMarks = synchronizedMarks;
}
@@ -89,6 +112,14 @@ public void setStages(List stages) {
this.stages = stages;
}
+ public PipelineResultResponse getPipelineResult() {
+ return pipelineResult;
+ }
+
+ public void setPipelineResult(PipelineResultResponse pipelineResult) {
+ this.pipelineResult = pipelineResult;
+ }
+
public boolean isContinue() {
return action == CloudExecutionAction.CONTINUE;
}
@@ -128,6 +159,15 @@ public void validate() {
if (isAwait() && getLock() == null) {
throw new RuntimeException("ExecutionPlan is await, but not lock information returned");
}
+
+ // pipelineResult is required on EXECUTE and CONTINUE so the client can write
+ // per-stage verdict and per-change records uniformly. AWAIT / ABORT carry none.
+ if ((isExecute() || isContinue()) && pipelineResult == null) {
+ throw new RuntimeException(
+ "ExecutionPlan is " + action
+ + ", but no pipelineResult returned — the server must populate the result side"
+ + " so the client can write verdict and per-change status into PipelineRun.");
+ }
}
}
diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/PipelineResultResponse.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/PipelineResultResponse.java
new file mode 100644
index 000000000..e91ea4021
--- /dev/null
+++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/PipelineResultResponse.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2026 Flamingock (https://www.flamingock.io)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.flamingock.cloud.api.response;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Result-side sibling of {@code ExecutionPlanResponse.stages}. Carries the server's
+ * synthesised per-stage {@code verdict} and per-change {@code status} for the entire
+ * submitted pipeline, in a shape the client iterates uniformly to feed
+ * {@code PipelineRun.markStageVerdict} and
+ * {@code PipelineRun.markStageAlreadyAppliedFromAudit}.
+ *
+ * The two halves of the response mirror the two halves of the core-side
+ * {@code PipelineRun} two-writer model: the operation side ({@code stages[]}) tells the
+ * executor what to do; the result side (this class) tells the client's planner what facts
+ * to record.
+ */
+public class PipelineResultResponse {
+
+ private List stages;
+
+ public PipelineResultResponse() {
+ }
+
+ public PipelineResultResponse(List stages) {
+ this.stages = stages;
+ }
+
+ public List getStages() {
+ return stages;
+ }
+
+ public void setStages(List stages) {
+ this.stages = stages;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ PipelineResultResponse that = (PipelineResultResponse) o;
+ return Objects.equals(stages, that.stages);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(stages);
+ }
+}
diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/StageResultResponse.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/StageResultResponse.java
new file mode 100644
index 000000000..19d934741
--- /dev/null
+++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/response/StageResultResponse.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2026 Flamingock (https://www.flamingock.io)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.flamingock.cloud.api.response;
+
+import io.flamingock.cloud.api.vo.CloudPlannerVerdict;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Result-side per-stage payload. Sibling of {@link StageResponse} on the operation side;
+ * carries the server's per-stage {@code verdict} and per-change result records for the
+ * client to write into {@code PipelineRun} via {@code markStageVerdict} +
+ * {@code markStageAlreadyAppliedFromAudit}.
+ *
+ * The server returns one of these for every stage in the submitted pipeline
+ * regardless of whether the stage carries actionable work, so the client can iterate
+ * uniformly without special-cases for "stage absent from response".
+ */
+public class StageResultResponse {
+
+ private String name;
+
+ private CloudPlannerVerdict verdict;
+
+ private List changes;
+
+ public StageResultResponse() {
+ }
+
+ public StageResultResponse(String name,
+ CloudPlannerVerdict verdict,
+ List changes) {
+ this.name = name;
+ this.verdict = verdict;
+ this.changes = changes;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public CloudPlannerVerdict getVerdict() {
+ return verdict;
+ }
+
+ public void setVerdict(CloudPlannerVerdict verdict) {
+ this.verdict = verdict;
+ }
+
+ public List getChanges() {
+ return changes;
+ }
+
+ public void setChanges(List changes) {
+ this.changes = changes;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ StageResultResponse that = (StageResultResponse) o;
+ return Objects.equals(name, that.name)
+ && verdict == that.verdict
+ && Objects.equals(changes, that.changes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, verdict, changes);
+ }
+}
diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/vo/CloudChangeStatus.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/vo/CloudChangeStatus.java
new file mode 100644
index 000000000..fe3933b43
--- /dev/null
+++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/vo/CloudChangeStatus.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2026 Flamingock (https://www.flamingock.io)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.flamingock.cloud.api.vo;
+
+/**
+ * Per-change status carried in both directions:
+ *
+ *
+ * - In {@code ChangeRequest.currentStatus}: the client tells the server what its
+ * operation-side {@code ChangeResult.status} currently holds, so the server can apply
+ * its "client report is informational input" rule without contradicting the operation's
+ * prior writes.
+ * - In {@code ChangeResultResponse.status}: the server's synthesised per-change status
+ * reconciled from audit + client report. Mirrors the core-side {@code ChangeStatus}
+ * enum and feeds the client's {@code PipelineRun} writers verbatim.
+ *
+ *
+ * Source of truth for the synthesis is the audit store on the server; the client's report
+ * is informational input the server respects (e.g. a {@code FAILED} report breaks the retry
+ * loop on a still-pending change).
+ */
+public enum CloudChangeStatus {
+
+ /**
+ * No positive information: the operation did not process this change this run (executor
+ * stopped on an earlier failure, stage was unreached, etc.) and the server's audit holds
+ * no terminal entry.
+ */
+ NOT_REACHED,
+
+ /**
+ * The change was applied during this run by the operation; the server's audit confirms
+ * it. Distinct from {@link #ALREADY_APPLIED} because the client correctly reports the
+ * act of applying it this run.
+ */
+ APPLIED,
+
+ /**
+ * The server's audit confirms the change is already applied from a prior run (or from a
+ * CLI {@code mark-as-applied} / external write). The operation did not have to invoke
+ * the executor for this change.
+ */
+ ALREADY_APPLIED,
+
+ /**
+ * The change failed during execution this run. The server respects the failure and uses
+ * the configured recovery strategy to decide whether to re-offer it.
+ */
+ FAILED,
+
+ /**
+ * The change failed during execution this run and was successfully rolled back
+ * (transactional auto-rollback).
+ */
+ ROLLED_BACK
+}
diff --git a/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/vo/CloudPlannerVerdict.java b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/vo/CloudPlannerVerdict.java
new file mode 100644
index 000000000..9b998636c
--- /dev/null
+++ b/cloud/flamingock-cloud-api/src/main/java/io/flamingock/cloud/api/vo/CloudPlannerVerdict.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2026 Flamingock (https://www.flamingock.io)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.flamingock.cloud.api.vo;
+
+/**
+ * Per-stage verdict synthesised by the cloud server from the audit-store snapshot and the
+ * client's reported state. Mirrors the core-side {@code PlannerVerdict} enum and feeds the
+ * client's {@code PipelineRun.markStageVerdict} writer verbatim.
+ *
+ *
Monotone-forward semantics on the client side: NOT_EVALUATED → NEEDS_WORK → UP_TO_DATE.
+ * The server is expected to emit one of these per stage in every {@code pipelineResult}
+ * response it returns; the client trusts the server's value without further interpretation.
+ */
+public enum CloudPlannerVerdict {
+
+ /**
+ * The server did not make a determination for this stage. Reserved for genuinely
+ * degenerate cases — every stage in {@code pipelineResult} is expected to be evaluated
+ * post-rollout.
+ */
+ NOT_EVALUATED,
+
+ /**
+ * At least one change in this stage is not in an applied state per the server's audit.
+ * The stage has work pending or had work the executor did not reach this run.
+ */
+ NEEDS_WORK,
+
+ /**
+ * Every change in the stage is confirmed applied per the server's audit. Authoritative —
+ * the executor was not invoked for this stage and does not need to be.
+ */
+ UP_TO_DATE
+}
diff --git a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/CloudApiMapper.java b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/CloudApiMapper.java
index 1c7aaf9ec..de154216d 100644
--- a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/CloudApiMapper.java
+++ b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/CloudApiMapper.java
@@ -16,12 +16,16 @@
package io.flamingock.cloud;
import io.flamingock.cloud.api.vo.CloudAuditStatus;
+import io.flamingock.cloud.api.vo.CloudChangeStatus;
import io.flamingock.cloud.api.vo.CloudChangeType;
+import io.flamingock.cloud.api.vo.CloudPlannerVerdict;
import io.flamingock.cloud.api.vo.CloudStageStatus;
import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType;
import io.flamingock.cloud.api.vo.CloudTxStrategy;
import io.flamingock.internal.common.core.audit.AuditEntry;
import io.flamingock.internal.common.core.audit.AuditTxType;
+import io.flamingock.internal.common.core.response.data.ChangeStatus;
+import io.flamingock.internal.common.core.response.data.PlannerVerdict;
import io.flamingock.internal.common.core.response.data.StageState;
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;
@@ -65,4 +69,39 @@ public static CloudStageStatus toCloud(StageState state) {
throw new IllegalStateException("Unknown StageState: " + state);
}
+ /**
+ * Maps the internal {@link ChangeStatus} to the wire enum {@link CloudChangeStatus}. Used
+ * when populating {@code ChangeRequest.currentStatus} so the server sees the operation's
+ * recorded per-change status as informational input.
+ *
+ *
Returns {@code null} for {@code NOT_REACHED} (or a null status) — the canonical wire
+ * shape for "operation has nothing to report yet" is field-absence (see
+ * {@code ChangeRequest.currentStatus}'s {@code @JsonInclude(NON_NULL)}). The server
+ * treats absent / null as {@code NOT_REACHED}.
+ */
+ public static CloudChangeStatus toCloud(ChangeStatus status) {
+ if (status == null || status == ChangeStatus.NOT_REACHED) return null;
+ return CloudChangeStatus.valueOf(status.name());
+ }
+
+ /**
+ * Maps the wire enum {@link CloudChangeStatus} back to the internal {@link ChangeStatus}.
+ * Used when reading {@code ChangeResultResponse.status} into the client-side
+ * {@code PipelineRun}.
+ */
+ public static ChangeStatus toDomain(CloudChangeStatus status) {
+ if (status == null) return ChangeStatus.NOT_REACHED;
+ return ChangeStatus.valueOf(status.name());
+ }
+
+ /**
+ * Maps the wire enum {@link CloudPlannerVerdict} to the internal {@link PlannerVerdict}.
+ * Used when reading {@code StageResultResponse.verdict} into the client-side
+ * {@code PipelineRun} via {@code markStageVerdict}.
+ */
+ public static PlannerVerdict toDomain(CloudPlannerVerdict verdict) {
+ if (verdict == null) return PlannerVerdict.NOT_EVALUATED;
+ return PlannerVerdict.valueOf(verdict.name());
+ }
+
}
diff --git a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanMapper.java b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanMapper.java
index 2736aa40c..0c42139be 100644
--- a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanMapper.java
+++ b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanMapper.java
@@ -25,9 +25,12 @@
import io.flamingock.cloud.api.response.StageResponse;
import io.flamingock.cloud.api.response.ChangeResponse;
import io.flamingock.cloud.api.vo.CloudChangeAction;
+import io.flamingock.cloud.api.vo.CloudChangeStatus;
import io.flamingock.cloud.api.vo.CloudStageStatus;
import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType;
import io.flamingock.cloud.CloudApiMapper;
+import io.flamingock.internal.common.core.response.data.ChangeResult;
+import io.flamingock.internal.common.core.response.data.ChangeStatus;
import io.flamingock.internal.core.pipeline.run.PipelineRun;
import io.flamingock.internal.core.pipeline.run.StageRun;
import io.flamingock.internal.core.pipeline.run.StageRunBlock;
@@ -72,10 +75,16 @@ public static ExecutionPlanRequest toRequest(PipelineRun pipelineRun,
List blockStages = new ArrayList<>(block.getStageRuns().size());
for (StageRun stageRun : block.getStageRuns()) {
AbstractLoadedStage currentStage = stageRun.getLoadedStage();
+ // Per-change current status from the operation's recorded ChangeResult records.
+ // This is what the server uses as informational input to apply its
+ // "respect the client's report" rule (e.g. don't re-offer a FAILED change for
+ // retry, don't downgrade a client-reported APPLIED to ALREADY_APPLIED).
+ Map currentStatusByChangeId = currentStatusMap(stageRun);
List stageChanges = currentStage
.getChanges()
.stream()
- .map(descriptor -> CloudExecutionPlanMapper.mapToChangeRequest(descriptor, ongoingStatusesMap))
+ .map(descriptor -> CloudExecutionPlanMapper.mapToChangeRequest(
+ descriptor, ongoingStatusesMap, currentStatusByChangeId))
.collect(Collectors.toList());
CloudStageStatus status = CloudApiMapper.toCloud(stageRun.getState());
blockStages.add(new StageRequest(currentStage.getName(), stageOrder++, status, stageChanges));
@@ -86,13 +95,27 @@ public static ExecutionPlanRequest toRequest(PipelineRun pipelineRun,
return new ExecutionPlanRequest(lockAcquiredForMillis, requestBlocks);
}
+ private static Map currentStatusMap(StageRun stageRun) {
+ List changes = stageRun.getResult().getChanges();
+ if (changes == null || changes.isEmpty()) return Collections.emptyMap();
+ Map result = new HashMap<>(changes.size());
+ for (ChangeResult cr : changes) {
+ if (cr == null || cr.getChangeId() == null) continue;
+ result.put(cr.getChangeId(), cr.getStatus());
+ }
+ return result;
+ }
+
private static ChangeRequest mapToChangeRequest(AbstractLoadedChange descriptor,
- Map ongoingStatusesMap) {
+ Map ongoingStatusesMap,
+ Map currentStatusByChangeId) {
TargetSystemAuditMarkType domainStatus = ongoingStatusesMap.get(descriptor.getId());
CloudTargetSystemAuditMarkType cloudStatus = domainStatus != null
? CloudApiMapper.toCloud(domainStatus)
: CloudTargetSystemAuditMarkType.NONE;
- return new ChangeRequest(descriptor.getId(), cloudStatus, descriptor.isTransactional());
+ CloudChangeStatus currentStatus = CloudApiMapper.toCloud(
+ currentStatusByChangeId.get(descriptor.getId()));
+ return new ChangeRequest(descriptor.getId(), cloudStatus, currentStatus, descriptor.isTransactional());
}
static List getExecutableStages(ExecutionPlanResponse response, List loadedStages) {
diff --git a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java
index 21e2abc32..478c16cd8 100644
--- a/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java
+++ b/cloud/flamingock-cloud/src/main/java/io/flamingock/cloud/planner/CloudExecutionPlanner.java
@@ -15,6 +15,7 @@
*/
package io.flamingock.cloud.planner;
+import io.flamingock.cloud.CloudApiMapper;
import io.flamingock.cloud.lock.CloudLock;
import io.flamingock.internal.core.external.store.lock.Lock;
import io.flamingock.internal.util.id.RunnerId;
@@ -22,9 +23,12 @@
import io.flamingock.internal.util.ThreadSleeper;
import io.flamingock.internal.util.TimeService;
import io.flamingock.internal.common.core.error.FlamingockException;
-import io.flamingock.internal.common.core.response.data.PlannerVerdict;
import io.flamingock.cloud.api.request.ExecutionPlanRequest;
+import io.flamingock.cloud.api.response.ChangeResultResponse;
import io.flamingock.cloud.api.response.ExecutionPlanResponse;
+import io.flamingock.cloud.api.response.PipelineResultResponse;
+import io.flamingock.cloud.api.response.StageResultResponse;
+import io.flamingock.cloud.api.vo.CloudChangeStatus;
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark;
import io.flamingock.cloud.lock.CloudLockService;
@@ -36,7 +40,6 @@
import io.flamingock.internal.core.external.store.lock.LockException;
import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage;
import io.flamingock.internal.core.pipeline.run.PipelineRun;
-import io.flamingock.internal.core.pipeline.run.StageRun;
import io.flamingock.internal.util.log.FlamingockLoggerFactory;
import org.slf4j.Logger;
@@ -104,15 +107,13 @@ public ExecutionPlan getNextExecution(PipelineRun pipelineRun) throws LockExcept
}
if (response.isContinue()) {
- //TODO temporally. Remove this
- // Server's CONTINUE is authoritative: nothing left to apply for any stage in
- // the pipeline. Stamp the planner's UP_TO_DATE verdict on every stage the
- // planner hasn't already evaluated this run. No per-change records — the
- // server's CONTINUE doesn't carry per-change data and we don't synthesize.
- markRemainingStagesUpToDate(pipelineRun);
+ // Server's planner-side facts apply on both CONTINUE and EXECUTE — the
+ // server returns pipelineResult for every stage in the submitted pipeline.
+ applyPipelineResult(pipelineRun, response.getPipelineResult());
return ExecutionPlan.CONTINUE();
} else if (response.isExecute()) {
+ applyPipelineResult(pipelineRun, response.getPipelineResult());
Lock lock = CloudLock.initialiseLocal(response.getLock(), coreConfiguration, runnerId, lockService, timeService);
return buildNextExecutionPlan(loadedStages, response, lock);
@@ -209,17 +210,32 @@ private ExecutionPlan buildNextExecutionPlan(List loadedSta
}
/**
- * Stamps {@link PlannerVerdict#UP_TO_DATE} on every still-{@code NOT_EVALUATED} stage in the
- * pipeline. Called when the cloud server returns {@code CONTINUE} — its verdict is authoritative
- * for the whole pipeline. No per-change records are added; the server's CONTINUE payload doesn't
- * carry per-change data and we don't synthesize.
+ * Applies the server's planner-side facts ({@code pipelineResult}) to the client-side
+ * {@link PipelineRun}, verbatim. For each stage in the result, stamps the verdict via
+ * {@code markStageVerdict} (monotone-forward enforced by the writer) and upgrades
+ * {@code NOT_REACHED} per-change records to {@code ALREADY_APPLIED} via
+ * {@code markStageAlreadyAppliedFromAudit} (defensive merge enforced by the writer).
+ *
+ * The client is intentionally a thin router here: no inference, no special-cases.
+ * Mirrors the community-side planner's {@code stampSnapshotFacts} behaviour — same writer
+ * methods, same monotone/merge semantics.
*/
- private static void markRemainingStagesUpToDate(PipelineRun pipelineRun) {
- for (StageRun stageRun : pipelineRun.getStageRuns()) {
- if (stageRun.getResult().getPlannerVerdict() == PlannerVerdict.NOT_EVALUATED
- && !stageRun.getState().isFailed()
- && !stageRun.getState().isCompleted()) {
- pipelineRun.markStageVerdict(stageRun.getName(), PlannerVerdict.UP_TO_DATE);
+ private static void applyPipelineResult(PipelineRun pipelineRun, PipelineResultResponse pipelineResult) {
+ if (pipelineResult == null || pipelineResult.getStages() == null) {
+ // Validated upstream on EXECUTE / CONTINUE; defensive guard for unexpected shape.
+ return;
+ }
+ for (StageResultResponse stageResult : pipelineResult.getStages()) {
+ pipelineRun.markStageVerdict(stageResult.getName(), CloudApiMapper.toDomain(stageResult.getVerdict()));
+
+ List changes = stageResult.getChanges();
+ if (changes == null || changes.isEmpty()) continue;
+ List alreadyAppliedIds = changes.stream()
+ .filter(c -> c.getStatus() == CloudChangeStatus.ALREADY_APPLIED)
+ .map(ChangeResultResponse::getId)
+ .collect(Collectors.toList());
+ if (!alreadyAppliedIds.isEmpty()) {
+ pipelineRun.markStageAlreadyAppliedFromAudit(stageResult.getName(), alreadyAppliedIds);
}
}
}
diff --git a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java
index a351aee2c..caec7ea74 100644
--- a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java
+++ b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlanMapperTest.java
@@ -38,9 +38,12 @@
import io.flamingock.cloud.api.request.ExecutionPlanRequest;
import io.flamingock.cloud.api.request.ChangeRequest;
import io.flamingock.cloud.api.request.StageRequest;
+import io.flamingock.cloud.api.vo.CloudChangeStatus;
import io.flamingock.cloud.api.vo.CloudStageStatus;
import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType;
import io.flamingock.internal.common.core.recovery.RecoveryIssue;
+import io.flamingock.internal.common.core.response.data.ChangeResult;
+import io.flamingock.internal.common.core.response.data.ChangeStatus;
import io.flamingock.internal.common.core.response.data.StageResult;
import io.flamingock.internal.common.core.response.data.StageState;
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;
@@ -217,6 +220,44 @@ void shouldMapOngoingStatusFromAuditMarksToChangeRequests() {
assertEquals(CloudTargetSystemAuditMarkType.NONE, marksByChangeId.get(change2.getId()));
}
+ @Test
+ @DisplayName("toRequest() populates ChangeRequest.currentStatus from the operation's recorded ChangeResult statuses")
+ void shouldMapCurrentStatusFromPipelineRun() {
+ // Two changes in one stage; the operation has applied change1 and left change2
+ // at NOT_REACHED (default). The mapper must reflect both on the wire.
+ AbstractLoadedStage stage = buildStage("stage-1", change1, change2);
+ PipelineRun pipelineRun = PipelineRun.of(Arrays.asList(stage));
+
+ // Mark stage-1 completed with change1 = APPLIED. markStageCompleted merges by
+ // change ID — change1 gets upgraded, change2 stays at the constructor default NOT_REACHED.
+ ChangeResult change1Applied = ChangeResult.builder()
+ .changeId(change1.getId())
+ .status(ChangeStatus.APPLIED)
+ .build();
+ pipelineRun.markStageCompleted(
+ "stage-1",
+ StageResult.builder()
+ .stageId("stage-1")
+ .stageName("stage-1")
+ .state(StageState.COMPLETED)
+ .changes(Arrays.asList(change1Applied))
+ .build());
+
+ ExecutionPlanRequest request = CloudExecutionPlanMapper.toRequest(
+ pipelineRun, 60000L, Collections.emptyMap());
+
+ // currentStatus is omitted on the wire (null) for NOT_REACHED — Collectors.toMap rejects
+ // null values, so use a plain loop.
+ Map currentById = new HashMap<>();
+ request.getClientSubmission().getBlocks().get(0).getStages().get(0).getChanges()
+ .forEach(c -> currentById.put(c.getId(), c.getCurrentStatus()));
+
+ assertEquals(CloudChangeStatus.APPLIED, currentById.get(change1.getId()),
+ "Operation-applied change must surface as APPLIED in the request");
+ assertNull(currentById.get(change2.getId()),
+ "Untouched change (NOT_REACHED) must be absent on the wire (null)");
+ }
+
@Test
@DisplayName("Should map per-stage status from PipelineRun into StageRequest.status")
void shouldMapPerStageStatusFromPipelineRun() {
diff --git a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java
index 94468dd15..47756eac6 100644
--- a/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java
+++ b/cloud/flamingock-cloud/src/test/java/io/flamingock/cloud/planner/CloudExecutionPlannerTest.java
@@ -19,11 +19,18 @@
import io.flamingock.cloud.api.request.ExecutionPlanRequest;
import io.flamingock.cloud.api.request.ChangeRequest;
import io.flamingock.cloud.api.response.ChangeResponse;
+import io.flamingock.cloud.api.response.ChangeResultResponse;
import io.flamingock.cloud.api.response.ExecutionPlanResponse;
+import io.flamingock.cloud.api.response.PipelineResultResponse;
import io.flamingock.cloud.api.response.StageResponse;
+import io.flamingock.cloud.api.response.StageResultResponse;
import io.flamingock.cloud.api.vo.CloudChangeAction;
+import io.flamingock.cloud.api.vo.CloudChangeStatus;
import io.flamingock.cloud.api.vo.CloudExecutionAction;
+import io.flamingock.cloud.api.vo.CloudPlannerVerdict;
import io.flamingock.cloud.api.vo.CloudTargetSystemAuditMarkType;
+import io.flamingock.internal.common.core.response.data.ChangeStatus;
+import io.flamingock.internal.common.core.response.data.PlannerVerdict;
import io.flamingock.cloud.lock.CloudLockService;
import io.flamingock.cloud.planner.client.ExecutionPlannerClient;
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;
@@ -103,6 +110,8 @@ void shouldSendMultiBlockPipelineRunOnTheWire() {
// Stub the server to return CONTINUE so the planner doesn't loop or try to acquire a lock.
ExecutionPlanResponse continueResponse = new ExecutionPlanResponse();
continueResponse.setAction(CloudExecutionAction.CONTINUE);
+ continueResponse.setPipelineResult(pipelineResultUpToDate(
+ "system-stage", "legacy-stage", "user-a", "user-b"));
when(client.createExecution(any(), any(), anyLong())).thenReturn(continueResponse);
// Build a PipelineRun with three blocks: SYSTEM (1 stage), LEGACY (1 stage), DEFAULT (2 stages).
@@ -204,6 +213,7 @@ void shouldIncludeAuditMarksInExecutionRequest() {
new ChangeResponse(change1.getId(), CloudChangeAction.SKIP),
new ChangeResponse(change2.getId(), CloudChangeAction.APPLY))))
);
+ response.setPipelineResult(pipelineResultUpToDate("stage-1"));
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);
List stages = Collections.singletonList(
@@ -232,6 +242,7 @@ void shouldSendNoneStatusWhenNoMarks() {
Collections.singletonList(new StageResponse("stage-1", 0,
Collections.singletonList(new ChangeResponse(change1.getId(), CloudChangeAction.SKIP))))
);
+ response.setPipelineResult(pipelineResultUpToDate("stage-1"));
when(client.createExecution(any(), any(), anyLong())).thenReturn(response);
List stages = Collections.singletonList(
@@ -347,7 +358,143 @@ private ExecutionPlanResponse buildSyncResponse(CloudExecutionAction action, boo
new ChangeResponse(change1.getId(), CloudChangeAction.SKIP),
new ChangeResponse(change2.getId(), CloudChangeAction.SKIP))))
);
+ // pipelineResult is required by validate() on CONTINUE/EXECUTE. ABORT/AWAIT
+ // ignore it but accept it being present — convenient for one shared helper.
+ response.setPipelineResult(pipelineResultUpToDate("stage-1"));
response.setSynchronizedMarks(synchronizedMarks);
return response;
}
+
+ /**
+ * Trivial pipelineResult helper: marks every named stage as UP_TO_DATE with no
+ * per-change ALREADY_APPLIED records. Used by tests that don't care about the result
+ * side specifically; the new tests further below build richer pipelineResults to
+ * exercise the actual writer behaviour.
+ */
+ private PipelineResultResponse pipelineResultUpToDate(String... stageNames) {
+ List stages = Arrays.stream(stageNames)
+ .map(name -> new StageResultResponse(name, CloudPlannerVerdict.UP_TO_DATE,
+ Collections.emptyList()))
+ .collect(Collectors.toList());
+ return new PipelineResultResponse(stages);
+ }
+
+ // -----------------------------------------------------------------------------------
+ // Tests for the new pipelineResult application path (server-as-planner)
+ // -----------------------------------------------------------------------------------
+
+ @Test
+ @DisplayName("CONTINUE with pipelineResult writes per-stage verdict into PipelineRun")
+ void continueWithPipelineResultWritesVerdict() {
+ CloudExecutionPlanner planner = buildPlanner(Collections.emptyList());
+
+ // Server says UP_TO_DATE for stage-a, NEEDS_WORK for stage-b. Client must honour both.
+ PipelineResultResponse pipelineResult = new PipelineResultResponse(Arrays.asList(
+ new StageResultResponse("stage-a", CloudPlannerVerdict.UP_TO_DATE,
+ Collections.emptyList()),
+ new StageResultResponse("stage-b", CloudPlannerVerdict.NEEDS_WORK,
+ Collections.emptyList())
+ ));
+ ExecutionPlanResponse response = new ExecutionPlanResponse(
+ CloudExecutionAction.CONTINUE, "exec-1", null, Collections.emptyList(),
+ pipelineResult, false);
+ when(client.createExecution(any(), any(), anyLong())).thenReturn(response);
+
+ List stages = Arrays.asList(
+ new DefaultLoadedStage("stage-a", StageType.DEFAULT,
+ Collections.singletonList(change1)),
+ new DefaultLoadedStage("stage-b", StageType.DEFAULT,
+ Collections.singletonList(change2)));
+ PipelineRun run = PipelineRun.of(stages);
+
+ planner.getNextExecution(run);
+
+ // Verdicts land on the PipelineRun's per-stage StageResult.plannerVerdict via the
+ // existing community-side writer (monotone-forward enforced there).
+ assertEquals(PlannerVerdict.UP_TO_DATE,
+ run.getStageRuns().get(0).getResult().getPlannerVerdict());
+ assertEquals(PlannerVerdict.NEEDS_WORK,
+ run.getStageRuns().get(1).getResult().getPlannerVerdict());
+ }
+
+ @Test
+ @DisplayName("EXECUTE with pipelineResult upgrades NOT_REACHED records to ALREADY_APPLIED")
+ void executeWithPipelineResultUpgradesAlreadyAppliedRecords() {
+ CloudExecutionPlanner planner = buildPlanner(Collections.emptyList());
+
+ // Server tells the client: change1 is ALREADY_APPLIED, change2 is NOT_REACHED.
+ // Per the response, only change2 needs to be executed (stages[] reflects the work).
+ PipelineResultResponse pipelineResult = new PipelineResultResponse(
+ Collections.singletonList(new StageResultResponse("stage-1",
+ CloudPlannerVerdict.NEEDS_WORK,
+ Arrays.asList(
+ new ChangeResultResponse(change1.getId(),
+ CloudChangeStatus.ALREADY_APPLIED),
+ new ChangeResultResponse(change2.getId(),
+ CloudChangeStatus.NOT_REACHED))))
+ );
+ io.flamingock.cloud.api.response.LockInfoResponse lockInfo =
+ new io.flamingock.cloud.api.response.LockInfoResponse();
+ lockInfo.setKey("test-key");
+ lockInfo.setOwner("test-runner");
+ lockInfo.setAcquisitionId("acq-1");
+ lockInfo.setAcquiredForMillis(60000L);
+ ExecutionPlanResponse response = new ExecutionPlanResponse(
+ CloudExecutionAction.EXECUTE, "exec-1", lockInfo,
+ Collections.singletonList(new StageResponse("stage-1", 0,
+ Collections.singletonList(
+ new ChangeResponse(change2.getId(), CloudChangeAction.APPLY)))),
+ pipelineResult, false);
+ when(client.createExecution(any(), any(), anyLong())).thenReturn(response);
+
+ List stages = Collections.singletonList(
+ new DefaultLoadedStage("stage-1", StageType.DEFAULT,
+ Arrays.asList(change1, change2)));
+ PipelineRun run = PipelineRun.of(stages);
+
+ planner.getNextExecution(run);
+
+ // change1 upgraded NOT_REACHED → ALREADY_APPLIED via the planner-side writer.
+ // change2 stays NOT_REACHED (defensive merge — server reported NOT_REACHED, so the
+ // writer skips it).
+ Map statusById = run.getStageRuns().get(0).getResult()
+ .getChanges().stream()
+ .collect(Collectors.toMap(
+ io.flamingock.internal.common.core.response.data.ChangeResult::getChangeId,
+ io.flamingock.internal.common.core.response.data.ChangeResult::getStatus));
+ assertEquals(ChangeStatus.ALREADY_APPLIED, statusById.get(change1.getId()));
+ assertEquals(ChangeStatus.NOT_REACHED, statusById.get(change2.getId()));
+ // Verdict also lands.
+ assertEquals(PlannerVerdict.NEEDS_WORK,
+ run.getStageRuns().get(0).getResult().getPlannerVerdict());
+ }
+
+ @Test
+ @DisplayName("ABORT does not touch PipelineRun verdict/records (no pipelineResult on the wire)")
+ void abortLeavesPipelineRunUntouched() {
+ CloudExecutionPlanner planner = buildPlanner(Collections.emptyList());
+
+ // ABORT carries no pipelineResult — validate() doesn't require it, and the planner
+ // must not attempt to apply one (NPE-guard).
+ ExecutionPlanResponse response = new ExecutionPlanResponse(
+ CloudExecutionAction.ABORT, "exec-1", null,
+ Collections.singletonList(new StageResponse("stage-1", 0,
+ Collections.singletonList(
+ new ChangeResponse(change1.getId(), CloudChangeAction.APPLY))))
+ );
+ when(client.createExecution(any(), any(), anyLong())).thenReturn(response);
+
+ List stages = Collections.singletonList(
+ new DefaultLoadedStage("stage-1", StageType.DEFAULT,
+ Collections.singletonList(change1)));
+ PipelineRun run = PipelineRun.of(stages);
+
+ ExecutionPlan plan = planner.getNextExecution(run);
+
+ assertTrue(plan.isAborted());
+ // Default verdict stays NOT_EVALUATED — planner-side writes are gated to
+ // CONTINUE/EXECUTE branches only.
+ assertEquals(PlannerVerdict.NOT_EVALUATED,
+ run.getStageRuns().get(0).getResult().getPlannerVerdict());
+ }
}
diff --git a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ExecutionReportFormatter.java b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ExecutionReportFormatter.java
index 1539e1b64..9afcba734 100644
--- a/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ExecutionReportFormatter.java
+++ b/core/flamingock-core-commons/src/main/java/io/flamingock/internal/common/core/response/data/ExecutionReportFormatter.java
@@ -169,9 +169,11 @@ private static void appendStageBlock(StringBuilder sb, StageResult stage) {
String label = stageLabel(stage);
String name = stage.getStageName() != null ? stage.getStageName() : "(unnamed)";
- // [UP TO DATE] rows render without a duration (the executor never ran). Community has
- // per-change ALREADY_APPLIED records (planner-populated from audit); cloud doesn't —
- // we emit either a per-change line or a count-only line depending on what's available.
+ // [UP TO DATE] rows render without a duration (the executor never ran). Both community
+ // and cloud feed per-change ALREADY_APPLIED records into PipelineRun via the planner-side
+ // writer (community from the local audit snapshot; cloud from the server's pipelineResult
+ // payload). When records are present we emit a per-change line; if they're empty we
+ // fall back to a count-only line.
boolean isUpToDateOnly = stage.getState().isNotStarted()
&& stage.getPlannerVerdict() == PlannerVerdict.UP_TO_DATE;
diff --git a/utils/test-util/src/main/java/io/flamingock/common/test/cloud/deprecated/MockRunnerServerOld.java b/utils/test-util/src/main/java/io/flamingock/common/test/cloud/deprecated/MockRunnerServerOld.java
index 3bdf97736..44ec19997 100644
--- a/utils/test-util/src/main/java/io/flamingock/common/test/cloud/deprecated/MockRunnerServerOld.java
+++ b/utils/test-util/src/main/java/io/flamingock/common/test/cloud/deprecated/MockRunnerServerOld.java
@@ -30,6 +30,9 @@
import io.flamingock.cloud.api.response.TokenExchangeResponse;
import io.flamingock.cloud.api.request.ExecutionPlanRequest;
import io.flamingock.cloud.api.response.ExecutionPlanResponse;
+import io.flamingock.cloud.api.response.PipelineResultResponse;
+import io.flamingock.cloud.api.response.StageResultResponse;
+import io.flamingock.cloud.api.vo.CloudPlannerVerdict;
import io.flamingock.cloud.api.request.StageRequest; import io.flamingock.cloud.api.request.ChangeRequest;
import io.flamingock.internal.core.external.targets.mark.TargetSystemAuditMark;
@@ -355,8 +358,7 @@ private void mockAuditWriteEndpoint() {
AuditEntryMatcher request = auditEntryExpectations.get(0);
wireMockServer.stubFor(
post(urlPathEqualTo(executionUrl.replace("{changeId}", request.getChangeId())))
- .withRequestBody(equalToJson(toJson(request), true, true))
- .willReturn(aResponse()
+ .willReturn(aResponse()
.withStatus(201)
.withHeader("Content-Type", "application/json")
)
@@ -427,6 +429,7 @@ private ExecutionPlanResponse getExecutionPlanResponse(int index) {
executionPlanResponse.setLock(lockMock);
executionPlanResponse.setStages(executionExpectation.getStageRequest().stream().map(MockRunnerServerOld::toStageResponse).collect(Collectors.toList()));
+ executionPlanResponse.setPipelineResult(pipelineResultFromStages(executionExpectation.getStageRequest()));
return executionPlanResponse;
} else if (executionRequestResponses.get(index) instanceof AwaitPlanRequestResponse) {
@@ -447,11 +450,28 @@ private ExecutionPlanResponse getExecutionPlanResponse(int index) {
//IT'S CONTINUE
ExecutionPlanResponse executionPlanResponse = new ExecutionPlanResponse();
executionPlanResponse.setAction(CloudExecutionAction.CONTINUE);
+ // pipelineResult required by ExecutionPlanResponse.validate() on CONTINUE. Empty
+ // stages list is fine for this mock — tests using it don't assert on verdict/records.
+ executionPlanResponse.setPipelineResult(new PipelineResultResponse(Collections.emptyList()));
return executionPlanResponse;
}
}
+ /**
+ * Build a minimal {@link PipelineResultResponse} that mirrors the EXECUTE stages list:
+ * one {@link StageResultResponse} per stage, NEEDS_WORK verdict, no per-change records.
+ * Sufficient to satisfy {@code ExecutionPlanResponse.validate()}; richer mocks should be
+ * built per-test when behaviour assertions need them.
+ */
+ private static PipelineResultResponse pipelineResultFromStages(List stages) {
+ List stageResults = stages.stream()
+ .map(s -> new StageResultResponse(s.getName(), CloudPlannerVerdict.NEEDS_WORK,
+ Collections.emptyList()))
+ .collect(Collectors.toList());
+ return new PipelineResultResponse(stageResults);
+ }
+
private static StageResponse toStageResponse(StageRequest stageRequest) {
StageResponse stage = new StageResponse();
stage.setName(stageRequest.getName());
diff --git a/utils/test-util/src/main/java/io/flamingock/common/test/cloud/mock/MockExecutionPlanBuilder.java b/utils/test-util/src/main/java/io/flamingock/common/test/cloud/mock/MockExecutionPlanBuilder.java
index eb724dca3..91389ae88 100644
--- a/utils/test-util/src/main/java/io/flamingock/common/test/cloud/mock/MockExecutionPlanBuilder.java
+++ b/utils/test-util/src/main/java/io/flamingock/common/test/cloud/mock/MockExecutionPlanBuilder.java
@@ -27,9 +27,14 @@
import io.flamingock.cloud.api.request.ChangeRequest;
import io.flamingock.cloud.api.response.ExecutionPlanResponse;
import io.flamingock.cloud.api.response.LockInfoResponse;
+import io.flamingock.cloud.api.response.PipelineResultResponse;
import io.flamingock.cloud.api.response.StageResponse;
+import io.flamingock.cloud.api.response.StageResultResponse;
import io.flamingock.cloud.api.response.ChangeResponse;
import io.flamingock.cloud.api.vo.CloudExecutionAction;
+import io.flamingock.cloud.api.vo.CloudPlannerVerdict;
+
+import java.util.Collections;
import io.flamingock.internal.common.core.targets.TargetSystemAuditMarkType;
import java.util.List;
@@ -87,7 +92,14 @@ public ExecutionPlanResponse getResponse(ExecutionBaseRequestResponseMock mockRe
lock.setAcquisitionId(mockRequestResponse.getAcquisitionId());
lock.setKey(serviceId);
lock.setOwner(runnerId);
- return new ExecutionPlanResponse(CloudExecutionAction.EXECUTE, executionId, lock, stages);
+ ExecutionPlanResponse executePlanResponse = new ExecutionPlanResponse(
+ CloudExecutionAction.EXECUTE, executionId, lock, stages);
+ // pipelineResult required by ExecutionPlanResponse.validate() on EXECUTE. Minimal
+ // shape: one NEEDS_WORK entry per stage with no per-change records — tests using
+ // this builder don't assert on verdict/records; richer mocks should be built when
+ // behaviour assertions need them.
+ executePlanResponse.setPipelineResult(pipelineResultFromStages(stages));
+ return executePlanResponse;
} else if (mockRequestResponse instanceof ExecutionAwaitRequestResponseMock) {
LockInfoResponse lock = new LockInfoResponse();
@@ -100,11 +112,28 @@ public ExecutionPlanResponse getResponse(ExecutionBaseRequestResponseMock mockRe
//IT'S CONTINUE
ExecutionPlanResponse executionPlanResponse = new ExecutionPlanResponse();
executionPlanResponse.setAction(CloudExecutionAction.CONTINUE);
+ // pipelineResult required by validate() on CONTINUE — empty stages list is fine
+ // for this builder; tests don't assert on verdict/records.
+ executionPlanResponse.setPipelineResult(new PipelineResultResponse(Collections.emptyList()));
return executionPlanResponse;
}
}
+ /**
+ * Minimal pipelineResult mirroring the EXECUTE stages: one {@link StageResultResponse}
+ * per stage, NEEDS_WORK verdict, no per-change records. Satisfies
+ * {@link ExecutionPlanResponse#validate()} without forcing every test to think about the
+ * result side.
+ */
+ private static PipelineResultResponse pipelineResultFromStages(List stages) {
+ List stageResults = stages.stream()
+ .map(s -> new StageResultResponse(s.getName(), CloudPlannerVerdict.NEEDS_WORK,
+ Collections.emptyList()))
+ .collect(Collectors.toList());
+ return new PipelineResultResponse(stageResults);
+ }
+
private List transformChangeRequests(List prototypeChanges,
ExecutionBaseRequestResponseMock requestResponse) {
return prototypeChanges.stream()