"""GKLR main module."""
from __future__ import annotations
from typing import Optional, Any, Dict, List
import sys
import gc
import time
import numpy as np
from pympler import asizeof
import pandas as pd
from .logger import *
from .config import Config
from .kernel_utils import *
from .kernel_estimator import KernelEstimator
from .kernel_calcs import KernelCalcs
from .kernel_matrix import KernelMatrix
valid_gklr_params = ["n_jobs", "nystrom", "compression", "ridge_leverage_lambda", "nystrom_sampling"]
[docs]class KernelModel:
"""Main class for GKLR models."""
def __init__(self, model_params: Optional[Dict[str, Any]] = None) -> None:
"""Constructor.
Args:
model_params: A dict where the keys are the parameters of the kernel model and the value they contain.
Default: None.
"""
self._X = None
self.choice_column = None
self.attributes = None
self._Z = None
self._K = None
self._K_test = None
self._alpha = None
self.alpha_shape = None
self.n_parameters = 0
self.results = None
if model_params == None:
self._model_params = None
else:
# TODO: Check parameters
self._model_params = model_params
self.config = Config()
logger_debug("KernelModel initialized.")
def _set_kernel_params(self, hyperparams: Dict[str, Any]) -> None:
"""Set the kernel parameters.
Store the hyperparameters of the GKLR model in a config object and
a dict with the parameters to be passed to the kernel function.
Args:
hyperparams: A dict where the keys are the hyperparameters passed
to the GKLR object and their value.
"""
kernel_params = hyperparams.copy()
list_kernel_params = list(kernel_params.keys())
if "kernel" in kernel_params and kernel_params["kernel"] in valid_kernel_list:
self.config.set_hyperparameter("kernel", kernel_params.pop("kernel"))
list_kernel_params.remove("kernel")
for param in list_kernel_params:
if param in valid_gklr_params:
self.config.set_hyperparameter(param, kernel_params.pop(param))
elif param in valid_kernel_params:
# Valid parameter for the kernel function
pass
else:
raise ValueError(f"Parameter {param} is not a valid KernelModel",
"parameter.")
# Store the kernel function parameters
self.config.set_hyperparameter("kernel_params", kernel_params)
def _create_kernel_matrix(self,
X: pd.DataFrame,
choice_column: str,
attributes: Dict[int, List[str]],
config: Config,
Z: Optional[pd.DataFrame] = None,
train: bool = True,
) -> bool:
"""Creates a KernelMatrix object.
Creates the KernelMatrix object and store it in a private variable.
Args:
X: Train dataset stored in a pandas DataFrame. Shape: (n_samples, n_features)
choice_column: Name of the column of DataFrame `X` that contains the ID of chosen alternative.
attributes: A dict that contains the columns of DataFrame `X` that are considered for each alternative.
This dict is indexed by the ID of the available alternatives in the dataset and the values are list
containing the names of all the columns considered for that alternative.
config: A Config object that contains the hyperparameters of the GKLR model.
Z: Test dataset stored in a pandas DataFrame. Shape: (n_samples, n_features).
Default: None
train: A boolean that indicates if the kernel matrix to be created is for train or test data.
Default: True.
Returns:
A bolean that indicates if the kerne matrix was successfully created.
"""
success = True # TODO: Check conditions before create kernel, if not satisfied, then success is False
# TODO: ensure_columns_are_in_dataframe
# TODO: ensure_valid_variables_passed_to_kernel_matrix
config.check_values()
if train:
self._K = KernelMatrix(X, choice_column, attributes, config, Z)
self.n_parameters = self._K.get_num_cols() * self._K.get_num_alternatives() # One alpha vector per alternative
self.alpha_shape = (self._K.get_num_cols(), self._K.get_num_alternatives())
else:
self._K_test = KernelMatrix(X, choice_column, attributes, config, Z)
return success
[docs] def get_kernel(self, dataset: str = "train") -> KernelMatrix | None:
"""Returns the train and/or test KernelMatrix object.
Args:
dataset: The kernel matrix to be retrieved. It can take the values: "train", "test" or "both".
Default: "train".
Returns:
The KernelMatrix object.
"""
if dataset == "train":
return self._K
elif dataset == "test":
return self._K_test
else:
msg = "Dataset must be a value in: ['train', 'test', 'both']"
logger_error(msg)
raise ValueError(msg)
[docs] def clear_kernel(self, dataset: str = "train") -> None:
"""Clear the kernel matrices previously computed.
Removes the train and test kernel matrices and frees the memory.
Args:
dataset: The kernel matrix to be deleted. It can take the values: "train", "test" or "both".
Default: "train".
"""
if dataset == "train":
self._K = None
elif dataset == "test":
self._K_test = None
self._Z = None
elif dataset == "both":
self._K = None
self._K_test = None
self._X = None
self._Z = None
else:
msg = "Dataset must be a value in: ['train', 'test', 'both']"
logger_error(msg)
raise ValueError(msg)
gc.collect()
return None
[docs] def set_kernel_train(self,
X: pd.DataFrame,
choice_column: str,
attributes: Dict[int, List[str]],
hyperparams: Dict[str, Any],
verbose: int = 1,
) -> None:
"""Computes the kernel matrix for the train dataset.
Processes the train dataset and creates the corresponding kernel matrix. The kernel matrix is encapsulated and
stored using the KernelMatrix class.
Args:
X: Train dataset stored in a pandas DataFrame. Shape: (n_samples, n_features)
choice_column: Name of the column of DataFrame `X` that contains the ID of chosen alternative.
attributes: A dict that contains the columns of DataFrame `X` that are considered for each alternative.
This dict is indexed by the ID of the available alternatives in the dataset and the values are list
containing the names of all the columns considered for that alternative.
hyperparams: A dict where the keys are the hyperparameters passed to the kernel function and the value they
contain.
verbose: Indicates the level of verbosity of the function. If 0, no output will be printed. If 1, basic
information about the time spent and the size of the matrix will be displayed. Default: 1.
"""
self.clear_kernel(dataset="both")
self._set_kernel_params(hyperparams)
start_time = time.time()
success = self._create_kernel_matrix(X, choice_column, attributes, self.config, train=True)
elapsed_time_sec = time.time() - start_time
if success == 0:
self.clear_kernel(dataset="train")
msg = "The kernel matrix for the train set have NOT been created."
logger_error(msg)
raise RuntimeError(msg)
else:
self._X = X
self.choice_column = choice_column
self.attributes = attributes
elapsed_time_str = elapsed_time_to_str(elapsed_time_sec)
K_size, K_size_u = convert_size_bytes_to_human_readable(asizeof.asizeof(self._K))
if verbose >= 1:
print(f"The kernel matrix for the train set have been correctly created in {elapsed_time_str}. "
f"Size of the matrix object: {K_size} {K_size_u}")
sys.stdout.flush()
logger_debug(f"Kernel matrix for train dataset estimated in {elapsed_time_str}. Size: {K_size} {K_size_u}")
return None
[docs] def set_kernel_test(self,
Z: pd.DataFrame,
choice_column: Optional[str] = None,
attributes: Optional[Dict[int, List[str]]] = None,
verbose: int = 1,
) -> None:
"""Computes the kernel matrix test dataset.
Processes the test dataset and creates the corresponding kernel matrix. The kernel matrix is encapsulated and
stored using the KernelMatrix class.
Args:
Z: Test dataset stored in a pandas DataFrame. Shape: (n_samples, n_features)
choice_column: Name of the column of DataFrame `Z` that contains the ID of chosen alternative.
attributes: A dict that contains the columns of DataFrame `Z` that are considered for each alternative.
This dict is indexed by the ID of the available alternatives in the dataset and the values are list
containing the names of all the columns considered for that alternative.
verbose: Indicates the level of verbosity of the function. If 0, no output will be printed. If 1, basic
information about the time spent and the size of the matrix will be displayed. Default: 1.
"""
if self._X is None or self._K is None or self.choice_column is None or self.attributes is None:
msg = "First you must compute the kernel for the train dataset using set_kernel_train()."
logger_error(msg)
raise RuntimeError(msg)
self.clear_kernel(dataset="test")
# Set default values for the input parameters
choice_column = self.choice_column if choice_column is None else choice_column
attributes = self.attributes if attributes is None else attributes
start_time = time.time()
success = self._create_kernel_matrix(self._X, choice_column, attributes, self.config,
Z=Z, train=False)
elapsed_time_sec = time.time() - start_time
if success == 0:
msg = "The kernel matrix for the test set have not been created."
logger_error(msg)
raise RuntimeError(msg)
else:
self._Z = Z
elapsed_time_str = elapsed_time_to_str(elapsed_time_sec)
K_size, K_size_u = convert_size_bytes_to_human_readable(asizeof.asizeof(self._K_test))
if verbose >= 1:
print(f"The kernel matrix for the test set have been correctly created in {elapsed_time_str}. "
f"Size of the matrix object: {K_size} {K_size_u}")
sys.stdout.flush()
logger_debug(f"Kernel matrix for test dataset estimated in {elapsed_time_str}. Size: {K_size} {K_size_u}")
return None
[docs] def fit(self,
init_parms: Optional[np.ndarray] = None,
pmle: str = "Tikhonov",
pmle_lambda: float = 0,
method: str = "L-BFGS-B",
options: Optional[Dict[str, Any]] = None,
verbose: int = 1,
) -> None:
"""Fit the kernel model.
Perform the estimation of the kernel model and store post-estimation results.
Args:
init_parms: Initial value of the parameters to be optimized.
Shape: (num_cols_kernel_matrix, n_features). Default: None
pmle: Penalization method. Default: None.
pmle_lambda: Parameter for the penalization method. Default: 0
method: Optimization method. Default: "L-BFGS-B".
options: Options for the optimization method. Default: None.
verbose: Indicates the level of verbosity of the function. If 0, no output will be printed. If 1, basic
information about the time spent and the Log-likelihood value will be displayed. Default: 1.
"""
if self._K is None or self.alpha_shape is None:
msg = "First you must compute the kernel for the train dataset using set_kernel_train()."
logger_error(msg)
raise RuntimeError(msg)
if init_parms is None:
init_parms = np.zeros(self.alpha_shape, dtype=DEFAULT_DTYPE)
else:
pass # TODO: check that there are self.n_parameters and then make a cast to self.alpha_shape
# Create the Calcs instance
calcs = KernelCalcs(K=self._K)
# Create the estimator instance
estimator = KernelEstimator(calcs=calcs, pmle=pmle, pmle_lambda=pmle_lambda, method=method, verbose=verbose)
# Log-likelihood at zero
alpha_at_0 = np.zeros(self.alpha_shape, dtype=DEFAULT_DTYPE)
log_likelihood_at_zero = calcs.log_likelihood(alpha_at_0)
# Initial log-likelihood
initial_log_likelihood = calcs.log_likelihood(init_parms)
if verbose >= 1:
print("The estimation is going to start...\n"
"Log-likelihood at zero: {ll_zero:,.4f}\n"
"Initial log-likelihood: {i_ll:,.4f}".format(ll_zero=log_likelihood_at_zero, i_ll=initial_log_likelihood))
sys.stdout.flush()
if verbose >= 2:
print("Number of parameters to be estimated: {n_parameters:,d}".format(n_parameters=self.n_parameters))
sys.stdout.flush()
# Perform the estimation
start_time = time.time()
self.results = estimator.minimize(init_parms.reshape(self.n_parameters), options=options)
elapsed_time_sec = time.time() - start_time
elapsed_time_str = elapsed_time_to_str(elapsed_time_sec)
final_log_likelihood = calcs.log_likelihood(self.results["alpha"])
mcfadden_r2 = 1 - final_log_likelihood / log_likelihood_at_zero # TODO: Implement a method to compute metrics
# Store post-estimation information
self.results["initial_log_likelihood"] = initial_log_likelihood
self.results["final_log_likelihood"] = final_log_likelihood
self.results["elapsed_time"] = elapsed_time_sec
self.results["mcfadden_r2"] = mcfadden_r2
self.results["pmle"] = pmle
self.results["pmle_lamda"] = pmle_lambda
self.results["method"] = method
self.results["history"] = estimator.history
if verbose >= 1:
print("-------------------------------------------------------------------------\n"
"The kernel model has been estimated. Elapsed time: {elapsed_time}.\n"
"Final log-likelihood value: {final_log_likelihood:,.4f}\n"
"McFadden R^2: {r2:.4f}".format(elapsed_time=elapsed_time_str,
final_log_likelihood=final_log_likelihood,
r2 = mcfadden_r2))
sys.stdout.flush()
return None
[docs] def summary(self) -> None:
"""Print a summary of the estimation results."""
if self.results is None:
msg = "The model has not been estimated yet. Use fit() to estimate it."
logger_error(msg)
raise RuntimeError(msg)
print("-------------------------------------------------------------------------\n"
"GKLR Kernel Model summary\n"
"-------------------------------------------------------------------------\n"
"Optimization method: {method}\n"
"optimization success: {success}\n"
"Optimization message: {message}\n"
"Penalization: {pmle}\n"
"Penalization parameter: {pmle_lambda}\n"
"Initial log-likelihood: {initial_log_likelihood:,.4f}\n"
"Final log-likelihood: {final_log_likelihood:,.4f}\n"
"McFadden R^2: {r2:.4f}\n"
"Elapsed time: {elapsed_time}\n"
"-------------------------------------------------------------------------".format(
method=self.results["method"],
success=self.results["success"],
message=self.results["message"],
pmle=self.results["pmle"],
pmle_lambda=self.results["pmle_lamda"],
initial_log_likelihood=self.results["initial_log_likelihood"],
final_log_likelihood=self.results["final_log_likelihood"],
r2=self.results["mcfadden_r2"],
elapsed_time=elapsed_time_to_str(self.results["elapsed_time"])))
sys.stdout.flush()
return None
[docs] def predict_proba(self, train: bool = False) -> np.ndarray:
"""Predict class probabilities for the train or test kernel.
Args:
train: A boolean that indicates if the probability estimates belong to the training set (True) or test
set (False), only in the case that a test kernel matrix is defined. Default: False.
Returns:
Probability of the sample for each class in the model.
"""
if self._K is None:
msg = "Training kernel not found or not correctly defined. Use set_kernel_test() to compute it."
logger_error(msg)
raise RuntimeError(msg)
if train:
# Create the Calcs instance
calcs = KernelCalcs(K=self._K)
else:
if self._K_test is None:
msg = "First you must compute the kernel for the test dataset using set_kernel_test()."
logger_error(msg)
raise RuntimeError(msg)
# Create the Calcs instance
calcs = KernelCalcs(K=self._K_test)
if self.results is None:
msg = "First you must estimate the model using fit()."
logger_error(msg)
raise RuntimeError(msg)
proba = calcs.calc_probabilities(self.results["alpha"])
return proba
[docs] def predict_log_proba(self, train: bool = False) -> np.ndarray:
"""Predict the natural logarithm of the class probabilities for the train or test kernel.
Args:
train: A boolean that indicates if the probability estimates belong to the training set (True) or test
set (False), only in the case that a test kernel matrix is defined. Default: False.
Returns:
Log-probability of the sample for each class in the model.
"""
proba = self.predict_proba(train)
return np.log(proba)
[docs] def predict(self, train: bool = False) -> np.ndarray:
"""Predict class for the train or test kernel.
Args:
train: A boolean that indicates if the prediction belong to the training set (True) or test
set (False), only in the case that a test kernel matrix is defined. Default: False.
Returns:
Vector containing the class labels of the sample.
"""
if self._K is None or self._K.alternatives is None:
msg = "Training kernel not found or not correctly defined. Use set_kernel_test() to compute it."
logger_error(msg)
raise RuntimeError(msg)
proba = self.predict_proba(train)
encoded_labels = np.argmax(proba, axis=1)
return self._K.get_alternatives().take(encoded_labels)
[docs] def score(self) -> float | np.float64:
"""Predict the mean accuracy on the test kernel.
Returns:
Mean accuracy of `self.predict()`.
"""
if self.choice_column is None:
msg = "First you must compute the kernel for the train dataset using set_kernel_train()."
logger_error(msg)
raise RuntimeError(msg)
if self._K_test is None or self._Z is None:
msg = "First you must compute the kernel for the test dataset using set_kernel_test()."
logger_error(msg)
raise RuntimeError(msg)
y_true = self._Z[self.choice_column]
y_predict = self.predict()
score = np.average(y_true == y_predict)
return score