diff --git a/docs/source/opinf/changelog.md b/docs/source/opinf/changelog.md index 1047e476..b070f8b1 100644 --- a/docs/source/opinf/changelog.md +++ b/docs/source/opinf/changelog.md @@ -5,10 +5,18 @@ New versions may introduce substantial new features or API adjustments. ::: +## Version 0.5.15 + +- Improvement to `fit_regselect_*()` so that the regularization does not have to be initialized before fitting the model. This fixes a longstanding chicken/egg problem and makes using `fit_regselect_*()` much less cumbersome. +- Ensured error computation correctly aligns training and predicted states in `fit_regselect_continuous()`. Previously, this could have been misaligned when using a time derivative estimator that truncates states. +- Time derivative estimators in `opinf.ddt` now have a `mask()` method that map states to the estimation grid. +- Added regression tests based on the tutorial notebooks. +- Added `pytest` and `pytest-cov` to the dependencies for developers. + ## Version 0.5.14 - Catch any errors in `fit_regselect*()` that occur when the model uses `refit()`. -- Tikhonov-type least-squares solvers do not require the regularizer in the constructor but will raise an `AttributeError` in `solve()` (and other methods) if the regularizer is not set. This makes using `fit_regselect_*()` much less cumbersome. +- Tikhonov-type least-squares solvers do not require the regularizer in the constructor but will raise an `AttributeError` in `solve()` (and other methods) if the regularizer is not set. - `PODBasis.fit(Q)` raises a warning when using the `"method-of-snapshots"`/`"eigh"` strategy if $n < k$ for $\mathbf{Q}\in\mathbb{R}^{n \times k}.$ In this case, calculating the $n \times k$ SVD is likely more efficient than the $k \times k$ eigenvalue problem. - Added Python 3.13 to list of tests. diff --git a/pyproject.toml b/pyproject.toml index 240a7a24..003d06fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,8 @@ dev = [ "notebook", "pandas", "pre-commit>=3.7.1", + "pytest", + "pytest-cov", "tox>=4", ] diff --git a/src/opinf/__init__.py b/src/opinf/__init__.py index 347e3d3a..28987bc6 100644 --- a/src/opinf/__init__.py +++ b/src/opinf/__init__.py @@ -7,7 +7,7 @@ https://github.com/Willcox-Research-Group/rom-operator-inference-Python3 """ -__version__ = "0.5.14" +__version__ = "0.5.15" from . import ( basis, diff --git a/src/opinf/ddt/_base.py b/src/opinf/ddt/_base.py index 66b3e8a2..be44c883 100644 --- a/src/opinf/ddt/_base.py +++ b/src/opinf/ddt/_base.py @@ -104,7 +104,7 @@ def _check_dimensions(self, states, inputs, check_against_time=True): raise errors.DimensionalityError("states must be two-dimensional") if check_against_time and states.shape[-1] != self.time_domain.size: raise errors.DimensionalityError( - "states not aligned with time_domain" + "states and time_domain not aligned" ) if inputs is not None: if inputs.ndim == 1: @@ -135,6 +135,41 @@ def estimate(self, states, inputs=None): _inputs : (m, k') ndarray or None Inputs corresponding to ``_states``, if applicable. **Only returned** if ``inputs`` is provided. + + Raises + ------ + opinf.errors.DimensionalityError + If the ``states`` or ``inputs`` are not aligned with the + :attr:`time_domain`. + """ + raise NotImplementedError # pragma: no cover + + @abc.abstractmethod + def mask(self, arr): + """Map an array from the training time domain to the domain of the + estimated time derivatives. + + This method is used in post-hoc regularization selection routines. + + Parameters + ---------- + arr : (..., k) ndarray + Array (states, inputs, etc.) aligned with the training time domain. + + Returns + ------- + _arr : (..., k') ndarray + Array mapped to the domain of the estimated time derivatives. + + Examples + -------- + >>> Q, dQ = estimator.esimate(states) + >>> Q2 = estimator.mask(states) + >>> np.all(Q2 == Q) + True + >>> Q3 = estimator.mask(other_states_on_same_time_grid) + >>> Q3.shape == Q.shape + True """ raise NotImplementedError # pragma: no cover diff --git a/src/opinf/ddt/_finite_difference.py b/src/opinf/ddt/_finite_difference.py index c7b9a9bf..d7491c2a 100644 --- a/src/opinf/ddt/_finite_difference.py +++ b/src/opinf/ddt/_finite_difference.py @@ -774,9 +774,8 @@ class UniformFiniteDifferencer(DerivativeEstimatorTemplate): Time domain corresponding to the snapshot data. This class requires uniformly spaced time domains, see :class:`NonuniformFiniteDifferencer` for non-uniform domains. - scheme : str or callable - Finite difference scheme to use. - **Options:** + scheme : str + Finite difference scheme to use. **Options:** * ``'fwd1'``: first-order forward differences, see :func:`fwd1`. * ``'fwd2'``: second-order forward differences, see :func:`fwd2`. @@ -796,17 +795,6 @@ class UniformFiniteDifferencer(DerivativeEstimatorTemplate): * ``'ord2'``: second-order differences, see :func:`ord2`. * ``'ord4'``: fourth-order differences, see :func:`ord4`. * ``'ord6'``: sixth-order differences, see :func:`ord6`. - - If ``scheme`` is a callable function, its signature must match the - following syntax. - - .. code-block:: python - - _states, ddts = scheme(states, dt) - _states, ddts, _inputs = scheme(states, dt, inputs) - - Here ``dt`` is a positive float, the uniform time step. - Each output should have the same number of columns. """ _schemes = types.MappingProxyType( @@ -832,7 +820,7 @@ class UniformFiniteDifferencer(DerivativeEstimatorTemplate): } ) - def __init__(self, time_domain, scheme="ord4"): + def __init__(self, time_domain, scheme: str = "ord4"): """Store the time domain and set the finite difference scheme.""" DerivativeEstimatorTemplate.__init__(self, time_domain) @@ -842,13 +830,9 @@ def __init__(self, time_domain, scheme="ord4"): raise ValueError("time domain must be uniformly spaced") # Set the finite difference scheme. - if not callable(scheme): - if scheme not in self._schemes: - raise ValueError( - f"invalid finite difference scheme '{scheme}'" - ) - scheme = self._schemes[scheme] - self.__scheme = scheme + if scheme not in self._schemes: + raise ValueError(f"invalid finite difference scheme '{scheme}'") + self.__scheme = self._schemes[scheme] # Properties -------------------------------------------------------------- @property @@ -899,6 +883,47 @@ def estimate(self, states, inputs=None): states, inputs = self._check_dimensions(states, inputs, False) return self.scheme(states, self.dt, inputs) + def mask(self, arr): + """Map an array from the training time domain to the domain of the + estimated time derivatives. + + This method is used in post-hoc regularization selection routines. + + Parameters + ---------- + arr : (..., k) ndarray + Array (states, inputs, etc.) aligned with the training time domain. + + Returns + ------- + _arr : (..., k') ndarray + Array mapped to the domain of the estimated time derivatives. + + Examples + -------- + >>> Q, dQ = estimator.esimate(states) + >>> Q2 = estimator.mask(states) + >>> np.all(Q2 == Q) + True + >>> Q3 = estimator.mask(other_states_on_same_time_grid) + >>> Q3.shape == Q.shape + True + """ + schema = self.scheme.__name__ + mode, order = schema[:3], int(schema[3]) + if mode == "ord": + return arr + elif mode == "fwd": + cols = slice(0, -order) + elif mode == "bwd": + cols = slice(order, None) + elif mode == "ctr": + margin = order // 2 + cols = slice(margin, -margin) + else: # pragma: no cover + raise RuntimeError("invalid scheme name!") + return arr[..., cols] + class NonuniformFiniteDifferencer(DerivativeEstimatorTemplate): """Time derivative estimation with finite differences for state snapshots @@ -967,6 +992,35 @@ def estimate(self, states, inputs=None): return states, ddts, inputs return states, ddts + def mask(self, arr): + """Map an array from the training time domain to the domain of the + estimated time derivatives. Since this class provides an estimate at + every time step, this method simply returns ``arr``. + + This method is used in post-hoc regularization selection routines. + + Parameters + ---------- + arr : (..., k) ndarray + Array (states, inputs, etc.) aligned with the training time domain. + + Returns + ------- + _arr : (..., k') ndarray + Array mapped to the domain of the estimated time derivatives. + + Examples + -------- + >>> Q, dQ = estimator.esimate(states) + >>> Q2 = estimator.mask(states) + >>> np.all(Q2 == Q) + True + >>> Q3 = estimator.mask(other_states_on_same_time_grid) + >>> Q3.shape == Q.shape + True + """ + return arr + # Old API ===================================================================== def ddt_uniform(states, dt, order=2): diff --git a/src/opinf/ddt/_interpolation.py b/src/opinf/ddt/_interpolation.py index 85198e3d..a6177c1f 100644 --- a/src/opinf/ddt/_interpolation.py +++ b/src/opinf/ddt/_interpolation.py @@ -189,3 +189,55 @@ def estimate(self, states, inputs=None): if inputs is None: return states, ddts return states, ddts, inputs + + def mask(self, arr): + """Map an array from the training time domain to the domain of the + estimated time derivatives. + + + This method is used in post-hoc regularization selection routines. + + Parameters + ---------- + arr : (..., k) ndarray + Array (states, inputs, etc.) aligned with the training time domain. + + Returns + ------- + _arr : (..., k) ndarray + Array mapped to the domain of the estimated time derivatives. + + Notes + ----- + If :attr:`new_time_domain` is not the same as :attr:`time_domain`, + this method interpolates the ``arr`` and evaluates the interpolant + (not its derivative) over the :attr:`new_time_domain`. + + Examples + -------- + >>> Q, dQ = estimator.esimate(states) + >>> Q2 = estimator.mask(states) + >>> np.all(Q2 == Q) + True + >>> Q3 = estimator.mask(other_states_on_same_time_grid) + >>> Q3.shape == Q.shape + True + """ + if self.new_time_domain is None: + return arr + + # Interpolate to the new time domain. + statespline = self.InterpolatorClass( + self.time_domain, + arr, + **self.options, + ) + return statespline(self.new_time_domain) + + # Verification ------------------------------------------------------------ + def verify(self, plot=False, return_errors=False): + new_time_domain = self.__t2 + self.__t2 = None + out = super().verify(plot, return_errors) + self.__t2 = new_time_domain + return out diff --git a/src/opinf/models/mono/_base.py b/src/opinf/models/mono/_base.py index 9f127850..0cf9b3db 100644 --- a/src/opinf/models/mono/_base.py +++ b/src/opinf/models/mono/_base.py @@ -235,50 +235,6 @@ def input_dimension(self, m): ) self.__m = int(m) - # Properties: solver ------------------------------------------------------ - @property - def solver(self): - """Solver for the least-squares regression, see :mod:`opinf.lstsq`.""" - return self.__solver - - @solver.setter - def solver(self, solver): - """Set the solver, including default options.""" - if self._fully_intrusive: - if solver is not None: - warnings.warn( - "all operators initialized explicity, setting solver=None", - errors.OpInfWarning, - ) - self.__solver = None - return - - # Defaults and shortcuts. - if solver is None: - # No regularization. - solver = lstsq.PlainSolver() - elif np.isscalar(solver): - if solver == 0: - # Also no regularization. - solver = lstsq.PlainSolver() - elif solver > 0: - # Scalar Tikhonov (L2) regularization. - solver = lstsq.L2Solver(solver) - else: - raise ValueError("if a scalar, solver must be nonnegative") - - # Light validation: must be instance w/ fit(), solve(). - if isinstance(solver, type): - raise TypeError("solver must be an instance, not a class") - for mtd in "fit", "solve": - if not hasattr(solver, mtd) or not callable(getattr(solver, mtd)): - warnings.warn( - f"solver should have a '{mtd}()' method", - errors.OpInfWarning, - ) - - self.__solver = solver - # Dimensionality reduction ------------------------------------------------ def galerkin(self, Vr, Wr=None): r"""Construct a reduced-order model by taking the (Petrov-)Galerkin @@ -335,28 +291,6 @@ def galerkin(self, Vr, Wr=None): ] ) - # Validation methods ------------------------------------------------------ - def _check_inputargs(self, u, argname): - """Check that the model structure agrees with input arguments.""" - if self._has_inputs and u is None: - raise ValueError(f"argument '{argname}' required") - - if not self._has_inputs and u is not None: - warnings.warn( - f"argument '{argname}' should be None, " - "argument will be ignored", - errors.OpInfWarning, - ) - - def _check_is_trained(self): - """Ensure that the model is trained and ready for prediction.""" - if self.state_dimension is None: - raise AttributeError("no state_dimension (call fit())") - if self._has_inputs and (self.input_dimension is None): - raise AttributeError("no input_dimension (call fit())") - if any(oputils.is_uncalibrated(op) for op in self.operators): - raise AttributeError("model not trained (call fit())") - def __eq__(self, other): """Two models are equal if they have equivalent operators.""" if not isinstance(other, self.__class__): @@ -384,3 +318,104 @@ def copy(self): """Make a copy of the model.""" sol = self.solver.copy() if self.solver is not None else None return self.__class__([op.copy() for op in self.operators], solver=sol) + + +class _OpInfModel(_Model): + """Base class for monolithic models with Operator Inference learning.""" + + # Properties: solver ------------------------------------------------------ + @property + def solver(self): + """Solver for the least-squares regression, see :mod:`opinf.lstsq`.""" + return self.__solver + + @solver.setter + def solver(self, solver): + """Set the solver, including default options.""" + if self._fully_intrusive: + if solver is not None: + warnings.warn( + "all operators initialized explicity, setting solver=None", + errors.OpInfWarning, + ) + self.__solver = None + return + + # Defaults and shortcuts. + if solver is None: + # No regularization. + solver = lstsq.PlainSolver() + elif np.isscalar(solver): + if solver == 0: + # Also no regularization. + solver = lstsq.PlainSolver() + elif solver > 0: + # Scalar Tikhonov (L2) regularization. + solver = lstsq.L2Solver(solver) + else: + raise ValueError("if a scalar, solver must be nonnegative") + + # Light validation: must be instance w/ fit(), solve(). + if isinstance(solver, type): + raise TypeError("solver must be an instance, not a class") + for mtd in "fit", "solve": + if not hasattr(solver, mtd) or not callable(getattr(solver, mtd)): + warnings.warn( + f"solver should have a '{mtd}()' method", + errors.OpInfWarning, + ) + + self.__solver = solver + + # Validation methods ------------------------------------------------------ + def _check_inputargs(self, u, argname): + """Check that the model structure agrees with input arguments.""" + if self._has_inputs and u is None: + raise ValueError(f"argument '{argname}' required") + + if not self._has_inputs and u is not None: + warnings.warn( + f"argument '{argname}' should be None, " + "argument will be ignored", + errors.OpInfWarning, + ) + + def _check_is_trained(self): + """Ensure that the model is trained and ready for prediction.""" + if self.state_dimension is None: + raise AttributeError("no state_dimension (call fit())") + if self._has_inputs and (self.input_dimension is None): + raise AttributeError("no input_dimension (call fit())") + if any(oputils.is_uncalibrated(op) for op in self.operators): + raise AttributeError("model not trained (call fit())") + + # Fitting ----------------------------------------------------------------- + @abc.abstractmethod + def _process_fit_arguments(self, *args, **kwargs): + """Prepare training data, validate and set dimensions, etc.""" + pass + + @abc.abstractmethod + def _assemble_data_matrix(self, *args, **kwargs): + """Construct the Operator Inference data matrix.""" + pass + + @abc.abstractmethod + def _extract_operators(self, *args, **kwargs): + """Unpack the Operator Inference solution, the operator matrix.""" + pass + + @abc.abstractmethod + def _fit_solver(self, *args, **kwargs): + """Initialize the regression solver.""" + pass + + @abc.abstractmethod + def refit(self, *args, **kwargs): + """Solve the regression and unpack the results.""" + pass + + @abc.abstractmethod + def fit(self, *args, **kwargs): + """Learn model operators from data.""" + pass diff --git a/src/opinf/models/mono/_nonparametric.py b/src/opinf/models/mono/_nonparametric.py index e91d120e..e86c37da 100644 --- a/src/opinf/models/mono/_nonparametric.py +++ b/src/opinf/models/mono/_nonparametric.py @@ -13,16 +13,16 @@ import scipy.integrate as spintegrate import scipy.interpolate as spinterpolate -from ._base import _Model +from ._base import _OpInfModel from ... import errors, utils, operators as _operators from ...operators import _utils as oputils # Base class ================================================================== -class _NonparametricModel(_Model): +class _NonparametricModel(_OpInfModel): """Base class for nonparametric monolithic models. - Parent class: :class:`opinf.models.mono._base._Model` + Parent class: :class:`opinf.models.mono._base._OpInfModel` Child classes: @@ -249,6 +249,7 @@ def refit(self): # Execute non-intrusive learning. self._extract_operators(self.solver.solve()) + return self def fit(self, states, lhs, inputs=None): r"""Learn the model operators from data. @@ -311,8 +312,7 @@ def fit(self, states, lhs, inputs=None): return self self._fit_solver(states, lhs, inputs) - self.refit() - return self + return self.refit() # Model evaluation -------------------------------------------------------- def rhs(self, state, input_=None): diff --git a/src/opinf/models/mono/_parametric.py b/src/opinf/models/mono/_parametric.py index 712417ce..9e109a10 100644 --- a/src/opinf/models/mono/_parametric.py +++ b/src/opinf/models/mono/_parametric.py @@ -15,7 +15,7 @@ import numpy as np import scipy.interpolate as spinterpolate -from ._base import _Model +from ._base import _OpInfModel from ._nonparametric import ( _FrozenDiscreteModel, _FrozenContinuousModel, @@ -25,7 +25,7 @@ # Base classes ================================================================ -class _ParametricModel(_Model): +class _ParametricModel(_OpInfModel): """Base class for parametric monolithic models.""" _ModelClass = NotImplemented # Must be specified by child classes. @@ -89,12 +89,12 @@ def _get_operator_of_type(self, OpClass): @property def operators(self): """Operators comprising the terms of the model.""" - return _Model.operators.fget(self) + return _OpInfModel.operators.fget(self) @operators.setter def operators(self, ops): """Set the operators.""" - _Model.operators.fset(self, ops) + _OpInfModel.operators.fset(self, ops) # Check at least one operator is parametric. parametric_operators = [ diff --git a/src/opinf/roms/_base.py b/src/opinf/roms/_base.py index 7fdc9484..ded43ca0 100644 --- a/src/opinf/roms/_base.py +++ b/src/opinf/roms/_base.py @@ -466,7 +466,7 @@ def _get_stability_limits(self, states, stability_margin): limits[ell] = np.inf return shifts, limits - def _fit_and_return_training_data( + def _fit_solver( self, parameters, states, @@ -475,8 +475,9 @@ def _fit_and_return_training_data( fit_transformer, fit_basis, ): - """Process the training data, fit the model, and return the processed - training data. + """Process the training data and fit the model solver. + Returns the processed training data. + """ self._check_fit_args(lhs=lhs, inputs=inputs) if parameters is None: @@ -494,9 +495,11 @@ def _fit_and_return_training_data( if parameters is None: inputdata = None if inputs is None else np.hstack(inputs) - self.model.fit(np.hstack(states), np.hstack(lhs), inputdata) + self.model._fit_solver( + np.hstack(states), np.hstack(lhs), inputdata + ) else: - self.model.fit(parameters, states, lhs, inputs) + self.model._fit_solver(parameters, states, lhs, inputs) return states @@ -651,29 +654,18 @@ def fit_regselect_continuous( ) # Fit the model for the first time. - initialized = False - for reg in candidates: - self.model.solver.regularizer = regularizer_factory(reg) - try: - states = self._fit_and_return_training_data( - parameters=parameters, - states=states, - lhs=ddts, - inputs=inputs, - fit_transformer=fit_transformer, - fit_basis=fit_basis, - ) - initialized = True - break - except Exception: # pragma: no cover - pass - if not initialized: # pragma: no cover - raise RuntimeError( - "fit() failed with all regularization candidates" - ) + self._fit_solver( + parameters=parameters, + states=states, + lhs=ddts, + inputs=inputs, + fit_transformer=fit_transformer, + fit_basis=fit_basis, + ) # Set up the regularization selection. - shifts, limits = self._get_stability_limits(states, stability_margin) + states_ = [self.encode(Q) for Q in states] + shifts, limits = self._get_stability_limits(states_, stability_margin) def unstable(_Q, ell, size): """Return ``True`` if the solution is unstable.""" @@ -696,8 +688,8 @@ def unstable(_Q, ell, size): time_domains = train_time_domains if input_functions is None: - input_functions = [None] * len(states) - loop_collections = [states, input_functions, time_domains] + input_functions = [None] * len(states_) + loop_collections = [states_, input_functions, time_domains] if is_parametric := parameters is not None: loop_collections.insert(0, parameters) @@ -740,16 +732,10 @@ def training_error(reg_params): if unstable(solution, ell, t.size): return np.inf trainsize = Q.shape[-1] - solution_train = solution[:, :trainsize] - error += post.Lp_error(Q, solution_train, t[:trainsize])[1] - return error / len(states) - - # BUG: training states may not align with the first `trainsize` - # entries of `solution` because `Q` (from `states`) may have been - # cropped by the time derivative estimator. - # This is not an issue for fully discrete models. - # Potential fix: implement some kind of mask in ddt_estimator and - # replace `solution[:, :trainsize]` -> `solution[:, the_ddt_mask]`. + error += post.Lp_error( + Q, solution[:, :trainsize], t[:trainsize] + )[1] + return error / len(states_) best_regularization = utils.gridsearch( training_error, @@ -879,30 +865,18 @@ def fit_regselect_discrete( test_cases, utils.DiscreteRegTest ) - # Fit the model for the first time. - initialized = False - for reg in candidates: - self.model.solver.regularizer = regularizer_factory(candidates[0]) - try: - states = self._fit_and_return_training_data( - parameters=parameters, - states=states, - lhs=None, - inputs=inputs, - fit_transformer=fit_transformer, - fit_basis=fit_basis, - ) - initialized = True - break - except Exception: # pragma: no cover - pass - if not initialized: # pragma: no cover - raise RuntimeError( - "fit() failed with all regularization candidates" - ) + # Fit the model for the first time and get compressed training data. + states_ = self._fit_solver( + parameters=parameters, + states=states, + lhs=None, + inputs=inputs, + fit_transformer=fit_transformer, + fit_basis=fit_basis, + ) # Set up the regularization selection. - shifts, limits = self._get_stability_limits(states, stability_margin) + shifts, limits = self._get_stability_limits(states_, stability_margin) def unstable(_Q, ell): """Return ``True`` if the solution is unstable.""" @@ -911,13 +885,13 @@ def unstable(_Q, ell): return np.any(np.abs(_Q - shifts[ell]).max() > limits[ell]) # Extend the iteration counts by the number of testing iterations. - num_iters = [Q.shape[-1] for Q in states] + num_iters = [Q.shape[-1] for Q in states_] if num_test_iters > 0: num_iters = [n + num_test_iters for n in num_iters] if inputs is None: - inputs = [None] * len(states) - loop_collections = [states, inputs, num_iters] + inputs = [None] * len(states_) + loop_collections = [states_, inputs, num_iters] if is_parametric := parameters is not None: loop_collections.insert(0, parameters) @@ -958,7 +932,7 @@ def training_error(reg_params): if unstable(solution, ell): return np.inf error += post.frobenius_error(Q, solution[:, : Q.shape[-1]])[1] - return error / len(states) + return error / len(states_) best_regularization = utils.gridsearch( training_error, diff --git a/src/opinf/roms/_bayes.py b/src/opinf/roms/_bayes.py index 038c9ab8..60232d8d 100644 --- a/src/opinf/roms/_bayes.py +++ b/src/opinf/roms/_bayes.py @@ -297,7 +297,7 @@ def fit_regselect_continuous( ) # Fit the model for the first time. - states = self._fit_and_return_training_data( + self._fit_solver( parameters=parameters, states=states, lhs=ddts, @@ -307,7 +307,8 @@ def fit_regselect_continuous( ) # Set up the regularization selection. - shifts, limits = self._get_stability_limits(states, stability_margin) + states_ = [self.encode(Q) for Q in states] + shifts, limits = self._get_stability_limits(states_, stability_margin) def unstable(_Q, ell, size): """Return ``True`` if the solution is unstable.""" @@ -330,8 +331,8 @@ def unstable(_Q, ell, size): time_domains = train_time_domains if input_functions is None: - input_functions = [None] * len(states) - loop_collections = [states, input_functions, time_domains] + input_functions = [None] * len(states_) + loop_collections = [states_, input_functions, time_domains] if is_parametric := parameters is not None: loop_collections.insert(0, parameters) @@ -443,7 +444,7 @@ def fit_regselect_discrete( ) # Fit the model for the first time. - states = self._fit_and_return_training_data( + states_ = self._fit_solver( parameters=parameters, states=states, lhs=None, @@ -453,7 +454,7 @@ def fit_regselect_discrete( ) # Set up the regularization selection. - shifts, limits = self._get_stability_limits(states, stability_margin) + shifts, limits = self._get_stability_limits(states_, stability_margin) def unstable(_Q, ell): """Return ``True`` if the solution is unstable.""" @@ -462,13 +463,13 @@ def unstable(_Q, ell): return np.any(np.abs(_Q - shifts[ell]).max() > limits[ell]) # Extend the iteration counts by the number of testing iterations. - num_iters = [Q.shape[-1] for Q in states] + num_iters = [Q.shape[-1] for Q in states_] if num_test_iters > 0: num_iters = [n + num_test_iters for n in num_iters] if inputs is None: - inputs = [None] * len(states) - loop_collections = [states, inputs, num_iters] + inputs = [None] * len(states_) + loop_collections = [states_, inputs, num_iters] if is_parametric := parameters is not None: loop_collections.insert(0, parameters) @@ -512,7 +513,7 @@ def training_error(reg_params): return np.inf draws.append(solution[:, : Q.shape[-1]]) error += post.frobenius_error(Q, np.mean(draws, axis=0))[1] - return error / len(states) + return error / len(states_) best_regularization = utils.gridsearch( training_error, @@ -624,6 +625,7 @@ def fit( fit_basis=fit_basis, ) self._initialize_posterior() + return self def fit_regselect_continuous( self, diff --git a/src/opinf/roms/_nonparametric.py b/src/opinf/roms/_nonparametric.py index ab45e2b4..7dfdbea4 100644 --- a/src/opinf/roms/_nonparametric.py +++ b/src/opinf/roms/_nonparametric.py @@ -106,7 +106,7 @@ def fit( ------- self """ - self._fit_and_return_training_data( + self._fit_solver( parameters=None, states=states, lhs=lhs, @@ -114,6 +114,7 @@ def fit( fit_transformer=fit_transformer, fit_basis=fit_basis, ) + self.model.refit() return self def fit_regselect_continuous( diff --git a/src/opinf/roms/_parametric.py b/src/opinf/roms/_parametric.py index dad8e73e..33d7903f 100644 --- a/src/opinf/roms/_parametric.py +++ b/src/opinf/roms/_parametric.py @@ -105,7 +105,7 @@ def fit( ------- self """ - self._fit_and_return_training_data( + self._fit_solver( parameters=parameters, states=states, lhs=lhs, @@ -113,6 +113,7 @@ def fit( fit_transformer=fit_transformer, fit_basis=fit_basis, ) + self.model.refit() return self # Evaluation -------------------------------------------------------------- diff --git a/tests/ddt/test_base.py b/tests/ddt/test_base.py index e30d0cb7..895c9ab2 100644 --- a/tests/ddt/test_base.py +++ b/tests/ddt/test_base.py @@ -1,6 +1,7 @@ # ddt/test_base.py """Tests for ddt._base.""" +import abc import pytest import numpy as np import matplotlib.pyplot as plt @@ -8,148 +9,97 @@ import opinf -_module = opinf.ddt._base +class _TestDerivativeEstimatorTemplate: + """Base class for classes that test time derivative estimators.""" + Estimator = NotImplemented # Class to test. -class TestDerivativeEstimatorTemplate: - """Test _base.DerivativeEstimatorTemplate.""" - - Base = _module.DerivativeEstimatorTemplate - - class Dummy(Base): - """Instantiable version of DerivativeEstimatorTemplate.""" - - def estimate(self, states, inputs=None): - if inputs is not None: - return states, states, inputs - return states, states + @abc.abstractmethod + def get_estimators(self): + """Yield estimators to test.""" + raise NotImplementedError def test_init(self, k=100): - """Test __init__() and time_domain.""" - t = np.linspace(0, 1, k) - dummy = self.Dummy(t) - assert dummy.time_domain is t - - def test_verify_shapes(self, k=100): - """Test verify_shapes().""" + """Test __init__(), time_domain, __str__(), and __repr__().""" t = np.linspace(0, 1, k) + t2D = np.zeros((2, 3, 3)) - # Bad number of outputs with inputs=None. - - class Dummy1(self.Dummy): - def estimate(self, states, inputs=None): - return states, states, inputs - - with pytest.raises(opinf.errors.VerificationError) as ex: - Dummy1(t).verify_shapes() - assert ex.value.args[0] == "len(estimate(states, inputs=None)) != 2" - - # Misaligned output shapes with inputs=None. - - class Dummy2(self.Dummy): - def estimate(self, states, inputs=None): - return states[1:], states - - with pytest.raises(opinf.errors.VerificationError) as ex: - Dummy2(t).verify_shapes() - assert ex.value.args[0] == ( - "estimate(states)[0].shape[0] != states.shape[0]" - ) - - class Dummy3(self.Dummy): - def estimate(self, states, inputs=None): - return states, states[1:] - - with pytest.raises(opinf.errors.VerificationError) as ex: - Dummy3(t).verify_shapes() - assert ex.value.args[0] == ( - "estimate(states)[1].shape[0] != states.shape[0]" - ) - - class Dummy4(self.Dummy): - def estimate(self, states, inputs=None): - return states[:, :-1], states - - with pytest.raises(opinf.errors.VerificationError) as ex: - Dummy4(t).verify_shapes() - assert ex.value.args[0] == ( - "Q.shape[1] != dQdt.shape[1] " - "where Q, dQdt = estimate(states, inputs=None)" - ) - - # Bad number of outputs with inputs != None - - class Dummy5(self.Dummy): - def estimate(self, states, inputs=None): - return states, states - - with pytest.raises(opinf.errors.VerificationError) as ex: - Dummy5(t).verify_shapes() - assert ex.value.args[0] == "len(estimate(states, inputs)) != 3" - - # Misaligned output shapes with inputs != None. - - class Dummy6(self.Dummy): - def estimate(self, states, inputs=None): - if inputs is None: - return states, states - return states, states, None - - with pytest.raises(opinf.errors.VerificationError) as ex: - Dummy6(t).verify_shapes() - assert ex.value.args[0] == ( - "estimates(states, inputs)[2] should not be None" - ) - - class Dummy7(self.Dummy): - def estimate(self, states, inputs=None): - if inputs is None: - return states, states - return states, states, inputs[1:] - - with pytest.raises(opinf.errors.VerificationError) as ex: - Dummy7(t).verify_shapes() - assert ex.value.args[0] == ( - "estimate(states, inputs)[2].shape[0] != inputs.shape[0]" - ) - - class Dummy8(self.Dummy): - def estimate(self, states, inputs=None): - if inputs is None: - return states, states - return states, states, inputs[:, :-1] - - with pytest.raises(opinf.errors.VerificationError) as ex: - Dummy8(t).verify_shapes() + with pytest.raises(ValueError) as ex: + self.Estimator(t2D) assert ex.value.args[0] == ( - "Q.shape[1] != U.shape[1] where Q, _, U = estimate(states, inputs)" + "time_domain must be a one-dimensional array" ) - def test_verify(self, k=100): - """Lilghtly test verify().""" - t = np.sort(np.random.random(k)) - dummy = self.Dummy(t) - - # No plotting. - errors = dummy.verify(plot=False, return_errors=True) - num_tests = len(errors["dts"]) - for dataset in errors.values(): - assert len(dataset) == num_tests - - # Plotting. - interactive = plt.isinteractive() - plt.ion() - errors = dummy.verify(plot=True) - assert errors is None - fig = plt.gcf() - assert len(fig.axes) == 1 - plt.close(fig) - - if not interactive: - plt.ioff() - - # Check that the original time domain was restored. - assert dummy.time_domain is t + estimator = self.Estimator(t) + assert estimator.time_domain is t + + repr(estimator) + + def test_estimate(self, check_against_time: bool = True): + """Use verify() to test estimate().""" + for estimator in self.get_estimators(): + + t_original = estimator.time_domain + k = t_original.size + + # states must be two-dimensional. + Q = np.random.random(k) + with pytest.raises(opinf.errors.DimensionalityError) as ex: + estimator.estimate(Q) + assert ex.value.args[0] == "states must be two-dimensional" + + Q = np.random.random((2, k)) + if check_against_time: + # states and time_domain must be aligned. + with pytest.raises(opinf.errors.DimensionalityError) as ex: + estimator.estimate(Q[:, :-1]) + assert ex.value.args[0] == "states and time_domain not aligned" + + # states and inputs must be aligned. + U = np.random.random((2, k)) + with pytest.raises(opinf.errors.DimensionalityError) as ex: + estimator.estimate(Q, U[:, :-1]) + assert ex.value.args[0] == "states and inputs not aligned" + + # One-dimensional inputs. + estimator.estimate(Q, U[0]) + + # Test with verify(). + errors = estimator.verify(plot=False, return_errors=True) + for label, results in errors.items(): + if label == "dts": + continue + assert ( + np.min(results) < 5e-7 + ), f"test '{label}' failed for estimator\n{estimator}" + assert estimator.time_domain is t_original + + interactive = plt.isinteractive() + plt.ion() + errors = estimator.verify(plot=True) + assert errors is None + fig = plt.gcf() + assert len(fig.axes) == 1 + plt.close(fig) + + if not interactive: + plt.ioff() + + assert estimator.time_domain is t_original + + def test_mask(self): + """Test mask().""" + for estimator in self.get_estimators(): + k = estimator.time_domain.size + Q1 = np.random.random((2, k)) + Q2 = np.random.random((2, k)) + + Q1new, dQ = estimator.estimate(Q1) + Q1mask = estimator.mask(Q1) + assert np.all(Q1mask == Q1new) + + Q2mask = estimator.mask(Q2) + assert Q2mask.shape == Q1new.shape if __name__ == "__main__": diff --git a/tests/ddt/test_finite_difference.py b/tests/ddt/test_finite_difference.py index e3b93792..ece74286 100644 --- a/tests/ddt/test_finite_difference.py +++ b/tests/ddt/test_finite_difference.py @@ -2,10 +2,16 @@ """Tests for ddt._finite_difference.py""" import pytest +import warnings import numpy as np import opinf +try: + from .test_base import _TestDerivativeEstimatorTemplate +except ImportError: + from test_base import _TestDerivativeEstimatorTemplate + _module = opinf.ddt._finite_difference @@ -34,141 +40,69 @@ def test_finite_difference(r=10, k=20, m=3): assert U1d_.shape == (k - 3,) -class TestUniformFiniteDifferencer: +class TestUniformFiniteDifferencer(_TestDerivativeEstimatorTemplate): """Test ddt.UniformFiniteDifferencer.""" - Diff = _module.UniformFiniteDifferencer + Estimator = _module.UniformFiniteDifferencer + + def get_estimators(self): + t = np.linspace(0, 1, 100) + for name in self.Estimator._schemes.keys(): + yield self.Estimator(t, scheme=name) def test_init(self, k=100): - """Test __init__(), time_domain, and scheme.""" + """Test __init__(), time_domain, scheme, __str__(), and __repr__().""" t = np.linspace(0, 1, k) - differ = self.Diff(t) + differ = self.Estimator(t) assert differ.time_domain is t assert differ.dt == (t[1] - t[0]) # Try with non-uniform spacing. t2 = t**2 with pytest.raises(ValueError) as ex: - self.Diff(t2) + self.Estimator(t2) assert ex.value.args[0] == "time domain must be uniformly spaced" - # Too many dimensions. - t3 = np.sort(np.random.random((3, 3, 3))) - with pytest.raises(ValueError) as ex: - self.Diff(t3) - assert ex.value.args[0] == ( - "time_domain must be a one-dimensional array" - ) - # Bad scheme. with pytest.raises(ValueError) as ex: - self.Diff(t, scheme="best") + self.Estimator(t, scheme="best") assert ex.value.args[0] == "invalid finite difference scheme 'best'" - # Custom scheme. - - def myscheme(): - pass - - differ = self.Diff(t, myscheme) - assert differ.scheme is myscheme - - # Registered schemes. - for name, func in self.Diff._schemes.items(): + # Valid schemes. + for name, func in self.Estimator._schemes.items(): assert callable(func) - differ = self.Diff(t, scheme=name) + differ = self.Estimator(t, scheme=name) assert differ.scheme is func - repr(differ) + return super().test_init(k) - def test_estimate(self, r=3, k=100, m=2): - """Use verify() to validate estimate() - for all registered difference schemes. - """ - t = np.linspace(0, 1, k) + def test_estimate(self): + super().test_estimate(check_against_time=False) - differ = self.Diff(t) - Q = np.random.random(k) - with pytest.raises(opinf.errors.DimensionalityError) as ex: - differ.estimate(Q) - assert ex.value.args[0] == "states must be two-dimensional" - - Q = np.random.random((r, k)) - U = np.random.random((m, k)) - with pytest.raises(opinf.errors.DimensionalityError) as ex: - differ.estimate(Q, U[:, :-1]) - assert ex.value.args[0] == "states and inputs not aligned" - - # One-dimensional inputs. - differ.estimate(Q, U[0]) - - # Test all schemes. - for name in self.Diff._schemes.keys(): - differ = self.Diff(t, scheme=name) - errors = differ.verify(plot=False, return_errors=True) - for label, results in errors.items(): - if label == "dts": - continue - assert ( - np.min(results) < 5e-7 - ), f"problem with scheme '{name}', test '{label}'" - - -class TestNonuniformFiniteDifferencer: + +class TestNonuniformFiniteDifferencer(_TestDerivativeEstimatorTemplate): """Test ddt.NonuniformFiniteDifferencer.""" - Diff = _module.NonuniformFiniteDifferencer + Estimator = _module.NonuniformFiniteDifferencer + + def get_estimators(self): + t = np.linspace(0, 1, 100) ** 2 + yield self.Estimator(t) def test_init(self, k=100): """Test __init__(), time_domain, and order.""" t = np.linspace(0, 1, k) with pytest.warns(opinf.errors.OpInfWarning) as wn: - self.Diff(t) + self.Estimator(t) assert wn[0].message.args[0] == ( "time_domain is uniformly spaced, consider using " "UniformFiniteDifferencer" ) - t = t**2 - differ = self.Diff(t) - assert differ.time_domain is t - repr(differ) - - # Too many dimensions. - t3 = np.sort(np.random.random((3, 3, 3))) - with pytest.raises(ValueError) as ex: - self.Diff(t3) - assert ex.value.args[0] == ( - "time_domain must be a one-dimensional array" - ) - - def test_estimate(self, r=3, k=100, m=4): - """Use verify() to test estimate().""" - t = np.linspace(0, 1, k) ** 2 - differ = self.Diff(t) - - # Coverage for error messages. - Q = np.random.random(k) - with pytest.raises(opinf.errors.DimensionalityError) as ex: - differ.estimate(Q) - assert ex.value.args[0] == "states must be two-dimensional" - - Q = np.random.random((r, k)) - with pytest.raises(opinf.errors.DimensionalityError) as ex: - differ.estimate(Q[:, :-1]) - assert ex.value.args[0] == "states not aligned with time_domain" - - U = np.random.random((m, k)) - with pytest.raises(opinf.errors.DimensionalityError) as ex: - differ.estimate(Q, U[:, :-1]) - assert ex.value.args[0] == "states and inputs not aligned" - - # One-dimensional inputs. - differ.estimate(Q, U[0]) - - # Test the implementation. - differ.verify() + with warnings.catch_warnings(): + warnings.simplefilter("ignore", opinf.errors.OpInfWarning) + return super().test_init() # Old API ===================================================================== diff --git a/tests/ddt/test_interpolation.py b/tests/ddt/test_interpolation.py index 5a90c970..37c9bae4 100644 --- a/tests/ddt/test_interpolation.py +++ b/tests/ddt/test_interpolation.py @@ -7,15 +7,28 @@ import opinf +try: + from .test_base import _TestDerivativeEstimatorTemplate +except ImportError: + from test_base import _TestDerivativeEstimatorTemplate + _module = opinf.ddt._interpolation -class TestInterpDerivativeEstimator: +class TestInterpDerivativeEstimator(_TestDerivativeEstimatorTemplate): """Test opinf.ddt.InterpDerivativeEstimator.""" Estimator = _module.InterpDerivativeEstimator + def get_estimators(self): + t = np.linspace(0, 1, 100) + t2 = np.linspace(0.1, 0.9, 40) + + for name in self.Estimator._interpolators: + yield self.Estimator(t, InterpolatorClass=name) + yield self.Estimator(t, InterpolatorClass=name, new_time_domain=t2) + def test_init(self, k=100): """Test __init__() and properties.""" t = np.linspace(0, 1, k) @@ -43,30 +56,7 @@ def test_init(self, k=100): assert est.new_time_domain.shape == t.shape repr(est) - def test_estimate(self, r=5, m=3, k=20): - """Use verify() to test estimate().""" - t = np.linspace(0, 1, k) - t2 = (t[1:-2] / 1.5) + (1 / 6) - Q = np.random.random((r, k)) - U = np.random.random((m, k)) - - for name in self.Estimator._interpolators: - est = self.Estimator(t, InterpolatorClass=name) - errors = est.verify(plot=False, return_errors=True) - for label, results in errors.items(): - if label == "dts": - continue - assert ( - np.min(results) < 5e-7 - ), f"problem with InterpolatorClass '{name}', test '{label}'" - - est = self.Estimator(t, InterpolatorClass=name, new_time_domain=t2) - Q_, Qdot_, U_ = est.estimate(Q, U) - assert Q_.shape == (r, t2.size) == Qdot_.shape - assert U_.shape == (m, t2.size) - - # One-dimensional inputs. - est.estimate(Q, U[0]) + return super().test_init() if __name__ == "__main__": diff --git a/tests/regression/test_basics.py b/tests/regression/test_basics.py new file mode 100644 index 00000000..d40b65a7 --- /dev/null +++ b/tests/regression/test_basics.py @@ -0,0 +1,112 @@ +# test_basics.py +"""Regression test: linear heat equation, extrapolation to new initial = +conditions. +""" +import os +import pytest +import numpy as np +import scipy.sparse +import scipy.integrate + +import opinf + + +DATAFILE = f"{__file__[:-3]}-data.npz" + + +def generate_data(): + L = 1 + n = 2**10 - 1 + x_all = np.linspace(0, L, n + 2) + x = x_all[1:-1] + dx = x[1] - x[0] + + t0, tf = 0, 1 + k = 401 + t = np.linspace(t0, tf, k) + + diags = np.array([1, -2, 1]) / (dx**2) + A = scipy.sparse.diags(diags, [-1, 0, 1], (n, n)) + + # Initial conditions for the training data. + q0s = [ + x * (1 - x), + 10 * x * (1 - x), + 5 * x**2 * (1 - x) ** 2, + 50 * x**4 * (1 - x) ** 4, + 0.5 * np.sqrt(x * (1 - x)), + 0.25 * np.sqrt(np.sqrt(x * (1 - x))), + np.sin(np.pi * x) / 3 + np.sin(5 * np.pi * x) / 5, + ] + + def full_order_solve(initial_condition, time_domain): + """Solve the full-order model with SciPy.""" + return scipy.integrate.solve_ivp( + fun=lambda t, q: A @ q, + t_span=[time_domain[0], time_domain[-1]], + y0=initial_condition, + t_eval=time_domain, + method="BDF", + ).y + + Qs = np.array([full_order_solve(q0, t) for q0 in q0s]) + np.savez(DATAFILE, t=t, Qs=Qs) + + +@pytest.fixture +def data(): + if not os.path.isfile(DATAFILE): + generate_data() + return np.load(DATAFILE) + + +def test_01(data, whichinit: int = 0): + """OpInf ROM, reproduce training data, no extrapolation.""" + t, Qs = data["t"], data["Qs"] + Q = Qs[whichinit] + + rom = opinf.ROM( + basis=opinf.basis.PODBasis(cumulative_energy=0.9999), + ddt_estimator=opinf.ddt.UniformFiniteDifferencer(t, "ord6"), + model=opinf.models.ContinuousModel( + operators="A", + solver=opinf.lstsq.L2Solver(regularizer=1e-8), + ), + ).fit(Q) + + Q_ROM = rom.predict(Q[:, 0], t, method="BDF") + error = opinf.post.frobenius_error(Q, Q_ROM)[1] + assert error < 0.00105 + + +def test_02(data): + """OpInf ROM, training data from one trajectory, basis enriched with a few + initial conditions. + """ + t, Qs = data["t"], data["Qs"] + Qtrain = Qs[0] + q0_new = [Q[:, 0] for Q in Qs[1:]] + + Q_and_new_q0s = np.column_stack((Qtrain, *q0_new)) + + # Initialize a ROM with the new basis. + rom = opinf.ROM( + basis=opinf.basis.PODBasis(num_vectors=5).fit(Q_and_new_q0s), + ddt_estimator=opinf.ddt.UniformFiniteDifferencer(t, "ord6"), + model=opinf.models.ContinuousModel( + operators="A", + solver=opinf.lstsq.L2Solver(regularizer=1e-8), + ), + ).fit(Qtrain, fit_basis=False) + + maxerrors = [0.00041, 0.00085, 0.00335, 0.00340, 0.009500, 0.02610] + for Q, maxerr in zip(Qs, maxerrors): + Q_ROM = rom.predict(Q[:, 0], t, method="BDF") + error = opinf.post.frobenius_error(Q, Q_ROM)[1] + assert error < maxerr + + +if __name__ == "__main__": + import pytest + + pytest.main([__file__]) diff --git a/tests/regression/test_inputs.py b/tests/regression/test_inputs.py new file mode 100644 index 00000000..dc3922b7 --- /dev/null +++ b/tests/regression/test_inputs.py @@ -0,0 +1,146 @@ +# test_inputs.py +"""Heat equation with inputs.""" +import os +import pytest +import numpy as np +import scipy.sparse + +import opinf + + +DATAFILE = f"{__file__[:-3]}-data.npz" + + +input_functions = [ + lambda t: np.ones_like(t) + np.sin(4 * np.pi * t) / 4, + lambda t: np.exp(-t), + lambda t: 1 + t**2 / 2, + lambda t: 1 - np.sin(np.pi * t) / 2, + lambda t: 1 - np.sin(3 * np.pi * t) / 3, + lambda t: 1 + 25 * (t * (t - 1)) ** 3, + lambda t: 1 + np.exp(-2 * t) * np.sin(np.pi * t), +] + + +def generate_data(): + + # Construct the spatial domain. + L = 1 + n = 2**10 - 1 + x_all = np.linspace(0, L, n + 2) + x = x_all[1:-1] + dx = x[1] - x[0] + + # Construct the temporal domain. + T = 1 + K = 10**3 + 1 + t_all = np.linspace(0, T, K) + + # Construct the full-order state matrix A. + dx2inv = 1 / dx**2 + diags = np.array([1, -2, 1]) * dx2inv + A = scipy.sparse.diags(diags, [-1, 0, 1], (n, n)) + + # Construct the full-order input matrix B. + B = np.zeros_like(x) + B[0], B[-1] = dx2inv, dx2inv + + # Define the full-order model with an opinf.models class. + fom = opinf.models.ContinuousModel( + operators=[ + opinf.operators.LinearOperator(A), + opinf.operators.InputOperator(B), + ] + ) + + # Construct the part of the initial condition not dependent on u(t). + alpha = 100 + q0 = np.exp(alpha * (x - 1)) + np.exp(-alpha * x) - np.exp(-alpha) + + def full_order_solve(time_domain, u): + """Solve the full-order model with SciPy. + Here, u is a callable function. + """ + return fom.predict(q0 * u(0), time_domain, u, method="BDF") + + # Solve the full-order model with the training input. + Qs = np.array([full_order_solve(t_all, u) for u in input_functions]) + + np.savez(DATAFILE, t_all=t_all, Qs=Qs, A=A.toarray(), B=B) + + +@pytest.fixture +def data(): + if not os.path.isfile(DATAFILE): + generate_data() + return np.load(DATAFILE) + + +def test_01(data, k: int = 200, whichinput: int = 0): + """Intrusive ROM, single trajectory, prediction in time.""" + t_all, Qs, A, B = data["t_all"], data["Qs"], data["A"], data["B"] + Q = Qs[whichinput] + basis = opinf.basis.PODBasis(residual_energy=1e-6).fit(Q[:, :k]) + + rom_intrusive = opinf.ROM( + basis=basis, + model=opinf.models.ContinuousModel( + operators=[ + opinf.operators.LinearOperator(A), + opinf.operators.InputOperator(B), + ] + ).galerkin( + basis.entries + ), # Explicitly project FOM operators. + ) + Q_ROM_intrusive = rom_intrusive.predict( + Q[:, 0], t_all, input_func=input_functions[whichinput], method="BDF" + ) + + error = opinf.post.frobenius_error(Q, Q_ROM_intrusive)[1] + assert error < 0.0009 + + +def test_02(data, k: int = 200, whichinput: int = 0): + """OpInf ROM, single trajectory, prediction in time.""" + t_all, Qs = data["t_all"], data["Qs"] + t = t_all[:k] + Q = Qs[whichinput] + input_func = input_functions[whichinput] + + rom = opinf.ROM( + basis=opinf.basis.PODBasis(residual_energy=1e-6), + ddt_estimator=opinf.ddt.UniformFiniteDifferencer(t, "ord6"), + model=opinf.models.ContinuousModel("AB"), + ).fit(Q[:, :k], inputs=input_func(t)) + + Q_ROM = rom.predict(Q[:, 0], t_all, input_func=input_func, method="BDF") + + error = opinf.post.frobenius_error(Q, Q_ROM)[1] + assert error < 0.0009 + + +def test_03(data, k: int = 200, ntrain: int = 4): + """OpInf ROM, multiple trajectories, prediction in time""" + t_all, Qs = data["t_all"], data["Qs"] + t = t_all[:k] + Q_train = [Q[:, :k] for Q in Qs[:ntrain]] + Q_test = Qs[ntrain:] + U_train = [u(t) for u in input_functions[:ntrain]] + + rom = opinf.ROM( + basis=opinf.basis.PODBasis(residual_energy=1e-6), + ddt_estimator=opinf.ddt.UniformFiniteDifferencer(t, "ord6"), + model=opinf.models.ContinuousModel("AB"), + ).fit(Q_train, inputs=U_train) + + for i, u in enumerate(input_functions[ntrain:]): + Q_ROM = rom.predict(Q_test[i][:, 0], t_all, u, method="BDF") + error = opinf.post.frobenius_error(Q_test[i], Q_ROM)[1] + assert error < 0.001 + + +if __name__ == "__main__": + import pytest + + pytest.main([__file__]) diff --git a/tests/regression/test_lifting.py b/tests/regression/test_lifting.py new file mode 100644 index 00000000..f404fab7 --- /dev/null +++ b/tests/regression/test_lifting.py @@ -0,0 +1,306 @@ +# test_lifting.py +"""Regression test: Lifting, quadratic ROM, regularization selection.""" +import os +import pytest +import itertools +import numpy as np +import scipy.interpolate + +import opinf + + +DATAFILE = f"{__file__[:-3]}-data.npz" + + +v_bar = 1e2 +p_bar = 1e5 +z_bar = 1e-1 + + +class EulerFiniteDifferenceModel: + n_var = 3 + gamma = 1.4 + + def __init__(self, L: float = 2.0, n_x: int = 200): + self.x = np.linspace(0, L, n_x + 1)[:-1] + self.dx = self.x[1] - self.x[0] + self.L = L + + def spline_initial_conditions(self, init_params): + x, L = self.x, self.L + rho0s, v0s = init_params[0:3], init_params[3:6] + v0s = np.concatenate((v0s, [v0s[0]])) + rho0s = np.concatenate((rho0s, [rho0s[0]])) + + nodes = np.array([0, L / 3, 2 * L / 3, L]) + x[0] + rho = scipy.interpolate.CubicSpline(nodes, rho0s, bc_type="periodic")( + x + ) + v = scipy.interpolate.CubicSpline(nodes, v0s, bc_type="periodic")(x) + p = 1e5 * np.ones_like(x) + rho_e = p / (self.gamma - 1) + 0.5 * rho * v**2 + + return np.concatenate((rho, rho * v, rho_e)) + + def _ddx(self, variable): + return (variable - np.roll(variable, 1, axis=0)) / self.dx + + def derivative(self, tt, state): + rho, rho_v, rho_e = np.split(state, self.n_var) + v = rho_v / rho + p = (self.gamma - 1) * (rho_e - 0.5 * rho * v**2) + + return -np.concatenate( + [ + self._ddx(rho_v), + self._ddx(rho * v**2 + p), + self._ddx((rho_e + p) * v), + ] + ) + + def solve(self, q_init, time_domain): + return scipy.integrate.solve_ivp( + fun=self.derivative, + t_span=[time_domain[0], time_domain[-1]], + y0=q_init, + method="RK45", + t_eval=time_domain, + rtol=1e-6, + atol=1e-9, + ).y + + +# Experiment data ============================================================= +def generate_data(): + fom = EulerFiniteDifferenceModel(L=2, n_x=200) + + t_final = 0.15 + n_t = 501 + t_all = np.linspace(0, t_final, n_t) + + t_obs = 0.06 + t_train = t_all[t_all <= t_obs] + + q0s = [ + fom.spline_initial_conditions(icparams) + for icparams in itertools.product( + [20, 24], + [22], + [20, 24], + [95, 105], + [100], + [95, 105], + ) + ] + + Q_fom = np.array([fom.solve(q0, t_train) for q0 in q0s]) + test_init = fom.spline_initial_conditions([22, 21, 25, 100, 98, 102]) + test_solution = fom.solve(test_init, t_all) + + np.savez( + DATAFILE, + x=fom.x, + t_train=t_train, + Qs=Q_fom, + t_all=t_all, + Qtest=test_solution, + ) + + +@pytest.fixture +def data(): + if not os.path.isfile(DATAFILE): + generate_data() + return np.load(DATAFILE) + + +class EulerLifter(opinf.lift.LifterTemplate): + num_original_variables = 3 + num_lifted_variables = 3 + + def lift(self, original_state): + rho, rho_v, rho_e = np.split( + original_state, + self.num_original_variables, + axis=0, + ) + + v = rho_v / rho + p = (EulerFiniteDifferenceModel.gamma - 1) * ( + rho_e - 0.5 * rho_v * v + ) # From the ideal gas law. + zeta = 1 / rho + + return np.concatenate((v, p, zeta)) + + def unlift(self, lifted_state): + v, p, zeta = np.split( + lifted_state, + self.num_lifted_variables, + axis=0, + ) + + rho = 1 / zeta + rho_v = rho * v + rho_e = ( + p / (EulerFiniteDifferenceModel.gamma - 1) + 0.5 * rho_v * v + ) # From the ideal gas law. + + return np.concatenate((rho, rho_v, rho_e)) + + +def rom_test_error(rom, maxerr: float): + thedata = np.load(DATAFILE) + t, Qtest = thedata["t_all"], thedata["Qtest"] + + Qrom = rom.predict(Qtest[:, 0], t) + error = opinf.post.Lp_error(Qtest, Qrom, t)[1] + + assert error < maxerr + + +def test_01(data): + """Lifting, scaling, fixed regularization (no centering).""" + t_train, Qs = data["t_train"], data["Qs"] + + rom = opinf.ROM( + lifter=EulerLifter(), + transformer=opinf.pre.TransformerMulti( + [ + opinf.pre.ScaleTransformer(1 / v_bar), + opinf.pre.ScaleTransformer(1 / p_bar), + opinf.pre.ScaleTransformer(1 / z_bar), + ] + ), + basis=opinf.basis.PODBasis(num_vectors=9), + ddt_estimator=opinf.ddt.UniformFiniteDifferencer( + t_train, scheme="fwd4" + ), + model=opinf.models.ContinuousModel( + operators=[opinf.operators.QuadraticOperator()], + solver=opinf.lstsq.L2Solver(regularizer=1e-8), + ), + ).fit(Qs) + + rom_test_error(rom, 0.0032) + + +def test_02(data): + """Lifting, scaling, regularization selection (no centering).""" + t_train, Qs = data["t_train"], data["Qs"] + + rom = opinf.ROM( + lifter=EulerLifter(), + transformer=opinf.pre.TransformerMulti( + [ + opinf.pre.ScaleTransformer(1 / v_bar), + opinf.pre.ScaleTransformer(1 / p_bar), + opinf.pre.ScaleTransformer(1 / z_bar), + ] + ), + basis=opinf.basis.PODBasis(num_vectors=9), + ddt_estimator=opinf.ddt.UniformFiniteDifferencer( + t_train, scheme="fwd4" + ), + model=opinf.models.ContinuousModel( + operators=[opinf.operators.QuadraticOperator()], + solver=opinf.lstsq.L2Solver(), + ), + ).fit_regselect_continuous( + candidates=np.logspace(-12, 2, 15), + train_time_domains=t_train, + states=Qs, + verbose=False, + ) + + rom_test_error(rom, 0.0023) + + +def test_03(data): + """Lifting, centering, data-driven scaling, regularization selection.""" + t_train, Qs = data["t_train"], data["Qs"] + + rom = opinf.ROM( + lifter=EulerLifter(), + transformer=opinf.pre.TransformerMulti( # Variable-specific scaling + [ + opinf.pre.ShiftScaleTransformer( + centering=True, scaling="minmax" + ), + opinf.pre.ShiftScaleTransformer( + centering=True, scaling="minmax" + ), + opinf.pre.ShiftScaleTransformer( + centering=True, scaling="minmax" + ), + ] + ), + basis=opinf.basis.PODBasis(num_vectors=9), + ddt_estimator=opinf.ddt.UniformFiniteDifferencer( + t_train, scheme="fwd4" + ), + model=opinf.models.ContinuousModel( + operators=[ + opinf.operators.ConstantOperator(), # c + opinf.operators.LinearOperator(), # Aq(t) + opinf.operators.QuadraticOperator(), # H[q(t) ⊗ q(t)] + ], + solver=opinf.lstsq.L2Solver(), + ), + ) + + rom.fit_regselect_continuous( + candidates=np.logspace(-12, 2, 15), + train_time_domains=t_train, + states=Qs, + verbose=False, + ) + + rom_test_error(rom, 0.0031) + + +def test_04(data): + """Lifting, centering, exact scaling, regularization selection.""" + t_train, Qs = data["t_train"], data["Qs"] + + rom = opinf.ROM( + lifter=EulerLifter(), + transformer=opinf.pre.TransformerPipeline( + [ + opinf.pre.ShiftScaleTransformer(centering=True), # Center. + opinf.pre.TransformerMulti( + transformers=[ + opinf.pre.ScaleTransformer(1 / v_bar), + opinf.pre.ScaleTransformer(1 / p_bar), + opinf.pre.ScaleTransformer(1 / z_bar), + ] + ), + ], + ), + basis=opinf.basis.PODBasis(num_vectors=8), + ddt_estimator=opinf.ddt.UniformFiniteDifferencer( + t_train, scheme="fwd4" + ), + model=opinf.models.ContinuousModel( + operators=[ + opinf.operators.ConstantOperator(), + opinf.operators.LinearOperator(), + opinf.operators.QuadraticOperator(), + ], + solver=opinf.lstsq.L2Solver(), + ), + ) + + rom.fit_regselect_continuous( + candidates=np.logspace(-12, 2, 15), + train_time_domains=t_train, + states=Qs, + ) + + rom_test_error(rom, 0.0021) + + +if __name__ == "__main__": + import pytest + + pytest.main([__file__]) diff --git a/tests/regression/test_parametric.py b/tests/regression/test_parametric.py new file mode 100644 index 00000000..fc890315 --- /dev/null +++ b/tests/regression/test_parametric.py @@ -0,0 +1,132 @@ +# test_parametric.py +"""Regression test: parametric problems.""" + +import os +import pytest +import numpy as np +import scipy.sparse + +import opinf + + +DATAFILE = f"{__file__[:-3]}-data.npz" + + +def generate_data(): + s = 10 + training_parameters = np.logspace(-1, 1, s) + testing_parameters = np.sqrt( + training_parameters[:-1] * training_parameters[1:] + ) + + L = 1 + n = 2**10 - 1 + x_all = np.linspace(0, L, n + 2) + x = x_all[1:-1] + dx = x[1] - x[0] + + T = 1 + K = 401 + t_all = np.linspace(0, T, K) + + dx2inv = 1 / dx**2 + diags = np.array([1, -2, 1]) * dx2inv + A0 = scipy.sparse.diags(diags, [-1, 0, 1], (n, n)) + + c0 = np.zeros_like(x) + c0[0], c0[-1] = dx2inv, dx2inv + + alpha = 100 + q0 = np.exp(alpha * (x - 1)) + np.exp(-alpha * x) - np.exp(-alpha) + + def full_order_solve(mu, time_domain): + """Solve the full-order model with SciPy. + Here, u is a callable function. + """ + return scipy.integrate.solve_ivp( + fun=lambda t, q: mu * (c0 + A0 @ q), + y0=q0, + t_span=[time_domain[0], time_domain[-1]], + t_eval=time_domain, + method="BDF", + ).y + + Qtrain = [full_order_solve(mu, t_all) for mu in training_parameters] + Qtest = [full_order_solve(mu, t_all) for mu in testing_parameters] + + np.savez( + DATAFILE, + t_all=t_all, + training_parameters=training_parameters, + testing_parameters=testing_parameters, + Qtrain=Qtrain, + Qtest=Qtest, + ) + + +@pytest.fixture +def data(): + if not os.path.isfile(DATAFILE): + generate_data() + return np.load(DATAFILE) + + +def test_01(data): + t_all, training_parameters, testing_parameters, Qtrain, Qtest = ( + data["t_all"], + data["training_parameters"], + data["testing_parameters"], + data["Qtrain"], + data["Qtest"], + ) + + rom = opinf.ParametricROM( + basis=opinf.basis.PODBasis(projection_error=1e-6), + ddt_estimator=opinf.ddt.UniformFiniteDifferencer(t_all, "ord6"), + model=opinf.models.ParametricContinuousModel( + operators=[ + opinf.operators.AffineConstantOperator(1), + opinf.operators.AffineLinearOperator(1), + ], + solver=opinf.lstsq.L2Solver(1e-6), + ), + ).fit(training_parameters, Qtrain) + + maxerrors = [ + 0.0528, + 0.0344, + 0.0228, + 0.0156, + 0.0111, + 0.0078, + 0.0051, + 0.0028, + 0.0010, + 0.0002, + ] + for mu, Q, maxerr in zip(training_parameters, Qtrain, maxerrors): + Q_ROM = rom.predict(mu, Q[:, 0], t_all, method="BDF") + error = opinf.post.frobenius_error(Q, Q_ROM)[1] + assert error < maxerr + + maxerrors = [ + 0.0426, + 0.0279, + 0.0188, + 0.0131, + 0.0093, + 0.0064, + 0.0039, + 0.0018, + 0.0004, + ] + for mu, Q, maxerr in zip(testing_parameters, Qtest, maxerrors): + Q_ROM = rom.predict(mu, Q[:, 0], t_all, method="BDF") + error = opinf.post.frobenius_error(Q, Q_ROM)[1] + assert error < maxerr + + +if __name__ == "__main__": + import pytest + + pytest.main([__file__])