From 0685e0e646002ecef19d2be049d455ac31e82102 Mon Sep 17 00:00:00 2001 From: yashikavishwakarma Date: Wed, 18 Mar 2026 20:23:44 +0530 Subject: [PATCH 1/2] Reduce memory usage in _run_task_get_arffcontent by streaming fold results and add tqdm progress bar --- openml/runs/functions.py | 404 +++++++++++++++++++++++++-------------- 1 file changed, 256 insertions(+), 148 deletions(-) diff --git a/openml/runs/functions.py b/openml/runs/functions.py index d87bd3e18..a002fb955 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -463,7 +463,7 @@ def run_exists(task_id: int, setup_id: int) -> set[int]: return set() -def _run_task_get_arffcontent( # noqa: PLR0915, PLR0912, C901 +def _run_task_get_arffcontent( # noqa: PLR0913 *, model: Any, task: OpenMLTask, @@ -476,20 +476,23 @@ def _run_task_get_arffcontent( # noqa: PLR0915, PLR0912, C901 OrderedDict[str, OrderedDict], OrderedDict[str, OrderedDict], ]: - """Runs the hyperparameter optimization on the given task - and returns the arfftrace content. + """Runs the model on the given task and returns the arff content. + + Uses streaming aggregation to reduce peak memory usage - results are + aggregated as each fold completes rather than storing all results first. + A progress bar is shown to the user during execution. Parameters ---------- model : Any - The model that is to be evalauted. + The model that is to be evaluated. task : OpenMLTask The OpenMLTask to evaluate. extension : Extension The OpenML extension object. add_local_measures : bool Whether to compute additional local evaluation measures. - n_jobs : int + n_jobs : int, optional Number of jobs to run in parallel. If None, use 1 core by default. If -1, use all available cores. @@ -497,54 +500,40 @@ def _run_task_get_arffcontent( # noqa: PLR0915, PLR0912, C901 ------- Tuple[List[List], Optional[OpenMLRunTrace], OrderedDict[str, OrderedDict], OrderedDict[str, OrderedDict]] - A tuple containing the arfftrace content, - the OpenML run trace, the global and local evaluation measures. + A tuple containing the arff content, the OpenML run trace, + the fold-based and sample-based evaluation measures. """ + try: + from tqdm import tqdm + has_tqdm = True + except ImportError: + has_tqdm = False + warnings.warn( + "tqdm is not installed. Progress bar will not be shown. " + "Install it with: pip install tqdm", + ImportWarning, + stacklevel=2, + ) + arff_datacontent = [] # type: list[list] traces = [] # type: list[OpenMLRunTrace] - # stores fold-based evaluation measures. In case of a sample based task, - # this information is multiple times overwritten, but due to the ordering - # of tne loops, eventually it contains the information based on the full - # dataset size - user_defined_measures_per_fold = OrderedDict() # type: 'OrderedDict[str, OrderedDict]' - # stores sample-based evaluation measures (sublevel of fold-based) - # will also be filled on a non sample-based task, but the information - # is the same as the fold-based measures, and disregarded in that case - user_defined_measures_per_sample = OrderedDict() # type: 'OrderedDict[str, OrderedDict]' - - # TODO use different iterator to only provide a single iterator (less - # methods, less maintenance, less confusion) + user_defined_measures_per_fold = OrderedDict() # type: OrderedDict[str, OrderedDict] + user_defined_measures_per_sample = OrderedDict() # type: OrderedDict[str, OrderedDict] + num_reps, num_folds, num_samples = task.get_split_dimensions() - jobs = [] - for n_fit, (rep_no, fold_no, sample_no) in enumerate( - itertools.product( - range(num_reps), - range(num_folds), - range(num_samples), - ), - start=1, - ): - jobs.append((n_fit, rep_no, fold_no, sample_no)) - - # The forked child process may not copy the configuration state of OpenML from the parent. - # Current configuration setup needs to be copied and passed to the child processes. + # Build jobs list + jobs = list(itertools.product( + range(num_reps), + range(num_folds), + range(num_samples), + )) + total_jobs = len(jobs) + _config = openml.config.get_config_as_dict() - # Execute runs in parallel - # assuming the same number of tasks as workers (n_jobs), the total compute time for this - # statement will be similar to the slowest run - # TODO(eddiebergman): Simplify this - job_rvals: list[ - tuple[ - np.ndarray, - pd.DataFrame | None, - np.ndarray, - pd.DataFrame | None, - OpenMLRunTrace | None, - OrderedDict[str, float], - ], - ] - job_rvals = Parallel(verbose=0, n_jobs=n_jobs)( # type: ignore + + # Stream results fold-by-fold instead of storing all in memory + job_generator = Parallel(verbose=0, n_jobs=n_jobs, return_as="generator")( delayed(_run_task_get_arffcontent_parallel_helper)( extension=extension, fold_no=fold_no, @@ -554,133 +543,81 @@ def _run_task_get_arffcontent( # noqa: PLR0915, PLR0912, C901 task=task, configuration=_config, ) - for _n_fit, rep_no, fold_no, sample_no in jobs - ) # job_rvals contain the output of all the runs with one-to-one correspondence with `jobs` + for rep_no, fold_no, sample_no in jobs + ) + + # Show progress bar if tqdm is available + if has_tqdm: + job_generator = tqdm( + job_generator, + total=total_jobs, + desc="Running folds", + unit="fold", + ) - for n_fit, rep_no, fold_no, sample_no in jobs: - pred_y, proba_y, test_indices, test_y, inner_trace, user_defined_measures_fold = job_rvals[ - n_fit - 1 - ] + # Aggregate results as each fold completes (streaming) + for (rep_no, fold_no, sample_no), result in zip(jobs, job_generator): + pred_y, proba_y, test_indices, test_y, inner_trace, user_defined_measures_fold = result if inner_trace is not None: traces.append(inner_trace) - # add client-side calculated metrics. These is used on the server as - # consistency check, only useful for supervised tasks - def _calculate_local_measure( # type: ignore - sklearn_fn, - openml_name, - _test_y=test_y, - _pred_y=pred_y, - _user_defined_measures_fold=user_defined_measures_fold, - ): - _user_defined_measures_fold[openml_name] = sklearn_fn(_test_y, _pred_y) - if isinstance(task, (OpenMLClassificationTask, OpenMLLearningCurveTask)): - if test_y is None: - raise ValueError("test_y cannot be None for classification tasks.") - if proba_y is None: - raise ValueError("proba_y cannot be None for classification tasks.") - - for i, tst_idx in enumerate(test_indices): - if task.class_labels is not None: - prediction = ( - task.class_labels[pred_y[i]] - if isinstance(pred_y[i], (int, np.integer)) - else pred_y[i] - ) - if isinstance(test_y, pd.Series): - truth = ( - task.class_labels[test_y.iloc[i]] - if isinstance(test_y.iloc[i], int) - else test_y.iloc[i] - ) - else: - truth = ( - task.class_labels[test_y[i]] - if isinstance(test_y[i], (int, np.integer)) - else test_y[i] - ) - pred_prob = proba_y.iloc[i] if isinstance(proba_y, pd.DataFrame) else proba_y[i] - - arff_line = format_prediction( - task=task, - repeat=rep_no, - fold=fold_no, - sample=sample_no, - index=tst_idx, - prediction=prediction, - truth=truth, - proba=dict(zip(task.class_labels, pred_prob, strict=False)), - ) - else: - raise ValueError("The task has no class labels") - - arff_datacontent.append(arff_line) - + arff_datacontent.extend( + _aggregate_classification_predictions( + task=task, + rep_no=rep_no, + fold_no=fold_no, + sample_no=sample_no, + test_indices=test_indices, + test_y=test_y, + pred_y=pred_y, + proba_y=proba_y, + ) + ) if add_local_measures: - _calculate_local_measure( - sklearn.metrics.accuracy_score, - "predictive_accuracy", + user_defined_measures_fold["predictive_accuracy"] = ( + sklearn.metrics.accuracy_score(test_y, pred_y) ) elif isinstance(task, OpenMLRegressionTask): - if test_y is None: - raise ValueError("test_y cannot be None for regression tasks.") - for i, _ in enumerate(test_indices): - truth = test_y.iloc[i] if isinstance(test_y, pd.Series) else test_y[i] - arff_line = format_prediction( + arff_datacontent.extend( + _aggregate_regression_predictions( task=task, - repeat=rep_no, - fold=fold_no, - index=test_indices[i], - prediction=pred_y[i], - truth=truth, + rep_no=rep_no, + fold_no=fold_no, + test_indices=test_indices, + test_y=test_y, + pred_y=pred_y, ) - - arff_datacontent.append(arff_line) - + ) if add_local_measures: - _calculate_local_measure( - sklearn.metrics.mean_absolute_error, - "mean_absolute_error", + user_defined_measures_fold["mean_absolute_error"] = ( + sklearn.metrics.mean_absolute_error(test_y, pred_y) ) elif isinstance(task, OpenMLClusteringTask): for i, _ in enumerate(test_indices): - arff_line = [test_indices[i], pred_y[i]] # row_id, cluster ID - arff_datacontent.append(arff_line) + arff_datacontent.append([test_indices[i], pred_y[i]]) else: raise TypeError(type(task)) - for measure in user_defined_measures_fold: - if measure not in user_defined_measures_per_fold: - user_defined_measures_per_fold[measure] = OrderedDict() - if rep_no not in user_defined_measures_per_fold[measure]: - user_defined_measures_per_fold[measure][rep_no] = OrderedDict() - - if measure not in user_defined_measures_per_sample: - user_defined_measures_per_sample[measure] = OrderedDict() - if rep_no not in user_defined_measures_per_sample[measure]: - user_defined_measures_per_sample[measure][rep_no] = OrderedDict() - if fold_no not in user_defined_measures_per_sample[measure][rep_no]: - user_defined_measures_per_sample[measure][rep_no][fold_no] = OrderedDict() - - user_defined_measures_per_fold[measure][rep_no][fold_no] = user_defined_measures_fold[ - measure - ] - user_defined_measures_per_sample[measure][rep_no][fold_no][sample_no] = ( - user_defined_measures_fold[measure] - ) + _update_evaluation_measures( + user_defined_measures_fold=user_defined_measures_fold, + user_defined_measures_per_fold=user_defined_measures_per_fold, + user_defined_measures_per_sample=user_defined_measures_per_sample, + rep_no=rep_no, + fold_no=fold_no, + sample_no=sample_no, + ) trace: OpenMLRunTrace | None = None if len(traces) > 0: - if len(traces) != len(jobs): + if len(traces) != total_jobs: raise ValueError( - f"Did not find enough traces (expected {len(jobs)}, found {len(traces)})", + f"Did not find enough traces (expected {total_jobs}, found {len(traces)})", ) - trace = OpenMLRunTrace.merge_traces(traces) return ( @@ -691,6 +628,177 @@ def _calculate_local_measure( # type: ignore ) +def _aggregate_classification_predictions( + task: OpenMLTask, + rep_no: int, + fold_no: int, + sample_no: int, + test_indices: np.ndarray, + test_y: pd.Series | np.ndarray | None, + pred_y: np.ndarray, + proba_y: pd.DataFrame | np.ndarray | None, +) -> list[list]: + """Aggregate predictions for classification and learning curve tasks. + + Parameters + ---------- + task : OpenMLTask + The task being evaluated. + rep_no : int + The repeat number. + fold_no : int + The fold number. + sample_no : int + The sample number. + test_indices : np.ndarray + Indices of the test set. + test_y : pd.Series or np.ndarray or None + True labels for the test set. + pred_y : np.ndarray + Predicted labels. + proba_y : pd.DataFrame or np.ndarray or None + Predicted probabilities for each class. + + Returns + ------- + list of list + Formatted prediction rows ready for arff content. + """ + if test_y is None: + raise ValueError("test_y cannot be None for classification tasks.") + if proba_y is None: + raise ValueError("proba_y cannot be None for classification tasks.") + + arff_lines = [] + for i, tst_idx in enumerate(test_indices): + if task.class_labels is not None: + prediction = ( + task.class_labels[pred_y[i]] + if isinstance(pred_y[i], (int, np.integer)) + else pred_y[i] + ) + if isinstance(test_y, pd.Series): + truth = ( + task.class_labels[test_y.iloc[i]] + if isinstance(test_y.iloc[i], int) + else test_y.iloc[i] + ) + else: + truth = ( + task.class_labels[test_y[i]] + if isinstance(test_y[i], (int, np.integer)) + else test_y[i] + ) + pred_prob = proba_y.iloc[i] if isinstance(proba_y, pd.DataFrame) else proba_y[i] + arff_line = format_prediction( + task=task, + repeat=rep_no, + fold=fold_no, + sample=sample_no, + index=tst_idx, + prediction=prediction, + truth=truth, + proba=dict(zip(task.class_labels, pred_prob, strict=False)), + ) + else: + raise ValueError("The task has no class labels") + arff_lines.append(arff_line) + return arff_lines + + +def _aggregate_regression_predictions( + task: OpenMLTask, + rep_no: int, + fold_no: int, + test_indices: np.ndarray, + test_y: pd.Series | np.ndarray | None, + pred_y: np.ndarray, +) -> list[list]: + """Aggregate predictions for regression tasks. + + Parameters + ---------- + task : OpenMLTask + The task being evaluated. + rep_no : int + The repeat number. + fold_no : int + The fold number. + test_indices : np.ndarray + Indices of the test set. + test_y : pd.Series or np.ndarray or None + True target values for the test set. + pred_y : np.ndarray + Predicted target values. + + Returns + ------- + list of list + Formatted prediction rows ready for arff content. + """ + if test_y is None: + raise ValueError("test_y cannot be None for regression tasks.") + + arff_lines = [] + for i, _ in enumerate(test_indices): + truth = test_y.iloc[i] if isinstance(test_y, pd.Series) else test_y[i] + arff_line = format_prediction( + task=task, + repeat=rep_no, + fold=fold_no, + index=test_indices[i], + prediction=pred_y[i], + truth=truth, + ) + arff_lines.append(arff_line) + return arff_lines + + +def _update_evaluation_measures( + user_defined_measures_fold: OrderedDict, + user_defined_measures_per_fold: OrderedDict, + user_defined_measures_per_sample: OrderedDict, + rep_no: int, + fold_no: int, + sample_no: int, +) -> None: + """Update fold-level and sample-level evaluation measure dicts in place. + + Parameters + ---------- + user_defined_measures_fold : OrderedDict + Measures computed for the current fold. + user_defined_measures_per_fold : OrderedDict + Accumulator for fold-level measures (modified in place). + user_defined_measures_per_sample : OrderedDict + Accumulator for sample-level measures (modified in place). + rep_no : int + The repeat number. + fold_no : int + The fold number. + sample_no : int + The sample number. + """ + for measure in user_defined_measures_fold: + if measure not in user_defined_measures_per_fold: + user_defined_measures_per_fold[measure] = OrderedDict() + if rep_no not in user_defined_measures_per_fold[measure]: + user_defined_measures_per_fold[measure][rep_no] = OrderedDict() + + if measure not in user_defined_measures_per_sample: + user_defined_measures_per_sample[measure] = OrderedDict() + if rep_no not in user_defined_measures_per_sample[measure]: + user_defined_measures_per_sample[measure][rep_no] = OrderedDict() + if fold_no not in user_defined_measures_per_sample[measure][rep_no]: + user_defined_measures_per_sample[measure][rep_no][fold_no] = OrderedDict() + + user_defined_measures_per_fold[measure][rep_no][fold_no] = ( + user_defined_measures_fold[measure] + ) + user_defined_measures_per_sample[measure][rep_no][fold_no][sample_no] = ( + user_defined_measures_fold[measure] + ) + def _run_task_get_arffcontent_parallel_helper( # noqa: PLR0913 extension: Extension, fold_no: int, From dcb59d3ba68b0e1fbf1beead1735ac1b7b2f0510 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 18 Mar 2026 14:57:03 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- openml/runs/functions.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/openml/runs/functions.py b/openml/runs/functions.py index a002fb955..57513a314 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -463,7 +463,7 @@ def run_exists(task_id: int, setup_id: int) -> set[int]: return set() -def _run_task_get_arffcontent( # noqa: PLR0913 +def _run_task_get_arffcontent( *, model: Any, task: OpenMLTask, @@ -505,6 +505,7 @@ def _run_task_get_arffcontent( # noqa: PLR0913 """ try: from tqdm import tqdm + has_tqdm = True except ImportError: has_tqdm = False @@ -523,11 +524,13 @@ def _run_task_get_arffcontent( # noqa: PLR0913 num_reps, num_folds, num_samples = task.get_split_dimensions() # Build jobs list - jobs = list(itertools.product( - range(num_reps), - range(num_folds), - range(num_samples), - )) + jobs = list( + itertools.product( + range(num_reps), + range(num_folds), + range(num_samples), + ) + ) total_jobs = len(jobs) _config = openml.config.get_config_as_dict() @@ -556,7 +559,7 @@ def _run_task_get_arffcontent( # noqa: PLR0913 ) # Aggregate results as each fold completes (streaming) - for (rep_no, fold_no, sample_no), result in zip(jobs, job_generator): + for (rep_no, fold_no, sample_no), result in zip(jobs, job_generator, strict=False): pred_y, proba_y, test_indices, test_y, inner_trace, user_defined_measures_fold = result if inner_trace is not None: @@ -576,8 +579,8 @@ def _run_task_get_arffcontent( # noqa: PLR0913 ) ) if add_local_measures: - user_defined_measures_fold["predictive_accuracy"] = ( - sklearn.metrics.accuracy_score(test_y, pred_y) + user_defined_measures_fold["predictive_accuracy"] = sklearn.metrics.accuracy_score( + test_y, pred_y ) elif isinstance(task, OpenMLRegressionTask): @@ -792,13 +795,14 @@ def _update_evaluation_measures( if fold_no not in user_defined_measures_per_sample[measure][rep_no]: user_defined_measures_per_sample[measure][rep_no][fold_no] = OrderedDict() - user_defined_measures_per_fold[measure][rep_no][fold_no] = ( - user_defined_measures_fold[measure] - ) + user_defined_measures_per_fold[measure][rep_no][fold_no] = user_defined_measures_fold[ + measure + ] user_defined_measures_per_sample[measure][rep_no][fold_no][sample_no] = ( user_defined_measures_fold[measure] ) + def _run_task_get_arffcontent_parallel_helper( # noqa: PLR0913 extension: Extension, fold_no: int,