from datetime import datetime
from itertools import cycle
from typing import Any, Dict, List, Optional, Union
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from bokeh.layouts import column # type: ignore
from bokeh.models import Range1d # type: ignore
from bokeh.plotting import figure # type: ignore
from scipy.stats import norm # type: ignore
from statsmodels.stats.power import tt_ind_solve_power # type: ignore
from metricx.metric import Metric
_colors = [
"#1f77b4",
"#ff7f0e",
"#2ca02c",
"#d62728",
"#9467bd",
"#8c564b",
"#e377c2",
"#7f7f7f",
"#bcbd22",
"#17becf",
]
def _is_unique(s):
a = s.to_numpy()
return (a[0] == a).all()
[docs]class Task:
"""This class represents a task which is used to evaluate models.
Attributes:
name: The name of the task.
metrics: A list of Metrics which are produced by models which are
executed on the task. The first metric in this list is considered
the default metric.
"""
def __init__(
self,
name: str,
metrics: List[Metric],
):
self.name = name
self.metrics: Dict[str, Metric] = {}
self.results: Dict[str, List[Dict[str, float]]] = {}
for metric in metrics:
if metric.name in self.metrics:
raise ValueError("Metric names must be unique.")
self.metrics[metric.name] = metric
self.default_metric = metrics[0]
[docs] def report(self, model: str, result: Dict[Union[str, Metric], float]):
"""Report a result for the specified model.
Args:
model: A string identifying the model.
result: A dictionary mapping each metric to its value.
Raises:
KeyError: If the result dictionary is missing metrics.
"""
if model not in self.results:
self.results[model] = []
result_: Dict[str, Any] = {}
result_["timestamp"] = datetime.now()
for metric in self.metrics.values():
if metric in result:
result_[metric.name] = result[metric]
else:
result_[metric.name] = result[metric.name]
self.results[model].append(result_)
[docs] def rank(self, metric: Optional[Union[str, Metric]] = None) -> List[str]:
"""Rank the models.
Args:
metric: The target metric to sort by.
Returns:
A list of models, sorted from best to worst.
"""
metric = self.get_metric(metric)
scores = []
for model, (mu, var, _) in self.model_to_mu_var_n(metric).items():
if metric.is_higher_better:
mu = -mu
scores.append((mu, var, model))
scores = sorted(scores)
_, _, models = zip(*scores)
return list(models)
[docs] def likelihood(
self,
rank: List[str],
metric: Optional[Union[str, Metric]] = None,
N: int = 1000,
):
"""Estimate the likelihood of the given ranking.
Args:
rank: A proposed list of models sorted from best to worst.
metric: The target metric.
N: The number of samples to use for the estimation,
Returns:
A probability of the given ranking being the true ranking.
"""
metric = self.get_metric(metric)
model_to_mu_var_n = self.model_to_mu_var_n(metric)
model_to_values = {}
for model, (mu, var, _) in model_to_mu_var_n.items():
model_to_values[model] = np.random.normal(
loc=mu, scale=np.sqrt(var), size=N
)
is_valid = np.ones(N).astype(bool)
for a, b in zip(rank, rank[1:]):
if metric.is_higher_better:
is_valid &= model_to_values[a] > model_to_values[b]
else:
is_valid &= model_to_values[a] < model_to_values[b]
return np.mean(is_valid)
[docs] def best(self, metric: Union[str, Metric]) -> str:
"""Get the best model.
Args:
metric: The target metric to sort by.
Returns:
The best model on this task.
"""
return self.rank(metric)[0]
[docs] def samples_to_achieve_power(
self, modelA: str, modelB: str, metric: Optional[Union[str, Metric]] = None
):
"""Number of samples needed to achieve power.
This method estimates the number of samples needed - for each
model - to achieve 50% statistical power. This corresponds to the
probability of detecting a statistically significant effect
(p-value=0.1) if there is one.
"""
model_to_mu_var_n = self.model_to_mu_var_n(metric)
muA, varA, nA = model_to_mu_var_n[modelA]
muB, varB, nB = model_to_mu_var_n[modelB]
if nA < 3 or nB < 3:
raise ValueError("Not enough samples to estimate the power.")
pct_change = abs(0.1 * (muA + muB) / 2.0)
std_dev = np.sqrt(varA + varB)
effect_size = pct_change / std_dev
nA_needed = tt_ind_solve_power(
effect_size=effect_size, nobs1=None, alpha=0.1, power=0.5, ratio=nB / nA
)
nB_needed = nA_needed * nB / nA
return max(0, nA_needed - nA), max(0, nB_needed - nB)
[docs] def get_metric(self, metric: Optional[Union[str, Metric]]) -> Metric:
if metric is None:
return self.default_metric
elif isinstance(metric, str):
return self.metrics[metric]
return metric
[docs] def model_to_mu_var_n(self, metric: Optional[Union[str, Metric]]):
"""Compute mean, variance, and count."""
metric = self.get_metric(metric)
model_to_mu_var_n = {}
for model, results in self.results.items():
values = np.array([result[metric.name] for result in results])
mu, var = np.mean(values), np.var(values) + 1e-5 # type: ignore
if len(values) <= 1:
var = 0.0
model_to_mu_var_n[model] = (mu, var, len(values))
return model_to_mu_var_n
[docs] def to_csv(self, path_to_csv):
"""Export to CSV.
Args:
path_to_csv: The path to write the csv.
"""
self.to_df().to_csv(path_to_csv, index=False)
[docs] def to_df(self) -> pd.DataFrame:
"""Export to DataFrame.
Returns:
A DataFrame where each row corresponds to a single run.
"""
rows = []
for model, results in self.results.items():
for result in results:
obj: Dict[str, Any] = {}
obj["model"] = model
obj.update(result)
rows.append(obj)
return pd.DataFrame(rows)
[docs] def to_bokeh(self):
"""Export to bokeh Figure."""
figures = []
df = self.to_df()
for i, metric in enumerate(self.metrics.values()):
fig = figure(
plot_width=650,
plot_height=200,
title=metric.name,
toolbar_location=None,
)
colors = cycle(_colors)
ymax = 0.0
xmin = df[metric.name].min() - df[metric.name].std() * 3
xmax = df[metric.name].max() + df[metric.name].std() * 3
for model, grp in df.groupby("model"):
mu = grp[metric.name].mean()
if _is_unique(grp[metric.name]):
sigma = grp[metric.name].std()
x = [mu, mu]
y = [0, 100.0]
fig.line(x, y, line_color=next(colors), legend_label=model)
else:
sigma = grp[metric.name].std()
x = np.linspace(xmin, xmax, 1000)
y = norm.pdf(x, loc=mu, scale=sigma)
fig.line(x, y, line_color=next(colors), legend_label=model)
ymax = max(ymax, np.max(y))
fig.x_range = Range1d(xmin, xmax)
fig.y_range = Range1d(0.0, ymax * 1.1)
figures.append(fig)
fig.legend.visible = i == 0
return column(*figures)