Source code for echoflow.transformer

from typing import Optional, Tuple

import numpy as np
import pandas as pd
import torch
from scipy.special import ndtr
from scipy.stats import gaussian_kde


[docs]class KDETransformer: """Probability integral transform via KDE."""
[docs] def fit(self, X: np.ndarray): self.model = gaussian_kde(X) self.lower = np.min(X) - 3.0 * np.std(X) self.upper = np.max(X) + 3.0 * np.std(X)
[docs] def transform(self, X: np.ndarray) -> np.ndarray: stdev = np.sqrt(self.model.covariance[0, 0]) lower = ndtr((self.lower - self.model.dataset) / stdev)[0] uppers = ndtr((X[:, None] - self.model.dataset) / stdev) return (uppers - lower).dot(self.model.weights)
[docs] def inverse_transform(self, Z: np.ndarray, maxiter=1000, tol=1e-6) -> np.ndarray: low = np.full_like(Z, self.lower) high = np.full_like(Z, self.upper) for _ in range(maxiter): guess = (low + high) / 2.0 f_guess = self.transform(guess) - Z low[f_guess <= 0] = guess[f_guess <= 0] high[f_guess >= 0] = guess[f_guess >= 0] if (high - low).max() < tol: break return guess
[docs]class TableTransformer: """Transform a dataframe into a tensor.""" dims = 0 def __init__(self, use_kde): self.use_kde = use_kde
[docs] def fit_transform(self, df: pd.DataFrame) -> torch.Tensor: """Fit and transform a dataframe into a tensor. Continuous values are normalized to the [0.0, 1.0] range and categorical values are converted into a one-hot representation. Parameters ---------- df: The dataframe containing continuous and categorical values. Returns ---------- torch.Tensor: A tensor representation of the data. """ self.dims = 0 self.mappings = [] self.columns = df.columns for _, column in enumerate(df.columns): if df[column].dtype.kind in "f": mapping = { "type": "continuous", "column": column, "dst_idx": self.dims, "min": df[column].min(), "max": df[column].max(), } if self.use_kde: transformer = KDETransformer() transformer.fit(df[column].values) mapping["transformer"] = transformer self.mappings.append(mapping) self.dims += 1 elif df[column].dtype.kind in "O": values = set(df[column]) self.mappings.append( { "type": "categorical", "column": column, "dst_idx": { value: self.dims + i for i, value in enumerate(values) }, } ) self.dims += len(values) else: raise ValueError("Unsupported data type.") return self.transform(df)
[docs] def transform(self, df): X = torch.zeros(len(df), self.dims) for mapping in self.mappings: if mapping["type"] == "continuous": if "transformer" in mapping: X[:, mapping["dst_idx"]] = torch.FloatTensor( mapping["transformer"].transform(df[mapping["column"]].values) ) else: X[:, mapping["dst_idx"]] = ( torch.FloatTensor(df[mapping["column"]].values) - mapping["min"] ) / (mapping["max"] - mapping["min"]) elif mapping["type"] == "categorical": for value, idx in mapping["dst_idx"].items(): X[df[mapping["column"]] == value, idx] = ( 1.0 + np.random.normal(0.0, 1.0) / 10.0 # type: ignore ) return X
[docs] def inverse_transform(self, inputs: torch.Tensor) -> pd.DataFrame: """Inverse transform a tensor into a dataframe. Parameters ---------- inputs: The tensor to apply the inverse transform to. """ X = inputs.detach().numpy() cols = {} for mapping in self.mappings: if mapping["type"] == "continuous": if "transformer" in mapping: cols[mapping["column"]] = mapping["transformer"].inverse_transform( X[:, mapping["dst_idx"]] ) else: cols[mapping["column"]] = ( X[:, mapping["dst_idx"]] * (mapping["max"] - mapping["min"]) + mapping["min"] ) elif mapping["type"] == "categorical": values, indices = zip(*mapping["dst_idx"].items()) cols[mapping["column"]] = [ values[i] for i in np.argmax(X[:, indices], axis=1) ] return pd.DataFrame(cols, columns=self.columns)
[docs]class SplitTransformer: def __init__(self, use_kde): self.use_kde = use_kde
[docs] def fit_transform( self, df: pd.DataFrame ) -> Tuple[Optional[torch.Tensor], Optional[torch.Tensor]]: self.fit(df) return self.transform(df)
[docs] def fit(self, df: pd.DataFrame): self.meta = {} self.cardinality = [] self.columns = df.columns self.continuous_dims = 0 self.categorical_dims = 0 for column in df.columns: if df[column].dtype.kind in "f": self.meta[column] = { "type": "continuous", "idx": self.continuous_dims, "min": df[column].min(), "max": df[column].max(), } if self.use_kde: transformer = KDETransformer() transformer.fit(df[column].values) self.meta[column]["transformer"] = transformer self.continuous_dims += 1 if self.meta[column]["min"] == self.meta[column]["max"]: raise ValueError(f"The {column} column is constant.") elif df[column].dtype.kind in "O": v2i = {v: i for i, v in enumerate(set(df[column]))} self.meta[column] = { "type": "categorical", "idx": self.categorical_dims, "v2i": v2i, "i2v": {i: v for v, i in v2i.items()}, } self.cardinality.append(len(v2i)) self.categorical_dims += 1 if len(v2i) == 1: raise ValueError(f"The {column} column is constant.") else: raise ValueError("Unsupported data type.") if self.continuous_dims == 1: self.continuous_dims += 1
[docs] def transform( self, df: pd.DataFrame ) -> Tuple[Optional[torch.Tensor], Optional[torch.Tensor]]: continuous, categorical = None, None if self.continuous_dims != 0: continuous = torch.zeros(len(df), self.continuous_dims) if self.categorical_dims != 0: categorical = torch.zeros(len(df), self.categorical_dims) for column, meta in self.meta.items(): if meta["type"] == "continuous": assert continuous is not None if self.use_kde: continuous[:, meta["idx"]] = torch.FloatTensor( meta["transformer"].transform(df[column].values) ) else: continuous[:, meta["idx"]] = torch.FloatTensor( (df[column].values - meta["min"]) / (meta["max"] - meta["min"]) ) elif meta["type"] == "categorical": assert categorical is not None categorical[:, meta["idx"]] = torch.LongTensor( [meta["v2i"][v] for v in df[column].values] ) return continuous, categorical
[docs] def inverse_transform( self, continuous: Optional[torch.Tensor], categorical: Optional[torch.Tensor] ) -> pd.DataFrame: data = {} for column, meta in self.meta.items(): if meta["type"] == "continuous": assert continuous is not None if self.use_kde: data[column] = meta["transformer"].inverse_transform( continuous[:, meta["idx"]].detach().numpy() ) else: data[column] = ( ( continuous[:, meta["idx"]] * (meta["max"] - meta["min"]) + meta["min"] ) .detach() .numpy() ) data[column] = np.clip(data[column], meta["min"], meta["max"]) elif meta["type"] == "categorical": assert categorical is not None data[column] = [ meta["i2v"][i] for i in categorical[:, meta["idx"]].detach().numpy() ] return pd.DataFrame(data, columns=self.columns)