diff --git a/e2e/stress_test/continuous_failover_ha_multi_client.py b/e2e/stress_test/continuous_failover_ha_multi_client.py index a2869482d..62cefb819 100644 --- a/e2e/stress_test/continuous_failover_ha_multi_client.py +++ b/e2e/stress_test/continuous_failover_ha_multi_client.py @@ -42,6 +42,7 @@ def __init__(self, **kwargs): self.sn_nodes = [] self.current_outage_node = None self.snapshot_names = [] + self.current_outage_nodes = [] self.disconnect_thread = None self.outage_start_time = None self.outage_end_time = None @@ -60,8 +61,7 @@ def __init__(self, **kwargs): # self.outage_types = ["graceful_shutdown", "container_stop", "interface_full_network_interrupt", # "interface_partial_network_interrupt", # "partial_nw"] - self.outage_types = ["graceful_shutdown", "container_stop", "interface_full_network_interrupt", - "interface_partial_network_interrupt"] + self.outage_types = ["graceful_shutdown", "container_stop", "interface_full_network_interrupt"] # self.outage_types = ["partial_nw"] self.blocked_ports = None self.outage_log_file = os.path.join("logs", f"outage_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") @@ -111,7 +111,26 @@ def create_lvols_with_fio(self, count): lvol_name = f"{self.lvol_name}_{i}" if not is_crypto else f"c{self.lvol_name}_{i}" self.logger.info(f"Creating lvol with Name: {lvol_name}, fs type: {fs_type}, crypto: {is_crypto}") try: - if self.current_outage_node: + self.logger.info(f"Current Outage Node: {self.current_outage_nodes}") + if self.current_outage_nodes: + self.logger.info(f"Primary vs secondary: {self.sn_primary_secondary_map}") + skip_nodes = [node for node in self.sn_primary_secondary_map if self.sn_primary_secondary_map[node] in self.current_outage_nodes] + self.logger.info(f"Skip Nodes: {skip_nodes}") + for node in self.current_outage_nodes: + skip_nodes.append(node) + self.logger.info(f"Skip Nodes: {skip_nodes}") + self.logger.info(f"Storage Nodes with sec: {self.sn_nodes_with_sec}") + host_id = [node for node in self.sn_nodes_with_sec if node not in skip_nodes] + self.sbcli_utils.add_lvol( + lvol_name=lvol_name, + pool_name=self.pool_name, + size=self.lvol_size, + crypto=is_crypto, + key1=self.lvol_crypt_keys[0], + key2=self.lvol_crypt_keys[1], + host_id=host_id[0] + ) + elif self.current_outage_node: skip_nodes = [node for node in self.sn_primary_secondary_map if self.sn_primary_secondary_map[node] == self.current_outage_node] skip_nodes.append(self.current_outage_node) skip_nodes.append(self.sn_primary_secondary_map[self.current_outage_node]) @@ -276,7 +295,7 @@ def create_lvols_with_fio(self, count): "iodepth": 1, "numjobs": 5, "time_based": True, - "runtime": 2000, + "runtime": 3000, "log_avg_msec": 1000, "iolog_file": self.lvol_mount_details[lvol_name]["iolog_base_path"], }, @@ -308,9 +327,9 @@ def perform_random_outage(self): sleep_n_sec(120) for node in self.sn_nodes_with_sec: - self.ssh_obj.dump_lvstore(node_ip=self.mgmt_nodes[0], - storage_node_id=node) - + # self.ssh_obj.dump_lvstore(node_ip=self.mgmt_nodes[0], + # storage_node_id=node) + self.logger.info(f"Skipping lvstore dump!!") for node in self.sn_nodes_with_sec: cur_node_details = self.sbcli_utils.get_storage_node_details(node) cur_node_ip = cur_node_details[0]["mgmt_ip"] @@ -417,7 +436,7 @@ def perform_random_outage(self): self.disconnect_thread = threading.Thread( target=self.ssh_obj.disconnect_all_active_interfaces, - args=(node_ip, active_interfaces, 600), + args=(node_ip, active_interfaces, 300), ) self.disconnect_thread.start() elif outage_type == "interface_partial_network_interrupt": @@ -430,7 +449,7 @@ def perform_random_outage(self): self.disconnect_thread = threading.Thread( target=self.ssh_obj.disconnect_all_active_interfaces, - args=(node_ip, active_interfaces, 600), + args=(node_ip, active_interfaces, 300), ) self.disconnect_thread.start() elif outage_type == "partial_nw": @@ -483,7 +502,7 @@ def perform_random_outage(self): return outage_type - def restart_nodes_after_failover(self, outage_type): + def restart_nodes_after_failover(self, outage_type, restart=False): """Perform steps for node restart.""" node_details = self.sbcli_utils.get_storage_node_details(self.current_outage_node) node_ip = node_details[0]["mgmt_ip"] @@ -543,14 +562,48 @@ def restart_nodes_after_failover(self, outage_type): self.ssh_obj.exec_command(node=self.lvol_mount_details[lvol]["Client"], command=connect) elif outage_type == "container_stop": - self.sbcli_utils.wait_for_storage_node_status(self.current_outage_node, "online", timeout=1000) - # Log the restart event - self.log_outage_event(self.current_outage_node, outage_type, "Node restarted", outage_time=1) + if restart: + max_retries = 10 + retry_delay = 10 # seconds + + # Retry mechanism for restarting the node + for attempt in range(max_retries): + try: + force=False + if attempt == max_retries - 1: + force=True + self.logger.info("[CHECK] Restarting Node via CLI with Force flag as via API Fails.") + else: + self.logger.info("[CHECK] Restarting Node via CLI as via API Fails.") + self.ssh_obj.restart_node(node=self.mgmt_nodes[0], + node_id=self.current_outage_node, + force=force) + # else: + # self.sbcli_utils.restart_node(node_uuid=self.current_outage_node, expected_error_code=[503]) + self.sbcli_utils.wait_for_storage_node_status(self.current_outage_node, "online", timeout=1000) + break # Exit loop if successful + except Exception as _: + if attempt < max_retries - 2: + self.logger.info(f"Attempt {attempt + 1} failed to restart node. Retrying in {retry_delay} seconds...") + sleep_n_sec(retry_delay) + elif attempt < max_retries - 1: + self.logger.info(f"Attempt {attempt + 1} failed to restart node via API. Retrying in {retry_delay} seconds via CMD...") + sleep_n_sec(retry_delay) + else: + self.logger.info("Max retries reached. Failed to restart node.") + raise # Rethrow the last exception + self.sbcli_utils.wait_for_storage_node_status(self.current_outage_node, "online", timeout=1000) + # Log the restart event + self.log_outage_event(self.current_outage_node, outage_type, "Node restarted", outage_time=0) + else: + self.sbcli_utils.wait_for_storage_node_status(self.current_outage_node, "online", timeout=1000) + # Log the restart event + self.log_outage_event(self.current_outage_node, outage_type, "Node restarted", outage_time=2) elif "network_interrupt" in outage_type: self.sbcli_utils.wait_for_storage_node_status(self.current_outage_node, "online", timeout=1000) # Log the restart event - self.log_outage_event(self.current_outage_node, outage_type, "Node restarted", outage_time=11) + self.log_outage_event(self.current_outage_node, outage_type, "Node restarted", outage_time=6) if not self.k8s_test: for node in self.storage_nodes: @@ -608,9 +661,9 @@ def restart_nodes_after_failover(self, outage_type): # sleep_n_sec(30) for node in self.sn_nodes_with_sec: - self.ssh_obj.dump_lvstore(node_ip=self.mgmt_nodes[0], - storage_node_id=node) - + # self.ssh_obj.dump_lvstore(node_ip=self.mgmt_nodes[0], + # storage_node_id=node) + self.logger.info(f"Skipping lvstore dump!!") def create_snapshots_and_clones(self): """Create snapshots and clones during an outage.""" @@ -777,7 +830,7 @@ def create_snapshots_and_clones(self): "iodepth": 1, "numjobs": 5, "time_based": True, - "runtime": 2000, + "runtime": 3000, "log_avg_msec": 1000, "iolog_file": self.clone_mount_details[clone_name]["iolog_base_path"], }, @@ -786,22 +839,23 @@ def create_snapshots_and_clones(self): self.fio_threads.append(fio_thread) self.logger.info(f"Created snapshot {snapshot_name} and clone {clone_name}.") - self.sbcli_utils.resize_lvol(lvol_id=self.lvol_mount_details[lvol]["ID"], - new_size=f"{self.int_lvol_size}G") + if self.lvol_mount_details[lvol]["ID"]: + self.sbcli_utils.resize_lvol(lvol_id=self.lvol_mount_details[lvol]["ID"], + new_size=f"{self.int_lvol_size}G") sleep_n_sec(10) - self.sbcli_utils.resize_lvol(lvol_id=self.clone_mount_details[clone_name]["ID"], - new_size=f"{self.int_lvol_size}G") - + if self.clone_mount_details[clone_name]["ID"]: + self.sbcli_utils.resize_lvol(lvol_id=self.clone_mount_details[clone_name]["ID"], + new_size=f"{self.int_lvol_size}G") + def delete_random_lvols(self, count): """Delete random lvols during an outage.""" skip_nodes = [node for node in self.sn_primary_secondary_map if self.sn_primary_secondary_map[node] == self.current_outage_node] skip_nodes.append(self.current_outage_node) skip_nodes.append(self.sn_primary_secondary_map[self.current_outage_node]) - skip_nodes_lvol = [] - self.logger.info(f"Skipping Nodes: {skip_nodes_lvol}") + self.logger.info(f"Skipping Nodes: {skip_nodes}") available_lvols = [ - lvol for node, lvols in self.node_vs_lvol.items() if node not in skip_nodes_lvol for lvol in lvols + lvol for node, lvols in self.node_vs_lvol.items() if node not in skip_nodes for lvol in lvols ] self.logger.info(f"Available Lvols: {available_lvols}") if len(available_lvols) < count: @@ -922,7 +976,7 @@ def perform_failover_during_outage(self): storage_node_id=node, logs_path=self.docker_logs_path ) - self.create_lvols_with_fio(3) + self.create_lvols_with_fio(5) if not self.k8s_test: for node in self.storage_nodes: self.ssh_obj.restart_docker_logging( @@ -1041,7 +1095,7 @@ def restart_fio(self, iteration): "iodepth": 1, "numjobs": 5, "time_based": True, - "runtime": 2000, + "runtime": 3000, "log_avg_msec": 1000, "iolog_file": self.lvol_mount_details[lvol]["iolog_base_path"], }, @@ -1150,7 +1204,7 @@ def run(self): storage_node_id=node, logs_path=self.docker_logs_path ) - self.create_lvols_with_fio(5) + self.create_lvols_with_fio(3) if not self.k8s_test: for node in self.storage_nodes: self.ssh_obj.restart_docker_logging( @@ -1175,7 +1229,7 @@ def run(self): else: self.logger.info(f"Current outage node: {self.current_outage_node} is secondary node. Skipping delete and create") if outage_type != "partial_nw" or outage_type != "partial_nw_single_port": - sleep_n_sec(280) + sleep_n_sec(100) for node in self.sn_nodes_with_sec: cur_node_details = self.sbcli_utils.get_storage_node_details(node) cur_node_ip = cur_node_details[0]["mgmt_ip"] @@ -1229,7 +1283,7 @@ def run(self): # Perform failover and manage resources during outage outage_type = self.perform_failover_during_outage() if outage_type != "partial_nw" or outage_type != "partial_nw_single_port": - sleep_n_sec(100) + sleep_n_sec(280) time_duration = self.common_utils.calculate_time_duration( start_timestamp=self.outage_start_time, end_timestamp=self.outage_end_time diff --git a/e2e/stress_test/continuous_failover_ha_multi_outage.py b/e2e/stress_test/continuous_failover_ha_multi_outage.py old mode 100644 new mode 100755 index fb5f6d507..dd10cfb52 --- a/e2e/stress_test/continuous_failover_ha_multi_outage.py +++ b/e2e/stress_test/continuous_failover_ha_multi_outage.py @@ -1,5 +1,6 @@ from utils.common_utils import sleep_n_sec from datetime import datetime +from collections import defaultdict from stress_test.continuous_failover_ha_multi_client import RandomMultiClientFailoverTest from exceptions.custom_exception import LvolNotConnectException import threading @@ -8,13 +9,20 @@ import os +generated_sequences = set() + def generate_random_sequence(length): letters = string.ascii_uppercase numbers = string.digits all_chars = letters + numbers - first_char = random.choice(letters) - remaining_chars = ''.join(random.choices(all_chars, k=length - 1)) - return first_char + remaining_chars + + while True: + first_char = random.choice(letters) + remaining_chars = ''.join(random.choices(all_chars, k=length-1)) + result = first_char + remaining_chars + if result not in generated_sequences: + generated_sequences.add(result) + return result class RandomMultiClientMultiFailoverTest(RandomMultiClientFailoverTest): @@ -25,7 +33,7 @@ class RandomMultiClientMultiFailoverTest(RandomMultiClientFailoverTest): def __init__(self, **kwargs): super().__init__(**kwargs) - self.total_lvols = 20 + self.total_lvols = 40 self.lvol_name = f"lvl{generate_random_sequence(15)}" self.clone_name = f"cln{generate_random_sequence(15)}" self.snapshot_name = f"snap{generate_random_sequence(15)}" @@ -49,9 +57,7 @@ def __init__(self, **kwargs): self.test_name = "n_plus_k_failover_multi_client_ha" self.outage_types = [ "container_stop", - "graceful_shutdown", - "interface_partial_network_interrupt", - "interface_full_network_interrupt" + "graceful_shutdown" ] self.blocked_ports = None self.outage_log_file = os.path.join("logs", f"outage_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") @@ -61,22 +67,81 @@ def _initialize_outage_log(self): with open(self.outage_log_file, 'w') as log: log.write("Timestamp,Node,Outage_Type,Event\n") - def log_outage_event(self, node, outage_type, event): - timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + def log_outage_event(self, node, outage_type, event, outage_time=0): + """Log an outage event to the outage log file. + + Args: + node (str): Node UUID or IP where the event occurred. + outage_type (str): Type of outage (e.g., port_network_interrupt, container_stop, graceful_shutdown). + event (str): Event description (e.g., 'Outage started', 'Node restarted'). + outage_time (int): Minutes to add to self.outage_start_time. If 0/None, use current time. + """ + # Compute timestamp + if outage_time: + # Uses self.outage_start_time (epoch seconds) + outage_time (minutes) + base_epoch = getattr(self, "outage_start_time", None) + if isinstance(base_epoch, (int, float)) and base_epoch > 0: + ts_dt = datetime.fromtimestamp(int(base_epoch) + int(outage_time) * 60) + else: + # Fallback to now if outage_start_time is missing/invalid + ts_dt = datetime.now() + else: + ts_dt = datetime.now() + + timestamp = ts_dt.strftime('%Y-%m-%d %H:%M:%S') + + # Write the log line with open(self.outage_log_file, 'a') as log: log.write(f"{timestamp},{node},{outage_type},{event}\n") + def _build_reverse_secondary_map(self): + rev = defaultdict(set) # secondary -> {primary,...} + for p, s in self.sn_primary_secondary_map.items(): + if s: + rev[s].add(p) + return rev + + def _pick_outage_nodes(self, primary_candidates, k): + rev = self._build_reverse_secondary_map() + order = primary_candidates[:] # or random.shuffle(order) for randomness + + chosen, blocked = [], set() + for node in order: + if node in blocked: + continue + + chosen.append(node) + blocked.add(node) # itself + sec = self.sn_primary_secondary_map.get(node) + if sec: + blocked.add(sec) # its secondary + blocked.update(rev.get(node, ())) # any primary whose secondary == node + + if len(chosen) == k: + break + + if len(chosen) < k: + raise Exception( + f"Cannot pick {k} nodes without primary/secondary conflicts; only {len(chosen)} possible with current topology." + ) + return chosen + def perform_n_plus_k_outages(self): """ - Perform K (self.npcs) parallel outages as part of N+K configuration. - Ensure only primary nodes are selected for outage. + Select K outage nodes such that no two are in a primary/secondary + relationship (in either direction). Candidates = keys of the map. """ - primary_nodes = [node for node in self.sn_nodes if not self.sbcli_utils.is_secondary_node(node)] + # Candidates are nodes that are primary *for someone* (map keys) + primary_candidates = list(self.sn_primary_secondary_map.keys()) + self.current_outage_nodes = [] - if len(primary_nodes) < self.npcs: - raise Exception(f"Not enough primary nodes to perform {self.npcs} outages. Found only {len(primary_nodes)}.") + if len(primary_candidates) < self.npcs: + raise Exception( + f"Need {self.npcs} outage nodes, but only {len(primary_candidates)} primary-role nodes exist." + ) - outage_nodes = random.sample(primary_nodes, k=self.npcs) + outage_nodes = self._pick_outage_nodes(primary_candidates, self.npcs) + self.logger.info(f"Selected outage nodes: {outage_nodes}") outage_combinations = [] for node in outage_nodes: @@ -85,6 +150,12 @@ def perform_n_plus_k_outages(self): node_ip = node_details[0]["mgmt_ip"] node_rpc_port = node_details[0]["rpc_port"] + self.ssh_obj.fetch_distrib_logs( + storage_node_ip=node_ip, + storage_node_id=node, + logs_path=self.docker_logs_path + ) + self.logger.info(f"Performing {outage_type} on primary node {node}.") self.log_outage_event(node, outage_type, "Outage started") @@ -105,26 +176,74 @@ def perform_n_plus_k_outages(self): def _graceful_shutdown_node(self, node): try: - self.sbcli_utils.suspend_node(node_uuid=node, expected_error_code=[503]) - self.sbcli_utils.wait_for_storage_node_status(node, "suspended", timeout=1000) - self.sbcli_utils.shutdown_node(node_uuid=node, expected_error_code=[503]) - self.sbcli_utils.wait_for_storage_node_status(node, "offline", timeout=1000) + sleep_n_sec(10) + max_retries = 10 + retry_delay = 10 # seconds + # Retry mechanism for suspending the node + for attempt in range(max_retries): + try: + if attempt == max_retries - 1: + self.logger.info("[CHECK] Suspending Node via CLI as via API Fails.") + self.ssh_obj.suspend_node(node=self.mgmt_nodes[0], + node_id=node) + else: + self.sbcli_utils.suspend_node(node_uuid=node, expected_error_code=[503]) + self.sbcli_utils.wait_for_storage_node_status(node, "suspended", timeout=1000) + break # Exit loop if successful + except Exception as _: + if attempt < max_retries - 2: + self.logger.info(f"Attempt {attempt + 1} failed to suspend node. Retrying in {retry_delay} seconds...") + sleep_n_sec(retry_delay) + elif attempt < max_retries - 1: + self.logger.info(f"Attempt {attempt + 1} failed to suspend node via API. Retrying in {retry_delay} seconds via CMD...") + sleep_n_sec(retry_delay) + else: + self.logger.info("Max retries reached. Failed to suspend node.") + raise # Rethrow the last exception + + sleep_n_sec(10) # Wait before shutting down + + # Retry mechanism for shutting down the node + for attempt in range(max_retries): + try: + if attempt == max_retries - 1: + self.logger.info("[CHECK] Shutting down Node via CLI as via API Fails.") + self.ssh_obj.shutdown_node(node=self.mgmt_nodes[0], + node_id=node, + force=True) + else: + self.sbcli_utils.shutdown_node(node_uuid=node, force=True, + expected_error_code=[503]) + self.sbcli_utils.wait_for_storage_node_status(node, "offline", timeout=1000) + break # Exit loop if successful + except Exception as _: + if attempt < max_retries - 2: + self.logger.info(f"Attempt {attempt + 1} failed to shutdown node. Retrying in {retry_delay} seconds...") + sleep_n_sec(retry_delay) + elif attempt < max_retries - 1: + self.logger.info(f"Attempt {attempt + 1} failed to shutdown node via API. Retrying in {retry_delay} seconds via CMD...") + sleep_n_sec(retry_delay) + else: + self.logger.info("Max retries reached. Failed to shutdown node.") + raise # Rethrow the last exception except Exception as e: self.logger.error(f"Failed graceful shutdown for node {node}: {str(e)}") def _disconnect_partial_interface(self, node, node_ip): active_interfaces = [nic["if_name"] for nic in self.sbcli_utils.get_storage_node_details(node)[0]["data_nics"]] + active_interfaces = ['eth1'] self.disconnect_thread = threading.Thread( target=self.ssh_obj.disconnect_all_active_interfaces, - args=(node_ip, active_interfaces, 600) + args=(node_ip, active_interfaces, 300) ) self.disconnect_thread.start() def _disconnect_full_interface(self, node, node_ip): + self.logger.info("Handling full interface based network interruption...") active_interfaces = self.ssh_obj.get_active_interfaces(node_ip) self.disconnect_thread = threading.Thread( target=self.ssh_obj.disconnect_all_active_interfaces, - args=(node_ip, active_interfaces, 600) + args=(node_ip, active_interfaces, 300) ) self.disconnect_thread.start() @@ -134,50 +253,81 @@ def delete_random_lvols(self, count): lvol for node, lvols in self.node_vs_lvol.items() if node not in self.current_outage_nodes for lvol in lvols ] + + self.logger.info(f"Available Lvols: {available_lvols}") if len(available_lvols) < count: self.logger.warning("Not enough lvols available to delete the requested count.") count = len(available_lvols) for lvol in random.sample(available_lvols, count): - self.logger.info(f"Deleting lvol {lvol}") + self.logger.info(f"Deleting lvol {lvol}.") snapshots = self.lvol_mount_details[lvol]["snapshots"] to_delete = [] - - # Handle dependent clones for clone_name, clone_details in self.clone_mount_details.items(): if clone_details["snapshot"] in snapshots: - self.common_utils.validate_fio_test(clone_details["Client"], clone_details["Log"]) + self.common_utils.validate_fio_test(clone_details["Client"], + log_file=clone_details["Log"]) self.ssh_obj.find_process_name(clone_details["Client"], f"{clone_name}_fio", return_pid=False) fio_pids = self.ssh_obj.find_process_name(clone_details["Client"], f"{clone_name}_fio", return_pid=True) + sleep_n_sec(10) for pid in fio_pids: self.ssh_obj.kill_processes(clone_details["Client"], pid=pid) + attempt = 1 + while len(fio_pids) > 2: + self.ssh_obj.find_process_name(clone_details["Client"], f"{clone_name}_fio", return_pid=False) + fio_pids = self.ssh_obj.find_process_name(clone_details["Client"], f"{clone_name}_fio", return_pid=True) + if attempt >= 30: + raise Exception("FIO not killed on clone") + attempt += 1 + sleep_n_sec(20) + + sleep_n_sec(10) self.ssh_obj.unmount_path(clone_details["Client"], f"/mnt/{clone_name}") self.ssh_obj.remove_dir(clone_details["Client"], dir_path=f"/mnt/{clone_name}") self.disconnect_lvol(clone_details['ID']) - self.sbcli_utils.delete_lvol(clone_name) + self.sbcli_utils.delete_lvol(clone_name, max_attempt=20, skip_error=True) + sleep_n_sec(30) if clone_name in self.lvols_without_sec_connect: self.lvols_without_sec_connect.remove(clone_name) to_delete.append(clone_name) - + self.ssh_obj.delete_files(clone_details["Client"], [f"{self.log_path}/local-{clone_name}_fio*"]) + self.ssh_obj.delete_files(clone_details["Client"], [f"{self.log_path}/{clone_name}_fio_iolog*"]) + self.ssh_obj.delete_files(clone_details["Client"], [f"/mnt/{clone_name}/*"]) + # self.ssh_obj.delete_files(clone_details["Client"], [f"{self.log_path}/{clone_name}*.log"]) for del_key in to_delete: del self.clone_mount_details[del_key] - - # Delete snapshots for snapshot in snapshots: snapshot_id = self.ssh_obj.get_snapshot_id(self.mgmt_nodes[0], snapshot) + # snapshot_node = self.snap_vs_node[snapshot] + # if snapshot_node not in skip_nodes: self.ssh_obj.delete_snapshot(self.mgmt_nodes[0], snapshot_id=snapshot_id) self.snapshot_names.remove(snapshot) - # Stop FIO and cleanup lvol - self.common_utils.validate_fio_test(self.lvol_mount_details[lvol]["Client"], self.lvol_mount_details[lvol]["Log"]) + self.common_utils.validate_fio_test(self.lvol_mount_details[lvol]["Client"], + log_file=self.lvol_mount_details[lvol]["Log"]) self.ssh_obj.find_process_name(self.lvol_mount_details[lvol]["Client"], f"{lvol}_fio", return_pid=False) + sleep_n_sec(10) fio_pids = self.ssh_obj.find_process_name(self.lvol_mount_details[lvol]["Client"], f"{lvol}_fio", return_pid=True) for pid in fio_pids: self.ssh_obj.kill_processes(self.lvol_mount_details[lvol]["Client"], pid=pid) + attempt = 1 + while len(fio_pids) > 2: + self.ssh_obj.find_process_name(self.lvol_mount_details[lvol]["Client"], f"{lvol}_fio", return_pid=False) + fio_pids = self.ssh_obj.find_process_name(self.lvol_mount_details[lvol]["Client"], f"{lvol}_fio", return_pid=True) + if attempt >= 30: + raise Exception("FIO not killed on lvols") + attempt += 1 + sleep_n_sec(20) + + sleep_n_sec(10) self.ssh_obj.unmount_path(self.lvol_mount_details[lvol]["Client"], f"/mnt/{lvol}") self.ssh_obj.remove_dir(self.lvol_mount_details[lvol]["Client"], dir_path=f"/mnt/{lvol}") self.disconnect_lvol(self.lvol_mount_details[lvol]['ID']) - self.sbcli_utils.delete_lvol(lvol) + self.sbcli_utils.delete_lvol(lvol, max_attempt=20, skip_error=True) + self.ssh_obj.delete_files(self.lvol_mount_details[lvol]["Client"], [f"{self.log_path}/local-{lvol}_fio*"]) + self.ssh_obj.delete_files(self.lvol_mount_details[lvol]["Client"], [f"{self.log_path}/{lvol}_fio_iolog*"]) + self.ssh_obj.delete_files(self.lvol_mount_details[lvol]["Client"], [f"/mnt/{lvol}/*"]) + # self.ssh_obj.delete_files(self.lvol_mount_details[lvol]["Client"], [f"{self.log_path}/{lvol}*.log"]) if lvol in self.lvols_without_sec_connect: self.lvols_without_sec_connect.remove(lvol) del self.lvol_mount_details[lvol] @@ -190,14 +340,19 @@ def delete_random_lvols(self, count): def create_snapshots_and_clones(self): """Create snapshots and clones during an outage, avoiding lvols on outage nodes.""" self.int_lvol_size += 1 + skip_nodes = [node for node in self.sn_primary_secondary_map if self.sn_primary_secondary_map[node] in self.current_outage_nodes] + self.logger.info(f"Skip Nodes: {skip_nodes}") + for node in self.current_outage_nodes: + skip_nodes.append(node) + self.logger.info(f"Skip Nodes: {skip_nodes}") available_lvols = [ lvol for node, lvols in self.node_vs_lvol.items() - if node not in self.current_outage_nodes for lvol in lvols + if node not in skip_nodes for lvol in lvols ] if not available_lvols: self.logger.warning("No available lvols to create snapshots and clones.") return - + self.logger.info(f"Available lvols: {available_lvols}") for _ in range(3): random.shuffle(available_lvols) lvol = available_lvols[0] @@ -205,69 +360,140 @@ def create_snapshots_and_clones(self): temp_name = generate_random_sequence(5) if snapshot_name in self.snapshot_names: snapshot_name = f"{snapshot_name}_{temp_name}" - try: output, error = self.ssh_obj.add_snapshot(self.mgmt_nodes[0], self.lvol_mount_details[lvol]["ID"], snapshot_name) - if "(False," in output or "(False," in error: - raise Exception(output or error) + if "(False," in output: + raise Exception(output) + if "(False," in error: + raise Exception(error) except Exception as e: - self.logger.warning(f"Snapshot creation failed: {e}") - continue - + self.logger.warning(f"Snap creation fails with {str(e)}. Retrying with different name.") + try: + snapshot_name = f"snap_{lvol}" + temp_name = generate_random_sequence(5) + snapshot_name = f"{snapshot_name}_{temp_name}" + self.ssh_obj.add_snapshot(self.mgmt_nodes[0], self.lvol_mount_details[lvol]["ID"], snapshot_name) + except Exception as exp: + self.logger.warning(f"Retry Snap creation fails with {str(exp)}.") + continue + self.snapshot_names.append(snapshot_name) + lvol_node_id = self.sbcli_utils.get_lvol_details( + lvol_id=self.lvol_mount_details[lvol]["ID"])[0]["node_id"] + self.snap_vs_node[snapshot_name] = lvol_node_id self.lvol_mount_details[lvol]["snapshots"].append(snapshot_name) - clone_name = f"clone_{generate_random_sequence(15)}" + if clone_name in list(self.clone_mount_details): + clone_name = f"{clone_name}_{temp_name}" sleep_n_sec(30) snapshot_id = self.ssh_obj.get_snapshot_id(self.mgmt_nodes[0], snapshot_name) try: self.ssh_obj.add_clone(self.mgmt_nodes[0], snapshot_id, clone_name) except Exception as e: - self.logger.warning(f"Clone creation failed: {e}") - continue - + self.logger.warning(f"Clone creation fails with {str(e)}. Retrying with different name.") + try: + clone_name = f"clone_{generate_random_sequence(15)}" + temp_name = generate_random_sequence(5) + clone_name = f"{clone_name}_{temp_name}" + self.ssh_obj.add_clone(self.mgmt_nodes[0], snapshot_id, clone_name) + except Exception as exp: + self.logger.warning(f"Retry Clone creation fails with {str(exp)}.") + continue fs_type = self.lvol_mount_details[lvol]["FS"] client = self.lvol_mount_details[lvol]["Client"] self.clone_mount_details[clone_name] = { - "ID": self.sbcli_utils.get_lvol_id(clone_name), - "Command": None, - "Mount": None, - "Device": None, - "MD5": None, - "FS": fs_type, - "Log": f"{self.log_path}/{clone_name}.log", - "snapshot": snapshot_name, - "Client": client + "ID": self.sbcli_utils.get_lvol_id(clone_name), + "Command": None, + "Mount": None, + "Device": None, + "MD5": None, + "FS": fs_type, + "Log": f"{self.log_path}/{clone_name}.log", + "snapshot": snapshot_name, + "Client": client, + "iolog_base_path": f"{self.log_path}/{clone_name}_fio_iolog" } + self.logger.info(f"Created clone {clone_name}.") + + sleep_n_sec(3) + + self.ssh_obj.exec_command(node=self.mgmt_nodes[0], + command=f"{self.base_cmd} lvol list") + connect_ls = self.sbcli_utils.get_lvol_connect_str(lvol_name=clone_name) self.clone_mount_details[clone_name]["Command"] = connect_ls + + # if self.secondary_outage: + # connect_ls = [connect_ls[0]] + # self.lvols_without_sec_connect.append(clone_name) + initial_devices = self.ssh_obj.get_devices(node=client) for connect_str in connect_ls: _, error = self.ssh_obj.exec_command(node=client, command=connect_str) if error: - self.logger.warning(f"Clone connect failed: {error}") + lvol_details = self.sbcli_utils.get_lvol_details(lvol_id=self.clone_mount_details[clone_name]["ID"]) + nqn = lvol_details[0]["nqn"] + self.ssh_obj.disconnect_nvme(node=client, nqn_grep=nqn) + self.logger.info(f"Connecting clone {clone_name} has error: {error}. Disconnect all connections for that clone!!") + self.sbcli_utils.delete_lvol(lvol_name=clone_name, max_attempt=20, skip_error=True) + sleep_n_sec(30) + del self.clone_mount_details[clone_name] continue + sleep_n_sec(3) final_devices = self.ssh_obj.get_devices(node=client) - lvol_device = next((f"/dev/{d.strip()}" for d in final_devices if d not in initial_devices), None) + lvol_device = None + for device in final_devices: + if device not in initial_devices: + lvol_device = f"/dev/{device.strip()}" + break if not lvol_device: - raise LvolNotConnectException("Clone device not found") + raise LvolNotConnectException("LVOL did not connect") self.clone_mount_details[clone_name]["Device"] = lvol_device + # Mount and Run FIO if fs_type == "xfs": self.ssh_obj.clone_mount_gen_uuid(client, lvol_device) - mount_point = f"{self.mount_path}/{clone_name}" self.ssh_obj.mount_path(node=client, device=lvol_device, mount_path=mount_point) self.clone_mount_details[clone_name]["Mount"] = mount_point + # clone_node_id = self.sbcli_utils.get_lvol_details( + # lvol_id=self.lvol_mount_details[clone_name]["ID"])[0]["node_id"] + + # self.node_vs_lvol[clone_node_id].append(clone_name) + + sleep_n_sec(10) + self.ssh_obj.delete_files(client, [f"{mount_point}/*fio*"]) self.ssh_obj.delete_files(client, [f"{self.log_path}/local-{clone_name}_fio*"]) - + self.ssh_obj.delete_files(client, [f"{self.log_path}/{clone_name}_fio_iolog*"]) + + sleep_n_sec(5) + + # Start FIO + # fio_thread = threading.Thread( + # target=self.ssh_obj.run_fio_test, + # args=(client, None, self.clone_mount_details[clone_name]["Mount"], self.clone_mount_details[clone_name]["Log"]), + # kwargs={ + # "size": self.fio_size, + # "name": f"{clone_name}_fio", + # "rw": "randrw", + # "bs": f"{2 ** random.randint(2, 7)}K", + # "nrfiles": 16, + # "iodepth": 1, + # "numjobs": 5, + # "time_based": True, + # "runtime": 2000, + # "log_avg_msec": 1000, + # "iolog_file": self.clone_mount_details[clone_name]["iolog_base_path"], + # "debug": True, + # }, + # ) fio_thread = threading.Thread( target=self.ssh_obj.run_fio_test, - args=(client, None, mount_point, self.clone_mount_details[clone_name]["Log"]), + args=(client, None, self.clone_mount_details[clone_name]["Mount"], self.clone_mount_details[clone_name]["Log"]), kwargs={ "size": self.fio_size, "name": f"{clone_name}_fio", @@ -278,15 +504,21 @@ def create_snapshots_and_clones(self): "numjobs": 5, "time_based": True, "runtime": 2000, + "log_avg_msec": 1000, + "iolog_file": self.clone_mount_details[clone_name]["iolog_base_path"], }, ) fio_thread.start() self.fio_threads.append(fio_thread) + self.logger.info(f"Created snapshot {snapshot_name} and clone {clone_name}.") - self.logger.info(f"Created snapshot {snapshot_name} and clone {clone_name}") - self.sbcli_utils.resize_lvol(self.lvol_mount_details[lvol]["ID"], f"{self.int_lvol_size}G") + if self.lvol_mount_details[lvol]["ID"]: + self.sbcli_utils.resize_lvol(lvol_id=self.lvol_mount_details[lvol]["ID"], + new_size=f"{self.int_lvol_size}G") sleep_n_sec(10) - self.sbcli_utils.resize_lvol(self.clone_mount_details[clone_name]["ID"], f"{self.int_lvol_size}G") + if self.clone_mount_details[clone_name]["ID"]: + self.sbcli_utils.resize_lvol(lvol_id=self.clone_mount_details[clone_name]["ID"], + new_size=f"{self.int_lvol_size}G") def run(self): @@ -301,6 +533,8 @@ def run(self): for result in storage_nodes['results']: self.sn_nodes.append(result["uuid"]) self.sn_nodes_with_sec.append(result["uuid"]) + self.sn_primary_secondary_map[result["uuid"]] = result["secondary_node_id"] + self.logger.info(f"Secondary node map: {self.sn_primary_secondary_map}") sleep_n_sec(30) @@ -320,11 +554,23 @@ def run(self): for node, outage_type in outage_events: self.current_outage_node = node - self.restart_nodes_after_failover(outage_type) + if outage_type == "container_stop" and self.npcs > 1: + self.restart_nodes_after_failover(outage_type, True) + else: + self.restart_nodes_after_failover(outage_type) self.logger.info("Waiting for fallback recovery.") sleep_n_sec(100) + for node in self.sn_nodes_with_sec: + cur_node_details = self.sbcli_utils.get_storage_node_details(node) + cur_node_ip = cur_node_details[0]["mgmt_ip"] + self.ssh_obj.fetch_distrib_logs( + storage_node_ip=cur_node_ip, + storage_node_id=node, + logs_path=self.docker_logs_path + ) + time_duration = self.common_utils.calculate_time_duration( start_timestamp=self.outage_start_time, end_timestamp=self.outage_end_time @@ -343,12 +589,27 @@ def run(self): # for node, outage_type in outage_events: # if not self.sbcli_utils.is_secondary_node(node): self.validate_migration_for_node(self.outage_start_time, 2000, None, 60, no_task_ok=no_task_ok) + self.common_utils.manage_fio_threads(self.fio_node, self.fio_threads, timeout=20000) for clone, clone_details in self.clone_mount_details.items(): self.common_utils.validate_fio_test(clone_details["Client"], clone_details["Log"]) + self.ssh_obj.delete_files(clone_details["Client"], [f"{self.log_path}/local-{clone}_fio*"]) + self.ssh_obj.delete_files(clone_details["Client"], [f"{self.log_path}/{clone}_fio_iolog*"]) for lvol, lvol_details in self.lvol_mount_details.items(): self.common_utils.validate_fio_test(lvol_details["Client"], lvol_details["Log"]) + self.ssh_obj.delete_files(lvol_details["Client"], [f"{self.log_path}/local-{lvol}_fio*"]) + self.ssh_obj.delete_files(lvol_details["Client"], [f"{self.log_path}/{lvol}_fio_iolog*"]) self.logger.info(f"N+K failover iteration {iteration} complete.") + + for node in self.sn_nodes_with_sec: + cur_node_details = self.sbcli_utils.get_storage_node_details(node) + cur_node_ip = cur_node_details[0]["mgmt_ip"] + self.ssh_obj.fetch_distrib_logs( + storage_node_ip=cur_node_ip, + storage_node_id=node, + logs_path=self.docker_logs_path + ) iteration += 1 + diff --git a/e2e/utils/ssh_utils.py b/e2e/utils/ssh_utils.py index bd06f06f7..48f56dfc3 100644 --- a/e2e/utils/ssh_utils.py +++ b/e2e/utils/ssh_utils.py @@ -401,7 +401,7 @@ def run_fio_test(self, node, device=None, directory=None, log_file=None, **kwarg command = ( f"sudo fio --name={name} {location} --ioengine={ioengine} --direct=1 --iodepth={iodepth} " - f"{time_based} --runtime={runtime} --rw={rw} --max_latency=30s --bs={bs} --size={size} --rwmixread={rwmixread} " + f"{time_based} --runtime={runtime} --rw={rw} --max_latency=20s --bs={bs} --size={size} --rwmixread={rwmixread} " f"{verify_md5} --verify_dump=1 --verify_fatal=1 --numjobs={numjobs} --nrfiles={nrfiles} " f"{log_avg_msec_opt} {iolog_opt} " f"{output_format}{output_file}" @@ -708,6 +708,25 @@ def get_snapshot_id(self, node, snapshot_name): def add_snapshot(self, node, lvol_id, snapshot_name): cmd = f"{self.base_cmd} -d snapshot add {lvol_id} {snapshot_name}" output, error = self.exec_command(node=node, command=cmd) + + # ---- Wait up to 10 minutes for the snapshot to appear ---- + start = time.time() + deadline = start + 600 # 10 minutes + wait_interval = 10 # seconds between checks + snapshot_id = "" + + while time.time() < deadline: + snapshot_id = self.get_snapshot_id(node=node, snapshot_name=snapshot_name) + if snapshot_id: + if hasattr(self, "logger"): + self.logger.info(f"Snapshot '{snapshot_name}' is visible with ID: {snapshot_id}") + break + time.sleep(wait_interval) + + if not snapshot_id: + if hasattr(self, "logger"): + self.logger.error(f"Timed out waiting for snapshot '{snapshot_name}' to appear within 10 minutes.") + return output, error def add_clone(self, node, snapshot_id, clone_name): @@ -1435,6 +1454,7 @@ def fetch_distrib_logs(self, storage_node_ip, storage_node_id, logs_path): find_container_cmd = "sudo docker ps --format '{{.Names}}' | grep -E '^spdk_[0-9]+$'" container_name_output, _ = self.exec_command(storage_node_ip, find_container_cmd) container_name = container_name_output.strip() + container_name = False if not container_name: self.logger.warning(f"No SPDK container found on {storage_node_ip}")