# -*- coding: utf-8 -*-
from . import cosmic_params
import numpy as np
try:
from scipy.ndimage import gaussian_filter
except ImportError:
gaussian_filter = None
'''
This file should be updated !!!
'''
#%% obtain best-fit parameters from chains
#!!! to be updated, build a class for this part, move this function to plc and refer plc
def _quantile(x, q, weights=None):
"""
Compute sample quantiles with support for weighted samples (a function in corner)
This is a copy of quantile in corner (https://github.com/dfm/corner.py). Copyright (c) 2013-2015 Daniel Foreman-Mackey.
Note
----
When ``weights`` is ``None``, this method simply calls numpy's percentile
function with the values of ``q`` multiplied by 100.
Parameters
----------
x : array-like[nsamples,]
The samples.
q : array-like[nquantiles,]
The list of quantiles to compute. These should all be in the range
``[0, 1]``.
weights : Optional[array-like[nsamples,]]
An optional weight corresponding to each sample. These
Returns
-------
quantiles : array-like[nquantiles,]
The sample quantiles computed at ``q``.
Raises
------
ValueError
For invalid quantiles; ``q`` not in ``[0, 1]`` or dimension mismatch
between ``x`` and ``weights``.
"""
x = np.atleast_1d(x)
q = np.atleast_1d(q)
if np.any(q < 0.0) or np.any(q > 1.0):
raise ValueError("Quantiles must be between 0 and 1")
if weights is None:
return np.percentile(x, 100.0 * q)
else:
weights = np.atleast_1d(weights)
if len(x) != len(weights):
raise ValueError("Dimension mismatch: len(weights) != len(x)")
idx = np.argsort(x)
sw = weights[idx]
cdf = np.cumsum(sw)[:-1]
cdf /= cdf[-1]
cdf = np.append(0, cdf)
return np.interp(q, cdf, x[idx]).tolist()
[docs]def pdf_1(X, bins=100, smooth=5):
"""Estimate the probability density function for the given data.
"""
hist = np.histogram(X, bins, density=True) #normed=True --> density=True
P, x = hist[0], hist[1]
x = (x[:-1]+x[1:])/2.0
P = gaussian_filter(P, smooth)
return x, P
[docs]def get_cov(X, Y, mean=None):
"""Calculate covariance or variance.
Parameters
----------
X : array-like
Random variable X.
Y : array-like
Random variable Y.
mean : array-like or list, optional
The mean values of X and Y. Default: None
Returns
-------
float
Covariance or variance.
"""
#https://en.wikipedia.org/wiki/Covariance
if mean is None:
mu_X = np.mean(X)
mu_Y = np.mean(Y)
else:
mu_X, mu_Y = mean[0], mean[1]
return np.mean((X-mu_X)*(Y-mu_Y))
[docs]def get_CovMatrix(chain, params_n, best_values=None):
"""Calculate covariance matrix from a chain.
Parameters
----------
chain : array-like
The ANN or MCMC chain with shape (N, M), where N is the number of chain and M is the number of parameters.
params_n : array-like
The number of parameters.
best_values : array-like
The best-fit values.
Returns
-------
cov_matrix : array-like
Covariance matrix.
"""
if best_values is None:
best_values = np.mean(chain, axis=0)
cov_matrix = np.ones((params_n, params_n))
for i in range(params_n):
for j in range(params_n):
cov_matrix[i, j] = get_cov(chain[:,i], chain[:,j], mean=[best_values[i], best_values[j]])
return cov_matrix
[docs]class Chains(object):
[docs] @staticmethod
def reshape_chain(chain):
if len(chain.shape)==1:
chain = chain.reshape(-1, 1)
return chain
[docs] @staticmethod
def params_n(chain):
if len(chain.shape)==1:
return 1
elif len(chain.shape)==2:
return chain.shape[1]
[docs] @staticmethod
def best_mode(chain, bins=100, smooth=5):
"""Take the mode as the best value.
"""
chain = Chains.reshape_chain(chain)
params_n = Chains.params_n(chain)
best = np.zeros(params_n)
for i in range(params_n):
x, prob = pdf_1(chain[:,i], bins, smooth)
best_idx = np.where(prob==np.max(prob))
best[i] = x[best_idx]
return best
[docs] @staticmethod
def sigma(chain, best_values, out_sigma=1):
"""Calculate the standard deviations.
Parameters
----------
chain : array-like
The ANN chain.
best_values : 1-dimension array
The best values of parameters.
out_sigma : int
The output sigma, which can be 1, 2, or 3. Default: 1
Returns
-------
sigma_1l, sigma_2l, sigma_3l : 1-dimension array
The left 1 sigma, 2 sigma, or 3sigma deviations.
sigma_1r, sigma_2r, sigma_3r : 1-dimension array
The right 1 sigma, 2 sigma, or 3sigma deviations.
"""
chain = Chains.reshape_chain(chain)
params_n = Chains.params_n(chain)
sigma1_left = np.zeros(params_n)
sigma2_left = np.zeros(params_n)
sigma3_left = np.zeros(params_n)
sigma1_right = np.zeros(params_n)
sigma2_right = np.zeros(params_n)
sigma3_right = np.zeros(params_n)
for i in range(params_n):
sigma1_left_edge = _quantile(chain[:,i][np.where(chain[:,i]<best_values[i])], 0.3174)
sigma2_left_edge = _quantile(chain[:,i][np.where(chain[:,i]<best_values[i])], 0.0456)
sigma3_left_edge = _quantile(chain[:,i][np.where(chain[:,i]<best_values[i])], 0.0026)
sigma1_right_edge = _quantile(chain[:,i][np.where(chain[:,i]>best_values[i])], 0.6826)
sigma2_right_edge = _quantile(chain[:,i][np.where(chain[:,i]>best_values[i])], 0.9544)
sigma3_right_edge = _quantile(chain[:,i][np.where(chain[:,i]>best_values[i])], 0.9974)
sigma1_left[i] = best_values[i] - sigma1_left_edge
sigma2_left[i] = best_values[i] - sigma2_left_edge
sigma3_left[i] = best_values[i] - sigma3_left_edge
sigma1_right[i] = sigma1_right_edge - best_values[i]
sigma2_right[i] = sigma2_right_edge - best_values[i]
sigma3_right[i] = sigma3_right_edge - best_values[i]
if out_sigma==1:
return sigma1_left, sigma1_right
elif out_sigma==2:
return sigma1_left, sigma1_right, sigma2_left, sigma2_right
elif out_sigma==3:
return sigma1_left, sigma1_right, sigma2_left, sigma2_right, sigma3_left, sigma3_right
[docs] @staticmethod
def bestFit(chain, best_type='mode', out_sigma=1, symmetry_error=True):
"""Get the best-fit parameters from the chain.
Parameters
----------
chain : array-like
The ANN chain.
best_type : str, optional
The type of the best values of parameters, 'mode' or 'median'. If 'mode', it will take the mode as the best value.
If 'median', it will take the median as the best value. Default: 'mode'
out_sigma : int
The output sigma, which can be 1, 2, or 3. Default: 1
symmetry_error : bool, optional
If True, obtain symmetrical errors, otherwise, obtain unsymmetrical errors. Default: True
"""
if best_type=='mode':
best_values = Chains.best_mode(chain, bins=100, smooth=5)
elif best_type=='median':
best_values = Chains.best_median(chain)
sigma1_left, sigma1_right, sigma2_left, sigma2_right, sigma3_left, sigma3_right = Chains.sigma(chain, best_values, out_sigma=3) #1sigma, 2sigma, 3sigma left/right error
if out_sigma==1:
best_fit = np.c_[best_values, sigma1_left, sigma1_right]
best_fit_symError = np.c_[best_values, (sigma1_left+sigma1_right)/2.0]
elif out_sigma==2:
best_fit = np.c_[best_values, sigma1_left, sigma1_right, sigma2_left, sigma2_right]
best_fit_symError = np.c_[best_values, (sigma1_left+sigma1_right)/2.0, (sigma2_left+sigma2_right)/2.0]
elif out_sigma==3:
best_fit = np.c_[best_values, sigma1_left, sigma1_right, sigma2_left, sigma2_right, sigma3_left, sigma3_right]
best_fit_symError = np.c_[best_values, (sigma1_left+sigma1_right)/2.0, (sigma2_left+sigma2_right)/2.0, (sigma3_left+sigma3_right)/2.0]
if symmetry_error:
return best_fit_symError
else:
return best_fit
[docs] @staticmethod
def param_devs(chain_1, chain_2):
"""Get deviations of parameters obtained from two chains.
"""
best_fit_1 = Chains.bestFit(chain_1, best_type='mode', symmetry_error=True)
best_fit_2 = Chains.bestFit(chain_2, best_type='mode', symmetry_error=True)
dev = abs(best_fit_1[:,0]-best_fit_2[:,0]) / np.sqrt(best_fit_1[:,1]**2+best_fit_2[:,1]**2)
dev = [round(i, 4) for i in dev]
return dev
[docs] @staticmethod
def error_devs(chain_1, chain_true):
"""Get the absolute values of the relative deviations of error of parameters obtained from two chains.
"""
err_1 = Chains.bestFit(chain_1, best_type='mode', symmetry_error=True)[:,1]
err_true = Chains.bestFit(chain_true, best_type='mode', symmetry_error=True)[:,1]
dev = abs( (err_1 - err_true)/err_true )
dev = [round(i, 4) for i in dev]
return dev
[docs] @staticmethod
def cov_matrix(chain, max_error=True, expand_factor=0):
"""Get the covariance matrix.
Parameters
----------
chain : array-like
The ANN chain.
max_error : bool, optional
If True, the diagonal elements of the covariance matrix will be replaced by the estimated maximum errors,
which is useful for non-Gaussian distribution. Default: True
expand_factor : float, optional
The expansion factor that is used to expand the error (the standard deviation) of each cosmological parameter.
For example, if expand_factor=0.05, the error will has 5% expansion. It only works when max_error is True. Default: 0
Returns
-------
cov : array-like
The covariance matrix.
"""
#remove expand_factor?
best_fit = Chains.bestFit(chain, best_type='mode', symmetry_error=False)
cov = get_CovMatrix(chain, chain.shape[1], best_values=best_fit[:,0])
if max_error:
sigma_max = np.max(best_fit[:,1:], axis=1)
# this will ensure the Pearson correlation coefficient is in the range of [-1,1], and also ensure matrix is positive definite
idx_big = np.where(sigma_max/np.sqrt(cov.diagonal()) > 1.0)[0]
cov[idx_big, idx_big] = sigma_max[idx_big]**2
diag_idx = np.diag_indices_from(cov)
cov[diag_idx] = cov[diag_idx] * (1+expand_factor)**2
#test
# print(cov.shape)
# cov[1,0] = cov[1,0] * 1.5
# cov[0,1] = cov[0,1] * 1.5
#test 2
# factor_sigma = 0.1
# error_factor = np.abs(np.random.normal(1, factor_sigma, len(cov)))
# # error_factor = np.abs(np.random.normal(0, factor_sigma, len(cov))) + 1
# print(error_factor)
# idx = np.where(cov)
# cov[idx] = cov[idx] * error_factor[idx[0]] * error_factor[idx[1]]
#test 3
# factor_sigma = 0.3
# error_factor = np.abs(np.random.normal(1, factor_sigma, 1))
# # error_factor = np.abs(np.random.normal(0, factor_sigma, 1)) + 1
# print(error_factor)
# cov = cov * error_factor * error_factor
return cov
#%% update parameter space
[docs]class CheckParameterSpace(object):
[docs] @staticmethod
def check_limit(p_space, limit_space):
"""Check the parameter space to ensure that the parameter space does not exceed its limit range.
Parameters
----------
p_space : array-like
The parameter space to be checked.
limit_space : array-like
The limit range of parameter space.
Returns
-------
array-like
A parameter space being limited by its limit range.
"""
params_lower = p_space[:,0]
params_upper = p_space[:,1]
params_min = limit_space[:,0]
params_max = limit_space[:,1]
for i in range(len(params_lower)):
if not np.isnan(params_min[i]):
if params_lower[i] < params_min[i]:
params_lower[i] = params_min[i]
if not np.isnan(params_max[i]):
if params_upper[i] > params_max[i]:
params_upper[i] = params_max[i]
return np.c_[params_lower, params_upper]
[docs]class UpdateParameterSpace(CheckParameterSpace):
"""Update parameter space.
Parameters
----------
step : int
The number of step in the training process.
param_names : list
A list that contains parameter names.
chain_1 : array-like
The ANN chain of the i-th step, where :math:`i\geq2`.
chain_0 : None or array-like, optional
The ANN chain of the (i-1)-th step, where :math:`i\geq2`, if step :math:`\leq2`, ``chain_0`` should be set to None,
otherwise, ``chain_0`` should be an array. Default: None
init_params : None or array-like
The initial settings of the parameter space. If ``chain_0`` is given, ``init_params`` will be ignored. Default: None
spaceSigma : int or array-like, optional
The size of parameter space to be learned. It is a int or a numpy array with shape of (n,), where n is the number of parameters,
e.g. for spaceSigma=5, the parameter space to be learned is :math:`[-5\sigma, +5\sigma]`. Default: 5
params_dict : dict or None, optional
Information of cosmological parameters that include the labels, the minimum values,
and the maximum values. See :func:`~.cosmic_params.params_dict_zoo`. Default: None
"""
def __init__(self, step, param_names, chain_1, chain_0=None, init_params=None, spaceSigma=5, params_dict=None):
self.step = self._step(step)
self.param_names = param_names
self.params_limit = cosmic_params.ParamsProperty(param_names, params_dict=params_dict).params_limit
self.chain_1 = self._chain_1(chain_1)
self.chain_0 = chain_0
self.init_params = init_params
if type(spaceSigma) is int:
self.spaceSigma = np.array([spaceSigma for i in range(len(param_names))])
else:
self.spaceSigma = spaceSigma
def _step(self, step):
if step>=2:
return step
else:
raise ValueError('Parameter space can only be updated after the first step (step>=2)')
def _chain_1(self, chain):
chain = Chains.reshape_chain(chain)
self.best_fit_1 = Chains.bestFit(chain, best_type='mode', symmetry_error=False)
self.best_1 = self.best_fit_1[:,0]
self.sigma_max_1 = np.max(self.best_fit_1[:,1:], axis=1)
self.sigma_mean_1 = (self.best_fit_1[:,1] + self.best_fit_1[:,2])/2.0
return chain
@property
def param_devs(self):
if self.step==2:
if self.chain_0 is None:
dev = abs((np.mean(self.init_params, axis=1) - self.best_1) / self.best_1)
dev = [round(i, 4) for i in dev]
return dev
else:
return Chains.param_devs(self.chain_1, self.chain_0)
elif self.step>=3:
return Chains.param_devs(self.chain_1, self.chain_0)
@property
def error_devs(self):
if self.step==2:
if self.chain_0 is None:
#here we assume initial parameter space is [-5\sigma, +5\sigma]
err_0 = (np.mean(self.init_params, axis=1) - self.init_params[:,0])/5.0
dev = abs((self.sigma_mean_1 - err_0) / err_0)
dev = [round(i, 4) for i in dev]
return dev
else:
return Chains.error_devs(self.chain_1, self.chain_0)
elif self.step>=3:
return Chains.error_devs(self.chain_1, self.chain_0)
#this method should be updated, deviation can not fully indecate the similarity between two chains, other method should be used
#this method may should be removed ?, by adding the number of chains to be obtained?
[docs] def small_dev(self, limit_dev=0.01):
"""A small value of deviation of parameters between two steps used to end the training process.
Parameters
----------
limit_dev : float, optional
If the deviation of parameters between two steps smaller than this value, the training process will be over. Default: 0.01 (the deviation < 1%)
Returns
-------
bool
True (dev :math:`\leq` limit_dev) or False (dev>limit_dev)
"""
if max(self.param_devs) <= limit_dev:
if self.step==2:
print('\nThe maximum deviation of parameters between the first step and the initial parameters is: {:.2f}\\sigma'.format(max(self.param_devs)))
else:
print('\nThe maximum deviation of parameters between the above two steps is: {:.2f}\\sigma'.format(max(self.param_devs)))
print('The parameters are well constrained based on the deviation smaller than %s\\sigma\n'%limit_dev)
return True
else:
if self.step==2:
print('\nThe minimum/maximum deviations of parameters (relative deviations of errors) between the first step and the initial parameters are: {:.2f}/{:.2f}\\sigma; ({:.2f}/{:.2f})'.format(min(self.param_devs), max(self.param_devs), min(self.error_devs), max(self.error_devs)))
else:
print('\nThe minimum/maximum deviations of parameters (relative deviations of errors) between the above two steps are: {:.2f}/{:.2f}\\sigma; ({:.2f}/{:.2f})'.format(min(self.param_devs), max(self.param_devs), min(self.error_devs), max(self.error_devs)))
return False
def _auto_sigma(self, space_sigma, param_dev, critical_5=2/np.sqrt(2), critical_10=7/np.sqrt(2)):
# the values of critical_* here are based on a hypothesis
# critical_10: int, default: (10-3)/2**0.5, if max(self.param_dev)>critical_10, 'sigma' will be set to 20,
# if critical_5<max(self.param_dev)<=critical_10, 'sigma' will be set to 10
# critical_5: int, default: (5-3)/2**0.5, if max(self.param_dev)<=critical_5, 'sigma' will be set to 5
# we can also use the following settings: critical_5=(5-4)/2**0.5, critical_10=(10-4)/2**0.5
if param_dev<critical_5:
# sigma = 5
sigma = space_sigma #also use error_dev???
elif critical_5<=param_dev<critical_10:
sigma = 10
elif param_dev>=critical_10:
sigma = 15
return sigma
@property
def spaceSigma_all(self):
_sigmas = []
for i in range(len(self.param_names)):
# _sigmas.append(self._auto_sigma(self.spaceSigma[i], self.param_devs[i]))
_sigmas.append(self._auto_sigma(self.spaceSigma[i], self.param_devs[i], critical_5=1/np.sqrt(2), critical_10=6/np.sqrt(2)))
return np.array(_sigmas)
[docs] def params_space(self):
"""Obtain the parameter space to be learned from chain.
Returns
-------
array-like
Limited parameter space.
"""
p_space = np.c_[self.best_1-self.sigma_max_1*self.spaceSigma_all, self.best_1+self.sigma_max_1*self.spaceSigma_all]
return self.check_limit(p_space, self.params_limit)
[docs] def limited_spaceSigma_all(self):
p_space_limited = self.params_space()
p_space_lower = p_space_limited[:,0]
p_space_upper = p_space_limited[:,1]
left_sigmas = (self.best_1 - p_space_lower) / self.sigma_max_1
right_sigmas = (p_space_upper - self.best_1) / self.sigma_max_1
lim_sigmas = np.c_[left_sigmas, right_sigmas]
return lim_sigmas
@property
def spaceSigma_max(self):
return round(np.max(self.limited_spaceSigma_all()), 1)
@property
def spaceSigma_min(self):
return round(np.min(self.limited_spaceSigma_all()), 1)
[docs] def print_learningRange(self):
print('Updating parameter space to be learned ...')
p_space_limited = self.params_space()
lim_sigmas = self.limited_spaceSigma_all()
for i in range(len(self.param_names)):
print('Learning range of %s: [%.6f, %.6f] ~ [-%.1f\sigma, +%.1f\sigma]'%(self.param_names[i], p_space_limited[i][0], p_space_limited[i][1], lim_sigmas[i][0], lim_sigmas[i][1]))
# print('Note: if min(sigma)<3, this parameter will not be estimated well!!!') #???
print('')