diff --git a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py index ed0eb5abdee..0a129c8eb46 100644 --- a/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py +++ b/src/main/python/systemds/scuro/drsearch/hyperparameter_tuner.py @@ -19,27 +19,85 @@ # # ------------------------------------------------------------- from typing import Dict, List, Tuple, Any, Optional -from skopt import gp_minimize from skopt.space import Real, Integer, Categorical -from skopt.utils import use_named_args -import json +import numpy as np import logging from dataclasses import dataclass import time import copy - +from joblib import Parallel, delayed +from skopt import Optimizer +from systemds.scuro.drsearch.representation_dag import ( + RepresentationDAGBuilder, +) from systemds.scuro.modality.modality import Modality +from systemds.scuro.drsearch.task import PerformanceMeasure +import pickle + + +def get_params_for_node(node_id, params): + return { + k.split("-")[-1]: v for k, v in params.items() if k.startswith(node_id + "-") + } @dataclass class HyperparamResult: - representation_name: str best_params: Dict[str, Any] best_score: float all_results: List[Tuple[Dict[str, Any], float]] tuning_time: float modality_id: int + task_name: str + dag: Any + mm_opt: bool = False + + +class HyperparamResults: + def __init__(self, tasks, modalities): + self.tasks = tasks + self.modalities = modalities + self.results = {} + for task in tasks: + self.results[task.model.name] = { + modality.modality_id: [] for modality in modalities + } + + def add_result(self, results): + # TODO: Check if order of best results matters (deterministic) + for result in results: + if result.mm_opt: + self.results[result.task_name]["mm_results"].append(result) + else: + self.results[result.task_name][result.modality_id].append(result) + + def setup_mm(self, optimize_unimodal): + if not optimize_unimodal: + self.results = {} + for task in self.tasks: + self.results[task.model.name] = {"mm_results": []} + + def get_k_best_results(self, modality, task, performance_metric_name): + results = self.results[task.model.name][modality.modality_id] + dags = [] + for result in results: + dag_with_best_params = RepresentationDAGBuilder() + prev_node_id = None + for node in result.dag.nodes: + if node.operation is not None and node.parameters: + params = get_params_for_node(node.node_id, result.best_params) + prev_node_id = dag_with_best_params.create_operation_node( + node.operation, [prev_node_id], params + ) + else: # it is a leaf node + prev_node_id = dag_with_best_params.create_leaf_node( + node.modality_id + ) + + dags.append(dag_with_best_params.build(prev_node_id)) + representations = [list(dag.execute([modality]).values())[-1] for dag in dags] + return results, representations class HyperparameterTuner: @@ -57,16 +115,17 @@ def __init__( debug: bool = False, ): self.tasks = tasks - self.optimization_results = optimization_results + self.unimodal_optimization_results = optimization_results + self.optimization_results = HyperparamResults(tasks, modalities) self.n_jobs = n_jobs self.scoring_metric = scoring_metric self.maximize_metric = maximize_metric self.save_results = save_results - self.results = {} self.k = k self.modalities = modalities self.representations = None self.k_best_cache = None + self.k_best_cache_by_modality = None self.k_best_representations = None self.extract_k_best_modalities_per_task() self.debug = debug @@ -96,43 +155,45 @@ def get_modality_by_id_and_instance_id(self, modality_id, instance_id): def extract_k_best_modalities_per_task(self): self.k_best_representations = {} self.k_best_cache = {} + self.k_best_cache_by_modality = {} representations = {} for task in self.tasks: self.k_best_representations[task.model.name] = [] self.k_best_cache[task.model.name] = [] + self.k_best_cache_by_modality[task.model.name] = {} representations[task.model.name] = {} for modality in self.modalities: k_best_results, cached_data = ( - self.optimization_results.get_k_best_results( + self.unimodal_optimization_results.get_k_best_results( modality, task, self.scoring_metric ) ) representations[task.model.name][modality.modality_id] = k_best_results + self.k_best_cache_by_modality[task.model.name][ + modality.modality_id + ] = cached_data self.k_best_representations[task.model.name].extend(k_best_results) self.k_best_cache[task.model.name].extend(cached_data) self.representations = representations def tune_unimodal_representations(self, max_eval_per_rep: Optional[int] = None): - results = {} for task in self.tasks: - results[task.model.name] = [] - for representation in self.k_best_representations[task.model.name]: - result = self.tune_dag_representation( - representation.dag, - representation.dag.root_node_id, - task, - max_eval_per_rep, + reps = self.k_best_representations[task.model.name] + self.optimization_results.add_result( + Parallel(n_jobs=self.n_jobs)( + delayed(self.tune_dag_representation)( + rep.dag, rep.dag.root_node_id, task, max_eval_per_rep + ) + for rep in reps ) - results[task.model.name].append(result) - - self.results = results + ) if self.save_results: self.save_tuning_results() - return results - - def tune_dag_representation(self, dag, root_node_id, task, max_evals=None): + def tune_dag_representation( + self, dag, root_node_id, task, max_evals=None, mm_opt=False + ): hyperparams = {} reps = [] modality_ids = [] @@ -149,7 +210,7 @@ def visit_node(node_id): visited.add(node_id) if node.operation is not None: if node.operation().parameters: - hyperparams.update(node.operation().parameters) + hyperparams[node_id] = node.operation().parameters reps.append(node.operation) node_order.append(node_id) if node.modality_id is not None: @@ -161,99 +222,136 @@ def visit_node(node_id): return None start_time = time.time() - rep_name = "_".join([rep.__name__ for rep in reps]) + rep_name = "-".join([rep.__name__ for rep in reps]) search_space = [] param_names = [] - for param_name, param_values in hyperparams.items(): - param_names.append(param_name) - if isinstance(param_values, list): - if all(isinstance(v, (int, float)) for v in param_values): - if all(isinstance(v, int) for v in param_values): + for op_id, op_params in hyperparams.items(): + for param_name, param_values in op_params.items(): + param_names.append(op_id + "-" + param_name) + if isinstance(param_values, list): + search_space.append( + Categorical(param_values, name=op_id + "-" + param_name) + ) + elif isinstance(param_values, tuple) and len(param_values) == 2: + if isinstance(param_values[0], int) and isinstance( + param_values[1], int + ): search_space.append( Integer( - min(param_values), max(param_values), name=param_name + param_values[0], + param_values[1], + name=op_id + "-" + param_name, ) ) else: search_space.append( - Real(min(param_values), max(param_values), name=param_name) + Real( + param_values[0], + param_values[1], + name=op_id + "-" + param_name, + ) ) else: - search_space.append(Categorical(param_values, name=param_name)) - elif isinstance(param_values, tuple) and len(param_values) == 2: - if isinstance(param_values[0], int) and isinstance( - param_values[1], int - ): search_space.append( - Integer(param_values[0], param_values[1], name=param_name) + Categorical([param_values], name=op_id + "-" + param_name) ) - else: - search_space.append( - Real(param_values[0], param_values[1], name=param_name) - ) - else: - search_space.append(Categorical([param_values], name=param_name)) n_calls = max_evals if max_evals else 50 all_results = [] - @use_named_args(search_space) - def objective(**params): + def evaluate_point(point): + params = dict(zip(param_names, point)) result = self.evaluate_dag_config( - dag, params, node_order, modality_ids, task + dag, + params, + node_order, + modality_ids, + task, + modalities_override=( + self._get_cached_modalities_for_task(task, modality_ids) + if mm_opt + else None + ), ) - all_results.append(result) - - score = result[1].average_scores[self.scoring_metric] + score = result[1] + if isinstance(score, PerformanceMeasure): + score = score.average_scores[self.scoring_metric] if self.maximize_metric: - return -score + objective_value = -score else: - return score + objective_value = score + return objective_value, result - result = gp_minimize( - objective, - search_space, - n_calls=n_calls, - random_state=42, - verbose=self.debug, - n_initial_points=min(10, n_calls // 2), + opt = Optimizer( + search_space, random_state=42, n_initial_points=min(10, n_calls // 2) ) - if self.maximize_metric: - best_params, best_score = max( - all_results, key=lambda x: x[1].average_scores[self.scoring_metric] + n_batch = min(abs(self.n_jobs), n_calls) if self.n_jobs != 0 else 1 + for _ in range(0, n_calls, n_batch): + points = opt.ask(n_points=n_batch) + results = Parallel(n_jobs=self.n_jobs)( + delayed(evaluate_point)(p) for p in points ) + objective_values = [result[0] for result in results] + all_results.extend(result[1] for result in results) + opt.tell(points, objective_values) + + def get_score(result): + score = result[1] + if isinstance(score, PerformanceMeasure): + return score.average_scores[self.scoring_metric] + return score + + if self.maximize_metric: + best_params, best_score = max(all_results, key=get_score) else: - best_params, best_score = min( - all_results, key=lambda x: x[1].average_scores[self.scoring_metric] - ) + best_params, best_score = min(all_results, key=get_score) tuning_time = time.time() - start_time - return HyperparamResult( + best_result = HyperparamResult( representation_name=rep_name, best_params=best_params, best_score=best_score, all_results=all_results, tuning_time=tuning_time, modality_id=modality_ids[0] if modality_ids else None, + task_name=task.model.name, + dag=dag, + mm_opt=mm_opt, ) - def evaluate_dag_config(self, dag, params, node_order, modality_ids, task): + return best_result + + def _get_cached_modalities_for_task(self, task, modality_ids): + if not self.k_best_cache_by_modality: + return self.get_modalities_by_id(modality_ids) + unique_modality_ids = list(dict.fromkeys(modality_ids)) + cached_modalities = [] + for modality_id in unique_modality_ids: + cached_modalities.extend( + self.k_best_cache_by_modality[task.model.name].get(modality_id, []) + ) + return cached_modalities + + def evaluate_dag_config( + self, dag, params, node_order, modality_ids, task, modalities_override=None + ): try: dag_copy = copy.deepcopy(dag) for node_id in node_order: node = dag_copy.get_node_by_id(node_id) if node.operation is not None and node.parameters: - node_params = { - k: v for k, v in params.items() if k in node.parameters - } - node.parameters = node_params + node.parameters = get_params_for_node(node_id, params) - modalities = self.get_modalities_by_id(modality_ids) + modalities = ( + modalities_override + if modalities_override is not None + else self.get_modalities_by_id(modality_ids) + ) modified_modality = dag_copy.execute(modalities, task) score = task.run( modified_modality[list(modified_modality.keys())[-1]].data @@ -262,7 +360,7 @@ def evaluate_dag_config(self, dag, params, node_order, modality_ids, task): return params, score except Exception as e: self.logger.error(f"Error evaluating DAG with params {params}: {e}") - return params, float("-inf") if self.maximize_metric else float("inf") + return params, np.nan def tune_multimodal_representations( self, @@ -271,14 +369,25 @@ def tune_multimodal_representations( optimize_unimodal: bool = True, max_eval_per_rep: Optional[int] = None, ): - results = {} + self.optimization_results.setup_mm(optimize_unimodal) for task in self.tasks: + + def _get_metric_value(result): + score = result.val_score + if isinstance(score, PerformanceMeasure): + score = score.average_scores + if isinstance(score, dict): + return score.get( + self.scoring_metric, + float("-inf") if self.maximize_metric else float("inf"), + ) + return score + best_results = sorted( optimization_results[task.model.name], - key=lambda x: x.val_score, - reverse=True, + key=_get_metric_value, + reverse=self.maximize_metric, )[:k] - results[task.model.name] = [] best_optimization_results = best_results for representation in best_optimization_results: @@ -314,32 +423,18 @@ def tune_multimodal_representations( representation.dag.root_node_id, task, max_eval_per_rep, + mm_opt=True, ) - results[task.model.name].append(result) - - self.results = results - + self.optimization_results.add_result([result]) if self.save_results: self.save_tuning_results() - return results - def save_tuning_results(self, filepath: str = None): if not filepath: filepath = f"hyperparameter_results_{int(time.time())}.json" - json_results = {} - for task in self.results.keys(): - for result in self.results[task]: - json_results[result.representation_name] = { - "best_params": result.best_params, - "best_score": result.best_score, - "tuning_time": result.tuning_time, - "num_evaluations": len(result.all_results), - } - - with open(filepath, "w") as f: - json.dump(json_results, f, indent=2) + with open(filepath, "wb") as f: + pickle.dump(self.optimization_results.results, f) if self.debug: self.logger.info(f"Results saved to {filepath}") diff --git a/src/main/python/systemds/scuro/representations/window_aggregation.py b/src/main/python/systemds/scuro/representations/window_aggregation.py index 4d4ec19c5b9..039387eb015 100644 --- a/src/main/python/systemds/scuro/representations/window_aggregation.py +++ b/src/main/python/systemds/scuro/representations/window_aggregation.py @@ -59,7 +59,9 @@ def aggregation_function(self, value): self._aggregation_function = Aggregation(value) -@register_context_operator([ModalityType.TIMESERIES, ModalityType.AUDIO]) +@register_context_operator( + [ModalityType.TIMESERIES, ModalityType.AUDIO, ModalityType.EMBEDDING] +) class WindowAggregation(Window): def __init__(self, aggregation_function="mean", window_size=10, pad=False): super().__init__("WindowAggregation", aggregation_function) @@ -167,7 +169,9 @@ def window_aggregate_nested_level(self, instance, new_length): return np.array(result) -@register_context_operator([ModalityType.TIMESERIES, ModalityType.AUDIO]) +@register_context_operator( + [ModalityType.TIMESERIES, ModalityType.AUDIO, ModalityType.EMBEDDING] +) class StaticWindow(Window): def __init__(self, aggregation_function="mean", num_windows=100): super().__init__("StaticWindow", aggregation_function) @@ -198,7 +202,9 @@ def execute(self, modality): return np.array(windowed_data) -@register_context_operator([ModalityType.TIMESERIES, ModalityType.AUDIO]) +@register_context_operator( + [ModalityType.TIMESERIES, ModalityType.AUDIO, ModalityType.EMBEDDING] +) class DynamicWindow(Window): def __init__(self, aggregation_function="mean", num_windows=100): super().__init__("DynamicWindow", aggregation_function) diff --git a/src/main/python/tests/scuro/data_generator.py b/src/main/python/tests/scuro/data_generator.py index ae78c50b8aa..cfa77b1dd69 100644 --- a/src/main/python/tests/scuro/data_generator.py +++ b/src/main/python/tests/scuro/data_generator.py @@ -62,6 +62,7 @@ def extract(self, file, indices): class ModalityRandomDataGenerator: def __init__(self): + np.random.seed(4) self.modality_id = 0 self.modality_type = None self.metadata = {} diff --git a/src/main/python/tests/scuro/test_hp_tuner.py b/src/main/python/tests/scuro/test_hp_tuner.py index 73c498e2360..de7c8f02174 100644 --- a/src/main/python/tests/scuro/test_hp_tuner.py +++ b/src/main/python/tests/scuro/test_hp_tuner.py @@ -78,31 +78,31 @@ def test_hp_tuner_for_audio_modality(self): self.run_hp_for_modality([audio]) - # def test_multimodal_hp_tuning(self): - # audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data( - # self.num_instances, 3000 - # ) - # audio = UnimodalModality( - # TestDataLoader( - # self.indices, None, ModalityType.AUDIO, audio_data, np.float32, audio_md - # ) - # ) - # - # text_data, text_md = ModalityRandomDataGenerator().create_text_data( - # self.num_instances - # ) - # text = UnimodalModality( - # TestDataLoader( - # self.indices, None, ModalityType.TEXT, text_data, str, text_md - # ) - # ) - # - # self.run_hp_for_modality( - # [audio, text], multimodal=True, tune_unimodal_representations=True - # ) - # self.run_hp_for_modality( - # [audio, text], multimodal=True, tune_unimodal_representations=False - # ) + def test_multimodal_hp_tuning(self): + audio_data, audio_md = ModalityRandomDataGenerator().create_audio_data( + self.num_instances, 3000 + ) + audio = UnimodalModality( + TestDataLoader( + self.indices, None, ModalityType.AUDIO, audio_data, np.float32, audio_md + ) + ) + + text_data, text_md = ModalityRandomDataGenerator().create_text_data( + self.num_instances + ) + text = UnimodalModality( + TestDataLoader( + self.indices, None, ModalityType.TEXT, text_data, str, text_md + ) + ) + + # self.run_hp_for_modality( + # [audio, text], multimodal=True, tune_unimodal_representations=True + # ) + self.run_hp_for_modality( + [audio, text], multimodal=True, tune_unimodal_representations=False + ) def test_hp_tuner_for_text_modality(self): text_data, text_md = ModalityRandomDataGenerator().create_text_data( @@ -130,7 +130,7 @@ def run_hp_for_modality( }, ): registry = Registry() - registry._fusion_operators = [Average, Concatenation, LSTM] + registry._fusion_operators = [LSTM] unimodal_optimizer = UnimodalOptimizer(modalities, self.tasks, False) unimodal_optimizer.optimize() @@ -159,8 +159,32 @@ def run_hp_for_modality( else: hp.tune_unimodal_representations(max_eval_per_rep=10) - assert len(hp.results) == len(self.tasks) - assert len(hp.results[self.tasks[0].model.name]) == 2 + assert len(hp.optimization_results.results) == len(self.tasks) + if multimodal: + if tune_unimodal_representations: + assert ( + len( + hp.optimization_results.results[self.tasks[0].model.name][0] + ) + == 1 + ) + else: + assert ( + len( + hp.optimization_results.results[self.tasks[0].model.name][ + "mm_results" + ] + ) + == 1 + ) + else: + assert ( + len(hp.optimization_results.results[self.tasks[0].model.name]) == 1 + ) + assert ( + len(hp.optimization_results.results[self.tasks[0].model.name][0]) + == 2 + ) if __name__ == "__main__":