from collections import OrderedDict
import gc
import pandas as pd
import numpy as np
import torch
from tqdm import tqdm
from .dataframe import EncoderDataFrame
# from logging import BasicLogger, IpynbLogger, TensorboardXLogger
from .scalers import StandardScaler, NullScaler, GaussRankScaler
[docs]def ohe(input_vector, dim, device="cpu"):
"""Does one-hot encoding of input vector."""
batch_size = len(input_vector)
nb_digits = dim
y = input_vector.reshape(-1, 1)
y_onehot = torch.FloatTensor(batch_size, nb_digits).to(device)
y_onehot.zero_()
y_onehot.scatter_(1, y, 1)
return y_onehot
[docs]def compute_embedding_size(n_categories):
"""
Applies a standard formula to choose the number of feature embeddings
to use in a given embedding layers.
n_categories is the number of unique categories in a column.
"""
val = min(600, round(1.6 * n_categories**0.56))
return int(val)
[docs]class CompleteLayer(torch.nn.Module):
"""
Impliments a layer with linear transformation
and optional activation and dropout."""
def __init__(
self,
in_dim,
out_dim,
activation=None,
dropout=None,
*args,
**kwargs
):
super(CompleteLayer, self).__init__(*args, **kwargs)
self.layers = []
linear = torch.nn.Linear(in_dim, out_dim)
self.layers.append(linear)
self.add_module('linear_layer', linear)
if activation is not None:
act = self.interpret_activation(activation)
self.layers.append(act)
if dropout is not None:
dropout_layer = torch.nn.Dropout(dropout)
self.layers.append(dropout_layer)
self.add_module('dropout', dropout_layer)
[docs] def interpret_activation(self, act=None):
if act is None:
act = self.activation
activations = {
'leaky_relu':torch.nn.functional.leaky_relu,
'relu':torch.relu,
'sigmoid':torch.sigmoid,
'tanh':torch.tanh,
'selu':torch.selu,
'hardtanh':torch.nn.functional.hardtanh,
'relu6':torch.nn.functional.relu6,
'elu':torch.nn.functional.elu,
'celu':torch.nn.functional.celu,
'rrelu':torch.nn.functional.rrelu,
'hardshrink':torch.nn.functional.hardshrink,
'tanhshrink':torch.nn.functional.tanhshrink,
'softsign':torch.nn.functional.softsign
}
try:
return activations[act]
except:
msg = f'activation {act} not understood. \n'
msg += 'please use one of: \n'
msg += str(list(activations.keys()))
raise Exception(msg)
[docs] def forward(self, x):
for layer in self.layers:
x = layer(x)
return x
[docs]class AutoEncoder(torch.nn.Module):
def __init__(
self,
encoder_layers=None,
decoder_layers=None,
encoder_dropout=None,
decoder_dropout=None,
encoder_activations=None,
decoder_activations=None,
activation='relu',
min_cats=10,
swap_p=.15,
lr=0.01,
batch_size=256,
eval_batch_size=1024,
optimizer='adam',
amsgrad=False,
momentum=0,
betas=(0.9, 0.999),
dampening=0,
weight_decay=0,
lr_decay=None,
nesterov=False,
verbose=False,
device=None,
logger='basic',
logdir='logdir/',
project_embeddings=True,
run=None,
progress_bar=True,
n_megabatches=1,
scaler='standard',
eps = 1e-6,
*args,
**kwargs
):
super(AutoEncoder, self).__init__(*args, **kwargs)
self.numeric_fts = OrderedDict()
self.binary_fts = OrderedDict()
self.categorical_fts = OrderedDict()
self.encoder_layers = encoder_layers
self.decoder_layers = decoder_layers
self.encoder_activations = encoder_activations
self.decoder_activations = decoder_activations
self.encoder_dropout = encoder_dropout
self.decoder_dropout = decoder_dropout
self.min_cats = min_cats
self.encoder = []
self.decoder = []
self.train_mode = self.train
self.swap_p = swap_p
self.batch_size = batch_size
self.eval_batch_size = eval_batch_size
self.numeric_output = None
self.binary_output = None
self.num_names = []
self.bin_names = []
self.activation = activation
self.optimizer = optimizer
self.lr = lr
self.lr_decay = lr_decay
self.amsgrad=amsgrad
self.momentum=momentum
self.betas=betas
self.dampening=dampening
self.weight_decay=weight_decay
self.nesterov=nesterov
self.optim = None
self.progress_bar = progress_bar
self.mse = torch.nn.modules.loss.MSELoss(reduction='none')
self.bce = torch.nn.modules.loss.BCELoss(reduction='none')
self.cce = torch.nn.modules.loss.CrossEntropyLoss(reduction='none')
self.verbose = verbose
if device is None:
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
else:
self.device = device
self.logger = logger
self.logdir = logdir
self.run = run
self.project_embeddings = project_embeddings
self.scaler = scaler
self.n_megabatches = n_megabatches
self.eps = eps
[docs] def get_scaler(self, name):
scalers = {
'standard':StandardScaler,
'gauss_rank':GaussRankScaler,
None:NullScaler,
'none':NullScaler
}
return scalers[name]
[docs] def init_numeric(self, df):
dt = df.dtypes
numeric = []
numeric += list(dt[dt==int].index)
numeric += list(dt[dt==float].index)
# print("Numeric ", numeric)
if isinstance(self.scaler, str):
scalers = {ft:self.scaler for ft in numeric}
elif isinstance(self.scaler, dict):
scalers = self.scaler
for ft in numeric:
Scaler = self.get_scaler(scalers.get(ft, 'gauss_rank'))
feature = {
'mean':df[ft].mean(),
'std':df[ft].std(),
'scaler':Scaler()
}
feature['scaler'].fit(df[ft][~df[ft].isna()].values)
self.numeric_fts[ft] = feature
self.num_names = list(self.numeric_fts.keys())
[docs] def init_cats(self, df):
dt = df.dtypes
# objects = list(dt[dt==pd.Categorical].index)
objects = list(dt[dt=='category'].index)
# print("Category ", objects)
for ft in objects:
feature = {}
vl = df[ft].value_counts()
if len(vl) < 3:
feature['cats'] = list(vl.index)
self.binary_fts[ft] = feature
continue
cats = list(vl[vl >= self.min_cats].index)
feature['cats'] = cats
self.categorical_fts[ft] = feature
[docs] def init_binary(self, df):
dt = df.dtypes
binaries = list(dt[dt==bool].index)
# print("Binary ", binaries)
for ft in self.binary_fts:
feature = self.binary_fts[ft]
for i, cat in enumerate(feature['cats']):
feature[cat] = bool(i)
for ft in binaries:
feature = dict()
feature['cats'] = [True, False]
feature[True] = True
feature[False] = False
self.binary_fts[ft] = feature
self.bin_names = list(self.binary_fts.keys())
[docs] def init_features(self, df):
self.init_numeric(df)
self.init_cats(df)
self.init_binary(df)
[docs] def build_outputs(self, dim):
self.numeric_output = torch.nn.Linear(dim, len(self.numeric_fts))
self.binary_output = torch.nn.Linear(dim, len(self.binary_fts))
for ft in self.categorical_fts:
feature = self.categorical_fts[ft]
cats = feature['cats']
layer = torch.nn.Linear(dim, len(cats)+1)
feature['output_layer'] = layer
self.add_module(f'{ft} output', layer)
[docs] def prepare_df(self, df):
"""
Does data preparation on copy of input dataframe.
Returns copy.
"""
output_df = EncoderDataFrame()
for ft in self.numeric_fts:
feature = self.numeric_fts[ft]
col = df[ft].fillna(feature['mean'])
trans_col = feature['scaler'].transform(col.values)
trans_col = pd.Series(index=df.index, data=trans_col)
output_df[ft] = trans_col
for ft in self.binary_fts:
feature = self.binary_fts[ft]
output_df[ft] = df[ft].apply(lambda x: feature.get(x, False))
for ft in self.categorical_fts:
feature = self.categorical_fts[ft]
col = pd.Categorical(df[ft], categories=feature['cats']+['_other'])
col = col.fillna('_other')
output_df[ft] = col
return output_df
[docs] def build_optimizer(self):
lr = self.lr
params = self.parameters()
if self.optimizer == 'adam':
return torch.optim.Adam(
params,
lr=self.lr,
amsgrad=self.amsgrad,
weight_decay=self.weight_decay,
betas=self.betas
)
elif self.optimizer == 'sgd':
return torch.optim.SGD(
params,
lr,
momentum=self.momentum,
nesterov=self.nesterov,
dampening=self.dampening,
weight_decay=self.weight_decay,
)
elif self.optimizer == 'adamW':
return torch.optim.AdamW(
params,
lr,
betas = self.betas,
eps = self.eps)
[docs] def build_model(self, df):
"""
Takes a pandas dataframe as input.
Builds autoencoder model.
Returns the dataframe after making changes.
"""
if self.verbose:
print('Building model...')
#get metadata from features
self.init_features(df)
input_dim = self.build_inputs()
#construct a canned denoising autoencoder architecture
if self.encoder_layers is None:
self.encoder_layers = [int(4*input_dim) for _ in range(3)]
if self.decoder_layers is None:
self.decoder_layers = []
if self.encoder_activations is None:
self.encoder_activations = [self.activation for _ in self.encoder_layers]
if self.decoder_activations is None:
self.decoder_activations = [self.activation for _ in self.decoder_layers]
if self.encoder_dropout is None or type(self.encoder_dropout) == float:
drp = self.encoder_dropout
self.encoder_dropout = [drp for _ in self.encoder_layers]
if self.decoder_dropout is None or type(self.decoder_dropout) == float:
drp = self.decoder_dropout
self.decoder_dropout = [drp for _ in self.decoder_layers]
for i, dim in enumerate(self.encoder_layers):
activation = self.encoder_activations[i]
layer = CompleteLayer(
input_dim,
dim,
activation = activation,
dropout = self.encoder_dropout[i]
)
input_dim = dim
self.encoder.append(layer)
self.add_module(f'encoder_{i}', layer)
for i, dim in enumerate(self.decoder_layers):
activation = self.decoder_activations[i]
layer = CompleteLayer(
input_dim,
dim,
activation = activation,
dropout = self.decoder_dropout[i]
)
input_dim = dim
self.decoder.append(layer)
self.add_module(f'decoder_{i}', layer)
#set up predictive outputs
self.build_outputs(dim)
#get optimizer
self.optim = self.build_optimizer()
if self.lr_decay is not None:
self.lr_decay = torch.optim.lr_scheduler.ExponentialLR(self.optim, self.lr_decay)
cat_names = list(self.categorical_fts.keys())
fts = self.num_names + self.bin_names + cat_names
# if self.logger == 'basic':
# self.logger = BasicLogger(fts=fts)
# elif self.logger == 'ipynb':
# self.logger = IpynbLogger(fts=fts)
# elif self.logger == 'tensorboard':
# self.logger = TensorboardXLogger(logdir=self.logdir, run=self.run, fts=fts)
#returns a copy of preprocessed dataframe.
self.to(self.device)
if self.verbose:
print('done!')
[docs] def compute_targets(self, df):
num = torch.tensor(df[self.num_names].values).float().to(self.device)
bin = torch.tensor(df[self.bin_names].astype(int).values).float().to(self.device)
codes = []
for ft in self.categorical_fts:
# print("Ft ", ft)
feature = self.categorical_fts[ft]
code = torch.tensor(df[ft].cat.codes.astype(int).values).to(self.device)
# print("Code ", code, len(code))
codes.append(code)
# print("Categorical target ")
# print(codes)
# print(len(codes))
return num, bin, codes
[docs] def compute_outputs(self, x):
num = self.numeric_output(x)
bin = self.binary_output(x)
bin = torch.sigmoid(bin)
cat = []
for ft in self.categorical_fts:
feature = self.categorical_fts[ft]
out = feature['output_layer'](x)
cat.append(out)
return num, bin, cat
[docs] def encode(self, x, layers=None):
if layers is None:
layers = len(self.encoder)
for i in range(layers):
layer = self.encoder[i]
x = layer(x)
return x
[docs] def decode(self, x, layers=None):
if layers is None:
layers = len(self.decoder)
for i in range(layers):
layer = self.decoder[i]
x = layer(x)
num, bin, cat = self.compute_outputs(x)
return num, bin, cat
[docs] def forward(self, df):
"""We do the thang. Takes pandas dataframe as input."""
num, bin, embeddings = self.encode_input(df)
x = torch.cat(num + bin + embeddings, dim=1)
encoding = self.encode(x)
num, bin, cat = self.decode(encoding)
return num, bin, cat
[docs] def compute_loss(self, num, bin, cat, target_df, logging=False, _id=False):
if logging:
if self.logger is not None:
logging = True
else:
logging = False
net_loss = []
num_target, bin_target, codes = self.compute_targets(target_df)
mse_loss = self.mse(num, num_target)
net_loss += list(mse_loss.mean(dim=0).cpu().detach().numpy())
mse_loss = mse_loss.mean()
bce_loss = self.bce(bin, bin_target)
net_loss += list(bce_loss.mean(dim=0).cpu().detach().numpy())
bce_loss = bce_loss.mean()
cce_loss = []
for i, ft in enumerate(self.categorical_fts):
loss = self.cce(cat[i], codes[i])
loss = loss.mean()
cce_loss.append(loss)
val = loss.cpu().item()
net_loss += [val]
if logging:
if self.training:
self.logger.training_step(net_loss)
elif _id:
self.logger.id_val_step(net_loss)
elif not self.training:
self.logger.val_step(net_loss)
net_loss = np.array(net_loss).mean()
return mse_loss, bce_loss, cce_loss, net_loss
[docs] def do_backward(self, mse, bce, cce):
mse.backward(retain_graph=True)
bce.backward(retain_graph=True)
for i, ls in enumerate(cce):
if i == len(cce)-1:
ls.backward(retain_graph=False)
else:
ls.backward(retain_graph=True)
[docs] def fit(self, df, epochs=1, val=None):
"""Does training."""
if self.optim is None:
self.build_model(df)
if self.n_megabatches==1:
df = self.prepare_df(df)
if val is not None:
val_df = self.prepare_df(val)
val_in = val_df.swap(likelihood=self.swap_p)
msg = "Validating during training.\n"
msg += "Computing baseline performance..."
baseline = self.compute_baseline_performance(val_in, val_df)
if self.verbose:
print(msg)
result = []
val_batches = len(val_df)//self.eval_batch_size
if len(val_df) % self.eval_batch_size != 0:
val_batches += 1
n_updates = len(df)//self.batch_size
if len(df) % self.batch_size > 0:
n_updates += 1
for i in tqdm(range(epochs)):
self.train()
if self.verbose:
print(f'training epoch {i+1}...')
df = df.sample(frac=1.0)
df = EncoderDataFrame(df)
if self.n_megabatches > 1:
self.train_megabatch_epoch(n_updates, df)
else:
input_df = df.swap(likelihood=self.swap_p)
self.train_epoch(n_updates, input_df, df)
if self.lr_decay is not None:
self.lr_decay.step()
if val is not None:
self.eval()
with torch.no_grad():
swapped_loss = []
id_loss = []
for i in range(val_batches):
start = i * self.eval_batch_size
stop = (i+1) * self.eval_batch_size
slc_in = val_in.iloc[start:stop]
slc_out = val_df.iloc[start:stop]
num, bin, cat = self.forward(slc_in)
_, _, _, net_loss = self.compute_loss(num, bin, cat, slc_out)
swapped_loss.append(net_loss)
num, bin, cat = self.forward(slc_out)
_, _, _, net_loss = self.compute_loss(num, bin, cat, slc_out, _id=True)
id_loss.append(net_loss)
# self.logger.end_epoch()
# if self.project_embeddings:
# self.logger.show_embeddings(self.categorical_fts)
if self.verbose:
swapped_loss = np.array(swapped_loss).mean()
id_loss = np.array(id_loss).mean()
msg = '\n'
msg += 'net validation loss, swapped input: \n'
msg += f"{round(swapped_loss, 4)} \n\n"
msg += 'baseline validation loss: '
msg += f"{round(baseline, 4)} \n\n"
msg += 'net validation loss, unaltered input: \n'
msg += f"{round(id_loss, 4)} \n\n\n"
print(msg)
[docs] def train_epoch(self, n_updates, input_df, df, pbar=None):
"""Run regular epoch."""
# if pbar is None and self.progress_bar:
# close = True
# pbar = tqdm.tqdm(total=n_updates)
# else:
# close = False
for j in range(n_updates):
start = j * self.batch_size
stop = (j+1) * self.batch_size
in_sample = input_df.iloc[start:stop]
target_sample = df.iloc[start:stop]
num, bin, cat = self.forward(in_sample)
mse, bce, cce, net_loss = self.compute_loss(
num, bin, cat, target_sample,
logging=False
)
self.do_backward(mse, bce, cce)
self.optim.step()
self.optim.zero_grad()
# if self.progress_bar:
# pbar.update(1)
# if close:
# pbar.close()
[docs] def train_megabatch_epoch(self, n_updates, df):
"""
Run epoch doing 'megabatch' updates, preprocessing data in large
chunks.
"""
# if self.progress_bar:
# pbar = tqdm.tqdm(total=n_updates)
# else:
# pbar = None
n_rows = len(df)
n_megabatches = self.n_megabatches
batch_size = self.batch_size
res = n_rows/n_megabatches
batches_per_megabatch = (res // batch_size) + 1
megabatch_size = batches_per_megabatch * batch_size
final_batch_size = n_rows - (n_megabatches - 1) * megabatch_size
for i in range(n_megabatches):
megabatch_start = int(i * megabatch_size)
megabatch_stop = int((i+1) * megabatch_size)
megabatch = df.iloc[megabatch_start:megabatch_stop]
megabatch = self.prepare_df(megabatch)
input_df = megabatch.swap(self.swap_p)
if i == (n_megabatches-1):
n_updates = int(final_batch_size//batch_size)
if final_batch_size % batch_size > 0:
n_updates += 1
else:
n_updates = int(batches_per_megabatch)
self.train_epoch(n_updates, input_df, megabatch, pbar=None)
del megabatch
del input_df
gc.collect()
[docs] def get_representation(self, df, layer=0):
"""
Computes latent feature vector from hidden layer
given input dataframe.
argument layer (int) specifies which layer to get.
by default (layer=0), returns the "encoding" layer.
layer < 0 counts layers back from encoding layer.
layer > 0 counts layers forward from encoding layer.
"""
result = []
n_batches = len(df)//self.eval_batch_size
if len(df) % self.eval_batch_size != 0:
n_batches += 1
self.eval()
if self.optim is None:
self.build_model(df)
df = self.prepare_df(df)
with torch.no_grad():
for i in range(n_batches):
start = i * self.eval_batch_size
stop = (i+1) * self.eval_batch_size
num, bin, embeddings = self.encode_input(df.iloc[start:stop])
x = torch.cat(num + bin + embeddings, dim=1)
if layer <= 0:
layers = len(self.encoder) + layer
x = self.encode(x, layers=layers)
else:
x = self.encode(x)
x = self.decode(x, layers=layer)
result.append(x)
z = torch.cat(result, dim=0)
return z
[docs] def get_deep_stack_features(self, df):
"""
records and outputs all internal representations
of input df as row-wise vectors.
Output is 2-d array with len() == len(df)
"""
result = []
n_batches = len(df)//self.eval_batch_size
if len(df) % self.eval_batch_size != 0:
n_batches += 1
self.eval()
if self.optim is None:
self.build_model(df)
df = self.prepare_df(df)
with torch.no_grad():
for i in range(n_batches):
this_batch = []
start = i * self.eval_batch_size
stop = (i+1) * self.eval_batch_size
num, bin, embeddings = self.encode_input(df.iloc[start:stop])
x = torch.cat(num + bin + embeddings, dim=1)
for layer in self.encoder:
x = layer(x)
this_batch.append(x)
for layer in self.decoder:
x = layer(x)
this_batch.append(x)
z = torch.cat(this_batch, dim=1)
result.append(z)
result = torch.cat(result, dim=0)
return result
[docs] def get_anomaly_score(self, df):
"""
Returns a per-row loss of the input dataframe.
Does not corrupt inputs.
"""
self.eval()
data = self.prepare_df(df)
num_target, bin_target, codes = self.compute_targets(data)
with torch.no_grad():
num, bin, cat = self.forward(data)
mse_loss = self.mse(num, num_target)
net_loss = [mse_loss.data]
bce_loss = self.bce(bin, bin_target)
net_loss += [bce_loss.data]
cce_loss = []
for i, ft in enumerate(self.categorical_fts):
loss = self.cce(cat[i], codes[i])
cce_loss.append(loss)
net_loss += [loss.data.reshape(-1, 1)]
net_loss = torch.cat(net_loss, dim=1).mean(dim=1)
return net_loss.cpu().numpy()
[docs] def decode_to_df(self, x, df=None):
"""
Runs input embeddings through decoder
and converts outputs into a dataframe.
"""
if df is None:
cols = [x for x in self.binary_fts.keys()]
cols += [x for x in self.numeric_fts.keys()]
cols += [x for x in self.categorical_fts.keys()]
df = pd.DataFrame(index=range(len(x)), columns=cols)
num, bin, cat = self.decode(x)
num_cols = [x for x in self.numeric_fts.keys()]
num_df = pd.DataFrame(data=num.detach().cpu().numpy(), index=df.index)
num_df.columns = num_cols
for ft in num_df.columns:
feature = self.numeric_fts[ft]
col = num_df[ft]
trans_col = feature['scaler'].inverse_transform(col.values)
result = pd.Series(index=df.index, data=trans_col)
num_df[ft] = result
bin_cols = [x for x in self.binary_fts.keys()]
bin_df = pd.DataFrame(data=bin.detach().cpu().numpy(), index=df.index)
bin_df.columns = bin_cols
bin_df = bin_df.apply(lambda x: round(x)).astype(bool)
for ft in bin_df.columns:
feature = self.binary_fts[ft]
map = {
False:feature['cats'][0],
True:feature['cats'][1]
}
bin_df[ft] = bin_df[ft].apply(lambda x: map[x])
cat_df = pd.DataFrame(index=df.index)
for i, ft in enumerate(self.categorical_fts):
feature = self.categorical_fts[ft]
#get argmax excluding NaN column (impute with next-best guess)
codes = torch.argmax(cat[i][:, :-1], dim=1).cpu().numpy()
cat_df[ft] = codes
cats = feature['cats']
cat_df[ft] = cat_df[ft].apply(lambda x: cats[x])
#concat
output_df = pd.concat([num_df, bin_df, cat_df], axis=1)
return output_df[df.columns]
[docs] def df_predict(self, df):
"""
Runs end-to-end model.
Interprets output and creates a dataframe.
Outputs dataframe with same shape as input
containing model predictions.
"""
self.eval()
data = self.prepare_df(df)
with torch.no_grad():
num, bin, embeddings = self.encode_input(data)
x = torch.cat(num + bin + embeddings, dim=1)
x = self.encode(x)
output_df = self.decode_to_df(x, df=df)
return output_df