Source code for mrtool.cov_selection.covfinder

# -*- coding: utf-8 -*-
"""
    Cov Finder
    ~~~~~~~~~~
"""
from typing import List, Dict, Tuple, Union
import warnings
from copy import deepcopy
import numpy as np
from mrtool import MRData, LinearCovModel, MRBRT
from mrtool.core.other_sampling import sample_simple_lme_beta

[docs]class CovFinder: """Class in charge of the covariate selection. """ zero_gamma_uprior = np.array([0.0, 0.0]) loose_gamma_uprior = np.array([1.0, 1.0]) def __init__(self, data: MRData, covs: List[str], pre_selected_covs: Union[List[str], None] = None, normalized_covs: bool = True, num_samples: int = 1000, laplace_threshold: float = 1e-5, power_range: Tuple[float, float] = (-8, 8), power_step_size: float = 0.5, inlier_pct: float = 1.0, alpha: float = 0.05, beta_gprior_std: float = 1.0, bias_zero: bool = False, use_re: Union[Dict, None] = None): """Covariate Finder. Args: data (MRData): Data object used for variable selection. covs (List[str]): Candidate covariates. normalized_covs (bool): If true, will normalize the covariates. pre_selected_covs (List[str] | None, optional): Pre-selected covaraites, will always be in the selected list. num_samples (int, optional): Number of samples used for judging if a variable is significance. laplace_threshold (float, optional): When coefficients from the Laplace regression is above this value, we consider it as the potential useful covariate. power_range (Tuple[float, float], optional): Power range for the Laplace prior standard deviation. Laplace prior standard deviation will go from `10**power_range[0]` to `10**power_range[1]`. power_step_size (float, optional): Step size of the swiping across the power range. inlier_pct (float, optional): Trimming option inlier percentage. Default to 1. alpha (float, optional): Significance threshold. Default to 0.05. beta_gprior_std (float, optional): Loose beta Gaussian prior standard deviation. Default to 1. bias_zero (bool, optional): If `True`, fit when specify the Gaussian prior it will be mean zero. Default to `False`. use_re (Union[Dict, None], optional): A dictionary of use_re for each covariate. When `None` we have an uninformative prior for the random effects variance. Default to `None`. """ self.data = data self.covs = covs self.pre_selected_covs = [] if pre_selected_covs is None else pre_selected_covs assert len(set(self.pre_selected_covs) & set(self.covs)) == 0, \ "covs and pre_selected_covs should be mutually exclusive." self.normalize_covs = normalized_covs self.inlier_pct = inlier_pct self.beta_gprior_std = beta_gprior_std assert self.beta_gprior_std > 0.0, f"beta_gprior_std={self.beta_gprior_std} has to be positive." self.bias_zero = bias_zero self.alpha = alpha if self.normalize_covs: self.data = deepcopy(data) self.data.normalize_covs(self.covs) self.selected_covs = self.pre_selected_covs.copy() self.beta_gprior = dict() self.all_covs = self.pre_selected_covs + self.covs self.stop = False self.use_re = {} if use_re is None else use_re for cov in self.all_covs: if cov not in self.use_re: self.use_re[cov] = False self.num_samples = num_samples self.laplace_threshold = laplace_threshold self.power_range = power_range self.power_step_size = power_step_size self.powers = np.arange(*self.power_range, self.power_step_size) self.num_covs = len(pre_selected_covs) + len(covs) if len(covs) == 0: warnings.warn("There is no covariates to select, will return the pre-selected covariates.") self.stop = True
[docs] def create_model(self, covs: List[str], prior_type: str = 'Laplace', laplace_std: float = None) -> MRBRT: """Create Gaussian or Laplace model. Args: covs (List[str]): A list of covariates need to be included in the model. prior_type (str): Indicate if use ``Gaussian`` or ``Laplace`` model. laplace_std (float): Standard deviation of the Laplace prior. Default to None. Return: MRBRT: Created model object. """ assert prior_type in ['Laplace', 'Gaussian'], "Prior type can only 'Laplace' or 'Gaussian'." if prior_type == 'Laplace': assert laplace_std is not None, "Use Laplace prior must provide standard deviation." if prior_type == 'Laplace': cov_models = [ LinearCovModel(cov, use_re=True, prior_beta_laplace=np.array([0.0, laplace_std]) if cov not in self.selected_covs else None, prior_beta_gaussian=None if cov not in self.selected_covs else self.beta_gprior[cov], prior_gamma_uniform=self.loose_gamma_uprior if self.use_re[cov] else \ self.zero_gamma_uprior) for cov in covs ] else: cov_models = [ LinearCovModel(cov, use_re=True, prior_beta_gaussian=None if cov not in self.beta_gprior else self.beta_gprior[cov], prior_gamma_uniform=self.loose_gamma_uprior if self.use_re[cov] else \ self.zero_gamma_uprior) for cov in covs ] model = MRBRT(self.data, cov_models=cov_models, inlier_pct=self.inlier_pct) return model
[docs] def fit_gaussian_model(self, covs: List[str]) -> MRBRT: """Fit Gaussian model. Args: covs (List[str]): A list of covariates need to be included in the model. Returns: MRBRT: the fitted model object. """ gaussian_model = self.create_model(covs, prior_type='Gaussian') gaussian_model.fit_model(x0=np.zeros(gaussian_model.num_vars), inner_print_level=5, inner_max_iter=1000) empirical_gamma = np.var(gaussian_model.u_soln, axis=0) gaussian_model.gamma_soln = empirical_gamma gaussian_model.lt.gamma = empirical_gamma return gaussian_model
[docs] def fit_laplace_model(self, covs: List[str], laplace_std: float) -> MRBRT: """Fit Laplace model. Args: covs (List[str]): A list of covariates need to be included in the model. laplace_std (float): The Laplace prior std. Returns: MRBRT: the fitted model object. """ laplace_model = self.create_model(covs, prior_type='Laplace', laplace_std=laplace_std) lprior = laplace_model.create_lprior() scale = 1 if np.isinf(lprior[1]).all() else 2 laplace_model.fit_model(x0=np.zeros(scale*laplace_model.num_vars), inner_print_level=5, inner_max_iter=1000) return laplace_model
[docs] def summary_gaussian_model(self, gaussian_model: MRBRT) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Summary the gaussian model. Return the mean standard deviation and the significance indicator of beta. Args: gaussian_model (MRBRT): Gaussian model object. Returns: Tuple[np.ndarray, np.ndarray, np.ndarray]: Mean, standard deviation and indicator of the significance of beta solution. """ beta_samples = sample_simple_lme_beta(sample_size=self.num_samples, model=gaussian_model) beta_mean = gaussian_model.beta_soln beta_std = np.std(beta_samples, axis=0) beta_sig = self.is_significance(beta_samples, var_type='beta', alpha=self.alpha) return beta_mean, beta_std, beta_sig
[docs] def fit_pre_selected_covs(self): """Fit the pre-selected covariates. """ if len(self.pre_selected_covs) != 0: gaussian_model = self.fit_gaussian_model(self.pre_selected_covs) beta_mean, beta_std, _ = self.summary_gaussian_model(gaussian_model) beta_std.fill(self.beta_gprior_std) self.update_beta_gprior(self.pre_selected_covs, beta_mean, beta_std)
[docs] def select_covs_by_laplace(self, laplace_std: float, verbose: bool = False): # fit laplace model and select the potential additional covariates laplace_model = self.fit_laplace_model(self.all_covs, laplace_std) additional_covs = [] for i, cov in enumerate(self.all_covs): if np.abs(laplace_model.beta_soln[i]) > self.laplace_threshold and cov not in self.selected_covs: additional_covs.append(cov) if verbose: print(f'Laplace std: {laplace_std}') print(' potential additional covariates', additional_covs) if len(additional_covs) > 0: candidate_covs = self.selected_covs + additional_covs gaussian_model = self.fit_gaussian_model(candidate_covs) beta_soln_mean, beta_soln_std, beta_soln_sig = self.summary_gaussian_model(gaussian_model) if verbose: print(' Gaussian model fe_mean:', beta_soln_mean) print(' Gaussiam model fe_std: ', beta_soln_std) print(' Gaussian model re_var: ', gaussian_model.gamma_soln) print(' significance:', beta_soln_sig) # update the selected covs i_start = len(self.selected_covs) i_end = len(candidate_covs) for i in range(i_start, i_end): if beta_soln_sig[i]: self.selected_covs.append(candidate_covs[i]) self.update_beta_gprior([candidate_covs[i]], np.array([0.0 if self.bias_zero else beta_soln_mean[i]]), np.array([self.beta_gprior_std])) if verbose: print(' selected covariates:', self.selected_covs) # update the stop self.stop = not all(beta_soln_sig) # other stop criterion if len(self.selected_covs) == self.num_covs: self.stop = True
[docs] def update_beta_gprior(self, covs: List[str], mean: np.ndarray, std: np.ndarray): """Update the beta Gaussian prior. Args: covs (List[str]): Name of the covariates. mean (np.ndarray): Mean of the priors. std (np.ndarray): Standard deviation of the priors. """ for i, cov in enumerate(covs): if cov not in self.all_covs: raise ValueError(f"Unrecognized covaraite {cov} for beta Gaussian prior update.") if cov not in self.beta_gprior: self.beta_gprior[cov] = np.array([mean[i], std[i]])
[docs] def select_covs(self, verbose: bool = False): if len(self.covs) != 0: self.fit_pre_selected_covs() for power in self.powers: if not self.stop: laplace_std = 10**power self.select_covs_by_laplace(laplace_std, verbose=verbose) self.stop = True
[docs] @staticmethod def is_significance(var_samples: np.ndarray, var_type: str = 'beta', alpha: float = 0.05) -> np.ndarray: assert var_type == 'beta', "Only support variable type beta." assert 0.0 < alpha < 1.0, "Significance threshold has to be between 0 and 1." var_uis = np.quantile(var_samples, (0.5*alpha, 1 - 0.5*alpha), axis=0) var_sig = var_uis.prod(axis=0) > 0 return var_sig