diff --git a/README.md b/README.md index 7a8ba93..1df7cc7 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,24 @@ End-to-end tests for Deckhouse storage components. 5. Write your test in `tests//_test.go` (Section marked `---=== TESTS START HERE ===---`) 6. Run the test: `go test -timeout=240m -v ./tests/ -count=1` +### Run using an existing cluster (no VM creation) + +Use this mode to run tests against a cluster that is already running (faster iterations, no virtualization/VM setup). + +1. Set cluster creation mode to use existing cluster: + ```bash + export TEST_CLUSTER_CREATE_MODE=alwaysUseExisting + ``` +2. Point SSH to the **test cluster** (the Kubernetes API master you want to run tests on): + - **Direct access:** `SSH_HOST` = IP/hostname of the cluster master, `SSH_USER` = user that can run `sudo cat /etc/kubernetes/admin.conf` on that host. + - **Via jump host:** set `SSH_JUMP_HOST`, `SSH_JUMP_USER`, `SSH_JUMP_KEY_PATH` (optional); `SSH_HOST`/`SSH_USER` are the target cluster master. +3. Source the rest of your test env (e.g. `source tests//test_exports`), then run: + ```bash + go test -timeout=240m -v ./tests/ -count=1 + ``` + +Kubeconfig is written to `temp//` (e.g. `temp/sds_node_configurator_test/kubeconfig-.yml`). The framework acquires a cluster lock so only one test run uses the cluster at a time. If a previous run left the lock (crash, Ctrl+C), set `TEST_CLUSTER_FORCE_LOCK_RELEASE=true` for the next run (do not use if another test might be using the cluster). + The `-count=1` flag prevents Go from using cached test results. Timeout `240m` is a global timeout for entire testkit. Adjust it on your needs. @@ -48,6 +66,23 @@ Designed to validate any CSI driver stability under high load with concurrent PV Run the test: `go test -timeout=120m -v ./tests/csi-all-stress-tests -count=1` +### sds-node-configurator-stress-tests + +Stress test for **sds-node-configurator**: ramps independent **LVMVolumeGroups** on a single node (one VirtualDisk → one BlockDevice → one VG per slot). Empirically finds how many VGs the node and agent can reconcile; LVM2 has no fixed VG count limit. + +- Creates a nested cluster with `sds-node-configurator` and `sds-local-volume` (see `cluster_config.yml`) +- Implementation: `pkg/testkit/snc_max_vgs_stress.go` (`MaxVGsStressRunner`) +- Probe mode (default): pass if at least one LVG is `Ready`; see report in logs for the empirical ceiling +- Strict mode: `STRESS_MAX_VG_STRICT=true` requires all `STRESS_MAX_VG_TARGET` to become `Ready` + +Run: + +```bash +go test -timeout=240m -v ./tests/sds-node-configurator-stress-tests -count=1 +``` + +Tuning: `STRESS_MAX_VG_TARGET` (default `30`), `STRESS_MAX_VG_BATCH_SIZE` (default `5`), `STRESS_MAX_VG_DISK_SIZE` (default `1Gi`), `STRESS_MAX_VG_MIN_READY`, `STRESS_MAX_VG_STRICT`. + ## Functions Glossary (exportable only) @@ -71,6 +106,7 @@ See [pkg/FUNCTIONS_GLOSSARY.md](pkg/FUNCTIONS_GLOSSARY.md) for a full list of al - `SSH_PUBLIC_KEY` -- Path to SSH public key file, or plain-text key content. Default: `~/.ssh/id_rsa.pub` - `SSH_PASSPHRASE` -- Passphrase for the SSH private key. Required for non-interactive mode with encrypted keys - `SSH_VM_USER` -- SSH user for connecting to VMs deployed inside the test cluster. Default: `cloud` +- `SSH_VM_PASSWORD` -- Password for SSH to VMs (e.g. `cloud`) when connecting from jump host for lsblk checks. If set, uses `sshpass`; leave empty for key-based auth. Required when VMs accept only password auth. - `SSH_JUMP_HOST` -- Jump host address for connecting to clusters behind a bastion - `SSH_JUMP_USER` -- Jump host SSH user. Defaults to `SSH_USER` if jump host is set - `SSH_JUMP_KEY_PATH` -- Jump host SSH key path. Defaults to `SSH_PRIVATE_KEY` if jump host is set @@ -79,8 +115,10 @@ See [pkg/FUNCTIONS_GLOSSARY.md](pkg/FUNCTIONS_GLOSSARY.md) for a full list of al - `YAML_CONFIG_FILENAME` -- Filename of the cluster definition YAML. Default: `cluster_config.yml` - `TEST_CLUSTER_CLEANUP` -- Set to `true` to remove the test cluster after tests complete. Default: `false` +- `TEST_CLUSTER_RESUME` -- Set to `true` to continue from a previous failed run (only for `alwaysCreateNew`). If the test failed in the middle of cluster creation, re-run with `TEST_CLUSTER_RESUME=true`; the framework will load saved state from `temp//cluster-state.json` (written after step 6), restore VM hostnames, and run the remaining steps (connect to first master, add nodes, enable modules). Requires that step 6 (VMs created, VM info gathered) completed before the failure. - `TEST_CLUSTER_NAMESPACE` -- Namespace for DKP cluster deployment. Default: `e2e-test-cluster` - `KUBE_CONFIG_PATH` -- Path to a kubeconfig file. Used as fallback if SSH-based kubeconfig retrieval fails +- `KUBE_INSECURE_SKIP_TLS_VERIFY` -- Set to `true` to skip TLS certificate verification for the Kubernetes API (e.g. self-signed certs or tunnel to 127.0.0.1). Default: not set (verify TLS) - `IMAGE_PULL_POLICY` -- Image pull policy for ClusterVirtualImages: `Always` or `IfNotExists`. Default: `IfNotExists` ### Logging @@ -116,3 +154,11 @@ See [pkg/FUNCTIONS_GLOSSARY.md](pkg/FUNCTIONS_GLOSSARY.md) for a full list of al - `STRESS_TEST_MAX_ATTEMPTS` -- Maximum attempts for waiting operations. Default: `360` - `STRESS_TEST_INTERVAL` -- Interval between attempts in seconds. Default: `5` - `STRESS_TEST_CLEANUP` -- Whether to cleanup resources after stress tests. Default: `true` + +**sds-node-configurator max-VG stress** (`sds-node-configurator-stress-tests`): + +- `STRESS_MAX_VG_TARGET` -- How many independent LVMVolumeGroups to attempt. Default: `30` +- `STRESS_MAX_VG_BATCH_SIZE` -- Ramp batch size. Default: `5` +- `STRESS_MAX_VG_DISK_SIZE` -- VirtualDisk size per slot. Default: `1Gi` +- `STRESS_MAX_VG_STRICT` -- If `true`, fail unless all targets are Ready. Default: `false` (probe) +- `STRESS_MAX_VG_MIN_READY` -- Minimum Ready count in probe mode. Default: `1` diff --git a/internal/cluster/cluster.go b/internal/cluster/cluster.go index f5a8154..8aed5df 100644 --- a/internal/cluster/cluster.go +++ b/internal/cluster/cluster.go @@ -183,7 +183,8 @@ func expandPath(path string) (string, error) { // and returns a rest.Config that can be used with Kubernetes clients, along with the path to the kubeconfig file. // If sshClient is provided, it will be used instead of creating a new connection. // If sshClient is nil, a new connection will be created and closed automatically. -func GetKubeconfig(ctx context.Context, masterIP, user, keyPath string, sshClient ssh.SSHClient) (*rest.Config, string, error) { +// If kubeconfigOutputDir is non-empty, the kubeconfig file is written there; otherwise temp// is used. +func GetKubeconfig(ctx context.Context, masterIP, user, keyPath string, sshClient ssh.SSHClient, kubeconfigOutputDir string) (*rest.Config, string, error) { // Create SSH client if not provided shouldClose := false if sshClient == nil { @@ -198,23 +199,24 @@ func GetKubeconfig(ctx context.Context, masterIP, user, keyPath string, sshClien defer sshClient.Close() } - // Get the test file name from the caller - _, callerFile, _, ok := runtime.Caller(1) - if !ok { - return nil, "", fmt.Errorf("failed to get caller file information") - } - testFileName := strings.TrimSuffix(filepath.Base(callerFile), filepath.Ext(callerFile)) - - // Determine the temp directory path in the repo root - // callerFile is in tests/{test-dir}/, so we go up two levels to reach repo root - callerDir := filepath.Dir(callerFile) - repoRootPath := filepath.Join(callerDir, "..", "..") - // Resolve the .. parts to get absolute path - repoRoot, err := filepath.Abs(repoRootPath) - if err != nil { - return nil, "", fmt.Errorf("failed to resolve repo root path: %w", err) + var tempDir string + if kubeconfigOutputDir != "" { + tempDir = kubeconfigOutputDir + } else { + // Get the test file name from the caller (creates temp/cluster when called from pkg/cluster) + _, callerFile, _, ok := runtime.Caller(1) + if !ok { + return nil, "", fmt.Errorf("failed to get caller file information") + } + testFileName := strings.TrimSuffix(filepath.Base(callerFile), filepath.Ext(callerFile)) + callerDir := filepath.Dir(callerFile) + repoRootPath := filepath.Join(callerDir, "..", "..") + repoRoot, err := filepath.Abs(repoRootPath) + if err != nil { + return nil, "", fmt.Errorf("failed to resolve repo root path: %w", err) + } + tempDir = filepath.Join(repoRoot, "temp", testFileName) } - tempDir := filepath.Join(repoRoot, "temp", testFileName) // Create temp directory if it doesn't exist if err := os.MkdirAll(tempDir, 0755); err != nil { diff --git a/internal/config/config.go b/internal/config/config.go index ef2ed88..5dde700 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -53,7 +53,7 @@ const ( // Kubernetes operations ModuleCheckTimeout = 10 * time.Second // Timeout for checking module status NamespaceTimeout = 30 * time.Second // Timeout for creating namespace - NodeGroupTimeout = 3 * time.Second // Timeout for creating NodeGroup + NodeGroupTimeout = 2 * time.Minute // Timeout for creating NodeGroup (API can be slow right after bootstrap) SecretsWaitTimeout = 2 * time.Minute // Timeout for waiting for bootstrap secrets to appear ClusterHealthTimeout = 15 * time.Minute // Timeout for cluster health check ModuleDeployTimeout = 15 * time.Minute // Timeout for waiting for ONE module to be ready diff --git a/internal/config/env.go b/internal/config/env.go index fa51fc2..a540a5e 100644 --- a/internal/config/env.go +++ b/internal/config/env.go @@ -69,6 +69,8 @@ var ( // SSH credentials to deploy to VM VMSSHUser = os.Getenv("SSH_VM_USER") VMSSHUserDefaultValue = "cloud" + // VMSSHPassword when set is used to SSH from jump host to VMs (cloud@vmIP) via sshpass. Leave empty for key-based auth. + VMSSHPassword = os.Getenv("SSH_VM_PASSWORD") // KubeConfigPath is the path to a kubeconfig file. If SSH retrieval fails (e.g., sudo requires password), // this path will be used as a fallback. If not set and SSH fails, the user will be notified to download @@ -87,6 +89,11 @@ var ( TestClusterNamespace = os.Getenv("TEST_CLUSTER_NAMESPACE") TestClusterNamespaceDefaultValue = "e2e-test-cluster" + // TestClusterResume when set to "true" or "True" (only for alwaysCreateNew) tries to continue from a previous + // failed run: if state was saved after step 6 (VMs created, IPs gathered), connects to the first master and + // runs remaining steps (add nodes, enable modules). Set to "true" and re-run the test after a mid-deploy failure. + TestClusterResume = os.Getenv("TEST_CLUSTER_RESUME") + // TestClusterStorageClass specifies the storage class for DKP cluster deployment TestClusterStorageClass = os.Getenv("TEST_CLUSTER_STORAGE_CLASS") //TestClusterStorageClassDefaultValue = "rsc-test-r2-local" diff --git a/internal/kubernetes/storage/lvmvolumegroup.go b/internal/kubernetes/storage/lvmvolumegroup.go index 74fd9e8..3306160 100644 --- a/internal/kubernetes/storage/lvmvolumegroup.go +++ b/internal/kubernetes/storage/lvmvolumegroup.go @@ -76,6 +76,31 @@ func (c *LVMVolumeGroupClient) Get(ctx context.Context, name string) (*snc.LVMVo return &lvg, nil } +// CreateWithMatchLabels creates an LVMVolumeGroup bound to block devices selected by label (typical: hostname + metadata.name). +func (c *LVMVolumeGroupClient) CreateWithMatchLabels(ctx context.Context, name, nodeName, actualVGName string, matchLabels map[string]string) error { + lvg := &snc.LVMVolumeGroup{ + TypeMeta: metav1.TypeMeta{ + APIVersion: snc.SchemeGroupVersion.String(), + Kind: "LVMVolumeGroup", + }, + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: snc.LVMVolumeGroupSpec{ + Type: "Local", + Local: snc.LVMVolumeGroupLocalSpec{ + NodeName: nodeName, + }, + BlockDeviceSelector: &metav1.LabelSelector{ + MatchLabels: matchLabels, + }, + ActualVGNameOnTheNode: actualVGName, + }, + } + if err := c.client.Create(ctx, lvg); err != nil { + return fmt.Errorf("failed to create LVMVolumeGroup %s: %w", name, err) + } + return nil +} + // Create creates a new LVMVolumeGroup for a specific node func (c *LVMVolumeGroupClient) Create(ctx context.Context, name, nodeName string, blockDeviceNames []string, actualVGName string) error { lvg := &snc.LVMVolumeGroup{ diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index ce64f24..d1b79e6 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -95,6 +95,91 @@ type TestClusterResources struct { SetupSSHClient ssh.SSHClient // Setup node SSH client (for cleanup) } +// resumeState is saved after step 6 (VMs created, IPs gathered) so a failed run can be continued with TEST_CLUSTER_RESUME=true. +type resumeState struct { + FirstMasterIP string `json:"first_master_ip"` + Namespace string `json:"namespace"` + VMNames []string `json:"vm_names"` + SetupVMName string `json:"setup_vm_name"` + MasterHostnames []string `json:"master_hostnames"` + WorkerHostnames []string `json:"worker_hostnames"` +} + +// getClusterStatePath returns temp//cluster-state.json +// (same dir as PrepareBootstrapConfig and cluster-state.json; used for both save and resume). +func getClusterStatePath(testFilePath string) (string, error) { + callerDir := filepath.Dir(testFilePath) + repoRoot, err := filepath.Abs(filepath.Join(callerDir, "..", "..")) + if err != nil { + return "", err + } + testFileName := strings.TrimSuffix(filepath.Base(testFilePath), filepath.Ext(testFilePath)) + return filepath.Join(repoRoot, "temp", testFileName, "cluster-state.json"), nil +} + +// getTestTempDirFromStack returns temp// by walking the call stack +// until a path containing "/tests/" is found. Used when testFilePath is not available (e.g. UseExistingCluster). +func getTestTempDirFromStack() (string, error) { + for i := 1; i <= 20; i++ { + _, file, _, ok := runtime.Caller(i) + if !ok { + break + } + if !strings.Contains(filepath.ToSlash(file), "/tests/") { + continue + } + dir := filepath.Dir(file) + for filepath.Base(dir) != "tests" { + parent := filepath.Dir(dir) + if parent == dir { + break + } + dir = parent + } + if filepath.Base(dir) != "tests" { + continue + } + repoRoot := filepath.Dir(dir) + testFileName := strings.TrimSuffix(filepath.Base(file), filepath.Ext(file)) + return filepath.Join(repoRoot, "temp", testFileName), nil + } + return "", fmt.Errorf("could not determine test temp dir from call stack") +} + +func saveClusterState(testFilePath string, state *resumeState) error { + path, err := getClusterStatePath(testFilePath) + if err != nil { + return err + } + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return err + } + data, err := json.MarshalIndent(state, "", " ") + if err != nil { + return err + } + return os.WriteFile(path, data, 0600) +} + +func loadClusterState(testFilePath string) (*resumeState, error) { + path, err := getClusterStatePath(testFilePath) + if err != nil { + return nil, err + } + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + var state resumeState + if err := json.Unmarshal(data, &state); err != nil { + return nil, err + } + if state.FirstMasterIP == "" || state.Namespace == "" || len(state.VMNames) == 0 { + return nil, fmt.Errorf("invalid cluster state: missing required fields") + } + return &state, nil +} + // loadClusterConfigFromPath loads and validates a cluster configuration from a specific file path func loadClusterConfigFromPath(configPath string) (*config.ClusterDefinition, error) { // Read the YAML file @@ -156,33 +241,62 @@ func CreateTestCluster( ) (*TestClusterResources, error) { logger.Step(1, "Loading cluster configuration from %s", yamlConfigFilename) - // Get the test file's directory (the caller of CreateTestCluster, which is the test file) - // runtime.Caller(1) gets the immediate caller (the test file that called CreateTestCluster) - _, callerFile, _, ok := runtime.Caller(1) - if !ok { - return nil, fmt.Errorf("failed to determine test file path") + // Find the test package directory and test file path by walking the call stack. + // CreateTestCluster is called from CreateOrConnectToTestCluster in cluster.go, so Caller(1) is not the test file. + var testDir string + var testFilePath string + for skip := 1; skip <= 10; skip++ { + _, callerFile, _, ok := runtime.Caller(skip) + if !ok { + break + } + if strings.Contains(filepath.ToSlash(callerFile), "/tests/") { + testDir = filepath.Dir(callerFile) + testFilePath = callerFile + break + } + } + if testDir == "" { + return nil, fmt.Errorf("failed to determine test directory (no caller under tests/)") } - testDir := filepath.Dir(callerFile) yamlConfigPath := filepath.Join(testDir, yamlConfigFilename) - logger.Debug("Test file directory: %s", testDir) + logger.Debug("Test directory: %s", testDir) logger.Debug("Config file path: %s", yamlConfigPath) - // Step 1: Load cluster configuration from YAML - // LoadClusterConfig uses runtime.Caller(1) which would get this function, not the test file - // So we need to load it directly from the path + // Step 1: Load cluster configuration from YAML (from test directory, e.g. tests/sds-node-configurator/cluster_config.yml) clusterDefinition, err := loadClusterConfigFromPath(yamlConfigPath) if err != nil { return nil, fmt.Errorf("failed to load cluster configuration: %w", err) } logger.StepComplete(1, "Cluster configuration loaded successfully from %s", yamlConfigPath) - // Randomize hostnames to avoid SAN initiator collisions. - // SANs remember iSCSI initiator names keyed by hostname; reusing the same hostnames - // (master-1, worker-1, etc.) across cluster recreations causes stale initiator mappings. - // Each node gets its own unique suffix to minimize collision likelihood. - randomizeHostnames(clusterDefinition) - logger.Info("Cluster hostnames randomized: masters=%v, workers=%v", + var resumeMode bool + if config.TestClusterResume == "true" || config.TestClusterResume == "True" { + state, loadErr := loadClusterState(testFilePath) + if loadErr == nil { + resumeMode = true + logger.Info("Resume mode: restoring hostnames from saved state (first_master_ip=%s)", state.FirstMasterIP) + for i, h := range state.MasterHostnames { + if i < len(clusterDefinition.Masters) { + clusterDefinition.Masters[i].Hostname = h + } + } + for i, h := range state.WorkerHostnames { + if i < len(clusterDefinition.Workers) { + clusterDefinition.Workers[i].Hostname = h + } + } + } else { + statePath, _ := getClusterStatePath(testFilePath) + logger.Info("TEST_CLUSTER_RESUME is set but cluster state not found or invalid: %v (tried %s)", loadErr, statePath) + } + } + if !resumeMode { + // Randomize hostnames to avoid SAN initiator collisions. + randomizeHostnames(clusterDefinition) + } + logger.Info("Cluster hostnames: masters=%v, workers=%v", func() []string { names := make([]string, len(clusterDefinition.Masters)) for i, m := range clusterDefinition.Masters { @@ -206,14 +320,36 @@ func CreateTestCluster( return nil, fmt.Errorf("failed to get SSH private key path: %w", err) } + useJumpHost := config.SSHJumpHost != "" + var jumpUser, jumpHost, jumpKeyPath string + if useJumpHost { + jumpUser = config.SSHJumpUser + if jumpUser == "" { + jumpUser = sshUser + } + jumpHost = config.SSHJumpHost + jumpKeyPath = config.SSHJumpKeyPath + if jumpKeyPath == "" { + jumpKeyPath = sshKeyPath + } + logger.Info("Using jump host %s@%s to connect to base cluster %s@%s", jumpUser, jumpHost, sshUser, sshHost) + } + logger.Step(2, "Connecting to base cluster %s@%s", sshUser, sshHost) - // Step 2: Connect to base cluster - baseClusterResources, err := ConnectToCluster(ctx, ConnectClusterOptions{ - SSHUser: sshUser, - SSHHost: sshHost, - SSHKeyPath: sshKeyPath, - UseJumpHost: false, - }) + // Step 2: Connect to base cluster (kubeconfig written to test temp dir to avoid temp/cluster) + var kubeconfigDir string + if path, pathErr := getClusterStatePath(testFilePath); pathErr == nil { + kubeconfigDir = filepath.Dir(path) + } + baseConnectOpts := ConnectClusterOptions{SSHUser: sshUser, SSHHost: sshHost, SSHKeyPath: sshKeyPath, UseJumpHost: useJumpHost, KubeconfigOutputDir: kubeconfigDir} + if useJumpHost { + baseConnectOpts = ConnectClusterOptions{ + SSHUser: jumpUser, SSHHost: jumpHost, SSHKeyPath: jumpKeyPath, + UseJumpHost: true, TargetUser: sshUser, TargetHost: sshHost, TargetKeyPath: sshKeyPath, + KubeconfigOutputDir: kubeconfigDir, + } + } + baseClusterResources, err := ConnectToCluster(ctx, baseConnectOpts) if err != nil { return nil, fmt.Errorf("failed to connect to base cluster: %w", err) } @@ -249,6 +385,57 @@ func CreateTestCluster( } logger.StepComplete(4, "Test namespace created") + if resumeMode { + // Resume path: cluster is already fully created and ready. Only connect to it and return resources. + // No GatherVMInfo, NodeGroup, AddNodes, EnableModules — assume "should create test cluster" already completed. + state, _ := loadClusterState(testFilePath) + namespace = state.Namespace + baseKubeconfig := baseClusterResources.Kubeconfig + baseKubeconfigPath := baseClusterResources.KubeconfigPath + baseTunnelInfo := baseClusterResources.TunnelInfo + vmResources := &VMResources{VMNames: state.VMNames, SetupVMName: state.SetupVMName} + + if len(clusterDefinition.Masters) > 0 { + clusterDefinition.Masters[0].IPAddress = state.FirstMasterIP + } + firstMasterIP := state.FirstMasterIP + if firstMasterIP == "" { + baseClusterResources.SSHClient.Close() + baseClusterResources.TunnelInfo.StopFunc() + return nil, fmt.Errorf("resume: first_master_ip missing in cluster state") + } + + logger.Step(1, "Resume: connecting to existing test cluster master %s", firstMasterIP) + testConnectOpts := ConnectClusterOptions{ + SSHUser: sshUser, SSHHost: sshHost, SSHKeyPath: sshKeyPath, + UseJumpHost: true, TargetUser: config.VMSSHUser, TargetHost: firstMasterIP, TargetKeyPath: sshKeyPath, + KubeconfigOutputDir: kubeconfigDir, + } + if useJumpHost { + testConnectOpts = ConnectClusterOptions{ + SSHUser: jumpUser, SSHHost: jumpHost, SSHKeyPath: jumpKeyPath, + UseJumpHost: true, TargetUser: config.VMSSHUser, TargetHost: firstMasterIP, TargetKeyPath: sshKeyPath, + KubeconfigOutputDir: kubeconfigDir, + } + } + testClusterResources, err := ConnectToCluster(ctx, testConnectOpts) + if err != nil { + baseClusterResources.SSHClient.Close() + baseClusterResources.TunnelInfo.StopFunc() + return nil, fmt.Errorf("resume: connect to test cluster: %w", err) + } + logger.StepComplete(1, "Resume: connected to test cluster") + + testClusterResources.ClusterDefinition = clusterDefinition + testClusterResources.VMResources = vmResources + testClusterResources.BaseClusterClient = baseClusterResources.SSHClient + testClusterResources.BaseKubeconfig = baseKubeconfig + testClusterResources.BaseKubeconfigPath = baseKubeconfigPath + testClusterResources.BaseTunnelInfo = baseTunnelInfo + testClusterResources.SetupSSHClient = nil + return testClusterResources, nil + } + logger.Step(5, "Creating virtual machines (this may take up to %v)", config.VMCreationTimeout) // Step 5: Create virtualization client and virtual machines virtCtx, cancel := context.WithTimeout(ctx, config.VMCreationTimeout) @@ -328,7 +515,7 @@ func CreateTestCluster( logger.Step(6, "Gathering VM information") // Step 6: Gather VM information gatherCtx, cancel := context.WithTimeout(ctx, config.VMInfoTimeout) - err = GatherVMInfo(gatherCtx, virtClient, namespace, clusterDefinition, vmResources) + err = GatherVMInfo(gatherCtx, virtClient, namespace, clusterDefinition, vmResources, nil) cancel() if err != nil { baseClusterResources.SSHClient.Close() @@ -337,6 +524,26 @@ func CreateTestCluster( } logger.StepComplete(6, "VM information gathered") + // Save resume state so a failed run can be continued with TEST_CLUSTER_RESUME=true + masterHostnames := make([]string, len(clusterDefinition.Masters)) + for i, m := range clusterDefinition.Masters { + masterHostnames[i] = m.Hostname + } + workerHostnames := make([]string, len(clusterDefinition.Workers)) + for i, w := range clusterDefinition.Workers { + workerHostnames[i] = w.Hostname + } + if err := saveClusterState(testFilePath, &resumeState{ + FirstMasterIP: clusterDefinition.Masters[0].IPAddress, + Namespace: namespace, + VMNames: vmNames, + SetupVMName: vmResources.SetupVMName, + MasterHostnames: masterHostnames, + WorkerHostnames: workerHostnames, + }); err != nil { + logger.Warn("Failed to save resume state: %v", err) + } + // Step 7: Get setup node IP and wait for SSH readiness setupNode, err := GetSetupNode(clusterDefinition) if err != nil { @@ -363,11 +570,12 @@ func CreateTestCluster( logger.StepComplete(7, "SSH is ready on setup node %s", setupNodeIP) logger.Step(8, "Establishing SSH connection to setup node") - // Step 8: Establish SSH connection to setup node - setupSSHClient, err := ssh.NewClientWithJumpHost( - sshUser, sshHost, sshKeyPath, // jump host - config.VMSSHUser, setupNodeIP, sshKeyPath, // target host - ) + // Step 8: Establish SSH connection to setup node (first hop: jump host or base cluster) + hopUser, hopHost, hopKey := sshUser, sshHost, sshKeyPath + if useJumpHost { + hopUser, hopHost, hopKey = jumpUser, jumpHost, jumpKeyPath + } + setupSSHClient, err := ssh.NewClientWithJumpHost(hopUser, hopHost, hopKey, config.VMSSHUser, setupNodeIP, sshKeyPath) if err != nil { baseClusterResources.SSHClient.Close() baseClusterResources.TunnelInfo.StopFunc() @@ -439,16 +647,20 @@ func CreateTestCluster( baseTunnelInfo := baseClusterResources.TunnelInfo logger.Step(14, "Connecting to test cluster master %s", firstMasterIP) - // Step 14: Connect to test cluster - testClusterResources, err := ConnectToCluster(ctx, ConnectClusterOptions{ - SSHUser: sshUser, - SSHHost: sshHost, - SSHKeyPath: sshKeyPath, - UseJumpHost: true, - TargetUser: config.VMSSHUser, - TargetHost: firstMasterIP, - TargetKeyPath: sshKeyPath, - }) + // Step 14: Connect to test cluster (first hop: jump host or base cluster) + testConnectOpts := ConnectClusterOptions{ + SSHUser: sshUser, SSHHost: sshHost, SSHKeyPath: sshKeyPath, + UseJumpHost: true, TargetUser: config.VMSSHUser, TargetHost: firstMasterIP, TargetKeyPath: sshKeyPath, + KubeconfigOutputDir: kubeconfigDir, + } + if useJumpHost { + testConnectOpts = ConnectClusterOptions{ + SSHUser: jumpUser, SSHHost: jumpHost, SSHKeyPath: jumpKeyPath, + UseJumpHost: true, TargetUser: config.VMSSHUser, TargetHost: firstMasterIP, TargetKeyPath: sshKeyPath, + KubeconfigOutputDir: kubeconfigDir, + } + } + testClusterResources, err := ConnectToCluster(ctx, testConnectOpts) if err != nil { setupSSHClient.Close() baseClusterResources.SSHClient.Close() @@ -700,6 +912,88 @@ func UseExistingCluster(ctx context.Context) (*TestClusterResources, error) { } logger.StepComplete(3, "Cluster is healthy") + // When using jump host, the jump host is the base cluster (with Deckhouse virtualization). + // Get its kubeconfig so tests can create VirtualDisks and attach to VMs. + if config.SSHJumpHost != "" { + logger.Step(4, "Getting base cluster kubeconfig from jump host (for VirtualDisk)") + jumpUser := config.SSHJumpUser + if jumpUser == "" { + jumpUser = sshUser + } + jumpKeyPath := config.SSHJumpKeyPath + if jumpKeyPath == "" { + jumpKeyPath = sshKeyPath + } + baseSSHClient, connErr := ssh.NewClient(jumpUser, config.SSHJumpHost, jumpKeyPath) + if connErr != nil { + _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) + if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { + clusterResources.TunnelInfo.StopFunc() + } + clusterResources.SSHClient.Close() + return nil, fmt.Errorf("failed to connect to base cluster (jump host %s): %w", config.SSHJumpHost, connErr) + } + baseTunnel, tunnelErr := ssh.EstablishSSHTunnel(context.Background(), baseSSHClient, "6445") + if tunnelErr != nil { + baseSSHClient.Close() + _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) + if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { + clusterResources.TunnelInfo.StopFunc() + } + clusterResources.SSHClient.Close() + return nil, fmt.Errorf("failed to establish base cluster tunnel: %w", tunnelErr) + } + kubeconfigDir, dirErr := getTestTempDirFromStack() + if dirErr != nil { + baseTunnel.StopFunc() + baseSSHClient.Close() + _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) + if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { + clusterResources.TunnelInfo.StopFunc() + } + clusterResources.SSHClient.Close() + return nil, fmt.Errorf("failed to get test temp dir for kubeconfig: %w", dirErr) + } + _, baseKubeconfigPath, kubeErr := internalcluster.GetKubeconfig(ctx, config.SSHJumpHost, jumpUser, jumpKeyPath, baseSSHClient, kubeconfigDir) + if kubeErr != nil { + baseTunnel.StopFunc() + baseSSHClient.Close() + _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) + if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { + clusterResources.TunnelInfo.StopFunc() + } + clusterResources.SSHClient.Close() + return nil, fmt.Errorf("failed to get base cluster kubeconfig from jump host: %w", kubeErr) + } + if updateErr := internalcluster.UpdateKubeconfigPort(baseKubeconfigPath, baseTunnel.LocalPort); updateErr != nil { + baseTunnel.StopFunc() + baseSSHClient.Close() + _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) + if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { + clusterResources.TunnelInfo.StopFunc() + } + clusterResources.SSHClient.Close() + return nil, fmt.Errorf("failed to update base cluster kubeconfig port: %w", updateErr) + } + baseKubeconfig, buildErr := clientcmd.BuildConfigFromFlags("", baseKubeconfigPath) + if buildErr != nil { + baseTunnel.StopFunc() + baseSSHClient.Close() + _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) + if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { + clusterResources.TunnelInfo.StopFunc() + } + clusterResources.SSHClient.Close() + return nil, fmt.Errorf("failed to build base cluster rest config: %w", buildErr) + } + configureExtendedTimeouts(baseKubeconfig) + clusterResources.BaseClusterClient = baseSSHClient + clusterResources.BaseKubeconfig = baseKubeconfig + clusterResources.BaseKubeconfigPath = baseKubeconfigPath + clusterResources.BaseTunnelInfo = baseTunnel + logger.StepComplete(4, "Base cluster kubeconfig ready (VirtualDisk operations)") + } + logger.Success("Existing cluster is ready for use") return clusterResources, nil } @@ -1457,8 +1751,12 @@ func CleanupTestCluster(ctx context.Context, resources *TestClusterResources) er logger.Step(6, "Stopping base cluster tunnel and closing SSH client") // Step 6: Stop base cluster tunnel and close base cluster SSH client - if baseTunnel != nil && baseTunnel.StopFunc != nil { - if err := baseTunnel.StopFunc(); err != nil { + baseTunnelToStop := baseTunnel + if baseTunnelToStop == nil && resources.BaseTunnelInfo != nil { + baseTunnelToStop = resources.BaseTunnelInfo // e.g. from UseExistingCluster with SSH_JUMP_HOST + } + if baseTunnelToStop != nil && baseTunnelToStop.StopFunc != nil { + if err := baseTunnelToStop.StopFunc(); err != nil { errs = append(errs, fmt.Errorf("failed to stop base cluster SSH tunnel: %w", err)) logger.Error("Failed to stop base cluster SSH tunnel: %v", err) } else { @@ -1739,6 +2037,9 @@ type ConnectClusterOptions struct { TargetUser string // Required when UseJumpHost is true TargetHost string // Required when UseJumpHost is true (IP or hostname) TargetKeyPath string // Optional: defaults to SSHKeyPath if empty + + // KubeconfigOutputDir if set saves kubeconfig to this dir instead of temp// (avoids temp/cluster) + KubeconfigOutputDir string } // ConnectToCluster establishes SSH connection to a cluster (base or test), @@ -1862,7 +2163,7 @@ func ConnectToCluster(ctx context.Context, opts ConnectClusterOptions) (*TestClu } // Step 3: Get kubeconfig from cluster master - _, kubeconfigPath, err := internalcluster.GetKubeconfig(ctx, masterHost, masterUser, opts.SSHKeyPath, sshClient) + _, kubeconfigPath, err := internalcluster.GetKubeconfig(ctx, masterHost, masterUser, opts.SSHKeyPath, sshClient, opts.KubeconfigOutputDir) if err != nil { tunnelInfo.StopFunc() sshClient.Close() diff --git a/pkg/cluster/vms.go b/pkg/cluster/vms.go index 4ee1402..72f5cd1 100644 --- a/pkg/cluster/vms.go +++ b/pkg/cluster/vms.go @@ -639,27 +639,25 @@ type vmIPResult struct { hostname string } +// GatherVMInfoOptions optionally customizes GatherVMInfo behaviour. +// Pass nil for default (gather all VMs including setup). +type GatherVMInfoOptions struct { + // SkipSetupVM when true skips the setup/bootstrap VM. Use when resuming after Deckhouse is up: + // the bootstrap node is removed at that point and only master/worker VMs exist. + SkipSetupVM bool +} + // GatherVMInfo gathers IP addresses for all VMs in the cluster definition and fills them into ClusterDefinition. // This should be called once while connected to the base cluster, before switching to test cluster. // It modifies clusterDef in-place by setting IPAddress field for each VM node. -func GatherVMInfo(ctx context.Context, virtClient *virtualization.Client, namespace string, clusterDef *config.ClusterDefinition, vmResources *VMResources) error { +// When opts.SkipSetupVM is true, the setup (bootstrap) VM is not queried and clusterDef.Setup is left unchanged. +func GatherVMInfo(ctx context.Context, virtClient *virtualization.Client, namespace string, clusterDef *config.ClusterDefinition, vmResources *VMResources, opts *GatherVMInfoOptions) error { + if opts == nil { + opts = &GatherVMInfoOptions{} + } var wg sync.WaitGroup results := make(chan vmIPResult) - // Count total VMs to gather info for - totalVMs := 0 - for i := range clusterDef.Masters { - if clusterDef.Masters[i].HostType == config.HostTypeVM { - totalVMs++ - } - } - for i := range clusterDef.Workers { - if clusterDef.Workers[i].HostType == config.HostTypeVM { - totalVMs++ - } - } - totalVMs++ // setup node - // Gather info for all masters in parallel for i := range clusterDef.Masters { master := &clusterDef.Masters[i] @@ -686,14 +684,16 @@ func GatherVMInfo(ctx context.Context, virtClient *virtualization.Client, namesp } } - // Gather info for setup node in parallel + // Gather info for setup node unless skipped (e.g. resume: bootstrap VM already removed) setupVMName := vmResources.SetupVMName - wg.Add(1) - go func() { - defer wg.Done() - ip, err := GetVMIPAddress(ctx, virtClient, namespace, setupVMName) - results <- vmIPResult{node: nil, ip: ip, err: err, hostname: setupVMName} - }() + if !opts.SkipSetupVM && setupVMName != "" { + wg.Add(1) + go func() { + defer wg.Done() + ip, err := GetVMIPAddress(ctx, virtClient, namespace, setupVMName) + results <- vmIPResult{node: nil, ip: ip, err: err, hostname: setupVMName} + }() + } // Close results channel when all goroutines complete go func() { @@ -716,15 +716,18 @@ func GatherVMInfo(ctx context.Context, virtClient *virtualization.Client, namesp } } + if opts.SkipSetupVM { + // Do not touch clusterDef.Setup; bootstrap VM is gone + return nil + } + // Create or update clusterDef.Setup with the generated VM info if clusterDef.Setup == nil { - // Create setup node from DefaultSetupVM template setupNode := config.DefaultSetupVM setupNode.Hostname = setupVMName setupNode.IPAddress = setupIP clusterDef.Setup = &setupNode } else { - // Update existing setup node clusterDef.Setup.Hostname = setupVMName clusterDef.Setup.IPAddress = setupIP } diff --git a/pkg/kubernetes/blockdevice_wait.go b/pkg/kubernetes/blockdevice_wait.go new file mode 100644 index 0000000..e00b035 --- /dev/null +++ b/pkg/kubernetes/blockdevice_wait.go @@ -0,0 +1,100 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "crypto/md5" + "encoding/hex" + "fmt" + "strings" + "time" + + "k8s.io/client-go/rest" + + snc "github.com/deckhouse/sds-node-configurator/api/v1alpha1" + "github.com/deckhouse/storage-e2e/internal/kubernetes/storage" + "github.com/deckhouse/storage-e2e/internal/kubernetes/virtualization" + "github.com/deckhouse/storage-e2e/internal/logger" +) + +// BlockDeviceSerialFromUID returns the hex(MD5(uid)) serial used by the sds-node-configurator agent. +func BlockDeviceSerialFromUID(uid string) string { + h := md5.Sum([]byte(uid)) + return hex.EncodeToString(h[:]) +} + +// WaitConsumableBlockDeviceForVirtualDisk polls until a consumable BlockDevice appears for the given VirtualDisk attachment on targetVM. +func WaitConsumableBlockDeviceForVirtualDisk( + ctx context.Context, + nestedKube, baseKube *rest.Config, + namespace, diskName, attachmentName, targetVM string, + timeout time.Duration, +) (*snc.BlockDevice, error) { + virtClient, err := virtualization.NewClient(ctx, baseKube) + if err != nil { + return nil, fmt.Errorf("virtualization client: %w", err) + } + vdObj, err := virtClient.VirtualDisks().Get(ctx, namespace, diskName) + if err != nil { + return nil, fmt.Errorf("get VirtualDisk %s: %w", diskName, err) + } + attObj, err := virtClient.VirtualMachineBlockDeviceAttachments().Get(ctx, namespace, attachmentName) + if err != nil { + return nil, fmt.Errorf("get VMBDA %s: %w", attachmentName, err) + } + serialVD := BlockDeviceSerialFromUID(string(vdObj.GetUID())) + serialAtt := BlockDeviceSerialFromUID(string(attObj.GetUID())) + + bdClient, err := storage.NewBlockDeviceClient(ctx, nestedKube) + if err != nil { + return nil, err + } + + deadline := time.Now().Add(timeout) + poll := 10 * time.Second + for { + if ctx.Err() != nil { + return nil, ctx.Err() + } + if time.Now().After(deadline) { + return nil, fmt.Errorf("timeout waiting for consumable BlockDevice for disk %s on VM %s", diskName, targetVM) + } + + list, err := bdClient.List(ctx) + if err != nil { + logger.Warn("list BlockDevices: %v", err) + time.Sleep(poll) + continue + } + for i := range list.Items { + bd := &list.Items[i] + s := strings.TrimSpace(bd.Status.Serial) + if s != serialVD && s != serialAtt { + continue + } + if bd.Status.NodeName != targetVM { + continue + } + if !bd.Status.Consumable || bd.Status.Size.IsZero() || bd.Status.Path == "" || !strings.HasPrefix(bd.Status.Path, "/dev/") { + continue + } + return bd, nil + } + time.Sleep(poll) + } +} diff --git a/pkg/kubernetes/lvmvolumegroup.go b/pkg/kubernetes/lvmvolumegroup.go index ee3a541..357e644 100644 --- a/pkg/kubernetes/lvmvolumegroup.go +++ b/pkg/kubernetes/lvmvolumegroup.go @@ -19,6 +19,7 @@ package kubernetes import ( "context" "fmt" + "strings" "time" "k8s.io/client-go/rest" @@ -35,6 +36,85 @@ type ThinPoolSpec struct { AllocationLimit string // Allocation limit (optional) } +// CreateLVMVolumeGroupWithMatchLabels creates an LVMVolumeGroup with an explicit label selector (one BD per VG stress tests). +func CreateLVMVolumeGroupWithMatchLabels(ctx context.Context, kubeconfig *rest.Config, name, nodeName, actualVGName string, matchLabels map[string]string) error { + lvgClient, err := storage.NewLVMVolumeGroupClient(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create LVMVolumeGroup client: %w", err) + } + if err := lvgClient.CreateWithMatchLabels(ctx, name, nodeName, actualVGName, matchLabels); err != nil { + return err + } + logger.Success("LVMVolumeGroup %s created (selector %v)", name, matchLabels) + return nil +} + +// WaitForLVMBatchReady waits until every LVMVolumeGroup in names is Ready or timeout expires. +func WaitForLVMBatchReady(ctx context.Context, kubeconfig *rest.Config, names []string, timeout time.Duration) bool { + lvgClient, err := storage.NewLVMVolumeGroupClient(ctx, kubeconfig) + if err != nil { + logger.Warn("LVMVolumeGroup client: %v", err) + return false + } + deadline := time.Now().Add(timeout) + poll := 15 * time.Second + for { + allReady := true + for _, name := range names { + lvg, err := lvgClient.Get(ctx, name) + if err != nil || lvg.Status.Phase != snc.PhaseReady { + allReady = false + break + } + } + if allReady { + return true + } + if time.Now().After(deadline) { + return false + } + time.Sleep(poll) + } +} + +// CountReadyLVMVolumeGroups returns how many of the given LVMVolumeGroups are Ready. +func CountReadyLVMVolumeGroups(ctx context.Context, kubeconfig *rest.Config, names []string) (int, error) { + lvgClient, err := storage.NewLVMVolumeGroupClient(ctx, kubeconfig) + if err != nil { + return 0, err + } + n := 0 + for _, name := range names { + lvg, err := lvgClient.Get(ctx, name) + if err != nil { + continue + } + if lvg.Status.Phase == snc.PhaseReady { + n++ + } + } + return n, nil +} + +// DeleteLVMVolumeGroupsWithPrefix deletes all LVMVolumeGroups whose names start with prefix. +func DeleteLVMVolumeGroupsWithPrefix(ctx context.Context, kubeconfig *rest.Config, prefix string) error { + lvgClient, err := storage.NewLVMVolumeGroupClient(ctx, kubeconfig) + if err != nil { + return err + } + list, err := lvgClient.List(ctx) + if err != nil { + return err + } + for i := range list.Items { + if !strings.HasPrefix(list.Items[i].Name, prefix) { + continue + } + _ = lvgClient.Delete(ctx, list.Items[i].Name) + } + return nil +} + // CreateLVMVolumeGroup creates an LVMVolumeGroup resource for a specific node func CreateLVMVolumeGroup(ctx context.Context, kubeconfig *rest.Config, name, nodeName string, blockDeviceNames []string, actualVGName string) error { logger.Debug("Creating LVMVolumeGroup %s for node %s with %d block devices", name, nodeName, len(blockDeviceNames)) diff --git a/pkg/kubernetes/virtualdisk.go b/pkg/kubernetes/virtualdisk.go index 1086ac7..9846180 100644 --- a/pkg/kubernetes/virtualdisk.go +++ b/pkg/kubernetes/virtualdisk.go @@ -178,3 +178,63 @@ func WaitForVirtualDiskAttached(ctx context.Context, kubeconfig *rest.Config, na } } } + +// ListVirtualMachineNames returns names of VirtualMachines in the given namespace. +// Used to pick a VM when attaching a VirtualDisk (e.g. in alwaysUseExisting mode). +func ListVirtualMachineNames(ctx context.Context, kubeconfig *rest.Config, namespace string) ([]string, error) { + virtClient, err := virtualization.NewClient(ctx, kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to create virtualization client: %w", err) + } + list, err := virtClient.VirtualMachines().List(ctx, namespace) + if err != nil { + return nil, fmt.Errorf("failed to list VirtualMachines in %s: %w", namespace, err) + } + names := make([]string, 0, len(list)) + for i := range list { + names = append(names, list[i].Name) + } + return names, nil +} + +// GetVMIPFromBaseCluster returns the IP address of a VirtualMachine in the base cluster (namespace). +// Used to SSH to the VM (e.g. cloud@ip) from the jump host to run lsblk on nested nodes. +func GetVMIPFromBaseCluster(ctx context.Context, baseKubeconfig *rest.Config, namespace, vmName string) (string, error) { + virtClient, err := virtualization.NewClient(ctx, baseKubeconfig) + if err != nil { + return "", fmt.Errorf("create virtualization client: %w", err) + } + vm, err := virtClient.VirtualMachines().Get(ctx, namespace, vmName) + if err != nil { + return "", fmt.Errorf("get VM %s/%s: %w", namespace, vmName, err) + } + if vm.Status.IPAddress == "" { + return "", fmt.Errorf("VM %s/%s has no IP in status yet", namespace, vmName) + } + return vm.Status.IPAddress, nil +} + +// DetachAndDeleteVirtualDisk deletes the VirtualMachineBlockDeviceAttachment and then the VirtualDisk. +// Use this for cleanup after a test. Errors are logged but not returned for "not found" (idempotent). +func DetachAndDeleteVirtualDisk(ctx context.Context, kubeconfig *rest.Config, namespace, attachmentName, diskName string) error { + virtClient, err := virtualization.NewClient(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create virtualization client: %w", err) + } + + if attachmentName != "" { + if err := virtClient.VirtualMachineBlockDeviceAttachments().Delete(ctx, namespace, attachmentName); err != nil { + logger.Warn("Failed to delete VirtualMachineBlockDeviceAttachment %s/%s: %v", namespace, attachmentName, err) + } else { + logger.Success("VirtualMachineBlockDeviceAttachment %s/%s deleted", namespace, attachmentName) + } + } + if diskName != "" { + if err := virtClient.VirtualDisks().Delete(ctx, namespace, diskName); err != nil { + logger.Warn("Failed to delete VirtualDisk %s/%s: %v", namespace, diskName, err) + } else { + logger.Success("VirtualDisk %s/%s deleted", namespace, diskName) + } + } + return nil +} diff --git a/pkg/kubernetes/vmpod.go b/pkg/kubernetes/vmpod.go new file mode 100644 index 0000000..9dbb100 --- /dev/null +++ b/pkg/kubernetes/vmpod.go @@ -0,0 +1,62 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +// GetVMPodNodeAndContainerID returns the base cluster node name and the first container ID +// for the Pod that runs the given VM (e.g. virt-launcher--*). +// Used to run nsenter into the VM container from the base cluster node. +func GetVMPodNodeAndContainerID(ctx context.Context, baseConfig *rest.Config, namespace, vmName string) (nodeName, containerID string, err error) { + clientset, err := kubernetes.NewForConfig(baseConfig) + if err != nil { + return "", "", fmt.Errorf("create clientset: %w", err) + } + pods, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return "", "", fmt.Errorf("list pods in %s: %w", namespace, err) + } + for i := range pods.Items { + pod := &pods.Items[i] + if pod.Status.Phase != corev1.PodRunning { + continue + } + // Match Pod that runs this VM: name often contains VM name (e.g. virt-launcher-master-1-xxx) + if !strings.Contains(pod.Name, vmName) { + continue + } + if pod.Spec.NodeName == "" { + continue + } + for _, cs := range pod.Status.ContainerStatuses { + if cs.ContainerID != "" { + return pod.Spec.NodeName, cs.ContainerID, nil + } + } + return "", "", fmt.Errorf("pod %s/%s has no container ID yet", namespace, pod.Name) + } + return "", "", fmt.Errorf("no running pod for VM %s in namespace %s", vmName, namespace) +} diff --git a/pkg/testkit/snc_max_vgs_stress.go b/pkg/testkit/snc_max_vgs_stress.go new file mode 100644 index 0000000..b53d123 --- /dev/null +++ b/pkg/testkit/snc_max_vgs_stress.go @@ -0,0 +1,351 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testkit + +import ( + "context" + "fmt" + "os" + "strconv" + "strings" + "time" + + "k8s.io/client-go/rest" + + "github.com/deckhouse/storage-e2e/internal/logger" + "github.com/deckhouse/storage-e2e/pkg/kubernetes" +) + +const ( + maxVGStressDiskPrefix = "stress-vg-d" + maxVGStressLVGPrefix = "stress-lvg-" + maxVGStressVGNamePrefix = "stress-vg-" + + defaultMaxVGTarget = 30 + defaultMaxVGBatch = 5 + defaultMaxVGDiskSize = "1Gi" + + envMaxVGTarget = "STRESS_MAX_VG_TARGET" + envMaxVGBatch = "STRESS_MAX_VG_BATCH_SIZE" + envMaxVGDiskSize = "STRESS_MAX_VG_DISK_SIZE" + envMaxVGStrict = "STRESS_MAX_VG_STRICT" + envMaxVGMinReady = "STRESS_MAX_VG_MIN_READY" + + lvgReadyTimeoutMin = 15 * time.Minute + virtualDiskAttach = 15 * time.Minute + attachMaxRetries = 3 + attachRetryWait = 1 * time.Minute + bdWaitTimeout = 10 * time.Minute +) + +// MaxVGsStressConfig controls the ramp of independent LVMVolumeGroups (1 disk = 1 VG) on one node. +type MaxVGsStressConfig struct { + Target int + BatchSize int + DiskSize string + Strict bool + MinReady int + + Namespace string + StorageClass string + RunID string +} + +// MaxVGsStressSlotReport is one ramp slot after Run. +type MaxVGsStressSlotReport struct { + Index int + DiskName string + LVGName string + VGName string + BDName string + Ready bool +} + +// MaxVGsStressResult is returned by MaxVGsStressRunner.Run. +type MaxVGsStressResult struct { + NodeName string + Target int + ReadyTotal int + BatchSize int + StoppedEarly bool + Strict bool + Slots []MaxVGsStressSlotReport + Attachments []*kubernetes.VirtualDiskAttachmentResult +} + +// DefaultMaxVGsStressConfig reads tuning from STRESS_MAX_VG_* environment variables. +func DefaultMaxVGsStressConfig(namespace, storageClass, runID string) MaxVGsStressConfig { + target := envIntPositive(envMaxVGTarget, defaultMaxVGTarget) + batch := envIntPositive(envMaxVGBatch, defaultMaxVGBatch) + if batch > target { + batch = target + } + diskSize := strings.TrimSpace(os.Getenv(envMaxVGDiskSize)) + if diskSize == "" { + diskSize = defaultMaxVGDiskSize + } + strict := envBool(envMaxVGStrict) + minReady := 1 + if strict { + minReady = target + } + if v := strings.TrimSpace(os.Getenv(envMaxVGMinReady)); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + minReady = n + } + } + return MaxVGsStressConfig{ + Target: target, BatchSize: batch, DiskSize: diskSize, Strict: strict, MinReady: minReady, + Namespace: namespace, StorageClass: storageClass, RunID: runID, + } +} + +// MaxVGsStressRunner ramps VirtualDisk → BlockDevice → LVMVolumeGroup on a single node. +type MaxVGsStressRunner struct { + Cfg MaxVGsStressConfig + NestedKube *rest.Config + BaseKube *rest.Config +} + +func batchReadyTimeout(batchLen int) time.Duration { + if batchLen <= 0 { + return lvgReadyTimeoutMin + } + t := time.Duration(batchLen) * 4 * time.Minute + if t < lvgReadyTimeoutMin { + return lvgReadyTimeoutMin + } + const maxT = 3 * time.Hour + if t > maxT { + return maxT + } + return t +} + +// Run executes the stress ramp. Caller must call Cleanup with Result.Attachments and slot LVG names. +func (r *MaxVGsStressRunner) Run(ctx context.Context) (*MaxVGsStressResult, error) { + if r.NestedKube == nil || r.BaseKube == nil { + return nil, fmt.Errorf("nested and base kubeconfig are required") + } + if r.Cfg.StorageClass == "" { + return nil, fmt.Errorf("storage class is required") + } + if r.Cfg.RunID == "" { + r.Cfg.RunID = fmt.Sprintf("%d", time.Now().Unix()) + } + + vms, err := kubernetes.ListVirtualMachineNames(ctx, r.BaseKube, r.Cfg.Namespace) + if err != nil { + return nil, err + } + targetVM := "" + for _, vm := range vms { + if strings.HasPrefix(vm, "bootstrap-node-") { + continue + } + targetVM = vm + break + } + if targetVM == "" { + return nil, fmt.Errorf("no VirtualMachine found in namespace %s", r.Cfg.Namespace) + } + + type slot struct { + index int + disk string + vgName string + lvgName string + meta string + bdName string + att *kubernetes.VirtualDiskAttachmentResult + ready bool + } + + slots := make([]slot, 0, r.Cfg.Target) + var attachments []*kubernetes.VirtualDiskAttachmentResult + nodeName := "" + nodeSafe := "" + readyTotal := 0 + stoppedEarly := false + batchNum := 0 + + logger.Info("Max-VG stress ramp: target=%d batch=%d diskSize=%s VM=%q", r.Cfg.Target, r.Cfg.BatchSize, r.Cfg.DiskSize, targetVM) + + for batchStart := 0; batchStart < r.Cfg.Target && !stoppedEarly; batchStart += r.Cfg.BatchSize { + batchEnd := batchStart + r.Cfg.BatchSize + if batchEnd > r.Cfg.Target { + batchEnd = r.Cfg.Target + } + curBatch := batchEnd - batchStart + batchNum++ + logger.Info("Batch %d: slots [%d..%d) (%d)", batchNum, batchStart, batchEnd, curBatch) + + for i := batchStart; i < batchEnd; i++ { + idx := i + 1 + slots = append(slots, slot{ + index: idx, + disk: fmt.Sprintf("%s%d-%s", maxVGStressDiskPrefix, idx, r.Cfg.RunID), + vgName: fmt.Sprintf("%s%d-%s", maxVGStressVGNamePrefix, idx, r.Cfg.RunID), + }) + } + + for i := batchStart; i < batchEnd; i++ { + att, err := attachVirtualDiskWithRetry(ctx, r.BaseKube, kubernetes.VirtualDiskAttachmentConfig{ + VMName: targetVM, Namespace: r.Cfg.Namespace, DiskName: slots[i].disk, + DiskSize: r.Cfg.DiskSize, StorageClassName: r.Cfg.StorageClass, + }) + if err != nil { + return nil, fmt.Errorf("attach %s: %w", slots[i].disk, err) + } + slots[i].att = att + attachments = append(attachments, att) + } + + for i := batchStart; i < batchEnd; i++ { + attachCtx, cancel := context.WithTimeout(ctx, virtualDiskAttach) + err := kubernetes.WaitForVirtualDiskAttached(attachCtx, r.BaseKube, r.Cfg.Namespace, slots[i].att.AttachmentName, 10*time.Second) + cancel() + if err != nil { + return nil, fmt.Errorf("wait attach %s: %w", slots[i].att.AttachmentName, err) + } + } + + metaSeen := make(map[string]struct{}) + for i := batchStart; i < batchEnd; i++ { + bd, err := kubernetes.WaitConsumableBlockDeviceForVirtualDisk(ctx, r.NestedKube, r.BaseKube, + r.Cfg.Namespace, slots[i].att.DiskName, slots[i].att.AttachmentName, targetVM, bdWaitTimeout) + if err != nil { + return nil, err + } + slots[i].bdName = bd.Name + if nodeName == "" { + nodeName = bd.Status.NodeName + nodeSafe = strings.ReplaceAll(strings.ReplaceAll(nodeName, ".", "-"), "_", "-") + } else if bd.Status.NodeName != nodeName { + return nil, fmt.Errorf("BlockDevice %s on node %q, expected %q", bd.Name, bd.Status.NodeName, nodeName) + } + meta := bd.Labels["kubernetes.io/metadata.name"] + if meta == "" { + meta = bd.Name + } + if _, dup := metaSeen[meta]; dup { + return nil, fmt.Errorf("duplicate BlockDevice selector meta %q", meta) + } + metaSeen[meta] = struct{}{} + slots[i].meta = meta + slots[i].lvgName = fmt.Sprintf("%s%d-%s-%s", maxVGStressLVGPrefix, slots[i].index, r.Cfg.RunID, nodeSafe) + } + + for i := batchStart; i < batchEnd; i++ { + labels := map[string]string{ + "kubernetes.io/hostname": nodeName, + "kubernetes.io/metadata.name": slots[i].meta, + } + if err := kubernetes.CreateLVMVolumeGroupWithMatchLabels(ctx, r.NestedKube, slots[i].lvgName, nodeName, slots[i].vgName, labels); err != nil { + return nil, fmt.Errorf("create LVMVolumeGroup %s: %w", slots[i].lvgName, err) + } + } + + batchNames := make([]string, curBatch) + for i := batchStart; i < batchEnd; i++ { + batchNames[i-batchStart] = slots[i].lvgName + } + if !kubernetes.WaitForLVMBatchReady(ctx, r.NestedKube, batchNames, batchReadyTimeout(curBatch)) { + logger.Warn("Batch %d: not all LVMVolumeGroups Ready within %v — stopping ramp", batchNum, batchReadyTimeout(curBatch)) + stoppedEarly = true + for i := batchStart; i < batchEnd; i++ { + n, _ := kubernetes.CountReadyLVMVolumeGroups(ctx, r.NestedKube, []string{slots[i].lvgName}) + if n == 1 { + slots[i].ready = true + readyTotal++ + } + } + break + } + for i := batchStart; i < batchEnd; i++ { + slots[i].ready = true + } + readyTotal += curBatch + logger.Success("Batch %d OK: cumulative Ready=%d", batchNum, readyTotal) + } + + report := make([]MaxVGsStressSlotReport, len(slots)) + for i := range slots { + report[i] = MaxVGsStressSlotReport{ + Index: slots[i].index, DiskName: slots[i].disk, LVGName: slots[i].lvgName, + VGName: slots[i].vgName, BDName: slots[i].bdName, Ready: slots[i].ready, + } + } + + logger.Info("Max-VG stress finished: %d/%d Ready on node %s (stopped early=%v)", readyTotal, r.Cfg.Target, nodeName, stoppedEarly) + + return &MaxVGsStressResult{ + NodeName: nodeName, Target: r.Cfg.Target, ReadyTotal: readyTotal, BatchSize: r.Cfg.BatchSize, + StoppedEarly: stoppedEarly, Strict: r.Cfg.Strict, Slots: report, Attachments: attachments, + }, nil +} + +// Cleanup detaches stress VirtualDisks and deletes stress LVMVolumeGroup CRs. +func CleanupMaxVGsStress(ctx context.Context, nestedKube, baseKube *rest.Config, namespace string, res *MaxVGsStressResult) { + if res == nil { + return + } + for _, att := range res.Attachments { + if att == nil { + continue + } + _ = kubernetes.DetachAndDeleteVirtualDisk(ctx, baseKube, namespace, att.AttachmentName, att.DiskName) + } + _ = kubernetes.DeleteLVMVolumeGroupsWithPrefix(ctx, nestedKube, maxVGStressLVGPrefix) +} + +func attachVirtualDiskWithRetry(ctx context.Context, kube *rest.Config, cfg kubernetes.VirtualDiskAttachmentConfig) (*kubernetes.VirtualDiskAttachmentResult, error) { + var lastErr error + for attempt := 1; attempt <= attachMaxRetries; attempt++ { + att, err := kubernetes.AttachVirtualDiskToVM(ctx, kube, cfg) + if err == nil { + return att, nil + } + lastErr = err + if attempt < attachMaxRetries { + logger.Warn("attach %s attempt %d/%d: %v; retry in %v", cfg.DiskName, attempt, attachMaxRetries, err, attachRetryWait) + time.Sleep(attachRetryWait) + } + } + return nil, lastErr +} + +func envBool(name string) bool { + switch strings.ToLower(strings.TrimSpace(os.Getenv(name))) { + case "1", "true", "yes", "on": + return true + default: + return false + } +} + +func envIntPositive(name string, def int) int { + v := strings.TrimSpace(os.Getenv(name)) + if v == "" { + return def + } + n, err := strconv.Atoi(v) + if err != nil || n <= 0 { + return def + } + return n +} diff --git a/tests/sds-node-configurator-stress-tests/cluster_config.yml b/tests/sds-node-configurator-stress-tests/cluster_config.yml new file mode 100644 index 0000000..8248f42 --- /dev/null +++ b/tests/sds-node-configurator-stress-tests/cluster_config.yml @@ -0,0 +1,43 @@ +# Nested cluster for sds-node-configurator max-VG stress (light topology: 1 master + 1 worker). +clusterDefinition: + masters: + - hostname: "master-1" + hostType: "vm" + osType: "Ubuntu 22.04 6.2.0-39-generic" + cpu: 4 + coreFraction: 50 + ram: 8 + diskSize: 60 + workers: + - hostname: "worker-1" + hostType: "vm" + osType: "Ubuntu 24.04 6.8.0-53-generic" + cpu: 4 + coreFraction: 50 + ram: 8 + diskSize: 30 + dkpParameters: + kubernetesVersion: "Automatic" + podSubnetCIDR: "10.112.0.0/16" + serviceSubnetCIDR: "10.225.0.0/16" + clusterDomain: "cluster.local" + registryRepo: "dev-registry.deckhouse.io/sys/deckhouse-oss" + devBranch: "main" + modules: + - name: "snapshot-controller" + version: 1 + enabled: true + modulePullOverride: "main" + dependencies: [] + - name: "sds-local-volume" + version: 1 + enabled: true + dependencies: + - "snapshot-controller" + - "sds-node-configurator" + - name: "sds-node-configurator" + version: 1 + enabled: true + settings: + enableThinProvisioning: true + dependencies: [] diff --git a/tests/sds-node-configurator-stress-tests/sds_node_configurator_stress_tests_suite_test.go b/tests/sds-node-configurator-stress-tests/sds_node_configurator_stress_tests_suite_test.go new file mode 100644 index 0000000..204ec63 --- /dev/null +++ b/tests/sds-node-configurator-stress-tests/sds_node_configurator_stress_tests_suite_test.go @@ -0,0 +1,50 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sds_node_configurator_stress_tests + +import ( + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/internal/logger" +) + +var _ = BeforeSuite(func() { + err := config.ValidateEnvironment() + Expect(err).NotTo(HaveOccurred(), "Failed to validate environment") + err = logger.Initialize() + Expect(err).NotTo(HaveOccurred(), "Failed to initialize logger") +}) + +var _ = AfterSuite(func() { + if err := logger.Close(); err != nil { + GinkgoWriter.Printf("Warning: Failed to close logger: %v\n", err) + } +}) + +func TestSdsNodeConfiguratorStressTests(t *testing.T) { + RegisterFailHandler(Fail) + suiteConfig, reporterConfig := GinkgoConfiguration() + suiteConfig.Timeout = 4 * time.Hour + reporterConfig.Verbose = true + reporterConfig.ShowNodeEvents = false + RunSpecs(t, "Sds Node Configurator Stress Tests Suite", suiteConfig, reporterConfig) +} diff --git a/tests/sds-node-configurator-stress-tests/sds_node_configurator_stress_tests_test.go b/tests/sds-node-configurator-stress-tests/sds_node_configurator_stress_tests_test.go new file mode 100644 index 0000000..3b5da3f --- /dev/null +++ b/tests/sds-node-configurator-stress-tests/sds_node_configurator_stress_tests_test.go @@ -0,0 +1,114 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sds_node_configurator_stress_tests + +import ( + "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/pkg/cluster" + "github.com/deckhouse/storage-e2e/pkg/testkit" +) + +var _ = Describe("sds-node-configurator: maximum independent LVMVolumeGroups per node", Label("stress"), Ordered, func() { + var ( + testClusterResources *cluster.TestClusterResources + stressResult *testkit.MaxVGsStressResult + stressCfg testkit.MaxVGsStressConfig + ) + + AfterAll(func() { + if testClusterResources != nil { + ctx, cancel := context.WithTimeout(context.Background(), config.ClusterCleanupTimeout) + defer cancel() + _ = cluster.CleanupTestCluster(ctx, testClusterResources) + } + }) + + It("should create test cluster with sds-node-configurator", func() { + ctx, cancel := context.WithTimeout(context.Background(), config.ClusterCreationTimeout) + defer cancel() + + var err error + testClusterResources, err = cluster.CreateTestCluster(ctx, config.YAMLConfigFilename) + Expect(err).NotTo(HaveOccurred(), "test cluster should be created") + Expect(testClusterResources.BaseKubeconfig).NotTo(BeNil(), "base cluster kubeconfig required for VirtualDisk attach") + }) + + It("should ramp independent LVMVolumeGroups on one node until target or first failing batch", func() { + Expect(testClusterResources).NotTo(BeNil()) + runID := fmt.Sprintf("%d", time.Now().Unix()) + stressCfg = testkit.DefaultMaxVGsStressConfig(config.TestClusterNamespace, config.TestClusterStorageClass, runID) + + GinkgoWriter.Printf("\n Stress config: target=%d batch=%d disk=%s strict=%v minReady=%d\n", + stressCfg.Target, stressCfg.BatchSize, stressCfg.DiskSize, stressCfg.Strict, stressCfg.MinReady) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Hour) + defer cancel() + + runner := &testkit.MaxVGsStressRunner{ + Cfg: stressCfg, + NestedKube: testClusterResources.Kubeconfig, + BaseKube: testClusterResources.BaseKubeconfig, + } + var err error + stressResult, err = runner.Run(ctx) + Expect(err).NotTo(HaveOccurred()) + + printStressReport(stressResult) + + if stressCfg.Strict { + Expect(stressResult.ReadyTotal).To(Equal(stressCfg.Target), + "strict: expected %d Ready LVMVolumeGroups on %s", stressCfg.Target, stressResult.NodeName) + } else { + Expect(stressResult.ReadyTotal).To(BeNumerically(">=", stressCfg.MinReady), + "probe: at least %d Ready (got %d/%d, stopped early=%v)", + stressCfg.MinReady, stressResult.ReadyTotal, stressCfg.Target, stressResult.StoppedEarly) + } + }) + + AfterEach(func() { + if testClusterResources == nil || stressResult == nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), config.ClusterCleanupTimeout) + defer cancel() + testkit.CleanupMaxVGsStress(ctx, testClusterResources.Kubeconfig, testClusterResources.BaseKubeconfig, config.TestClusterNamespace, stressResult) + stressResult = nil + }) +}) + +func printStressReport(res *testkit.MaxVGsStressResult) { + GinkgoWriter.Printf("\n========== Max independent VGs per node — report ==========\n") + GinkgoWriter.Printf(" node: %s\n", res.NodeName) + GinkgoWriter.Printf(" Ready / target: %d / %d (batch %d, stopped early=%v)\n", + res.ReadyTotal, res.Target, res.BatchSize, res.StoppedEarly) + for _, s := range res.Slots { + phase := "Pending" + if s.Ready { + phase = "Ready" + } + GinkgoWriter.Printf(" [%02d] disk=%s lvg=%s vg=%s bd=%s %s\n", + s.Index, s.DiskName, s.LVGName, s.VGName, s.BDName, phase) + } + GinkgoWriter.Println("================================================================\n") +} diff --git a/tests/sds-node-configurator-stress-tests/test_exports b/tests/sds-node-configurator-stress-tests/test_exports new file mode 100755 index 0000000..cce182b --- /dev/null +++ b/tests/sds-node-configurator-stress-tests/test_exports @@ -0,0 +1,20 @@ +#!/bin/bash + +# Required environment variables +export TEST_CLUSTER_CREATE_MODE='alwaysCreateNew' +export DKP_LICENSE_KEY='your-license-key-here' +export REGISTRY_DOCKER_CFG='your-docker-registry-cfg-here' +export SSH_USER='your-ssh-user' +export SSH_HOST='your-ssh-host' +export TEST_CLUSTER_STORAGE_CLASS='your-storage-class' +export KUBE_CONFIG_PATH='~/.kube/config' +export SSH_PASSPHRASE='' # Optional but required for non-interactive mode + +# Optional environment variables with defaults +export YAML_CONFIG_FILENAME='cluster_config.yml' +export SSH_PRIVATE_KEY='~/.ssh/id_rsa' +export SSH_PUBLIC_KEY='~/.ssh/id_rsa.pub' +export SSH_VM_USER='cloud' +export TEST_CLUSTER_NAMESPACE='e2e-test-cluster' +export TEST_CLUSTER_CLEANUP='false' # Set to 'true' to enable cleanup after tests +export LOG_LEVEL='debug' # Set to 'debug' for detailed logs, 'info' for normal logs