diff --git a/.github/workflows/sonarcloud.yml b/.github/workflows/sonarcloud.yml new file mode 100644 index 0000000..7c1521a --- /dev/null +++ b/.github/workflows/sonarcloud.yml @@ -0,0 +1,72 @@ +name: SonarCloud Quality Review + +on: + push: + branches: + - main + - master + pull_request: + types: [opened, synchronize, reopened] + +jobs: + sonarcloud: + name: Code Analysis + runs-on: ubuntu-latest + + steps: + - name: Checkout Repository + uses: actions/checkout@v6 + with: + fetch-depth: 0 + + + - name: Set up Python + uses: actions/setup-python@v6 + with: + python-version: '3.10' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + pip install pytest pytest-cov + pip install -e . + + - name: Run Tests and Generate Coverage + run: | + pytest tests/ --cov=bigfeat --cov-report=xml:coverage.xml --junitxml=pytest-report.xml -o junit_family=xunit1 + + - name: 🕵️ Debug - Verify Files and Paths + run: | + echo "=================================================" + echo "1. Current Working Directory:" + pwd + echo "=================================================" + echo "2. Listing files in current directory:" + ls -la + echo "=================================================" + echo "3. Checking if pytest-report.xml exists:" + if [ -f "pytest-report.xml" ]; then echo "✅ pytest-report.xml FOUND"; else echo "❌ pytest-report.xml MISSING"; fi + echo "=================================================" + echo "4. Checking if coverage.xml exists:" + if [ -f "coverage.xml" ]; then echo "✅ coverage.xml FOUND"; else echo "❌ coverage.xml MISSING"; fi + echo "=================================================" + echo "5. Content of pytest-report.xml (First 20 lines) to verify file paths:" + head -n 20 pytest-report.xml + echo "=================================================" + + - name: SonarQube Scan + uses: SonarSource/sonarqube-scan-action@v7.1.0 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} + with: + args: > + -Dsonar.projectKey=taha2samy_bigfeat + -Dsonar.organization=taha2samy + -Dsonar.sources=bigfeat + -Dsonar.tests=tests + -Dsonar.python.version=3.10 + -Dsonar.exclusions=benchmarking/**,docs/**,metafeatures_training/** + -Dsonar.python.coverage.reportPaths=${{ github.workspace }}/coverage.xml + -Dsonar.python.xunit.reportPath=${{ github.workspace }}/pytest-report.xml + -Dsonar.python.xunit.reportPaths=${{ github.workspace }}/pytest-report.xml \ No newline at end of file diff --git a/bigfeat/__init__.py b/bigfeat/__init__.py index 9845544..afb830c 100644 --- a/bigfeat/__init__.py +++ b/bigfeat/__init__.py @@ -1,2 +1,2 @@ name = 'bigfeat' -import bigfeat.bigfeat_base \ No newline at end of file +import bigfeat.base \ No newline at end of file diff --git a/bigfeat/base.py b/bigfeat/base.py new file mode 100644 index 0000000..b558bd4 --- /dev/null +++ b/bigfeat/base.py @@ -0,0 +1,211 @@ +""" +base.py +------- +The main orchestrator class for BigFeat. +""" +import numpy as np +import ray +# debug comment +# 1111111111 +from sklearn.preprocessing import MinMaxScaler +import bigfeat.local_utils as local_utils +from .config import initialize_ray +from .distributed_tasks import remote_generate_batch, remote_get_importance + +# Importing from our refactored modules +from .generator import feat_with_depth, feat_with_depth_gen +from .importance import get_feature_importances +from .tree_utils import get_paths, get_split_feats +from .selection import check_correlations, fit_fanova +from .evaluation import select_estimator as eval_select_estimator + + +class BigFeat: + """Base BigFeat Class for both classification and regression tasks""" + def __init__(self, task_type='classification', options=None): + if task_type not in ['classification', 'regression']: + raise ValueError("task_type must be either 'classification' or 'regression'") + + self.task_type = task_type + self.options = options or {} + self.n_jobs = -1 + self.fanova_best = None + + self.operators = [np.multiply, np.add, np.subtract, np.abs, np.square] + self.binary_operators = [np.multiply, np.add, np.subtract] + self.unary_operators = [np.abs, np.square, local_utils.original_feat] + + def _calculate_initial_importance(self, x_ref, y_ref, random_state, split_feats): + future = remote_get_importance.remote( + x_ref, y_ref, "avg", self.task_type, random_state, self.n_jobs + ) + importance_sum, splits_sum = ray.get(future) + + ig_vector = importance_sum / (importance_sum.sum() + 1e-9) + split_vec = splits_sum / (splits_sum.sum() + 1e-9) + + if split_feats == "comb": + ig_vector = np.multiply(ig_vector, split_vec) + ig_vector /= (ig_vector.sum() + 1e-9) + elif split_feats == "splits": + ig_vector = split_vec + return ig_vector + + + def _get_initial_ig(self, x_ref, y_ref, random_state, split_feats): + """Helper to calculate initial feature importance.""" + future = remote_get_importance.remote( + x_ref, y_ref, "avg", self.task_type, random_state, self.n_jobs + ) + importance_sum, splits_sum = ray.get(future) + ig_vector = importance_sum / (importance_sum.sum() + 1e-9) + + if split_feats in ["comb", "splits"]: + split_vec = splits_sum / (splits_sum.sum() + 1e-9) + if split_feats == "comb": + ig_vector = np.multiply(ig_vector, split_vec) + else: + ig_vector = split_vec + ig_vector /= (ig_vector.sum() + 1e-9) + return ig_vector + + def _generate_iteration_batches(self, x_ref, iteration, num_to_gen, batch_size, random_state): + """Helper to manage Ray tasks batching.""" + gen_futures = [] + for i in range(0, num_to_gen, batch_size): + curr_size = min(batch_size, num_to_gen - i) + batch_depths = [self.rng.choice(self.depth_range, p=self.depth_weights) for _ in range(curr_size)] + gen_futures.append(remote_generate_batch.remote( + x_ref, batch_depths, random_state + i + iteration, + self.ig_vector, self.operators, self.operator_weights, + self.binary_operators, self.unary_operators)) + return gen_futures + + def _update_weights(self, selected_ops): + """Update operator weights based on their usage in selected features.""" + for i_op, op in enumerate(self.operators): + for feat in selected_ops: + if any(op == f_op[0] for f_op in feat): + self.imp_operators[i_op] += 1 + self.operator_weights = self.imp_operators / self.imp_operators.sum() + + def fit(self, x, y, gen_size=5, random_state=0, iterations=5, estimator='avg', + feat_imps=True, split_feats=None, check_corr=True, selection='stability', combine_res=True): + initialize_ray(self.options) + self.rng = np.random.default_rng(seed=random_state) + self.n_feats, self.n_rows = x.shape[1], x.shape[0] + self.selection, self.imp_operators = selection, np.ones(len(self.operators)) + self.operator_weights = self.imp_operators / self.imp_operators.sum() + self.ig_vector = np.ones(self.n_feats) / self.n_feats + self.depth_range = np.arange(3) + 1 + self.depth_weights = (1 / (2 ** self.depth_range)) / (1 / (2 ** self.depth_range)).sum() + + self.scaler = MinMaxScaler() + x_scaled = self.scaler.fit_transform(x) + x_ref, y_ref = ray.put(x_scaled), ray.put(y) + + if feat_imps: + self.ig_vector = self._get_initial_ig(x_ref, y_ref, random_state, split_feats) + + num_to_gen = self.n_feats * gen_size + batch_size = max(1, num_to_gen // (int(ray.cluster_resources().get("CPU", 1)) * 2)) + iters_comb = np.zeros((self.n_rows, self.n_feats * iterations)) + depths_comb = np.zeros(self.n_feats * iterations) + ids_comb, ops_comb = [None] * (self.n_feats * iterations), [None] * (self.n_feats * iterations) + + for iteration in range(iterations): + futures = self._generate_iteration_batches(x_ref, iteration, num_to_gen, batch_size, random_state) + flattened = [item for batch in ray.get(futures) for item in batch] + gen_feats_iter = np.column_stack([res[0] for res in flattened]) + + imps_iter, _ = get_feature_importances(gen_feats_iter, y, estimator, random_state, self.task_type, n_jobs=self.n_jobs) + feat_args = np.argsort(imps_iter)[-self.n_feats:] + + start, end = iteration * self.n_feats, (iteration + 1) * self.n_feats + iters_comb[:, start:end] = gen_feats_iter[:, feat_args] + depths_comb[start:end] = np.array([flattened[idx][3] for idx in feat_args]) + + for k, idx in enumerate(feat_args): + ids_comb[start + k], ops_comb[start + k] = flattened[idx][2], flattened[idx][1] + + self._update_weights([flattened[idx][1] for idx in feat_args]) + + # Final selection + if selection == 'stability' and iterations > 1 and combine_res: + imps_f, _ = get_feature_importances(iters_comb, y, estimator, random_state, self.task_type, n_jobs=self.n_jobs) + feat_args = np.argsort(imps_f)[-self.n_feats:] + gen_feats, self.tracking_ids, self.tracking_ops = iters_comb[:, feat_args], [ids_comb[i] for i in feat_args], [ops_comb[i] for i in feat_args] + self.feat_depths = depths_comb[feat_args] + else: + gen_feats, self.tracking_ids, self.tracking_ops = iters_comb[:, -self.n_feats:], ids_comb[-self.n_feats:], ops_comb[-self.n_feats:] + self.feat_depths = depths_comb[-self.n_feats:] + + if selection == 'stability' and check_corr: + gen_feats, to_drop = check_correlations(gen_feats) + self.tracking_ids = [it for i, it in enumerate(self.tracking_ids) if i not in to_drop] + self.tracking_ops = [it for i, it in enumerate(self.tracking_ops) if i not in to_drop] + self.feat_depths = np.delete(self.feat_depths, to_drop) + + gen_feats = np.hstack((gen_feats, x_scaled)) + if selection == 'fAnova': + gen_feats, self.fanova_best = fit_fanova(gen_feats, y, self.task_type, self.n_feats) + + return gen_feats + + + + + + + + + def transform(self, x): + """ + Produce features from the fitted BigFeat object. + """ + x_scaled = self.scaler.transform(x) + rows_count = x_scaled.shape[0] + gen_feats = np.zeros((rows_count, len(self.tracking_ids))) + + for i in range(gen_feats.shape[1]): + dpth = self.feat_depths[i] + # Copy lists to prevent modifying the fitted state during pop() + op_ls = self.tracking_ops[i].copy() + id_ls = self.tracking_ids[i].copy() + + gen_feats[:, i] = feat_with_depth_gen( + x_scaled, dpth, op_ls, id_ls, + self.binary_operators, self.unary_operators + ) + + gen_feats = np.hstack((gen_feats, x_scaled)) + + if self.selection == 'fAnova': + gen_feats = self.fanova_best.transform(gen_feats) + + return gen_feats + + def select_estimator(self, x, y, estimators_names=None): + """ + Select the best estimator based on cross-validation. + + Parameters: + ----------- + x : array-like + Feature matrix. + y : array-like + Target vector. + estimators_names : list or None + List of estimator names to try. + + Returns: + -------- + model : estimator + Fitted best estimator. + """ + return eval_select_estimator( + x, y, + task_type=self.task_type, + n_jobs=self.n_jobs, + estimators_names=estimators_names + ) \ No newline at end of file diff --git a/bigfeat/bigfeat_base.py b/bigfeat/bigfeat_base.py deleted file mode 100644 index 23cc727..0000000 --- a/bigfeat/bigfeat_base.py +++ /dev/null @@ -1,393 +0,0 @@ -import pandas as pd -import numpy as np -from sklearn.preprocessing import MinMaxScaler, StandardScaler -from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor -import bigfeat.local_utils as local_utils -from sklearn.metrics import roc_auc_score, mean_squared_error, r2_score, make_scorer -from sklearn.model_selection import train_test_split -from sklearn.tree import _tree -import lightgbm as lgb -from lightgbm.sklearn import LGBMClassifier, LGBMRegressor -from sklearn.feature_selection import SelectKBest, f_regression, f_classif -from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor -from sklearn.linear_model import LogisticRegression, LinearRegression -from sklearn.model_selection import cross_val_score -from sklearn.metrics import f1_score, make_scorer -from functools import partial - - -class BigFeat: - """Base BigFeat Class for both classification and regression tasks""" - - def __init__(self, task_type='classification'): - """ - Initialize the BigFeat object - - Parameters: - ----------- - task_type : str, default='classification' - The type of machine learning task. Either 'classification' or 'regression'. - """ - self.n_jobs = -1 - self.operators = [np.multiply, np.add, np.subtract, np.abs, np.square] - self.binary_operators = [np.multiply, np.add, np.subtract] - self.unary_operators = [np.abs, np.square, local_utils.original_feat] - self.task_type = task_type - - # Validate task_type input - if task_type not in ['classification', 'regression']: - raise ValueError("task_type must be either 'classification' or 'regression'") - - def fit(self, X, y, gen_size=5, random_state=0, iterations=5, estimator='avg', - feat_imps=True, split_feats=None, check_corr=True, selection='stability', combine_res=True): - """ Generated Features using test set """ - self.selection = selection - self.imp_operators = np.ones(len(self.operators)) - self.operator_weights = self.imp_operators / self.imp_operators.sum() - self.gen_steps = [] - self.n_feats = X.shape[1] - self.n_rows = X.shape[0] - self.ig_vector = np.ones(self.n_feats) / self.n_feats - self.comb_mat = np.ones((self.n_feats, self.n_feats)) - self.split_vec = np.ones(self.n_feats) - # Set RNG seed if provided for numpy - self.rng = np.random.RandomState(seed=random_state) - gen_feats = np.zeros((self.n_rows, self.n_feats * gen_size)) - iters_comb = np.zeros((self.n_rows, self.n_feats * iterations)) - depths_comb = np.zeros(self.n_feats * iterations) - ids_comb = np.zeros(self.n_feats * iterations, dtype=object) - ops_comb = np.zeros(self.n_feats * iterations, dtype=object) - self.feat_depths = np.zeros(gen_feats.shape[1]) - self.depth_range = np.arange(3) + 1 - self.depth_weights = 1 / (2 ** self.depth_range) - self.depth_weights /= self.depth_weights.sum() - self.scaler = MinMaxScaler() - self.scaler.fit(X) - X = self.scaler.transform(X) - if feat_imps: - self.ig_vector, estimators = self.get_feature_importances(X, y, estimator, random_state) - self.ig_vector /= self.ig_vector.sum() - for tree in estimators: - paths = self.get_paths(tree, np.arange(X.shape[1])) - self.get_split_feats(paths, self.split_vec) - self.split_vec /= self.split_vec.sum() - # self.split_vec = StandardScaler().fit_transform(self.split_vec.reshape(1, -1), {'var_':5}) - if split_feats == "comb": - self.ig_vector = np.multiply(self.ig_vector, self.split_vec) - self.ig_vector /= self.ig_vector.sum() - elif split_feats == "splits": - self.ig_vector = self.split_vec - for iteration in range(iterations): - self.tracking_ops = [] - self.tracking_ids = [] - gen_feats = np.zeros((self.n_rows, self.n_feats * gen_size)) - self.feat_depths = np.zeros(gen_feats.shape[1]) - for i in range(gen_feats.shape[1]): - dpth = self.rng.choice(self.depth_range, p=self.depth_weights) - ops = [] - ids = [] - gen_feats[:, i] = self.feat_with_depth(X, dpth, ops, ids) # ops and ids are updated - self.feat_depths[i] = dpth - self.tracking_ops.append(ops) - self.tracking_ids.append(ids) - self.tracking_ids = np.array(self.tracking_ids + [[]], dtype='object')[:-1] - self.tracking_ops = np.array(self.tracking_ops + [[]], dtype='object')[:-1] - imps, estimators = self.get_feature_importances(gen_feats, y, estimator, random_state) - total_feats = np.argsort(imps) - feat_args = total_feats[-self.n_feats:] - gen_feats = gen_feats[:, feat_args] - self.tracking_ids = self.tracking_ids[feat_args] - self.tracking_ops = self.tracking_ops[feat_args] - self.feat_depths = self.feat_depths[feat_args] - depths_comb[iteration * self.n_feats:(iteration + 1) * self.n_feats] = self.feat_depths - ids_comb[iteration * self.n_feats:(iteration + 1) * self.n_feats] = self.tracking_ids - ops_comb[iteration * self.n_feats:(iteration + 1) * self.n_feats] = self.tracking_ops - iters_comb[:, iteration * self.n_feats:(iteration + 1) * self.n_feats] = gen_feats - for i, op in enumerate(self.operators): - for feat in self.tracking_ops: - for feat_op in feat: - if op == feat_op[0]: - self.imp_operators[i] += 1 - self.operator_weights = self.imp_operators / self.imp_operators.sum() - if selection == 'stability' and iterations > 1 and combine_res: - imps, estimators = self.get_feature_importances(iters_comb, y, estimator, random_state) - total_feats = np.argsort(imps) - feat_args = total_feats[-self.n_feats:] - gen_feats = iters_comb[:, feat_args] - self.tracking_ids = ids_comb[feat_args] - self.tracking_ops = ops_comb[feat_args] - self.feat_depths = depths_comb[feat_args] - - if selection == 'stability' and check_corr: - gen_feats, to_drop_cor = self.check_correlations(gen_feats) - self.tracking_ids = np.delete(self.tracking_ids, to_drop_cor) - self.tracking_ops = np.delete(self.tracking_ops, to_drop_cor) - self.feat_depths = np.delete(self.feat_depths, to_drop_cor) - gen_feats = np.hstack((gen_feats, X)) - - if selection == 'fAnova': - # Use the appropriate feature selection method based on task type - if self.task_type == 'classification': - self.fAnova_best = SelectKBest(f_classif, k=self.n_feats) - else: # regression - self.fAnova_best = SelectKBest(f_regression, k=self.n_feats) - gen_feats = self.fAnova_best.fit_transform(gen_feats, y) - - return gen_feats - - def transform(self, X): - """ Produce features from the fitted BigFeat object """ - X = self.scaler.transform(X) - self.n_rows = X.shape[0] - gen_feats = np.zeros((self.n_rows, len(self.tracking_ids))) - for i in range(gen_feats.shape[1]): - dpth = self.feat_depths[i] - op_ls = self.tracking_ops[i].copy() - id_ls = self.tracking_ids[i].copy() - gen_feats[:, i] = self.feat_with_depth_gen(X, dpth, op_ls, id_ls) - gen_feats = np.hstack((gen_feats, X)) - if self.selection == 'fAnova': - gen_feats = self.fAnova_best.transform(gen_feats) - return gen_feats - - def select_estimator(self, X, y, estimators_names=None): - """ - Select the best estimator based on cross-validation - - Parameters: - ----------- - X : array-like - Feature matrix - y : array-like - Target vector - estimators_names : list or None - List of estimator names to try. If None, uses appropriate defaults. - - Returns: - -------- - model : estimator - Fitted best estimator - """ - # Use appropriate default estimators based on task type - if estimators_names is None: - if self.task_type == 'classification': - estimators_names = ['dt', 'lr'] - else: # regression - estimators_names = ['dt_reg', 'lr_reg'] - - # Define available estimators based on task type - estimators_dic = { - # Classification estimators - 'dt': DecisionTreeClassifier(), - 'lr': LogisticRegression(), - 'rf': RandomForestClassifier(n_jobs=self.n_jobs), - 'lgb': LGBMClassifier(), - # Regression estimators - 'dt_reg': DecisionTreeRegressor(), - 'lr_reg': LinearRegression(), - 'rf_reg': RandomForestRegressor(n_jobs=self.n_jobs), - 'lgb_reg': LGBMRegressor() - } - - models_score = {} - - for estimator in estimators_names: - model = estimators_dic[estimator] - - # Use appropriate scoring metric based on task type - if self.task_type == 'classification': - scorer = make_scorer(f1_score) - else: # regression - scorer = make_scorer(r2_score) - - models_score[estimator] = cross_val_score(model, X, y, cv=3, scoring=scorer).mean() - - best_estimator = max(models_score, key=models_score.get) - best_model = estimators_dic[best_estimator] - best_model.fit(X, y) - return best_model - - def get_feature_importances(self, X, y, estimator, random_state, sample_count=1, sample_size=3, n_jobs=1): - """Return feature importances by specified method""" - - importance_sum = np.zeros(X.shape[1]) - total_estimators = [] - for sampled in range(sample_count): - sampled_ind = np.random.choice(np.arange(self.n_rows), size=self.n_rows // sample_size, replace=False) - sampled_X = X[sampled_ind] - sampled_y = np.take(y, sampled_ind) - - # Different behavior based on task type - if estimator == "rf": - if self.task_type == 'classification': - estm = RandomForestClassifier(random_state=random_state, n_jobs=n_jobs) - else: # regression - estm = RandomForestRegressor(random_state=random_state, n_jobs=n_jobs) - - estm.fit(sampled_X, sampled_y) - total_importances = estm.feature_importances_ - estimators = estm.estimators_ - total_estimators += estimators - - elif estimator == "avg": - # For classification - if self.task_type == 'classification': - clf = RandomForestClassifier(random_state=random_state, n_jobs=n_jobs) - clf.fit(sampled_X, sampled_y) - rf_importances = clf.feature_importances_ - estimators = clf.estimators_ - total_estimators += estimators - - # LightGBM for classification - train_data = lgb.Dataset(sampled_X, label=sampled_y) - param = {'num_leaves': 31, 'objective': 'binary', 'verbose': -1} - param['metric'] = 'auc' - - # For regression - else: - clf = RandomForestRegressor(random_state=random_state, n_jobs=n_jobs) - clf.fit(sampled_X, sampled_y) - rf_importances = clf.feature_importances_ - estimators = clf.estimators_ - total_estimators += estimators - - # LightGBM for regression - train_data = lgb.Dataset(sampled_X, label=sampled_y) - param = {'num_leaves': 31, 'objective': 'regression', 'verbose': -1} - param['metric'] = 'rmse' - - # Common LightGBM code for both tasks - num_round = 2 - bst = lgb.train(param, train_data, num_round) - lgb_imps = bst.feature_importance(importance_type='gain') - lgb_imps /= lgb_imps.sum() - total_importances = (rf_importances + lgb_imps) / 2 - - importance_sum += total_importances - return importance_sum, total_estimators - - def get_weighted_feature_importances(self, X, y, estimator, random_state): - """Return feature importances weighted by model performance""" - X_train, X_test, y_train, y_test = train_test_split( - X, y, test_size=0.2, random_state=random_state) - - # Choose appropriate model based on task type - if self.task_type == 'classification': - estm = RandomForestClassifier(random_state=random_state, n_jobs=self.n_jobs) - else: # regression - estm = RandomForestRegressor(random_state=random_state, n_jobs=self.n_jobs) - - estm.fit(X_train, y_train) - ests = estm.estimators_ - model = estm - imps = np.zeros((len(model.estimators_), X.shape[1])) - scores = np.zeros(len(model.estimators_)) - - for i, each in enumerate(model.estimators_): - # Different scoring metrics based on task type - if self.task_type == 'classification': - y_probas_train = each.predict_proba(X_test)[:, 1] - score = roc_auc_score(y_test, y_probas_train) - else: # regression - y_pred_train = each.predict(X_test) - score = r2_score(y_test, y_pred_train) - - imps[i] = each.feature_importances_ - scores[i] = score - - weights = scores / scores.sum() - return np.average(imps, axis=0, weights=weights) - - def feat_with_depth(self, X, depth, op_ls, feat_ls): - """ Recursively generate a new features """ - if depth == 0: - feat_ind = self.rng.choice(np.arange(len(self.ig_vector)), p=self.ig_vector) - feat_ls.append(feat_ind) - return X[:, feat_ind] - depth -= 1 - op = self.rng.choice(self.operators, p=self.operator_weights) - if op in self.binary_operators: - feat_1 = self.feat_with_depth(X, depth, op_ls, feat_ls) - feat_2 = self.feat_with_depth(X, depth, op_ls, feat_ls) - op_ls.append((op, depth)) - return op(feat_1, feat_2) - elif op in self.unary_operators: - feat_1 = self.feat_with_depth(X, depth, op_ls, feat_ls) - op_ls.append((op, depth)) - return op(feat_1) - - def feat_with_depth_gen(self, X, depth, op_ls, feat_ls): - """ Reproduce generated features with new data """ - if depth == 0: - feat_ind = feat_ls.pop() - return X[:, feat_ind] - depth -= 1 - op = op_ls.pop()[0] - if op in self.binary_operators: - feat_1 = self.feat_with_depth_gen(X, depth, op_ls, feat_ls) - feat_2 = self.feat_with_depth_gen(X, depth, op_ls, feat_ls) - return op(feat_2, feat_1) - elif op in self.unary_operators: - feat_1 = self.feat_with_depth_gen(X, depth, op_ls, feat_ls) - return op(feat_1) - - def check_correlations(self, feats): - """ Check correlations among the selected features """ - cor_thresh = 0.8 - corr_matrix = pd.DataFrame(feats).corr().abs() - mask = np.tril(np.ones_like(corr_matrix, dtype=bool)) - tri_df = corr_matrix.mask(mask) - to_drop = [c for c in tri_df.columns if any(tri_df[c] > cor_thresh)] - # remove the feature with lower importance if corr > cor_thresh - # to_drop = [] - # for c in tri_df.columns: - # if any(corr_matrix[c] > cor_thresh): - # for c_, cor_val in enumerate(corr_matrix[c].values): - # if cor_val > cor_thresh and c != c_: - # if self.ig_vector_gen[c_] < self.ig_vector_gen[c] and c_ not in to_drop: - # to_drop.append(c_) - - feats = pd.DataFrame(feats).drop(to_drop, axis=1) - return feats.values, to_drop - - def get_paths(self, clf, feature_names): - """ Returns every path in the decision tree""" - tree_ = clf.tree_ - feature_name = [ - feature_names[i] if i != _tree.TREE_UNDEFINED else "undefined!" - for i in tree_.feature - ] - path = [] - path_list = [] - - def recurse(node, depth, path_list): - if tree_.feature[node] == _tree.TREE_UNDEFINED: - path_list.append(path.copy()) - else: - name = feature_name[node] - path.append(name) - recurse(tree_.children_left[node], depth + 1, path_list) - recurse(tree_.children_right[node], depth + 1, path_list) - path.pop() - - recurse(0, 1, path_list) - - new_list = [] - for i in range(len(path_list)): - if path_list[i] != path_list[i - 1]: - new_list.append(path_list[i]) - return new_list - - def get_combos(self, paths, comb_mat): - """ Fills Combination matrix with values """ - for i in range(len(comb_mat)): - for pt in paths: - if i in pt: - comb_mat[i][pt] += 1 - - def get_split_feats(self, paths, split_vec): - """ Fills split vector with values """ - for i in range(len(split_vec)): - for pt in paths: - if i in pt: - split_vec[i] += 1 diff --git a/bigfeat/config.py b/bigfeat/config.py new file mode 100644 index 0000000..a70f424 --- /dev/null +++ b/bigfeat/config.py @@ -0,0 +1,37 @@ +""" +config.py +--------- +Handles Ray initialization for Local, Remote Cluster, or Kubernetes. +""" + +import os +import ray + +def initialize_ray(options=None): + """ + Initializes Ray based on the provided options or environment variables. + Options can include: + - address: 'auto' for local cluster, or a specific IP/service name for K8s. + - num_cpus: number of CPUs to use. + - ray_init_kwargs: dict of additional args for ray.init(). + """ + if ray.is_initialized(): + return + + options = options or {} + # 1. Check for K8s or Remote Cluster via address + # If in K8s, address is usually 'ray://ray-head-service:10001' + # If Local, address is None + address = options.get("address", os.environ.get("RAY_ADDRESS")) + + init_args = { + "address": address, + "ignore_reinit_error": True, + "include_dashboard": False + } + + # Update with any user-provided kwargs (like num_cpus, etc.) + if "ray_init_kwargs" in options: + init_args.update(options["ray_init_kwargs"]) + + ray.init(**init_args) \ No newline at end of file diff --git a/bigfeat/distributed_tasks.py b/bigfeat/distributed_tasks.py new file mode 100644 index 0000000..683e162 --- /dev/null +++ b/bigfeat/distributed_tasks.py @@ -0,0 +1,57 @@ +""" +distributed_tasks.py +-------------------- +Wraps core functions into Ray remote tasks for distributed execution. +""" + +import ray +import numpy as np +from .generator import feat_with_depth +from .importance import get_feature_importances + + + +@ray.remote +def remote_generate_batch(x_ref, depths, rng_seed, ig_vector, operators, + op_weights, binary_ops, unary_ops): + """ + Generates a batch of features on a remote worker. + Using X_ref (Object Store reference) to save memory. + """ + import numpy as np # Vital for remote workers + rng = np.random.default_rng(rng_seed) + batch_results = [] + + for dpth in depths: + ops = [] + ids = [] + # Calling our standard generator function + feat_column = feat_with_depth( + x_ref, dpth, ops, ids, rng, ig_vector, + operators, op_weights, binary_ops, unary_ops + ) + batch_results.append((feat_column, ops, ids, dpth)) + + return batch_results + +@ray.remote +def remote_get_importance(x_sample, y_sample, estimator, task_type, random_seed, n_jobs): + """ + Calculates importance on a remote worker. + """ + # Calling our standard importance function + # Note: We return importance_sum and split_vec (The vector approach!) + from .importance import get_feature_importances + from .tree_utils import get_paths, get_split_feats + + imps, estimators = get_feature_importances( + x_sample, y_sample, estimator, random_seed, task_type, n_jobs=n_jobs + ) + + # Calculate splits locally on the worker to avoid sending heavy tree objects + split_vec = np.zeros(x_sample.shape[1]) + for tree in estimators: + paths = get_paths(tree, np.arange(x_sample.shape[1])) + get_split_feats(paths, split_vec) + + return imps, split_vec \ No newline at end of file diff --git a/bigfeat/evaluation.py b/bigfeat/evaluation.py new file mode 100644 index 0000000..f4f6e25 --- /dev/null +++ b/bigfeat/evaluation.py @@ -0,0 +1,81 @@ +""" +evaluation.py +------------- +Responsible for model evaluation and selecting the best estimator. +""" + +from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor +from sklearn.linear_model import LogisticRegression, LinearRegression +from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor +from lightgbm.sklearn import LGBMClassifier, LGBMRegressor +from sklearn.model_selection import cross_val_score +from sklearn.metrics import f1_score, r2_score, make_scorer + + +def select_estimator(x, y, task_type='classification', n_jobs=-1, estimators_names=None, random_state=42): + """ + Select the best estimator based on cross-validation + + Parameters: + ----------- + x : array-like + Feature matrix + y : array-like + Target vector + task_type : str, default='classification' + The type of machine learning task. Either 'classification' or 'regression'. + n_jobs : int, default=-1 + The number of jobs to run in parallel. + estimators_names : list or None + List of estimator names to try. If None, uses appropriate defaults. + random_state : int, default=42 + Seed used by the random number generator. + + Returns: + -------- + model : estimator + Fitted best estimator + """ + if estimators_names is None: + if task_type == 'classification': + estimators_names = ['dt', 'lr'] + else: + estimators_names = ['dt_reg', 'lr_reg'] + + estimators_dic = { + 'dt': DecisionTreeClassifier( + random_state=random_state, min_samples_leaf=1, max_features=None, ccp_alpha=0.0 + ), + 'lr': LogisticRegression(random_state=random_state), + 'rf': RandomForestClassifier( + n_jobs=n_jobs, random_state=random_state, min_samples_leaf=1, max_features='sqrt' + ), + 'lgb': LGBMClassifier(random_state=random_state, verbosity=-1), + + 'dt_reg': DecisionTreeRegressor( + random_state=random_state, min_samples_leaf=1, max_features=None, ccp_alpha=0.0 + ), + 'lr_reg': LinearRegression(), + 'rf_reg': RandomForestRegressor( + n_jobs=n_jobs, random_state=random_state, min_samples_leaf=1, max_features='sqrt' + ), + 'lgb_reg': LGBMRegressor(random_state=random_state, verbosity=-1) + } + + models_score = {} + + for estimator in estimators_names: + model = estimators_dic[estimator] + + if task_type == 'classification': + scorer = make_scorer(f1_score) + else: + scorer = make_scorer(r2_score) + + models_score[estimator] = cross_val_score(model, x, y, cv=3, scoring=scorer).mean() + + best_estimator = max(models_score, key=models_score.get) + best_model = estimators_dic[best_estimator] + best_model.fit(x, y) + + return best_model \ No newline at end of file diff --git a/bigfeat/generator.py b/bigfeat/generator.py new file mode 100644 index 0000000..551f913 --- /dev/null +++ b/bigfeat/generator.py @@ -0,0 +1,52 @@ +""" +generator.py +------------ +Responsible for recursively generating new features (Feature Crossing/Engineering). +""" + +import numpy as np + + +def feat_with_depth(X, depth, op_ls, feat_ls, rng, ig_vector, operators, + operator_weights, binary_operators, unary_operators): + """ Recursively generate a new features """ + if depth == 0: + feat_ind = rng.choice(np.arange(len(ig_vector)), p=ig_vector) + feat_ls.append(feat_ind) + return X[:, feat_ind] + + depth -= 1 + op = rng.choice(operators, p=operator_weights) + + if op in binary_operators: + feat_1 = feat_with_depth(X, depth, op_ls, feat_ls, rng, ig_vector, + operators, operator_weights, binary_operators, unary_operators) + feat_2 = feat_with_depth(X, depth, op_ls, feat_ls, rng, ig_vector, + operators, operator_weights, binary_operators, unary_operators) + op_ls.append((op, depth)) + return op(feat_1, feat_2) + + elif op in unary_operators: + feat_1 = feat_with_depth(X, depth, op_ls, feat_ls, rng, ig_vector, + operators, operator_weights, binary_operators, unary_operators) + op_ls.append((op, depth)) + return op(feat_1) + + +def feat_with_depth_gen(X, depth, op_ls, feat_ls, binary_operators, unary_operators): + """ Reproduce generated features with new data """ + if depth == 0: + feat_ind = feat_ls.pop() + return X[:, feat_ind] + + depth -= 1 + op = op_ls.pop()[0] + + if op in binary_operators: + feat_1 = feat_with_depth_gen(X, depth, op_ls, feat_ls, binary_operators, unary_operators) + feat_2 = feat_with_depth_gen(X, depth, op_ls, feat_ls, binary_operators, unary_operators) + return op(feat_2, feat_1) + + elif op in unary_operators: + feat_1 = feat_with_depth_gen(X, depth, op_ls, feat_ls, binary_operators, unary_operators) + return op(feat_1) \ No newline at end of file diff --git a/bigfeat/importance.py b/bigfeat/importance.py new file mode 100644 index 0000000..ca62b2f --- /dev/null +++ b/bigfeat/importance.py @@ -0,0 +1,106 @@ +""" +importance.py +------------- +Responsible for training initial models to extract feature importances. +""" + +import numpy as np +from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor +from sklearn.model_selection import train_test_split +from sklearn.metrics import roc_auc_score, r2_score +import lightgbm as lgb + + +def get_feature_importances(x, y, estimator, random_state, task_type, sample_count=1, sample_size=3, n_jobs=1): + """Return feature importances by specified method""" + rng = np.random.default_rng(random_state) + n_rows = x.shape[0] + importance_sum = np.zeros(x.shape[1]) + total_estimators = [] + + for _ in range(sample_count): + sampled_ind = rng.choice(n_rows, size=n_rows // sample_size, replace=False) + sampled_x = x[sampled_ind] + sampled_y = np.take(y, sampled_ind) + + # Different behavior based on task type + if estimator == "rf": + if task_type == 'classification': + estm = RandomForestClassifier(random_state=random_state, n_jobs=n_jobs, min_samples_leaf=1, max_features='sqrt') + else: # regression + estm = RandomForestRegressor(random_state=random_state, n_jobs=n_jobs, min_samples_leaf=1, max_features='sqrt') + + estm.fit(sampled_x, sampled_y) + total_importances = estm.feature_importances_ + estimators = estm.estimators_ + total_estimators += estimators + + elif estimator == "avg": + # For classification + if task_type == 'classification': + clf = RandomForestClassifier(random_state=random_state, n_jobs=n_jobs, min_samples_leaf=1, max_features='sqrt') + clf.fit(sampled_x, sampled_y) + rf_importances = clf.feature_importances_ + estimators = clf.estimators_ + total_estimators += estimators + + # LightGBM for classification + train_data = lgb.Dataset(sampled_x, label=sampled_y) + param = {'num_leaves': 31, 'objective': 'binary', 'verbose': -1} + param['metric'] = 'auc' + + # For regression + else: + clf = RandomForestRegressor(random_state=random_state, n_jobs=n_jobs, min_samples_leaf=1, max_features='sqrt') + clf.fit(sampled_x, sampled_y) + rf_importances = clf.feature_importances_ + estimators = clf.estimators_ + total_estimators += estimators + + # LightGBM for regression + train_data = lgb.Dataset(sampled_x, label=sampled_y) + param = {'num_leaves': 31, 'objective': 'regression', 'verbose': -1} + param['metric'] = 'rmse' + + # Common LightGBM code for both tasks + num_round = 2 + bst = lgb.train(param, train_data, num_round) + lgb_imps = bst.feature_importance(importance_type='gain') + lgb_imps /= lgb_imps.sum() + total_importances = (rf_importances + lgb_imps) / 2 + + importance_sum += total_importances + + return importance_sum, total_estimators + + +def get_weighted_feature_importances(x, y, random_state, task_type, n_jobs=-1): + """Return feature importances weighted by model performance""" + x_train, x_test, y_train, y_test = train_test_split( + x, y, test_size=0.2, random_state=random_state) + + # Choose appropriate model based on task type + if task_type == 'classification': + estm = RandomForestClassifier(random_state=random_state, n_jobs=n_jobs, min_samples_leaf=1, max_features='sqrt') + else: # regression + estm = RandomForestRegressor(random_state=random_state, n_jobs=n_jobs, min_samples_leaf=1, max_features='sqrt') + + estm.fit(x_train, y_train) + model = estm + imps = np.zeros((len(model.estimators_), x.shape[1])) + scores = np.zeros(len(model.estimators_)) + + for i, each in enumerate(model.estimators_): + # Different scoring metrics based on task type + if task_type == 'classification': + y_probas_train = each.predict_proba(x_test)[:, 1] + score = roc_auc_score(y_test, y_probas_train) + else: # regression + y_pred_train = each.predict(x_test) + score = r2_score(y_test, y_pred_train) + + imps[i] = each.feature_importances_ + scores[i] = score + + weights = scores / scores.sum() + return np.average(imps, axis=0, weights=weights) \ No newline at end of file diff --git a/bigfeat/local_utils.py b/bigfeat/local_utils.py index e6aa5cf..f2b4318 100644 --- a/bigfeat/local_utils.py +++ b/bigfeat/local_utils.py @@ -1,41 +1,82 @@ +""" +local_utils.py +-------------- +Provides utility functions and unary mathematical operators for feature transformation and grouping. +""" + import numpy as np import pandas as pd import scipy.stats def unary_cube(arr): - return np.power(arr,3) + """ + Computes the element-wise cube of the input array. + """ + return np.power(arr, 3) def unary_multinv(arr): - return 1/arr + """ + Computes the element-wise multiplicative inverse of the input array. + """ + return 1 / arr def unary_sqrtabs(arr): + """ + Computes the element-wise square root of the absolute values, preserving the original sign. + """ return np.sqrt(np.abs(arr)) * np.sign(arr) def unary_logabs(arr): + """ + Computes the element-wise natural logarithm of the absolute values, preserving the original sign. + """ return np.log(np.abs(arr)) * np.sign(arr) def convert_with_max(arr): - arr[arr>np.finfo(np.dtype('float32')).max] = np.finfo(np.dtype('float32')).max - arr[arr np.finfo(np.dtype('float32')).max] = np.finfo(np.dtype('float32')).max + arr[arr < np.finfo(np.dtype('float32')).min] = np.finfo(np.dtype('float32')).min return np.float32(arr) def mode(ar1): - return scipy.stats.mode(ar1)[0][0] + """ + Calculates the statistical mode of the input array. + """ + return scipy.stats.mode(ar1, keepdims=True).mode[0] + def ar_range(ar1): - return ar1.max()-ar1.min() + """ + Calculates the range (max - min) of the input array. + """ + return ar1.max() - ar1.min() + def percentile_25(ar1): + """ + Calculates the 25th percentile of the input array. + """ return np.percentile(ar1, 25) + def percentile_75(ar1): + """ + Calculates the 75th percentile of the input array. + """ return np.percentile(ar1, 75) - -def group_by(ar1,ar2): - group_by_ops =[np.mean,np.std,np.max,np.min,np.sum,mode,len,ar_range,np.median,percentile_25,percentile_75] - group_by_op = np.random.choice(group_by_ops) - temp_df=pd.DataFrame({'ar1':ar1, 'ar2':ar2}) +def group_by(ar1, ar2): + """ + Groups the second array by the values of the first array and applies a randomly selected aggregation. + """ + rng = np.random.default_rng(seed=42) + group_by_ops = [np.mean, np.std, np.max, np.min, np.sum, mode, len, ar_range, np.median, percentile_25, percentile_75] + group_by_op = rng.choice(group_by_ops) + temp_df = pd.DataFrame({'ar1': ar1, 'ar2': ar2}) group_res = temp_df.groupby(['ar1'])['ar2'].apply(group_by_op).to_dict() return temp_df['ar1'].map(group_res).values - def original_feat(ar1): + """ + Identity function that returns the input array unchanged. + """ return ar1 \ No newline at end of file diff --git a/bigfeat/selection.py b/bigfeat/selection.py new file mode 100644 index 0000000..b2b70f9 --- /dev/null +++ b/bigfeat/selection.py @@ -0,0 +1,61 @@ +""" +selection.py +------------ +Responsible for feature filtering and selection (Correlation & fAnova). +""" + +import pandas as pd +import numpy as np +from sklearn.feature_selection import SelectKBest, f_regression, f_classif + + +def check_correlations(feats): + """ Check correlations among the selected features """ + cor_thresh = 0.8 + corr_matrix = pd.DataFrame(feats).corr().abs() + mask = np.tril(np.ones_like(corr_matrix, dtype=bool)) + tri_df = corr_matrix.mask(mask) + to_drop = [c for c in tri_df.columns if any(tri_df[c] > cor_thresh)] + # remove the feature with lower importance if corr > cor_thresh + # to_drop = [] + # for c in tri_df.columns: + # if any(corr_matrix[c] > cor_thresh): + # for c_, cor_val in enumerate(corr_matrix[c].values): + # if cor_val > cor_thresh and c != c_: + # if self.ig_vector_gen[c_] < self.ig_vector_gen[c] and c_ not in to_drop: + # to_drop.append(c_) + + feats = pd.DataFrame(feats).drop(to_drop, axis=1) + return feats.values, to_drop + + +def fit_fanova(X, y, task_type, n_feats): + """ + Use the appropriate feature selection method (fAnova) based on task type. + + Parameters: + ----------- + X : array-like + Feature matrix. + y : array-like + Target vector. + task_type : str + Either 'classification' or 'regression'. + n_feats : int + Number of top features to select. + + Returns: + -------- + transformed_feats : array-like + The reduced feature matrix. + selector : SelectKBest object + The fitted selector to be used later in transform. + """ + if task_type == 'classification': + selector = SelectKBest(f_classif, k=n_feats) + else: # regression + selector = SelectKBest(f_regression, k=n_feats) + + transformed_feats = selector.fit_transform(X, y) + + return transformed_feats, selector \ No newline at end of file diff --git a/bigfeat/tree_utils.py b/bigfeat/tree_utils.py new file mode 100644 index 0000000..0c6befd --- /dev/null +++ b/bigfeat/tree_utils.py @@ -0,0 +1,52 @@ +""" +tree_utils.py +------------- +Responsible for extracting paths and feature combinations from decision trees. +""" + +from sklearn.tree import _tree + + +def get_paths(clf, feature_names): + """ Returns every path in the decision tree""" + tree_ = clf.tree_ + feature_name = [ + feature_names[i] if i != _tree.TREE_UNDEFINED else "undefined!" + for i in tree_.feature + ] + path = [] + path_list = [] + + def recurse(node, depth, path_list): + if tree_.feature[node] == _tree.TREE_UNDEFINED: + path_list.append(path.copy()) + else: + name = feature_name[node] + path.append(name) + recurse(tree_.children_left[node], depth + 1, path_list) + recurse(tree_.children_right[node], depth + 1, path_list) + path.pop() + + recurse(0, 1, path_list) + + new_list = [] + for i in range(len(path_list)): + if path_list[i] != path_list[i - 1]: + new_list.append(path_list[i]) + return new_list + + +def get_combos(paths, comb_mat): + """ Fills Combination matrix with values """ + for i in range(len(comb_mat)): + for pt in paths: + if i in pt: + comb_mat[i][pt] += 1 + + +def get_split_feats(paths, split_vec): + """ Fills split vector with values """ + for i in range(len(split_vec)): + for pt in paths: + if i in pt: + split_vec[i] += 1 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index cd77771..9e840ef 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,8 +5,9 @@ numpy==2.2.5 pandas==2.2.3 python-dateutil==2.9.0.post0 pytz==2025.2 +ray>=2.10.0 scikit-learn==1.6.1 scipy==1.15.2 six==1.17.0 threadpoolctl==3.6.0 -tzdata==2025.2 +tzdata==2025.2 \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..2b9e675 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,58 @@ +""" +conftest.py +----------- +Pytest fixtures configuration file. +Sets up Ray cluster for testing and provides dummy datasets. +""" + +import pytest +import ray +from sklearn.datasets import make_classification, make_regression + +@pytest.fixture(scope="session", autouse=True) +def init_ray_for_tests(): + """ + Initialize a lightweight local Ray cluster for the entire test session. + 'autouse=True' ensures this runs automatically before any tests. + 'scope="session"' ensures it only starts once for all tests to save time. + """ + # Start Ray with minimal resources to keep tests fast and prevent memory hogs + ray.init( + num_cpus=2, + ignore_reinit_error=True, + include_dashboard=False, + logging_level="ERROR" + ) + + yield + + if ray.is_initialized(): + ray.shutdown() + +@pytest.fixture(scope="session") +def classification_data(): + """ + Generate dummy dataset for classification tests. + Returns: (X, y) + """ + X, y = make_classification( + n_samples=100, + n_features=5, + n_informative=3, + random_state=42 + ) + return X, y + +@pytest.fixture(scope="session") +def regression_data(): + """ + Generate dummy dataset for regression tests. + Returns: (X, y) + """ + X, y = make_regression( + n_samples=100, + n_features=5, + n_informative=3, + random_state=42 + ) + return X, y \ No newline at end of file diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..d330e27 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,76 @@ +""" +test_utils.py +------------- +Unit tests for mathematical operators and utility functions in local_utils.py. +""" + +import numpy as np +import pytest +from bigfeat.local_utils import ( + unary_cube, + unary_multinv, + unary_sqrtabs, + unary_logabs, + convert_with_max, + original_feat +) + +def test_unary_cube(): + """Test standard cubing operation.""" + arr = np.array([-2, 0, 3]) + expected = np.array([-8, 0, 27]) + np.testing.assert_array_equal(unary_cube(arr), expected) + +def test_unary_multinv(): + """Test multiplicative inverse, including safe handling of non-zero values.""" + arr = np.array([2.0, -0.5, 4.0]) + expected = np.array([0.5, -2.0, 0.25]) + np.testing.assert_array_equal(unary_multinv(arr), expected) + +def test_unary_multinv_divide_by_zero(): + """Test how numpy handles divide by zero (should warn/inf, but not crash).""" + arr = np.array([0.0]) + with np.errstate(divide='ignore'): # We expect a division by zero warning + res = unary_multinv(arr) + assert np.isinf(res[0]) + +def test_unary_sqrtabs(): + """Test square root of absolute values, keeping the original sign.""" + arr = np.array([-4.0, 0.0, 9.0]) + expected = np.array([-2.0, 0.0, 3.0]) + np.testing.assert_array_equal(unary_sqrtabs(arr), expected) + +def test_unary_logabs(): + """Test log of absolute values, keeping the original sign.""" + # Using np.exp(1) ~ 2.718 to get a clean log value of 1.0 + arr = np.array([-np.exp(1), np.exp(2)]) + expected = np.array([-1.0, 2.0]) + np.testing.assert_allclose(unary_logabs(arr), expected, rtol=1e-5) + +def test_unary_logabs_zero(): + """Test log of zero (should return -inf but keep running).""" + arr = np.array([0.0]) + with np.errstate(divide='ignore'): + res = unary_logabs(arr) + # The sign of 0 is 0, so 0 * -inf = nan. + # We just ensure it doesn't crash the script. + assert np.isnan(res[0]) + +def test_convert_with_max(): + """Test clipping of excessively large/small numbers to float32 limits.""" + # Create numbers larger than float32 max + huge_positive = np.array([1e40]) + huge_negative = np.array([-1e40]) + + clipped_pos = convert_with_max(huge_positive) + clipped_neg = convert_with_max(huge_negative) + + assert clipped_pos[0] == np.finfo(np.float32).max + assert clipped_neg[0] == np.finfo(np.float32).min + assert clipped_pos.dtype == np.float32 + +def test_original_feat(): + """Test that original_feat returns the array unchanged.""" + arr = np.array([1, 2, 3]) + res = original_feat(arr) + np.testing.assert_array_equal(arr, res) \ No newline at end of file