import numpy as np
import scipy.stats
from .discrete.pareto import Pareto
[ドキュメント]def EI(predictor, training, test, fmax=None):
"""
Maximum expected improvement.
Parameters
----------
predictor: predictor object
Base class is defined in physbo.predictor.
training: physbo.variable
Training dataset. If already trained, the model does not use this.
test: physbo.variable
Inputs
fmax: float
Max value of posterior probability distribution.
If not set fmax, the max value of posterior mean of weights is set.
Returns
-------
score: numpy.ndarray
"""
fmean = predictor.get_post_fmean(training, test)
fcov = predictor.get_post_fcov(training, test)
fstd = np.sqrt(fcov)
if fmax is None:
fmax = np.max(predictor.get_post_fmean(training, training))
temp1 = fmean - fmax
temp2 = temp1 / fstd
score = temp1 * scipy.stats.norm.cdf(temp2) + fstd * scipy.stats.norm.pdf(temp2)
return score
[ドキュメント]def PI(predictor, training, test, fmax=None):
"""
Maximum probability of improvement.
Parameters
----------
predictor: predictor object
Base class is defined in physbo.predictor.
training: physbo.variable
Training dataset. If already trained, the model does not use this.
test: physbo.variable
Inputs
fmax: float
Max value of posterior probability distribution.
If not set fmax, the max value of posterior mean of weights is set.
Returns
-------
score: numpy.ndarray
"""
fmean = predictor.get_post_fmean(training, test)
fcov = predictor.get_post_fcov(training, test)
fstd = np.sqrt(fcov)
if fmax is None:
fmax = np.max(predictor.get_post_fmean(training, training))
temp = (fmean - fmax) / fstd
score = scipy.stats.norm.cdf(temp)
return score
[ドキュメント]def TS(predictor, training, test, alpha=1):
"""
Thompson sampling (See Sec. 2.1 in Materials Discovery Volume 4, June 2016, Pages 18-21)
Parameters
----------
predictor: predictor object
Base class is defined in physbo.predictor.
training: physbo.variable
Training dataset. If already trained, the model does not use this.
test: physbo.variable
Inputs
alpha: float
noise for sampling source
(default: 1.0)
Returns
-------
score: numpy.ndarray
"""
return predictor.get_post_samples(training, test, alpha=alpha)
[ドキュメント]def HVPI(fmean, fstd, pareto):
"""
Calculate Hypervolume-based Probability of Improvement (HVPI).
Reference: (Couckuyt et al., 2014) Fast calculation of multiobjective probability of improvement and expected improvement criteria for Pareto optimization
"""
N = fmean.shape[0]
n_obj = pareto.num_objectives
if pareto.reference_min is None:
pareto.set_reference_min()
reference_min = pareto.reference_min
# Pareto front with reference points
# shape: (front_size, n_obj)
front = np.r_[
np.array(reference_min).reshape((1, n_obj)),
pareto.front,
np.full((1, n_obj), np.inf),
]
ax = np.arange(n_obj)
n_cell = pareto.cells.lb.shape[0]
# convert to minimization problem
l = front[pareto.cells.ub, ax].reshape((1, n_cell, n_obj)) * -1
u = front[pareto.cells.lb, ax].reshape((1, n_cell, n_obj)) * -1
# convert to minimization problem
fmean = fmean.reshape((N, 1, n_obj)) * -1
fstd = fstd.reshape((N, 1, n_obj))
# calculate cdf
Phi_l = scipy.stats.norm.cdf((l - fmean) / fstd)
Phi_u = scipy.stats.norm.cdf((u - fmean) / fstd)
# calculate PoI
poi = np.sum(np.prod(Phi_u - Phi_l, axis=2), axis=1) # shape: (N, 1)
# calculate hypervolume contribution of fmean point
hv_valid = np.all(fmean < u, axis=2) # shape: (N, n_cell)
hv = np.prod(u - np.maximum(l, fmean), axis=2) # shape: (N, n_cell)
hv = np.sum(hv * hv_valid, axis=1) # shape: (N, 1)
# HVPoI
score = hv * poi
return score
[ドキュメント]def EHVI(fmean, fstd, pareto):
"""
Calculate Expected Hyper-Volume Improvement (EHVI).
Reference: (Couckuyt et al., 2014) Fast calculation of multiobjective probability of improvement and expected improvement criteria for Pareto optimization
"""
N = fmean.shape[0]
n_obj = pareto.num_objectives
if pareto.reference_min is None:
pareto.set_reference_min()
if pareto.reference_max is None:
pareto.set_reference_max()
reference_min = pareto.reference_min
reference_max = pareto.reference_max
# Pareto front with reference points
# shape: (front_size, n_obj)
front = np.r_[
np.array(reference_min).reshape((1, n_obj)),
pareto.front,
np.array(reference_max).reshape((1, n_obj)),
]
ax = np.arange(n_obj)
# convert to minimization problem
l = front[pareto.cells.ub, ax] * -1
u = front[pareto.cells.lb, ax] * -1
n_cell = pareto.cells.lb.shape[0]
# shape: (n_cell, 1, n_cell, n_obj)
l = np.tile(l, (n_cell, 1, 1, 1))
u = np.tile(u, (n_cell, 1, 1, 1))
a = l.transpose((2, 1, 0, 3))
b = u.transpose((2, 1, 0, 3))
# convert to minimization problem
fmean = fmean.reshape((1, N, 1, n_obj)) * -1
fstd = fstd.reshape((1, N, 1, n_obj))
# calculate pdf, cdf
phi_min_bu = scipy.stats.norm.pdf((np.minimum(b, u) - fmean) / fstd)
phi_max_al = scipy.stats.norm.pdf((np.maximum(a, l) - fmean) / fstd)
Phi_l = scipy.stats.norm.cdf((l - fmean) / fstd)
Phi_u = scipy.stats.norm.cdf((u - fmean) / fstd)
Phi_a = scipy.stats.norm.cdf((a - fmean) / fstd)
Phi_b = scipy.stats.norm.cdf((b - fmean) / fstd)
## calculate G
is_type_A = np.logical_and(a < u, l < b)
is_type_B = u <= a
# note: Phi[max_or_min(x,y)] = max_or_min(Phi[x], Phi[y])
EI_A = (
(b - a) * (np.maximum(Phi_a, Phi_l) - Phi_l)
+ (b - fmean) * (np.minimum(Phi_b, Phi_u) - np.maximum(Phi_a, Phi_l))
+ fstd * (phi_min_bu - phi_max_al)
)
EI_B = (b - a) * (Phi_u - Phi_l)
G = EI_A * is_type_A + EI_B * is_type_B
score = np.sum(np.sum(np.prod(G, axis=3), axis=0), axis=1) # shape: (N, 1)
return score
[ドキュメント]def TS_MO(predictor_list, training_list, test, alpha=1, reduced_candidate_num=None):
score = [
predictor.get_post_samples(training, test, alpha=alpha)
for predictor, training in zip(predictor_list, training_list)
]
score = np.array(score).reshape((len(predictor_list), test.X.shape[0])).T
pareto = Pareto(num_objectives=len(predictor_list))
if reduced_candidate_num is None or score.shape[0] <= reduced_candidate_num:
use_idx = np.arange(score.shape[0])
else:
use_idx = np.arange(reduced_candidate_num)
use_idx = np.random.choice(use_idx, reduced_candidate_num, replace=False)
# pareto.update_front(score)
pareto.update_front(score[use_idx, :])
# randomly choose candidate from pareto frontiers
chosen_idx = np.random.choice(pareto.front_num)
score_res = np.zeros(score.shape[0])
score_res[use_idx[chosen_idx]] = 1 # only chosen_idx th value is one.
return score_res