Source code for regressioninc.linear.models

"""
This module implements estimators for linear complex problems

These are built on scipy as scipy.linalg.lstsq does allow and support
complex-valued inputs. However, it does not output residuals for complex-valued
variables, therefore these are calculated out explicitly.
"""
from loguru import logger
from typing import Optional, Union
import numpy as np
import scipy.linalg as linalg
import statsmodels.api as sm

from regressioninc.base import Estimate, Estimator
from regressioninc.validation import validate_X_y, validate_weights
from regressioninc.linear.robust import mad
from regressioninc.errors import EstimatorError


RobustNorm = sm.robust.norms.RobustNorm


[docs]def add_intercept(X: np.ndarray) -> np.ndarray: """Add an intercept to the regressors Parameters ---------- X : np.ndarray The regressors Returns ------- np.ndarray The regressors with an extra column of ones to represent to allow solving for the intercept term Examples -------- >>> import numpy as np >>> from regressioninc.linear.models import add_intercept >>> X = np.array([[2,3], [4,5]]) >>> X array([[2, 3], [4, 5]]) Now add the intercept >>> X = add_intercept(X) >>> X array([[2, 3, 1], [4, 5, 1]]) """ n_samples = X.shape[0] return np.hstack((X, np.ones(shape=(n_samples, 1), dtype=X.dtype)))
[docs]def complex_to_real(X: np.ndarray, y: np.ndarray) -> tuple[np.ndarray, np.ndarray]: """ Convert complex-valued linear problem to a real-valued linear regression Parameters ---------- X : np.ndarray The regressors y : np.ndarray The observations Returns ------- tuple[np.ndarray, np.ndarray] X, y converted to real-valued generalised linear regression Examples -------- >>> from regressioninc.linear.models import add_intercept, complex_to_real >>> X = np.array([3 + 4j, 2 + 8j, 6 + 4j]).reshape(3,1) >>> X = add_intercept(X) >>> y = np.array([-2 - 4j, -1 - 3j, 5 + 6j]) >>> X_real, y_real = complex_to_real(X, y) >>> X_real array([[ 3., -4., 1., -0.], [ 4., 3., 0., 1.], [ 2., -8., 1., -0.], [ 8., 2., 0., 1.], [ 6., -4., 1., -0.], [ 4., 6., 0., 1.]]) >>> y_real array([-2., -4., -1., -3., 5., 6.]) """ n_samples, n_regressors = X.shape # the obesrvations y_real = np.empty(shape=(n_samples * 2), dtype=float) y_real[0::2] = y.real y_real[1::2] = y.imag # the regressors X_real = np.empty(shape=(n_samples * 2, n_regressors * 2), dtype=float) X_real[0::2, 0::2] = X.real X_real[0::2, 1::2] = -X.imag X_real[1::2, 0::2] = X.imag X_real[1::2, 1::2] = X.real return X_real, y_real
[docs]def real_params_to_complex(params: np.ndarray) -> np.ndarray: """ Convert real-valued parameters to complex-valued ones for problems that were transformed from complex-valued to real-valued. Parameters ---------- params : np.ndarray Real-valued parameters array Returns ------- np.ndarray The complex-valued parameters Examples -------- Let's generate a complex-valued linear problem, pose it as a real-valued linear problem and then convert the returned parameters back to complex. Generate the linear problem and add an intercept column to the regressors >>> import numpy as np >>> np.set_printoptions(precision=3, suppress=True) >>> from regressioninc.testing.complex import ComplexGrid, generate_linear_grid >>> from regressioninc.linear.models import add_intercept, OLS >>> from regressioninc.linear.models import complex_to_real, real_params_to_complex >>> params = np.array([3 + 2j]) >>> grid = ComplexGrid(r1=-1, r2=1, nr=3, i1=4, i2=6, ni=3) >>> X, y = generate_linear_grid(params, [grid], intercept=10) >>> X = add_intercept(X) Convert the complex-valued problem to a real-valued problem >>> X_real, y_real = complex_to_real(X, y) Solve the real-valued linear problem >>> model = OLS() >>> model.fit(X_real, y_real) OLS... Look at the real-valued coefficients >>> model.estimate.params array([ 3., 2., 10., 0.]) Convert the coefficients back to the complex domain >>> real_params_to_complex(model.estimate.params) array([ 3.+2.j, 10.+0.j]) """ return params[0::2] + 1j * params[1::2]
[docs]def apply_weights( X: np.ndarray, y: np.ndarray, weights: np.ndarray ) -> tuple[np.ndarray, np.ndarray]: r""" Transform X and y using the weights to perform a weighted least squares .. math:: \sqrt{weights} y = \sqrt{weights} X coef , is equivalent to, .. math:: X^H weights y = X^H weights X coef , where :math:`X^H` is the hermitian transpose of X. In this method, both the observations y and the predictors X are multipled by the square root of the weights and then returned. Parameters ---------- X : np.ndarray The regressors y : np.ndarray The regressands weights : np.ndarray The weights Returns ------- tuple[np.ndarray, np.ndarray] The weighted regressors and the weights regrassands Examples -------- >>> import numpy as np >>> X = [5, 3] >>> y = [2, 8] >>> weights = [] """ weights = validate_weights(weights, X, non_negative=True) sqrt = np.sqrt(weights) y = sqrt * y X = X * sqrt[..., np.newaxis] return X, y
[docs]class LinearEstimate(Estimate): """A linear estimate""" residues: Union[float, np.ndarray] rank: int singular_values: Union[None, np.ndarray]
[docs]class LinearEstimator(Estimator): """Base class for a linear estimator"""
[docs] def predict(self, X: np.ndarray) -> np.ndarray: """ Predict regrassands given regressors using the estimated parameters Parameters ---------- X : np.ndarray The regressors Returns ------- np.ndarray The regressands Raises ------ EstimatorError If no estimate exists """ if self.estimate is None: raise EstimatorError("No estimated parameters have been calculated") return np.dot(X, self.estimate.params.T)
[docs]def lstsq( X: np.ndarray, y: np.ndarray, weights: Optional[np.ndarray] = None, cond=None, overwrite_a=False, overwrite_b=False, check_finite=True, lapack_driver=None, ): """OLS from scipy wrapped here in case support for complex is dropped""" if weights is not None: X, y = apply_weights(X, y, weights) params, residues, rank, singular_values = linalg.lstsq( X, y, cond, overwrite_a, overwrite_b, check_finite, lapack_driver ) return LinearEstimate( params=params, residues=residues, rank=rank, singular_values=singular_values )
[docs]class OLS(LinearEstimator): """Ordinary least square regression"""
[docs] def fit(self, X: np.ndarray, y: np.ndarray): """ Fit the linear problem using least squares regression Parameters ---------- X : np.ndarray The predictors y : np.ndarray The observations Returns ------- np.ndarray The coefficients """ X, y = validate_X_y(X, y) self.estimate = lstsq(X, y) return self
[docs]class WLS(LinearEstimator): """Weighted least squares"""
[docs] def fit(self, X: np.ndarray, y: np.ndarray, weights: np.ndarray) -> np.ndarray: """ Apply weights to observations and predictors and find the coefficients of the linear model Parameters ---------- X : np.ndarray The predictors y : np.ndarray The observations weights : np.ndarray The weights to apply to the samples Returns ------- np.ndarray The coefficients for the model """ X, y = validate_X_y(X, y) self.estimate = lstsq(X, y, weights=weights) return self
[docs]class MEstimate(LinearEstimate): scale: float """An estimate of scale to be used weighting the rows of the linear problem"""
[docs]class MEstimator(LinearEstimator): max_iter: int = 50 """The maximum number of iterations""" early_stopping: float = 1e-8 """Minimum change required in residues between iterations to continue""" warm_start: bool = False """Use existing solution to initialise a new call to fit"""
[docs] def fit( self, X: np.ndarray, y: np.ndarray, M: Optional[RobustNorm] = None, ): if M is None: M = sm.robust.norms.TrimmedMean() if not self.warm_start or self.estimate is None: # do an initial least squares solution ols_estimate = lstsq(X, y) self.estimate = MEstimate(**ols_estimate.model_dump(), scale=0) resids = y - np.dot(X, self.estimate.params.T) self.estimate.scale = mad(resids, center=0) prev_resids = np.absolute(resids).real iter = 0 while iter < self.max_iter: # calculate scale and next iteration of weights if np.sum(prev_resids) < self.early_stopping: logger.debug(f"{prev_resids=} residuals small, quitting") break if self.estimate.scale == 0.0: logger.debug( f"{self.estimate.scale=}, suggests near perfect fit last iteration" ) break # perform another solving iteration weights = M.weights(prev_resids / self.estimate.scale) new_estimate = lstsq(X, y, weights=weights) resids = y - np.dot(X, new_estimate.params.T) scale = mad(resids, center=0) self.estimate = MEstimate(**new_estimate.model_dump(), scale=scale) abs_resids = np.absolute(resids).real # check the change in residuals and quit if insignificant if linalg.norm(abs_resids - prev_resids) < self.early_stopping: logger.debug("Early stopping criteria met, breaking") break prev_resids = abs_resids return self