diff --git a/compute/src/main/java/org/zstack/compute/host/HostAfterConnectHookExtensionPoint.java b/compute/src/main/java/org/zstack/compute/host/HostAfterConnectHookExtensionPoint.java new file mode 100644 index 00000000000..49c35255302 --- /dev/null +++ b/compute/src/main/java/org/zstack/compute/host/HostAfterConnectHookExtensionPoint.java @@ -0,0 +1,18 @@ +package org.zstack.compute.host; + +import org.zstack.header.core.workflow.Flow; +import org.zstack.header.host.ConnectHostInfo; +import org.zstack.header.host.HostInventory; + +/** + * Runs after the hypervisor connect hook and before pre-connect extensions. + */ +public interface HostAfterConnectHookExtensionPoint { + /** + * Returns a flow to run after host connection. + * + * @param reconnect whether this connect was triggered by host reconnect + * @return null to skip, or a flow to join the connect chain + */ + Flow createAfterConnectHookFlow(HostInventory host, ConnectHostInfo info, boolean reconnect); +} diff --git a/compute/src/main/java/org/zstack/compute/host/HostBase.java b/compute/src/main/java/org/zstack/compute/host/HostBase.java index 919d92631fc..054dbe53431 100755 --- a/compute/src/main/java/org/zstack/compute/host/HostBase.java +++ b/compute/src/main/java/org/zstack/compute/host/HostBase.java @@ -64,6 +64,7 @@ @Configurable(preConstruction = true, autowire = Autowire.BY_TYPE) public abstract class HostBase extends AbstractHost { protected static final CLogger logger = Utils.getLogger(HostBase.class); + private static final String AFTER_CONNECT_HOOK_RECONNECT = "afterConnectHookReconnect"; protected HostVO self; @Autowired @@ -1063,6 +1064,7 @@ public void run(final SyncTaskChain chain) { ConnectHostMsg connectMsg = new ConnectHostMsg(self.getUuid()); connectMsg.setNewAdd(false); connectMsg.setCalledByAPI(msg.isCalledByAPI()); + connectMsg.putHeaderEntry(AFTER_CONNECT_HOOK_RECONNECT, Boolean.TRUE.toString()); bus.makeTargetServiceIdByResourceUuid(connectMsg, HostConstant.SERVICE_ID, self.getUuid()); bus.send(connectMsg, new CloudBusCallBack(msg, chain, completion) { @Override @@ -1325,6 +1327,111 @@ public void fail(ErrorCode errorCode) { } }); + flow(new Flow() { + String __name__ = "call-after-connect-hook-extensions"; + private final List afterConnectHookFlows = new ArrayList<>(); + private boolean done; + + @Override + public void run(FlowTrigger trigger, Map data) { + FlowChain afterConnectHookChain = FlowChainBuilder.newSimpleFlowChain(); + afterConnectHookChain.allowEmptyFlow(); + + self = dbf.reload(self); + HostInventory inv = getSelfInventory(); + ConnectHostInfo info = ConnectHostInfo.fromConnectHostMsg(msg); + boolean reconnect = Boolean.parseBoolean(msg.getHeaderEntry(AFTER_CONNECT_HOOK_RECONNECT)); + + for (HostAfterConnectHookExtensionPoint p : pluginRgty.getExtensionList(HostAfterConnectHookExtensionPoint.class)) { + Flow flow = p.createAfterConnectHookFlow(inv, info, reconnect); + if (flow != null) { + Flow wrapper = new Flow() { + private boolean ran; + private Map runData; + + @Override + public void run(FlowTrigger trigger, Map data) { + ran = true; + runData = data; + flow.run(trigger, data); + } + + @Override + public void rollback(FlowRollback trigger, Map data) { + if (!ran) { + trigger.rollback(); + return; + } + + flow.rollback(trigger, runData); + } + + @Override + public boolean skip(Map data) { + return flow.skip(data); + } + }; + afterConnectHookFlows.add(wrapper); + afterConnectHookChain.then(wrapper); + } + } + + afterConnectHookChain.done(new FlowDoneHandler(trigger) { + @Override + public void handle(Map data) { + done = true; + trigger.next(); + } + }).error(new FlowErrorHandler(trigger) { + @Override + public void handle(ErrorCode errCode, Map data) { + trigger.fail(errCode); + } + }).start(); + } + + @Override + public void rollback(FlowRollback trigger, Map data) { + if (!done) { + trigger.rollback(); + return; + } + + ListIterator iterator = afterConnectHookFlows.listIterator(afterConnectHookFlows.size()); + rollbackAfterConnectHookFlows(iterator, trigger, data); + } + + private void rollbackAfterConnectHookFlows(ListIterator iterator, FlowRollback trigger, Map data) { + if (!iterator.hasPrevious()) { + trigger.rollback(); + return; + } + + try { + iterator.previous().rollback(new FlowRollback() { + @Override + public void rollback() { + rollbackAfterConnectHookFlows(iterator, trigger, data); + } + + @Override + public void skipRestRollbacks() { + trigger.skipRestRollbacks(); + } + + @Override + public ErrorCode getErrorCode() { + return trigger.getErrorCode(); + } + + }, data); + } catch (Throwable t) { + logger.warn("unhandled exception when rolling back after-connect-hook flow", t); + rollbackAfterConnectHookFlows(iterator, trigger, data); + } + } + }); + flow(new NoRollbackFlow() { String __name__ = "call-pre-connect-extensions"; diff --git a/compute/src/main/java/org/zstack/compute/host/HostManagerImpl.java b/compute/src/main/java/org/zstack/compute/host/HostManagerImpl.java index 3e478c319e8..269caf4a4e5 100755 --- a/compute/src/main/java/org/zstack/compute/host/HostManagerImpl.java +++ b/compute/src/main/java/org/zstack/compute/host/HostManagerImpl.java @@ -818,6 +818,7 @@ private void reportHostCapacity() { new While<>(hostUuids).step((hostUuid, completion) -> { CheckHostCapacityMsg msg = new CheckHostCapacityMsg(); msg.setHostUuid(hostUuid); + msg.setTimeout(HostConstant.CHECK_HOST_CAPACITY_TIMEOUT); bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, hostUuid); bus.send(msg, new CloudBusCallBack(completion) { @Override diff --git a/header/src/main/java/org/zstack/header/host/HostConstant.java b/header/src/main/java/org/zstack/header/host/HostConstant.java index 805fa985584..89da27621fa 100755 --- a/header/src/main/java/org/zstack/header/host/HostConstant.java +++ b/header/src/main/java/org/zstack/header/host/HostConstant.java @@ -1,11 +1,14 @@ package org.zstack.header.host; +import java.util.concurrent.TimeUnit; public interface HostConstant { public static final String SERVICE_ID = "host"; public static final String ACTION_CATEGORY = "host"; + long CHECK_HOST_CAPACITY_TIMEOUT = TimeUnit.MINUTES.toMillis(30); + String HOST_SYNC_SIGNATURE_PREFIX = "Host-"; String HOST_ARCHITECTURE_X86_64 = "x86_64"; diff --git a/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java b/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java index c3b01dc3c8b..c58985f7ca1 100755 --- a/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java +++ b/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java @@ -4833,7 +4833,7 @@ public String call(CephPrimaryStorageMonVO arg) { } final SetupSelfFencerOnKvmHostReply reply = new SetupSelfFencerOnKvmHostReply(); - new KvmCommandSender(param.getHostUuid()).send(cmd, KVM_HA_SETUP_SELF_FENCER, new KvmCommandFailureChecker() { + new KvmCommandSender(param.getHostUuid(), param.isNoStatusCheck()).send(cmd, KVM_HA_SETUP_SELF_FENCER, new KvmCommandFailureChecker() { @Override public ErrorCode getError(KvmResponseWrapper wrapper) { AgentResponse rsp = wrapper.getResponse(AgentResponse.class); diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostCapacityExtension.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostCapacityExtension.java index 5ad18e49c9f..4650053e247 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostCapacityExtension.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostCapacityExtension.java @@ -4,7 +4,6 @@ import org.zstack.core.cloudbus.CloudBus; import org.zstack.core.cloudbus.CloudBusCallBack; import org.zstack.core.errorcode.ErrorFacade; -import org.zstack.core.timeout.ApiTimeoutManager; import org.zstack.header.core.Completion; import org.zstack.header.core.NopeCompletion; import org.zstack.header.core.workflow.Flow; @@ -24,13 +23,12 @@ public class KVMHostCapacityExtension implements KVMHostConnectExtensionPoint, H @Autowired private ErrorFacade errf; @Autowired - private ApiTimeoutManager timeoutMgr; - @Autowired private ResourceConfigFacade rcf; public void reportCapacity(HostInventory host, Completion completion) { CheckHostCapacityMsg msg = new CheckHostCapacityMsg(); msg.setHostUuid(host.getUuid()); + msg.setTimeout(HostConstant.CHECK_HOST_CAPACITY_TIMEOUT); bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, host.getUuid()); bus.send(msg, new CloudBusCallBack(completion) { @Override diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KvmSetupSelfFencerExtensionPoint.java b/plugin/kvm/src/main/java/org/zstack/kvm/KvmSetupSelfFencerExtensionPoint.java index a202505f708..902b462a9c2 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KvmSetupSelfFencerExtensionPoint.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KvmSetupSelfFencerExtensionPoint.java @@ -38,6 +38,7 @@ class KvmSetupSelfFencerParam { private PrimaryStorageInventory primaryStorage; private String strategy; private List fencers; + private boolean noStatusCheck; // all previous self-fencer configurations on the ps will be removed after applying the new one private boolean flushPrevious = true; @@ -97,6 +98,14 @@ public void setFencers(List fencers) { this.fencers = fencers; } + public boolean isNoStatusCheck() { + return noStatusCheck; + } + + public void setNoStatusCheck(boolean noStatusCheck) { + this.noStatusCheck = noStatusCheck; + } + public boolean isFlushPrevious() { return flushPrevious; }