From 50c796fa313a860fe9463655bcda5cf93c534782 Mon Sep 17 00:00:00 2001 From: littleya Date: Sun, 17 May 2026 13:59:35 +0800 Subject: [PATCH 1/2] [ha]: setup self-fencer earlier during host reconnect This change is necessary because HA self-fencer setup needs to run soon after the hypervisor connect hook completes during host reconnect. This commit adds a host-level after-connect-hook extension point and marks reconnect-generated connect messages so HA can distinguish real host reconnects without changing existing connect single-flight behavior. It also propagates the one-time status-check bypass to KVM self-fencer setup paths and fixes host capacity check timeout setup. DBImpact Resolves: ZSTAC-69133 Change-Id: I7a73746163693639313333636f6d70757465 Co-Authored-By: Claude Opus 4.7 --- .../HostAfterConnectHookExtensionPoint.java | 9 ++ .../org/zstack/compute/host/HostBase.java | 107 ++++++++++++++++++ .../zstack/compute/host/HostManagerImpl.java | 4 + .../ceph/primary/CephPrimaryStorageBase.java | 2 +- .../zstack/kvm/KVMHostCapacityExtension.java | 1 + .../kvm/KvmSetupSelfFencerExtensionPoint.java | 9 ++ 6 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 compute/src/main/java/org/zstack/compute/host/HostAfterConnectHookExtensionPoint.java diff --git a/compute/src/main/java/org/zstack/compute/host/HostAfterConnectHookExtensionPoint.java b/compute/src/main/java/org/zstack/compute/host/HostAfterConnectHookExtensionPoint.java new file mode 100644 index 00000000000..767f1d443d7 --- /dev/null +++ b/compute/src/main/java/org/zstack/compute/host/HostAfterConnectHookExtensionPoint.java @@ -0,0 +1,9 @@ +package org.zstack.compute.host; + +import org.zstack.header.core.workflow.Flow; +import org.zstack.header.host.ConnectHostInfo; +import org.zstack.header.host.HostInventory; + +public interface HostAfterConnectHookExtensionPoint { + Flow createAfterConnectHookFlow(HostInventory host, ConnectHostInfo info, boolean reconnect); +} diff --git a/compute/src/main/java/org/zstack/compute/host/HostBase.java b/compute/src/main/java/org/zstack/compute/host/HostBase.java index 919d92631fc..054dbe53431 100755 --- a/compute/src/main/java/org/zstack/compute/host/HostBase.java +++ b/compute/src/main/java/org/zstack/compute/host/HostBase.java @@ -64,6 +64,7 @@ @Configurable(preConstruction = true, autowire = Autowire.BY_TYPE) public abstract class HostBase extends AbstractHost { protected static final CLogger logger = Utils.getLogger(HostBase.class); + private static final String AFTER_CONNECT_HOOK_RECONNECT = "afterConnectHookReconnect"; protected HostVO self; @Autowired @@ -1063,6 +1064,7 @@ public void run(final SyncTaskChain chain) { ConnectHostMsg connectMsg = new ConnectHostMsg(self.getUuid()); connectMsg.setNewAdd(false); connectMsg.setCalledByAPI(msg.isCalledByAPI()); + connectMsg.putHeaderEntry(AFTER_CONNECT_HOOK_RECONNECT, Boolean.TRUE.toString()); bus.makeTargetServiceIdByResourceUuid(connectMsg, HostConstant.SERVICE_ID, self.getUuid()); bus.send(connectMsg, new CloudBusCallBack(msg, chain, completion) { @Override @@ -1325,6 +1327,111 @@ public void fail(ErrorCode errorCode) { } }); + flow(new Flow() { + String __name__ = "call-after-connect-hook-extensions"; + private final List afterConnectHookFlows = new ArrayList<>(); + private boolean done; + + @Override + public void run(FlowTrigger trigger, Map data) { + FlowChain afterConnectHookChain = FlowChainBuilder.newSimpleFlowChain(); + afterConnectHookChain.allowEmptyFlow(); + + self = dbf.reload(self); + HostInventory inv = getSelfInventory(); + ConnectHostInfo info = ConnectHostInfo.fromConnectHostMsg(msg); + boolean reconnect = Boolean.parseBoolean(msg.getHeaderEntry(AFTER_CONNECT_HOOK_RECONNECT)); + + for (HostAfterConnectHookExtensionPoint p : pluginRgty.getExtensionList(HostAfterConnectHookExtensionPoint.class)) { + Flow flow = p.createAfterConnectHookFlow(inv, info, reconnect); + if (flow != null) { + Flow wrapper = new Flow() { + private boolean ran; + private Map runData; + + @Override + public void run(FlowTrigger trigger, Map data) { + ran = true; + runData = data; + flow.run(trigger, data); + } + + @Override + public void rollback(FlowRollback trigger, Map data) { + if (!ran) { + trigger.rollback(); + return; + } + + flow.rollback(trigger, runData); + } + + @Override + public boolean skip(Map data) { + return flow.skip(data); + } + }; + afterConnectHookFlows.add(wrapper); + afterConnectHookChain.then(wrapper); + } + } + + afterConnectHookChain.done(new FlowDoneHandler(trigger) { + @Override + public void handle(Map data) { + done = true; + trigger.next(); + } + }).error(new FlowErrorHandler(trigger) { + @Override + public void handle(ErrorCode errCode, Map data) { + trigger.fail(errCode); + } + }).start(); + } + + @Override + public void rollback(FlowRollback trigger, Map data) { + if (!done) { + trigger.rollback(); + return; + } + + ListIterator iterator = afterConnectHookFlows.listIterator(afterConnectHookFlows.size()); + rollbackAfterConnectHookFlows(iterator, trigger, data); + } + + private void rollbackAfterConnectHookFlows(ListIterator iterator, FlowRollback trigger, Map data) { + if (!iterator.hasPrevious()) { + trigger.rollback(); + return; + } + + try { + iterator.previous().rollback(new FlowRollback() { + @Override + public void rollback() { + rollbackAfterConnectHookFlows(iterator, trigger, data); + } + + @Override + public void skipRestRollbacks() { + trigger.skipRestRollbacks(); + } + + @Override + public ErrorCode getErrorCode() { + return trigger.getErrorCode(); + } + + }, data); + } catch (Throwable t) { + logger.warn("unhandled exception when rolling back after-connect-hook flow", t); + rollbackAfterConnectHookFlows(iterator, trigger, data); + } + } + }); + flow(new NoRollbackFlow() { String __name__ = "call-pre-connect-extensions"; diff --git a/compute/src/main/java/org/zstack/compute/host/HostManagerImpl.java b/compute/src/main/java/org/zstack/compute/host/HostManagerImpl.java index 3e478c319e8..d0cea298291 100755 --- a/compute/src/main/java/org/zstack/compute/host/HostManagerImpl.java +++ b/compute/src/main/java/org/zstack/compute/host/HostManagerImpl.java @@ -14,6 +14,7 @@ import org.zstack.core.defer.Deferred; import org.zstack.core.errorcode.ErrorFacade; import org.zstack.core.thread.*; +import org.zstack.core.timeout.ApiTimeoutManager; import org.zstack.core.workflow.FlowChainBuilder; import org.zstack.header.AbstractService; import org.zstack.header.allocator.HostAllocatorConstant; @@ -92,6 +93,8 @@ public class HostManagerImpl extends AbstractService implements HostManager, Man private ResourceConfigFacade rcf; @Autowired private ClusterResourceConfigInitializer crci; + @Autowired + private ApiTimeoutManager timeoutMgr; private Map hostBaseExtensionFactories = new HashMap<>(); private List hostExtensionManagers = new ArrayList<>(); @@ -818,6 +821,7 @@ private void reportHostCapacity() { new While<>(hostUuids).step((hostUuid, completion) -> { CheckHostCapacityMsg msg = new CheckHostCapacityMsg(); msg.setHostUuid(hostUuid); + timeoutMgr.setMessageTimeout(msg); bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, hostUuid); bus.send(msg, new CloudBusCallBack(completion) { @Override diff --git a/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java b/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java index c3b01dc3c8b..c58985f7ca1 100755 --- a/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java +++ b/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java @@ -4833,7 +4833,7 @@ public String call(CephPrimaryStorageMonVO arg) { } final SetupSelfFencerOnKvmHostReply reply = new SetupSelfFencerOnKvmHostReply(); - new KvmCommandSender(param.getHostUuid()).send(cmd, KVM_HA_SETUP_SELF_FENCER, new KvmCommandFailureChecker() { + new KvmCommandSender(param.getHostUuid(), param.isNoStatusCheck()).send(cmd, KVM_HA_SETUP_SELF_FENCER, new KvmCommandFailureChecker() { @Override public ErrorCode getError(KvmResponseWrapper wrapper) { AgentResponse rsp = wrapper.getResponse(AgentResponse.class); diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostCapacityExtension.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostCapacityExtension.java index 5ad18e49c9f..6f807d34849 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostCapacityExtension.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostCapacityExtension.java @@ -31,6 +31,7 @@ public class KVMHostCapacityExtension implements KVMHostConnectExtensionPoint, H public void reportCapacity(HostInventory host, Completion completion) { CheckHostCapacityMsg msg = new CheckHostCapacityMsg(); msg.setHostUuid(host.getUuid()); + timeoutMgr.setMessageTimeout(msg); bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, host.getUuid()); bus.send(msg, new CloudBusCallBack(completion) { @Override diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KvmSetupSelfFencerExtensionPoint.java b/plugin/kvm/src/main/java/org/zstack/kvm/KvmSetupSelfFencerExtensionPoint.java index a202505f708..902b462a9c2 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KvmSetupSelfFencerExtensionPoint.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KvmSetupSelfFencerExtensionPoint.java @@ -38,6 +38,7 @@ class KvmSetupSelfFencerParam { private PrimaryStorageInventory primaryStorage; private String strategy; private List fencers; + private boolean noStatusCheck; // all previous self-fencer configurations on the ps will be removed after applying the new one private boolean flushPrevious = true; @@ -97,6 +98,14 @@ public void setFencers(List fencers) { this.fencers = fencers; } + public boolean isNoStatusCheck() { + return noStatusCheck; + } + + public void setNoStatusCheck(boolean noStatusCheck) { + this.noStatusCheck = noStatusCheck; + } + public boolean isFlushPrevious() { return flushPrevious; } From 9d2004f088364eb66d757e4bb4e3e23fc3069968 Mon Sep 17 00:00:00 2001 From: littleya Date: Sun, 17 May 2026 21:24:13 +0800 Subject: [PATCH 2/2] [host]: avoid stale context timeout for capacity checks Co-Authored-By: Claude Opus 4.7 --- .../compute/host/HostAfterConnectHookExtensionPoint.java | 9 +++++++++ .../java/org/zstack/compute/host/HostManagerImpl.java | 5 +---- .../main/java/org/zstack/header/host/HostConstant.java | 3 +++ .../java/org/zstack/kvm/KVMHostCapacityExtension.java | 5 +---- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/compute/src/main/java/org/zstack/compute/host/HostAfterConnectHookExtensionPoint.java b/compute/src/main/java/org/zstack/compute/host/HostAfterConnectHookExtensionPoint.java index 767f1d443d7..49c35255302 100644 --- a/compute/src/main/java/org/zstack/compute/host/HostAfterConnectHookExtensionPoint.java +++ b/compute/src/main/java/org/zstack/compute/host/HostAfterConnectHookExtensionPoint.java @@ -4,6 +4,15 @@ import org.zstack.header.host.ConnectHostInfo; import org.zstack.header.host.HostInventory; +/** + * Runs after the hypervisor connect hook and before pre-connect extensions. + */ public interface HostAfterConnectHookExtensionPoint { + /** + * Returns a flow to run after host connection. + * + * @param reconnect whether this connect was triggered by host reconnect + * @return null to skip, or a flow to join the connect chain + */ Flow createAfterConnectHookFlow(HostInventory host, ConnectHostInfo info, boolean reconnect); } diff --git a/compute/src/main/java/org/zstack/compute/host/HostManagerImpl.java b/compute/src/main/java/org/zstack/compute/host/HostManagerImpl.java index d0cea298291..269caf4a4e5 100755 --- a/compute/src/main/java/org/zstack/compute/host/HostManagerImpl.java +++ b/compute/src/main/java/org/zstack/compute/host/HostManagerImpl.java @@ -14,7 +14,6 @@ import org.zstack.core.defer.Deferred; import org.zstack.core.errorcode.ErrorFacade; import org.zstack.core.thread.*; -import org.zstack.core.timeout.ApiTimeoutManager; import org.zstack.core.workflow.FlowChainBuilder; import org.zstack.header.AbstractService; import org.zstack.header.allocator.HostAllocatorConstant; @@ -93,8 +92,6 @@ public class HostManagerImpl extends AbstractService implements HostManager, Man private ResourceConfigFacade rcf; @Autowired private ClusterResourceConfigInitializer crci; - @Autowired - private ApiTimeoutManager timeoutMgr; private Map hostBaseExtensionFactories = new HashMap<>(); private List hostExtensionManagers = new ArrayList<>(); @@ -821,7 +818,7 @@ private void reportHostCapacity() { new While<>(hostUuids).step((hostUuid, completion) -> { CheckHostCapacityMsg msg = new CheckHostCapacityMsg(); msg.setHostUuid(hostUuid); - timeoutMgr.setMessageTimeout(msg); + msg.setTimeout(HostConstant.CHECK_HOST_CAPACITY_TIMEOUT); bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, hostUuid); bus.send(msg, new CloudBusCallBack(completion) { @Override diff --git a/header/src/main/java/org/zstack/header/host/HostConstant.java b/header/src/main/java/org/zstack/header/host/HostConstant.java index 805fa985584..89da27621fa 100755 --- a/header/src/main/java/org/zstack/header/host/HostConstant.java +++ b/header/src/main/java/org/zstack/header/host/HostConstant.java @@ -1,11 +1,14 @@ package org.zstack.header.host; +import java.util.concurrent.TimeUnit; public interface HostConstant { public static final String SERVICE_ID = "host"; public static final String ACTION_CATEGORY = "host"; + long CHECK_HOST_CAPACITY_TIMEOUT = TimeUnit.MINUTES.toMillis(30); + String HOST_SYNC_SIGNATURE_PREFIX = "Host-"; String HOST_ARCHITECTURE_X86_64 = "x86_64"; diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostCapacityExtension.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostCapacityExtension.java index 6f807d34849..4650053e247 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostCapacityExtension.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostCapacityExtension.java @@ -4,7 +4,6 @@ import org.zstack.core.cloudbus.CloudBus; import org.zstack.core.cloudbus.CloudBusCallBack; import org.zstack.core.errorcode.ErrorFacade; -import org.zstack.core.timeout.ApiTimeoutManager; import org.zstack.header.core.Completion; import org.zstack.header.core.NopeCompletion; import org.zstack.header.core.workflow.Flow; @@ -24,14 +23,12 @@ public class KVMHostCapacityExtension implements KVMHostConnectExtensionPoint, H @Autowired private ErrorFacade errf; @Autowired - private ApiTimeoutManager timeoutMgr; - @Autowired private ResourceConfigFacade rcf; public void reportCapacity(HostInventory host, Completion completion) { CheckHostCapacityMsg msg = new CheckHostCapacityMsg(); msg.setHostUuid(host.getUuid()); - timeoutMgr.setMessageTimeout(msg); + msg.setTimeout(HostConstant.CHECK_HOST_CAPACITY_TIMEOUT); bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, host.getUuid()); bus.send(msg, new CloudBusCallBack(completion) { @Override