diff --git a/obsidian/__init__.py b/obsidian/__init__.py index f2db016..cc9e775 100644 --- a/obsidian/__init__.py +++ b/obsidian/__init__.py @@ -1,11 +1,13 @@ """obsidian: Automated experiment design and black-box optimization""" -__version__ = '0.8.6' + +__version__ = "0.8.6" # Import key objects from obsidian.campaign import Campaign from obsidian.optimizer import BayesianOptimizer from obsidian.surrogates import SurrogateBoTorch from obsidian.parameters import ParamSpace, Target +from obsidian.rng import get_global_rng, reset_global_rng, USE_OLD_RNG_CONTROL, is_global_rng_initialized # Ensure that other subpackages are imported properly for documentation from obsidian.objectives import Objective @@ -14,3 +16,4 @@ import obsidian.exceptions as exceptions import obsidian.acquisition as acquisition import obsidian.plotting as plotting +import obsidian.rng as rng \ No newline at end of file diff --git a/obsidian/campaign/campaign.py b/obsidian/campaign/campaign.py index 98a5ecc..73b926a 100644 --- a/obsidian/campaign/campaign.py +++ b/obsidian/campaign/campaign.py @@ -7,6 +7,7 @@ from obsidian.constraints import Output_Constraint, const_class_dict from obsidian.exceptions import IncompatibleObjectiveError from obsidian.utils import tensordict_to_dict +from obsidian.rng import GlobalRNG import obsidian import pandas as pd @@ -48,15 +49,29 @@ def __init__(self, optimizer: Optimizer | None = None, designer: ExpDesigner | None = None, objective: Objective | None = None, - seed: int | None = None): + seed: int | None = None, + global_rng: GlobalRNG | None = None + ): self.set_X_space(X_space) self.data = pd.DataFrame() - - optimizer = BayesianOptimizer(X_space, seed=seed) if optimizer is None else optimizer + if obsidian.USE_OLD_RNG_CONTROL: + optimizer_seed = seed + designer_seed = seed + torch_rng = None + else: + if not global_rng: + global_rng = obsidian.get_global_rng(seed) + # optimizer_seed = global_rng.np_rng + # designer_seed = global_rng.np_rng + optimizer_seed = seed + designer_seed = seed + torch_rng = global_rng.torch_rng + self.global_rng = global_rng + optimizer = BayesianOptimizer(X_space, seed=optimizer_seed) if optimizer is None else optimizer self.set_optimizer(optimizer) - designer = ExpDesigner(X_space, seed=seed) if designer is None else designer + designer = ExpDesigner(X_space, seed=designer_seed, torch_rng=torch_rng) if designer is None else designer self.set_designer(designer) self.set_target(target) @@ -284,8 +299,8 @@ def initialize(self, **design_kwargs): Maps ExpDesigner.initialize method """ return self.designer.initialize(**design_kwargs) - - def fit(self): + + def fit(self, fit_options: dict = {}): """ Maps Optimizer.fit method @@ -296,7 +311,7 @@ def fit(self): if self.m_exp <= 0: raise ValueError('Must register data before fitting') - self.optimizer.fit(self.data, target=self.target) + self.optimizer.fit(self.data, target=self.target, fit_options=fit_options) def suggest(self, **optim_kwargs): """ diff --git a/obsidian/campaign/explainer.py b/obsidian/campaign/explainer.py index c64bed8..e5d87a3 100644 --- a/obsidian/campaign/explainer.py +++ b/obsidian/campaign/explainer.py @@ -1,8 +1,10 @@ """Explainer Class: Surrogate Model Interpretation Methods""" +import obsidian from obsidian.parameters import Param_Continuous, ParamSpace from obsidian.optimizer import Optimizer from obsidian.exceptions import UnfitError +from obsidian.rng import get_new_seed, with_tmp_seed import shap from shap import KernelExplainer, Explanation @@ -13,6 +15,7 @@ import torch import matplotlib.pyplot as plt from matplotlib.figure import Figure +from contextlib import nullcontext class Explainer(): @@ -102,15 +105,32 @@ def pred_func(X): y_pred = self.optimizer.predict(X, return_f_inv=True) mu = y_pred[y_name+' (pred)'].values return mu - + + self.shap['pred_func'] = pred_func self.shap['explainer'] = KernelExplainer(pred_func, X_ref) - self.shap['X_sample'] = self.X_space.unit_demap( - pd.DataFrame(torch.rand(size=(n, self.X_space.n_dim)).numpy(), - columns=X_ref.columns) + if obsidian.USE_OLD_RNG_CONTROL: + torch_rng = None + # dummy context manager + ctx_mgr = nullcontext() + else: + torch_rng = getattr(self.optimizer, "torch_rng", None) + # temporarily override global random states + tmp_seed: int = get_new_seed(1, torch_rng) # type: ignore + ctx_mgr = with_tmp_seed(tmp_seed) + + self.shap["X_sample"] = self.X_space.unit_demap( + pd.DataFrame( + torch.rand(size=(n, self.X_space.n_dim), generator=torch_rng).numpy(), + columns=X_ref.columns, + ) + ) + with ctx_mgr: + self.shap["values"] = self.shap["explainer"].shap_values( + self.shap["X_sample"], + seed=seed, + l1_reg="aic", ) - self.shap['values'] = self.shap['explainer'].shap_values(self.shap['X_sample'], - seed=seed, l1_reg='aic') self.shap['explanation'] = Explanation(self.shap['values'], feature_names=X_ref.columns) return diff --git a/obsidian/experiment/benchmark/optithon.py b/obsidian/experiment/benchmark/optithon.py index eadfad6..8c167e0 100644 --- a/obsidian/experiment/benchmark/optithon.py +++ b/obsidian/experiment/benchmark/optithon.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd from scipy.special import gamma +from numpy.random import Generator def Vm_func(X): @@ -61,19 +62,21 @@ def response_2(X): return E_curve -def OT_simulator(X, addNoise=False): +def OT_simulator(X, addNoise=False, generator=None): if isinstance(X, pd.DataFrame): X = X.to_numpy() if isinstance(X, np.ndarray) and X.ndim == 1: X = X[np.newaxis, :] - if X.shape[1] != 6: + if not addNoise: + if X.shape[1] != 5 and X.shape[1] != 6: + raise ValueError(f'Input to simulate must have 5 or 6 columns for no noise case, not {X.shape[1]}') + elif X.shape[1] != 6: raise ValueError(f'Input to simulate must have 6 columns, not {X.shape[1]}') # Break down the data into 3 parts for separate functions X123 = X[:, 0:3] X45 = X[:, 3:5] - X6 = X[:, 5] # First three cols represented in an enzyme kinetics problem y1 = response_1(X123) @@ -87,8 +90,12 @@ def OT_simulator(X, addNoise=False): # Final column has a mean 0 effect, but determines the scale of noise # Note: The overall problem is slightly heteroskedastic if addNoise: + X6 = X[:, 5] sd = 0.01+0.02*X6 - rel_error = np.random.normal(loc=1, scale=sd, size=y.shape[0]) + if isinstance(generator, Generator): + rel_error = generator.normal(loc=1, scale=sd, size=y.shape[0]) + else: + rel_error = np.random.normal(loc=1, scale=sd, size=y.shape[0]) y *= rel_error return y diff --git a/obsidian/experiment/design.py b/obsidian/experiment/design.py index eaa95f0..6c5ca1e 100644 --- a/obsidian/experiment/design.py +++ b/obsidian/experiment/design.py @@ -6,6 +6,7 @@ from obsidian.exceptions import UnsupportedError from botorch.utils.sampling import draw_sobol_samples +import numpy as np from numpy.typing import ArrayLike from scipy.stats import qmc @@ -29,12 +30,18 @@ class ExpDesigner: def __init__(self, X_space: ParamSpace, - seed: int | None = None): + seed: np.random.Generator | int | None = None, + torch_rng: torch.Generator | None = None + ): if not isinstance(X_space, ParamSpace): raise TypeError('X_space must be an obsidian ParamSpace object') self.X_space = X_space self.seed = seed + if torch_rng: + if not isinstance(torch_rng, torch.Generator): + raise TypeError('torch_rng must be a torch.Generator object') + self.torch_rng = torch_rng def __repr__(self): """String representation of object""" @@ -69,9 +76,10 @@ def initialize(self, seed = self.seed method_dict = { + # TODO: botorch is moving to SPEC-0007, `seed` will be phased out in favor of `rng` 'LHS': lambda d, m: torch.tensor( qmc.LatinHypercube(d=d, scramble=False, seed=seed, strength=1, optimization='random-cd').random(n=m)), - 'Random': lambda d, m: torch.rand(size=(m, d)), + 'Random': lambda d, m: torch.rand(size=(m, d), generator=self.torch_rng), 'Sobol': lambda d, m: draw_sobol_samples( bounds=torch.tensor([0.0, 1.0]).reshape(2, 1).repeat(1, d), n=m, q=1).squeeze(1), 'Custom': lambda d, m: torch.tensor(sample_custom), @@ -109,7 +117,7 @@ def initialize(self, print(f'The number of initialization experiments ({m}) exceeds the required \ number for the requested design ({m_required}). Filling with randomized experiments.') excess = m - m_required - sample_add = torch.rand(size=(excess, d)) + sample_add = torch.rand(size=(excess, d), generator=self.torch_rng) sample = torch.vstack((sample, sample_add)) sample = pd.DataFrame(sample.numpy(), columns=self.X_space.X_names) diff --git a/obsidian/experiment/simulator.py b/obsidian/experiment/simulator.py index b8dc4ff..5d2ce68 100644 --- a/obsidian/experiment/simulator.py +++ b/obsidian/experiment/simulator.py @@ -1,5 +1,6 @@ """Simulate virtual experimental data""" +from types import ModuleType, MethodType from obsidian.parameters import ParamSpace from typing import Callable @@ -7,6 +8,8 @@ import numpy as np import warnings +from numpy.random import Generator +import obsidian class Simulator: """ @@ -34,6 +37,8 @@ def __init__(self, response_function: Callable, name: str | list[str] = 'Response', eps: float | list[float] = 0.0, + rng: Generator | int | None = None, + error_function: Callable | None = None, **kwargs): if not callable(response_function): @@ -45,14 +50,32 @@ def __init__(self, self.response_function = response_function self.name = name self.eps = eps if isinstance(eps, list) else [eps] + if isinstance(rng, Generator): + # use provided numpy generator + self.rng: Generator | ModuleType = rng + else: + self._seed = rng + if obsidian.USE_OLD_RNG_CONTROL: + # no random state control here + self.rng = np.random + else: + # use new global RNG control + # note that if the global RNG is already initialized + # and the seed (rng) is different, a ValueError will be raised + global_rng = obsidian.get_global_rng(rng) + self.rng = global_rng.np_rng + if not error_function: + self.error_function = self._default_gaussian_error + else: + self.error_function = MethodType(error_function, self) self.kwargs = kwargs - + def __repr__(self): """String representation of object""" return f" obsidian Simulator(response_function={self.response_function.__name__}, eps={self.eps})" def simulate(self, - X_prop: pd.DataFrame) -> np.ndarray: + X_prop: pd.DataFrame) -> pd.DataFrame: """ Generates a response to a set of experiments. @@ -70,15 +93,14 @@ def simulate(self, y_sim = self.response_function(X) - # Apply error # Expand length of eps to match number of outputs if len(self.eps) == 1: self.eps *= y_sim.ndim if y_sim.ndim == 1: y_sim = y_sim.reshape(-1, 1) - for i in range(y_sim.shape[1]): - rel_error = np.random.normal(loc=1, scale=self.eps[i], size=y_sim.shape[0]) - y_sim[:, i] *= rel_error + + # Apply error + y_sim = self.error_function(X, y_sim) # Handle naming conventions y_dims = y_sim.shape[1] @@ -95,3 +117,7 @@ def simulate(self, df_sim = pd.DataFrame(y_sim, columns=self.name) return df_sim + + def _default_gaussian_error(self, X, y_sim: np.ndarray) -> np.ndarray: + rel_error = 1 + self.eps * self.rng.normal(size=y_sim.size).reshape(y_sim.shape) + return y_sim * rel_error \ No newline at end of file diff --git a/obsidian/experiment/utils.py b/obsidian/experiment/utils.py index b9d068b..dd003d9 100644 --- a/obsidian/experiment/utils.py +++ b/obsidian/experiment/utils.py @@ -2,13 +2,15 @@ from obsidian.exceptions import UnsupportedError import numpy as np +from numpy.random import Generator from itertools import product +from types import ModuleType def factorial_DOE(d: int, n_CP: int = 3, shuffle: bool = True, - seed: int | None = None, + seed: Generator | int | None = None, full: bool = False): """ Creates a statistically designed factorial experiment (DOE). @@ -22,7 +24,7 @@ def factorial_DOE(d: int, uncertainty and curvature. Default is ``3``. shuffle (bool, optional): Whether or not to shuffle the design or leave them in the default run order. Default is ``True``. - seed (int, optional): Randomization seed. Default is ``None``. + seed (Generator | int | None, optional): Randomization seed or generator for numpy. Default is ``None``. full (bool, optional): Whether or not to run the full DOE. Default is ``False``, which will lead to an efficient Res4+ design. @@ -72,10 +74,17 @@ def factorial_DOE(d: int, # Add centerpoints then shuffle X = np.vstack((X, CP)) - if seed is not None: - np.random.seed(seed) + if isinstance(seed, Generator): + # if it is actually a generator + rng: Generator | ModuleType = seed + else: + # with or without a seed, we use np.random for now + # in the future, we should instantiate a new random generator from entropy + rng = np.random + if seed is not None: + np.random.seed(seed) if shuffle: - np.random.shuffle(X) + rng.shuffle(X) # Rescale from (-1,1) to (0,0.999999) X = (X+1)/2 - 1e-6 diff --git a/obsidian/optimizer/base.py b/obsidian/optimizer/base.py index 9fbc6af..294c112 100644 --- a/obsidian/optimizer/base.py +++ b/obsidian/optimizer/base.py @@ -1,5 +1,6 @@ """Optimizer class definition""" +import obsidian from obsidian.parameters import ParamSpace, Target, Param_Observational from obsidian.exceptions import UnsupportedError @@ -13,6 +14,8 @@ import random from torch import Tensor +from obsidian.rng import GlobalRNG, dummy_decorator + class Optimizer(ABC): """ @@ -32,6 +35,7 @@ class Optimizer(ABC): def __init__(self, X_space: ParamSpace, seed: int | None = None, + global_rng: GlobalRNG | None = None, verbose: int = 1): # Verbose selection @@ -42,11 +46,26 @@ def __init__(self, # Handle randomization seed, considering all 3 sources (torch, random, numpy) self.seed = seed - if self.seed is not None: - torch.manual_seed(self.seed) - torch.use_deterministic_algorithms(True) - np.random.seed(self.seed) - random.seed(self.seed) + # TODO: this part is only for backward compatibility, we should drop it eventually + if obsidian.USE_OLD_RNG_CONTROL: + if self.seed is not None: + torch.manual_seed(self.seed) + torch.use_deterministic_algorithms(True) + np.random.seed(self.seed) + random.seed(self.seed) + self.rng_decorator = dummy_decorator + self.fit = self.rng_decorator(self.fit) + self.torch_rng = None + else: + if not global_rng: + self.global_rng = obsidian.get_global_rng(seed) + elif not isinstance(global_rng, GlobalRNG): + raise TypeError('global_rng must be an instance of GlobalRNG') + else: + self.global_rng = global_rng + self.torch_rng = self.global_rng.torch_rng + self.rng_decorator = self.global_rng.tmp_seed_override + self.fit = self.rng_decorator(self.fit) # Store the parameter space which contains useful reference properties if not isinstance(X_space, ParamSpace): @@ -217,9 +236,7 @@ def pf_distance(self, return min_distance @abstractmethod - def fit(self, - Z: pd.DataFrame, - target: Target | list[Target]): + def fit(self, Z: pd.DataFrame, target: Target | list[Target], fit_options: dict): """Fit the optimizer's surrogate models to data""" pass # pragma: no cover diff --git a/obsidian/optimizer/bayesian.py b/obsidian/optimizer/bayesian.py index a05f456..005af78 100644 --- a/obsidian/optimizer/bayesian.py +++ b/obsidian/optimizer/bayesian.py @@ -1,5 +1,8 @@ """Bayesian Optimization: Select experiments from the predicted posterior and update the prior""" +from typing import Any +import obsidian +from obsidian.rng import GlobalRNG from .base import Optimizer from obsidian.parameters import ParamSpace, Target, Task @@ -77,9 +80,10 @@ def __init__(self, X_space: ParamSpace, surrogate: str | dict | list[str] | list[dict] = 'GP', seed: int | None = None, + global_rng: GlobalRNG | None = None, verbose: int = 1): - super().__init__(X_space=X_space, seed=seed, verbose=verbose) + super().__init__(X_space=X_space, seed=seed, global_rng=global_rng, verbose=verbose) self.surrogate_type = [] # Shorthand name as str (as provided) self.surrogate_hps = [] # Hyperparameters @@ -121,8 +125,7 @@ def _load_surrogate_dict(surrogate_dict): if surrogate_str not in model_class_dict.keys(): raise KeyError(f'Surrogate model must be selected from one of: {model_class_dict.keys()}') - return - + @property def is_fit(self): """ @@ -167,9 +170,7 @@ def _validate_target(self, raise TypeError('Each item in target must be a Target object') return target - def fit(self, - Z: pd.DataFrame, - target: Target | list[Target]): + def fit(self, Z: pd.DataFrame, target: Target | list[Target], fit_options: dict = {}): """ Fits the BO surrogate model to data. @@ -177,6 +178,7 @@ def fit(self, Z (pd.DataFrame): Total dataset including inputs (X) and response values (y) target (Target or list of Target): The responses (y) to be used for optimization, packed into a Target object or list thereof + fit_options (dict, optional): Additional options to customize the fitting process. Refer to the model's `fit` method for details. Returns: None. Updates the model in self.surrogate @@ -224,32 +226,48 @@ def fit(self, # Instantiate and fit the model(s) self.surrogate = [] - for i in range(self.n_response): - self.surrogate.append( - SurrogateBoTorch(model_type=self.surrogate_type[i], seed=self.seed, - verbose=self.verbose >= 2, hps=self.surrogate_hps[i])) - - # Handle response NaN values on a response-by-response basis - f_train_i = self.f_train.iloc[:, i] - nan_indices = np.where(f_train_i.isna().values)[0] - X_t_train_valid = self.X_t_train.drop(nan_indices) - f_train_i_valid = f_train_i.drop(nan_indices) - if X_t_train_valid.shape[0] < 1: - raise ValueError(f'No valid data points for response {self.y_names[i]}') - if f_train_i_valid.shape[0] < 1: - raise ValueError(f'No valid response data points for response {self.y_names[i]}') - - # Fit the model for each response - self.surrogate[i].fit(X_t_train_valid, f_train_i_valid, - cat_dims=self.X_space.X_t_cat_idx, task_feature=self.X_space.X_t_task_idx) - - if self.verbose >= 1: - print(f'{self.surrogate_type[i]} model has been fit to data' - + f' with an R2-train-score of: {self.surrogate[i].r2_score:.3g}' - + (f' and a training-loss of: {self.surrogate[i].loss:.3g}' if self.verbose >= 2 else '') - + f' for response: {self.y_names[i]}') - return + # with USE_OLD_RNG_CONTROL, this decorator is dummy + # otherwise the decorator will activate a context manager to temporarily set the RNG seeds + @self.rng_decorator + def fit_models(): + """A wrapper switching between new (context manager) and old (manual) RNG behavior.""" + for i in range(self.n_response): + model = SurrogateBoTorch( + model_type=self.surrogate_type[i], + seed=self.seed, + verbose=self.verbose >= 2, + hps=self.surrogate_hps[i], + ) + + # Handle response NaN values on a response-by-response basis + f_train_i = self.f_train.iloc[:, i] + nan_indices = np.where(f_train_i.isna().values)[0] + X_t_train_valid = self.X_t_train.drop(nan_indices) + f_train_i_valid = f_train_i.drop(nan_indices) + if X_t_train_valid.shape[0] < 1: + raise ValueError(f'No valid data points for response {self.y_names[i]}') + if f_train_i_valid.shape[0] < 1: + raise ValueError(f'No valid response data points for response {self.y_names[i]}') + + # Fit the model for each response + model.fit( + X_t_train_valid, + f_train_i_valid, + cat_dims=self.X_space.X_t_cat_idx, + task_feature=self.X_space.X_t_task_idx, + **fit_options, + ) + + if self.verbose >= 1: + print(f'{self.surrogate_type[i]} model has been fit to data' + + f' with an R2-train-score of: {model.r2_score:.3g}' + + (f' and a training-loss of: {model.loss:.3g}' if self.verbose >= 2 else '') + + f' for response: {self.y_names[i]}') + self.surrogate.append(model) + + fit_models() + # TODO: incorporate new global RNG def save_state(self) -> dict: """ Saves the parameters of the Bayesian Optimizer so that they can be reloaded without fitting. @@ -295,6 +313,7 @@ def save_state(self) -> dict: def __repr__(self): return f'BayesianOptimizer(X_space={self.X_space}, surrogate={self.surrogate_type}, target={getattr(self, "target", None)})' + # TODO: incorporate new global RNG @classmethod def load_state(cls, config_save: dict): @@ -585,6 +604,7 @@ def suggest(self, optim_sequential: bool = True, optim_samples: int = 512, optim_restarts: int = 10, + optim_options: dict | None = None, objective: MCAcquisitionObjective | None = None, out_constraints: Output_Constraint | list[Output_Constraint] | None = None, eq_constraints: Linear_Constraint | list[Linear_Constraint] | None = None, @@ -636,6 +656,7 @@ def suggest(self, The default value is ``512``. optim_restarts (int, optional): The number of restarts to use in the global optimization of the acquisition function. The default value is ``10``. + optim_options (dict, optional): Options to pass to the optimization routine directly. Refer to BoTorch's `optimize_acqf` function family, `gen_candidates_scipy`, `gen_candidates_torch`, and `scipy.optimize.minimize` for possible options. objective (MCAcquisitionObjective, optional): The objective function to be used for optimization. The default is ``None``. out_constraints (Output_Constraint | list[Output_Constraint], optional): An output constraint, or a list @@ -730,7 +751,7 @@ def suggest(self, sampler = ListSampler(*samplers) else: sampler = SobolQMCNormalSampler(sample_shape=torch.Size([optim_samples]), seed=self.seed) - + # Calculate search bounds for optimization X_bounds = torch.tensor(self.X_space.search_space.values, dtype=TORCH_DTYPE) @@ -808,16 +829,18 @@ def suggest(self, nleq_constraints += self.X_space.nonlinear_constraints # Input constraints are used by optim_acqf and friends - optim_kwargs = {'equality_constraints': [c() for c in eq_constraints] if eq_constraints else None, - 'inequality_constraints': [c() for c in ineq_constraints] if ineq_constraints else None, - 'nonlinear_inequality_constraints': [c() for c in nleq_constraints] if nleq_constraints else None} + optim_kwargs: dict[str, Any] = { + "equality_constraints": [c() for c in eq_constraints] if eq_constraints else None, + "inequality_constraints": [c() for c in ineq_constraints] if ineq_constraints else None, + "nonlinear_inequality_constraints": [c() for c in nleq_constraints] if nleq_constraints else None, + } - optim_options = {} # Can optionally specify batch_limit or max_iter + # optim_options = {} # Can optionally specify batch_limit or max_iter # If nonlinear constraints are used, BoTorch doesn't provide an ic_generator # Must provide manual samples = just use random initialization if nleq_constraints: - X_ic = torch.ones((optim_samples, 1 if fixed_features_list else m_batch, self.X_space.n_tdim))*torch.rand(1) + X_ic = torch.ones((optim_samples, 1 if fixed_features_list else m_batch, self.X_space.n_tdim))*torch.rand(1, generator=self.torch_rng) optim_kwargs['batch_initial_conditions'] = X_ic if fixed_features_list: raise UnsupportedError('Nonlinear constraints are not supported with discrete features.') @@ -829,28 +852,35 @@ def suggest(self, Setting optim_sequential to False', UserWarning) optim_sequential = False + @self.rng_decorator + def optimize_acqf_wrapper(fixed_features_list): + if fixed_features_list: + # If there are any discrete values, we must used the mixed integer optimization + optim_func = optimize_acqf_mixed + optim_kwargs["fixed_features_list"] = fixed_features_list + else: + optim_func = optimize_acqf + optim_kwargs["sequential"] = optim_sequential + + candidates, _ = optim_func( + acq_function=aq_func, + bounds=X_bounds, + q=m_batch, + num_restarts=optim_restarts, + raw_samples=optim_samples, + options=optim_options, + **optim_kwargs, + ) + return candidates, _ + # If it's random search, no need to do optimization; Otherwise, initialize the aq function and optimize if aq_str == 'RS': - candidates = torch.rand((m_batch, self.X_space.n_tdim), dtype=TORCH_DTYPE) + candidates = torch.rand((m_batch, self.X_space.n_tdim), generator=self.torch_rng, dtype=TORCH_DTYPE) else: aq_func = aq_class_dict[aq_str](**aq_kwargs).to(TORCH_DTYPE) - - # If there are any discrete values, we must used the mixed integer optimization - if fixed_features_list: - candidates, _ = optimize_acqf_mixed(acq_function=aq_func, bounds=X_bounds, - fixed_features_list=fixed_features_list, - q=m_batch, # Always sequential - num_restarts=optim_restarts, raw_samples=optim_samples, - options=optim_options, - **optim_kwargs) - else: - candidates, _ = optimize_acqf(acq_function=aq_func, bounds=X_bounds, - q=m_batch, - sequential=optim_sequential, - num_restarts=optim_restarts, raw_samples=optim_samples, - options=optim_options, - **optim_kwargs) - + + candidates, _ = optimize_acqf_wrapper(fixed_features_list) + if self.verbose >= 2: print(f'Optimized {aq_str} acquisition function successfully') diff --git a/obsidian/parameters/param_space.py b/obsidian/parameters/param_space.py index 264e6dd..ca17deb 100644 --- a/obsidian/parameters/param_space.py +++ b/obsidian/parameters/param_space.py @@ -28,6 +28,8 @@ from obsidian.exceptions import UnsupportedError from obsidian.utils import tensordict_to_dict +from typing import Sequence + import torch import numpy as np import pandas as pd @@ -66,7 +68,7 @@ class ParamSpace(IParamSpace): """ def __init__(self, - params: list[Parameter]): + params: Sequence[Parameter]): # Convert to immutable dtype to presever order self.params = tuple(params) diff --git a/obsidian/rng.py b/obsidian/rng.py new file mode 100644 index 0000000..daf835a --- /dev/null +++ b/obsidian/rng.py @@ -0,0 +1,160 @@ +import random +import time +from contextlib import contextmanager +from functools import partial + +import numpy as np +import torch + + +class GlobalRNG: + """ + A class to manage global random number generation for reproducibility and statistical integrity. + """ + + def __init__(self, seed: int | None = None): + # if seed is none, get a seed from current time and save it + if seed is None: + # time.time() * 1e6 gives microseconds level accuracy + seed = int(time.time() * 1e6) % (2**32 - 1) + self.seed = seed + torch.use_deterministic_algorithms(True) + self.np_rng = np.random.default_rng(seed) + self.torch_rng = torch.Generator().manual_seed(seed) + print( + f"numpy and torch random number generators initialized with seed {seed}. " + "Please reuse this seed to reproduce the results." + ) + + def tmp_seed_override(self, func): + """Decorator that samples a seed from the global RNG and temporarily overrides the global random states (numpy, torch, random) for the duration of the decorated function call.""" + seed: int = get_new_seed(1, self.torch_rng) # type: ignore + + def wrapper(*args, **kwargs): + with with_tmp_seed(seed): + return func(*args, **kwargs) + + return wrapper + + def save_state(self) -> dict: + """Saves the objective and RNG states to a state dictionary""" + obj_dict = { + "name": self.__class__.__name__, + "seed": self.seed, + # numpy bit generator state is nested dictionary with only strings and integers + "np_rng": self.np_rng.bit_generator.state, + # torch generator state is a tensor so convert it to list for serialization + "torch_rng": self.torch_rng.get_state().cpu().tolist(), + } + return obj_dict + + @classmethod + def load_state(cls, obj_dict: dict): + """Loads the RNG states from a state dictionary""" + instance = cls(seed=obj_dict["seed"]) + instance.np_rng.bit_generator.state = obj_dict["np_rng"] + # must be a ByteTensor + instance.torch_rng.set_state(torch.ByteTensor(obj_dict["torch_rng"])) + print( + "numpy and torch random number generators, and their random states " + f"restored with seed {instance.seed}. " + ) + return instance + + +USE_OLD_RNG_CONTROL = False + +_GLOBAL_RNG: GlobalRNG | None = None + + +def get_global_rng(seed: int | None = None, verbose: bool = False, reset: bool = False): + global _GLOBAL_RNG + if _GLOBAL_RNG is None or reset: + if reset: + print(f"Resetting global RNG with seed {seed}.") + # when seed is not provided, a seed will be generated based on the current time + _GLOBAL_RNG = GlobalRNG(seed=seed) + elif seed and seed != _GLOBAL_RNG.seed: + raise ValueError( + f"Global RNG has already been initialized with seed {_GLOBAL_RNG.seed}, " + f"while a different seed {seed} was provided. " + "This is not allowed for statistical integrity considerations. " + "To reset all random number generators, please restart the Python session " + "or explicitly reinitialize the global RNG through `reset_global_rng`." + ) + else: + print(f"Retrieving global RNG initialized with seed {_GLOBAL_RNG.seed}.") + if seed and verbose: + print( + "Global RNG has already been initialized with the same seed " + f"{_GLOBAL_RNG.seed}. The current global RNG will be reused. " + "If this operation is meant to reset the random state, please reset " + "the global RNG explicitly through `reset_global_rng`." + ) + return _GLOBAL_RNG + + +def is_global_rng_initialized() -> bool: + """Checks if the global RNG has been initialized.""" + return _GLOBAL_RNG is not None + + +reset_global_rng = partial(get_global_rng, reset=True) + + +def get_new_seed(num: int, generator: torch.Generator | None = None) -> int | list[int]: + """ + Generates a new random 32-bit integer seed or a list of seeds from a given torch generator. + + Args: + num (int): The number of seeds to generate. + generator (torch.Generator, optional): The generator to use. + If None, a new default generator is used. + + Returns: + int | list[int]: A new random 32-bit integer seed or a list of seeds. + """ + # The default generator is used if none is provided + if generator is None: + generator = torch.Generator() + + # Generate a single 32-bit integer seed + seed = torch.randint(2**32 - 1, (num,), generator=generator) + if num > 1: + return seed.tolist() + else: + return seed.item() # type: ignore + + +@contextmanager +def with_tmp_seed(seed: int | None = None): + """ + Context manager to temporarily set and restore global seeds for torch, numpy, and + the built-in random module, in case of directly passing a generator to an external + method is not possible. + """ + if seed is None: + yield + return + + # Save original random states + original_torch_state = torch.get_rng_state() + original_numpy_state = np.random.get_state() + original_random_state = random.getstate() + + try: + # Set new seeds + torch.manual_seed(seed) + np.random.seed(seed) + random.seed(seed) + yield + finally: + # Restore original random states + torch.set_rng_state(original_torch_state) + np.random.set_state(original_numpy_state) + random.setstate(original_random_state) + + +def dummy_decorator(func): + """A dummy decorator for backward compatibility.""" + return func diff --git a/obsidian/surrogates/base.py b/obsidian/surrogates/base.py index 64bde0e..cb2adb1 100644 --- a/obsidian/surrogates/base.py +++ b/obsidian/surrogates/base.py @@ -1,5 +1,6 @@ """Surrogate model class definition""" +import obsidian from obsidian.config import TORCH_DTYPE from abc import ABC, abstractmethod @@ -46,7 +47,8 @@ def __init__(self, # Handle randomization seed, considering all 3 sources (torch, random, numpy) self.seed = seed - if self.seed is not None: + # TODO: this part is only for backward compatibility, we should drop it eventually + if self.seed is not None and obsidian.USE_OLD_RNG_CONTROL: torch.manual_seed(self.seed) torch.use_deterministic_algorithms(True) np.random.seed(self.seed) diff --git a/obsidian/surrogates/botorch.py b/obsidian/surrogates/botorch.py index 9eed9d7..a652759 100644 --- a/obsidian/surrogates/botorch.py +++ b/obsidian/surrogates/botorch.py @@ -1,5 +1,6 @@ """Surrogate models built using BoTorch API and torch_model objects""" +from typing import Callable from .base import SurrogateModel from .config import model_class_dict from .utils import fit_pytorch @@ -13,6 +14,7 @@ from botorch.models.gpytorch import GPyTorchModel from botorch.models.ensemble import EnsembleModel from gpytorch.mlls import ExactMarginalLogLikelihood +from botorch.optim.utils import sample_all_priors import torch import torch.nn as nn @@ -20,6 +22,14 @@ import pandas as pd import warnings +# Maximum number of attempts to fit the model +# BoTorch default is 5 +MAX_ATTEMPTS = 5 +# Whether to use multiple restarts for optimization +# BoTorch default is False +MULTI_STARTS = False +# Whether to sample all parameters from prior or not +SAMPLE_INITIAL_PARAMETERS = False class SurrogateBoTorch(SurrogateModel): """ @@ -43,6 +53,7 @@ class SurrogateBoTorch(SurrogateModel): to estimate uncertainty. hps (dict): Optional surrogate function hyperparameters. + sample_initial_parameters (bool): Whether to sample initial parameters from the prior. If hyperparameters are not provided, BoTorch will by default initialize the model with some built-in heuristics. To override this behavior, use ``sample_initial_parameters`` to sample all hyperparameters from priors. mll (ExactMarginalLogLikelihood): The marginal log likelihood of the model. torch_model (torch.nn.Module): The torch model for the surrogate. loss (float): The loss of the model. @@ -52,20 +63,20 @@ def __init__(self, model_type: str = 'GP', seed: int | None = None, verbose: bool = False, + sample_initial_parameters: bool = SAMPLE_INITIAL_PARAMETERS, hps: dict = {}): super().__init__(model_type=model_type, seed=seed, verbose=verbose) # Optional surrogate function hyperparameters self.hps = hps - - return - + self.sample_initial_parameters = sample_initial_parameters + def init_model(self, X: pd.DataFrame, y: pd.Series, cat_dims: list[int], - task_feature: int): + task_feature: int | None = None): """ Instantiates the torch model for the surrogate. Cannot be called during __init__ normally as X,y are required @@ -75,6 +86,7 @@ def init_model(self, X (pd.DataFrame): Input parameters for the training data. y (pd.Series): Training data responses. cat_dims (list): A list of indices for categorical dimensions in the input data. + task_feature (int, optional): The index of the task feature in the input data. Returns: None. Updates surrogate attributes, including self.torch_model. @@ -107,24 +119,38 @@ def init_model(self, self.torch_model = model_class_dict[self.model_type](train_X=X_p, train_Y=y_p, **self.hps) else: self.torch_model = model_class_dict[self.model_type](train_X=X_p, train_Y=y_p, **self.hps).to(TORCH_DTYPE) + + # self.torch_model.likelihood.noise_covar.noise_prior = GammaPrior(1.1, 10.0) + if self.sample_initial_parameters: + sample_all_priors(self.torch_model) - return - def fit(self, + def fit( + self, X: pd.DataFrame, y: pd.Series, - cat_dims=None, - task_feature=None): + cat_dims: list, + task_feature: int | None = None, + optimizer: Callable | None = None, + max_attempts: int = MAX_ATTEMPTS, + multi_starts: bool = MULTI_STARTS, + **optimizer_kwargs + ) -> None: """ Fits the surrogate model to data Args: X (pd.DataFrame): Input parameters for the training data y (pd.Series): Training data responses - cat_dims (list, optional): A list of indices for categorical dimensions in the input data. Default is ``None``. + cat_dims (list): A list of indices for categorical dimensions in the input data. + task_feature (int, optional): The index of the task feature in the input data. Default is ``None``. + optimizer (callable, optional): The optimizer to use for fitting the model. Default is ``None`` and chosen dynamically based on model type. + max_attempts (int, optional): The maximum number of attempts to fit the model. Default is ``MAX_ATTEMPTS=5``. + multi_starts (bool, optional): Whether to run all ``max_attempts`` restarts with different initializations and pick the best model. Default is ``MULTI_STARTS=True``. + **optimizer_kwargs: Additional keyword arguments to pass to the optimizer. Returns: - None. Updates the surrogate model attributes, including regressed parameters. + None. Updates the surrogate model attributes in-place, including regressed parameters. """ # Instantiate self.torch_model @@ -143,13 +169,14 @@ def fit(self, if isinstance(self.torch_model, GPyTorchModel): self.loss_fcn = ExactMarginalLogLikelihood(self.torch_model.likelihood, self.torch_model) - if self.model_type == 'DKL': - optimizer = fit_gpytorch_mll_torch - else: - optimizer = fit_gpytorch_mll_scipy + if not optimizer: + if self.model_type == 'DKL': + optimizer = fit_gpytorch_mll_torch + else: + optimizer = fit_gpytorch_mll_scipy try: - fit_gpytorch_mll(self.loss_fcn, optimizer=optimizer) + fit_gpytorch_mll(self.loss_fcn, optimizer=optimizer, max_attempts=max_attempts, pick_best_of_all_attempts=multi_starts, **optimizer_kwargs) except Exception: try: fit_gpytorch_mll(self.loss_fcn, optimizer=fit_gpytorch_mll_torch)