# -*- coding: utf-8 -*-
"""
utils
~~~~~
`utils` module of the `mrtool` package.
"""
from typing import Union, List, Any, Tuple
import numpy as np
import pandas as pd
try:
from cdd import Matrix, RepType, Polyhedron
except:
Warning("no cdd module installed, create fake classes.")
[docs] class RepType:
INEQUALITY = None
[docs] class Polyhedron:
[docs] def get_generators(self):
pass
[docs]def get_cols(df, cols):
"""Return the columns of the given data frame.
Args:
df (pandas.DataFrame):
Given data frame.
cols (str | list{str} | None):
Given column name(s), if is `None`, will return a empty data frame.
Returns:
pandas.DataFrame | pandas.Series:
The data frame contains the columns.
"""
assert isinstance(df, pd.DataFrame)
if isinstance(cols, list):
assert all([isinstance(col, str) and col in df
for col in cols])
else:
assert (cols is None) or (isinstance(cols, str) and cols in df)
if cols is None:
return df[[]]
else:
return df[cols]
[docs]def is_cols(cols):
"""Check variable type fall into the column name category.
Args:
cols (str | list{str} | None):
Column names candidate.
Returns:
bool:
if `col` is either str, list{str} or None
"""
ok = isinstance(cols, (str, list)) or cols is None
if isinstance(cols, list):
ok = ok and all([isinstance(col, str)
for col in cols])
return ok
[docs]def combine_cols(cols):
"""Combine column names into one list of names.
Args:
cols (list{str | list{str}}):
A list of names of columns or list of column names.
Return:
list{str}:
Combined names of columns.
"""
combined_cols = []
for col in cols:
if isinstance(col, str):
combined_cols.append(col)
else:
combined_cols += col
return combined_cols
[docs]def sizes_to_indices(sizes):
"""Converting sizes to corresponding indices.
Args:
sizes (numpy.dnarray):
An array consist of non-negative number.
Returns:
list{range}:
List the indices.
"""
indices = []
a = 0
b = 0
for i, size in enumerate(sizes):
b += size
indices.append(np.arange(a, b))
a += size
return indices
[docs]def is_gaussian_prior(prior, size=None):
"""Check if variable satisfy Gaussian prior format
Args:
prior (numpy.ndarray):
Either one or two dimensional array, with first group refer to mean
and second group refer to standard deviation.
Keyword Args:
size (int | None, optional):
Size the variable, prior related to.
Returns:
bool:
True if satisfy condition.
"""
# check type
ok = isinstance(prior, np.ndarray) or prior is None
if prior is not None:
# check dimension
ok = ok and (prior.ndim == 1 or prior.ndim == 2) and len(prior) == 2
if size is not None and prior.ndim == 2:
ok = ok and (prior.shape[1] == size)
# check value
ok = ok and np.all(prior[1] > 0.0)
return ok
is_laplace_prior = is_gaussian_prior
input_laplace_prior = input_gaussian_prior
[docs]def avg_integral(mat, spline=None, use_spline_intercept=False):
"""Compute average integral.
Args:
mat (numpy.ndarray):
Matrix that contains the starting and ending points of the integral
or a single column represents the mid-points.
spline (xspline.XSpline | None, optional):
Spline integrate over with, when `None` treat the function as
linear.
use_spline_intercept (bool, optional):
If `True` use all bases from spline, otherwise remove the first bases.
Returns:
numpy.ndarray:
Design matrix when spline is not `None`, otherwise the mid-points.
"""
assert mat.ndim == 2
if mat.size == 0:
return mat.reshape(mat.shape[0], 0)
index = 0 if use_spline_intercept else 1
if mat.shape[1] == 1:
return mat if spline is None else spline.design_mat(
mat.ravel(), l_extra=True, r_extra=True)[:, index:]
else:
if spline is None:
return mat.mean(axis=1)[:, None]
else:
x0 = mat[:, 0]
x1 = mat[:, 1]
dx = x1 - x0
val_idx = (dx == 0.0)
int_idx = (dx != 0.0)
mat = np.zeros((dx.size, spline.num_spline_bases))
if np.any(val_idx):
mat[val_idx, :] = spline.design_mat(x0[val_idx],
l_extra=True,
r_extra=True)
if np.any(int_idx):
mat[int_idx, :] = spline.design_imat(
x0[int_idx], x1[int_idx], 1,
l_extra=True,
r_extra=True)/(dx[int_idx][:, None])
return mat[:, index:]
# random knots
[docs]def sample_knots(num_intervals: int,
knot_bounds: Union[np.ndarray, None] = None,
interval_sizes: Union[np.ndarray, None] = None,
num_samples: int = 1) -> Union[np.ndarray, None]:
"""Sample knots given a set of rules.
Args:
num_intervals
Number of intervals (number of knots minus 1).
knot_bounds
Bounds for the interior knots. Here we assume the domain span 0 to 1,
bound for a knot should be between 0 and 1, e.g. ``[0.1, 0.2]``.
``knot_bounds`` should have number of interior knots of rows, and each row
is a bound for corresponding knot, e.g.
``knot_bounds=np.array([[0.0, 0.2], [0.3, 0.4], [0.3, 1.0]])``,
for when we have three interior knots.
interval_sizes
Bounds for the distances between knots. For the same reason, we assume
elements in `interval_sizes` to be between 0 and 1. For example,
``interval_distances=np.array([[0.1, 0.2], [0.1, 0.3], [0.1, 0.5], [0.1, 0.5]])``
means that the distance between first (0) and second knot has to be between 0.1 and 0.2, etc.
And the number of rows for ``interval_sizes`` has to be same with ``num_intervals``.
num_samples
Number of knots samples.
Returns:
np.ndarray: Return knots sample as array, with `num_samples` rows and number of knots columns.
"""
# rename variables
k = num_intervals
b = knot_bounds
d = interval_sizes
N = num_samples
t0 = 0.0
tk = 1.0
# check input
assert t0 <= tk
assert k >= 2
if d is not None:
assert d.shape == (k, 2) and sum(d[:, 0]) <= 1.0 and\
np.all(d >= 0.0) and np.all(d <= 1.0)
else:
d = np.repeat(np.array([[0.0, 1.0]]), k, axis=0)
if b is not None:
assert b.shape == (k - 1, 2) and\
np.all(b[:, 0] <= b[:, 1]) and\
np.all(b[:-1, 1] <= b[1:, 1]) and\
np.all(b >= 0.0) and np.all(b <= 1.0)
else:
b = np.repeat(np.array([[0.0, 1.0]]), k - 1, axis=0)
d = d*(tk - t0)
b = b*(tk - t0) + t0
d[0] += t0
d[-1] -= tk
# find vertices of the polyhedron
D = -col_diff_mat(k - 1)
I = np.identity(k - 1)
A1 = np.vstack((-D, D))
A2 = np.vstack((-I, I))
b1 = np.hstack((-d[:, 0], d[:, 1]))
b2 = np.hstack((-b[:, 0], b[:, 1]))
A = np.vstack((A1, A2))
b = np.hstack((b1, b2))
mat = np.insert(-A, 0, b, axis=1)
mat = Matrix(mat)
mat.rep_type = RepType.INEQUALITY
poly = Polyhedron(mat)
ext = poly.get_generators()
vertices_and_rays = np.array(ext)
if vertices_and_rays.size == 0:
print('there is no feasible knots')
return None
if np.any(vertices_and_rays[:, 0] == 0.0):
print('polyhedron is not closed, something is wrong.')
return None
else:
vertices = vertices_and_rays[:, 1:]
# sample from the convex combination of the vertices
n = vertices.shape[0]
s_simplex = sample_simplex(n, N=N)
s = s_simplex.dot(vertices)
s = np.insert(s, 0, t0, axis=1)
s = np.insert(s, k, tk, axis=1)
return s
[docs]def sample_simplex(n, N=1):
"""sample from n dimensional simplex"""
assert n >= 1
# special case when n == 1
if n == 1:
return np.ones((N, n))
# other cases
s = np.random.rand(N, n - 1)
s.sort(axis=1)
s = np.insert(s, 0, 0.0, axis=1)
s = np.insert(s, n, 1.0, axis=1)
w = np.zeros((n + 1, n))
id_d0 = np.diag_indices(n)
id_d1 = (id_d0[0] + 1, id_d0[1])
w[id_d0] = -1.0
w[id_d1] = 1.0
return s.dot(w)
[docs]def col_diff_mat(n):
"""column difference matrix"""
D = np.zeros((n + 1, n))
id_d0 = np.diag_indices(n)
id_d1 = (id_d0[0] + 1, id_d0[1])
D[id_d0] = -1.0
D[id_d1] = 1.0
return D
[docs]def nonlinear_trans(score, slope=6.0, quantile=0.7):
score_min = np.min(score)
score_max = np.max(score)
if score_max == score_min:
return np.ones(len(score))
else:
weight = (score - score_min)/(score_max - score_min)
sorted_weight = np.sort(weight)
x = sorted_weight[int(0.8*weight.size)]
y = 1.0 - x
# calculate the transformation coefficient
c = np.zeros(4)
c[1] = slope*x**2/quantile
c[0] = quantile*np.exp(c[1]/x)
c[3] = slope*y**2/(1.0 - quantile)
c[2] = (1.0 - quantile)*np.exp(c[3]/y)
weight_trans = np.zeros(weight.size)
for i in range(weight.size):
w = weight[i]
if w == 0.0:
weight_trans[i] = 0.0
elif w < x:
weight_trans[i] = c[0]*np.exp(-c[1]/w)
elif w < 1.0:
weight_trans[i] = 1.0 - c[2]*np.exp(-c[3]/(1.0 - w))
else:
weight_trans[i] = 1.0
weight_trans = (weight_trans - np.min(weight_trans))/\
(np.max(weight_trans) - np.min(weight_trans))
return weight_trans
[docs]def mat_to_fun(alt_mat, ref_mat=None):
alt_mat = np.array(alt_mat)
assert alt_mat.ndim == 2
if ref_mat is not None:
ref_mat = np.array(ref_mat)
assert ref_mat.ndim == 2
if alt_mat.size == 0:
fun = None
jac_fun = None
else:
if ref_mat is None or ref_mat.size == 0:
mat = alt_mat
else:
mat = alt_mat - ref_mat
def fun(x, mat=mat):
return mat.dot(x)
def jac_fun(x, mat=mat):
return mat
return fun, jac_fun
[docs]def mat_to_log_fun(alt_mat, ref_mat=None, add_one=True):
alt_mat = np.array(alt_mat)
shift = 1.0 if add_one else 0.0
assert alt_mat.ndim == 2
if ref_mat is not None:
ref_mat = np.array(ref_mat)
assert ref_mat.ndim == 2
if alt_mat.size == 0:
fun = None
jac_fun = None
else:
if ref_mat is None or ref_mat.size == 0:
def fun(beta):
return np.log(shift + alt_mat.dot(beta))
def jac_fun(beta):
return alt_mat/(shift + alt_mat.dot(beta)[:, None])
else:
def fun(beta):
return np.log(shift + alt_mat.dot(beta)) - \
np.log(shift + ref_mat.dot(beta))
def jac_fun(beta):
return alt_mat/(shift + alt_mat.dot(beta)[:, None]) - \
ref_mat/(shift + ref_mat.dot(beta)[:, None])
return fun, jac_fun
[docs]def empty_array():
return np.array(list())
[docs]def to_list(obj: Any) -> List[Any]:
"""Convert objective to list of object.
Args:
obj (Any): Object need to be convert.
Returns:
List[Any]:
If `obj` already is a list object, return `obj` itself,
otherwise wrap `obj` with a list and return it.
"""
if isinstance(obj, list):
return obj
else:
return [obj]
[docs]def is_numeric_array(array: np.ndarray) -> bool:
"""Check if an array is numeric.
Args:
array (np.ndarray): Array need to be checked.
Returns:
bool: True if the array is numeric.
"""
numerical_dtype_kinds = {'b', # boolean
'u', # unsigned integer
'i', # signed integer
'f', # floats
'c'} # complex
try:
return array.dtype.kind in numerical_dtype_kinds
except AttributeError:
# in case it's not a numpy array it will probably have no dtype.
return np.asarray(array).dtype.kind in numerical_dtype_kinds
[docs]def expand_array(array: np.ndarray,
shape: Tuple[int],
value: Any,
name: str) -> np.ndarray:
"""Expand array when it is empty.
Args:
array (np.ndarray):
Target array. If array is empty, fill in the ``value``. And
When it is not empty assert the ``shape`` agrees and return the original array.
shape (Tuple[int]): The expected shape of the array.
value (Any): The expected value in final array.
name (str): Variable name of the array (for error message).
Returns:
np.ndarray: Expanded array.
"""
array = np.array(array)
if len(array) == 0:
if hasattr(value, '__iter__') and not isinstance(value, str):
value = np.array(value)
assert value.shape == shape, f"{name}, alternative value inconsistent shape."
array = value
else:
array = np.full(shape, value)
else:
assert array.shape == shape, f"{name}, inconsistent shape."
return array
[docs]def ravel_dict(x: dict) -> dict:
"""Ravel dictionary.
"""
assert all([isinstance(k, str) for k in x.keys()])
assert all([isinstance(v, np.ndarray) for v in x.values()])
new_x = {}
for k, v in x.items():
if v.size == 1:
new_x[k] = v
else:
for i in range(v.size):
new_x[f'{k}_{i}'] = v[i]
return new_x