diff --git a/CATS/inputs.py b/CATS/inputs.py index 15b233c..d7eef2d 100644 --- a/CATS/inputs.py +++ b/CATS/inputs.py @@ -1,64 +1,104 @@ from collections import OrderedDict, namedtuple from typing import List, Literal, Union +import torch.nn as nn + DEFAULT_GROUP_NAME = "default_group" -class SparseFeat(namedtuple('SparseFeat', - ['name', 'vocabulary_size', 'embedding_dim', 'use_hash', 'dtype', 'embedding_name', - 'group_name'])): +class SparseFeat( + namedtuple( + "SparseFeat", + [ + "name", + "vocabulary_size", + "embedding_dim", + "use_hash", + "dtype", + "embedding_name", + "group_name", + ], + ) +): + """ + Returns information about a single categorical data. + :param name: feature's name + :param vocabulary_size: input category name + :param embedding_dim: Converted embedding's dimension + :param use_hash: whether to use hash + :param dtype: data's type + :param embedding_name: embedding's name + :param group_name: group's name """ - Returns information about a single categorical data. - :param name: feature's name - :param vocabulary_size: input category name - :param embedding_dim: Converted embedding's dimension - :param use_hash: whether to use hash - :param dtype: data's type - :param embedding_name: embedding's name - :param group_name: group's name - """ + __slots__ = () - def __new__(cls, name: str, vocabulary_size: int, embedding_dim=4, use_hash=False, dtype="int32", embedding_name=None, - group_name=DEFAULT_GROUP_NAME): + def __new__( + cls, + name: str, + vocabulary_size: int, + embedding_dim=4, + use_hash=False, + dtype="int32", + embedding_name=None, + group_name=DEFAULT_GROUP_NAME, + ): if embedding_name is None: embedding_name = name - elif embedding_dim == 'auto': + elif embedding_dim == "auto": embedding_dim = 6 * int(pow(vocabulary_size, 0.25)) if use_hash: - raise NotImplementedError("Feature hashing is not supported in PyTorch version. " - "Please use TensorFlow or disable hashing.") - return super(SparseFeat, cls).__new__(cls, name, vocabulary_size, embedding_dim, use_hash, dtype, - embedding_name, group_name) + raise NotImplementedError( + "Feature hashing is not supported in PyTorch version. " + "Please use TensorFlow or disable hashing." + ) + return super(SparseFeat, cls).__new__( + cls, + name, + vocabulary_size, + embedding_dim, + use_hash, + dtype, + embedding_name, + group_name, + ) def __hash__(self): """ - Determines the hash value based on the name. - :return: self.name's hash - """ + Determines the hash value based on the name. + :return: self.name's hash + """ return self.name.__hash__() -class VarLenSparseFeat(namedtuple('VarLenSparseFeat', - ['sparsefeat', 'maxlen', 'combiner', 'length_name'])): +class VarLenSparseFeat( + namedtuple("VarLenSparseFeat", ["sparsefeat", "maxlen", "combiner", "length_name"]) +): __slots__ = () - def __new__(cls, sparsefeat: SparseFeat, maxlen: int, combiner: Literal['mean', 'max', 'sum'] = 'mean', - length_name=None): + def __new__( + cls, + sparsefeat: SparseFeat, + maxlen: int, + combiner: Literal["mean", "max", "sum"] = "mean", + length_name=None, + ): + """ + :param sparsefeat: a single categorical data's info namedtuple + :param maxlen: maximum categories length + :param combiner: combining method for features ('sum', 'mean', 'max') + :param length_name: feature length name """ - :param sparsefeat: a single categorical data's info namedtuple - :param maxlen: maximum categories length - :param combiner: combining method for features ('sum', 'mean', 'max') - :param length_name: feature length name - """ - return super(VarLenSparseFeat, cls).__new__(cls, sparsefeat, maxlen, combiner, length_name) + return super(VarLenSparseFeat, cls).__new__( + cls, sparsefeat, maxlen, combiner, length_name + ) @property def name(self): """ - VarLenSparseFeat's name - :return: sparsefeat.name - """ + VarLenSparseFeat's name + :return: sparsefeat.name + """ return self.sparsefeat.name @property @@ -72,17 +112,17 @@ def vocabulary_size(self): @property def embedding_dim(self): """ - VarLenSparseFeat's embedding dimension - :return: sparsefeat.embedding_dim - """ + VarLenSparseFeat's embedding dimension + :return: sparsefeat.embedding_dim + """ return self.sparsefeat.embedding_dim @property def use_hash(self): """ - whether to use hash - :return: sparsefeat.use_hash - """ + whether to use hash + :return: sparsefeat.use_hash + """ return self.sparsefeat.use_hash @property @@ -111,22 +151,21 @@ def group_name(self): def __hash__(self): """ - Determines the hash value based on the name. - :return: self.name's hash - """ + Determines the hash value based on the name. + :return: self.name's hash + """ return self.name.__hash__() -class DenseFeat(namedtuple('Dense', - ['name', 'dimension', 'dtype'])): +class DenseFeat(namedtuple("Dense", ["name", "dimension", "dtype"])): __slots__ = () def __new__(cls, name: str, dimension=1, dtype="float32"): """ - Returns information about a numeric data. - :param name: numeric data's attribute name - :param dimension: dimension number - :param dtype: data's type + Returns information about a numeric data. + :param name: numeric data's attribute name + :param dimension: dimension number + :param dtype: data's type """ if dimension < 0 and not isinstance(dimension, int): raise ValueError("dimension must bigger then 0 and must be integer ") @@ -134,13 +173,15 @@ def __new__(cls, name: str, dimension=1, dtype="float32"): def __hash__(self): """ - Determines the hash value based on the name. - :return: self.name's hash + Determines the hash value based on the name. + :return: self.name's hash """ return self.name.__hash__() -def get_feature_names(feature_columns: List[Union[SparseFeat, DenseFeat, VarLenSparseFeat]]) -> list: +def get_feature_names( + feature_columns: List[Union[SparseFeat, DenseFeat, VarLenSparseFeat]] +) -> list: """ Get list of feature names :param feature_columns: list about feature instances (SparseFeat, DenseFeat, VarLenSparseFeat) @@ -149,15 +190,23 @@ def get_feature_names(feature_columns: List[Union[SparseFeat, DenseFeat, VarLenS if feature_columns is None: raise ValueError("feature_columns is None. feature_columns must be list") if not isinstance(feature_columns, list): - raise ValueError(f"feature_columns is {type(feature_columns)}, feature_columns must be list.") - if not all(isinstance(feature, (SparseFeat, DenseFeat, VarLenSparseFeat)) for feature in feature_columns): + raise ValueError( + f"feature_columns is {type(feature_columns)}, feature_columns must be list." + ) + if not all( + isinstance(feature, (SparseFeat, DenseFeat, VarLenSparseFeat)) + for feature in feature_columns + ): raise TypeError( - "All elements in feature_columns must be instances of SparseFeat, DenseFeat or VarLenSparseFeat.") + "All elements in feature_columns must be instances of SparseFeat, DenseFeat or VarLenSparseFeat." + ) features = build_input_features(feature_columns) return list(features.keys()) -def build_input_features(feature_columns: List[Union[SparseFeat, DenseFeat, VarLenSparseFeat]]) -> dict: +def build_input_features( + feature_columns: List[Union[SparseFeat, DenseFeat, VarLenSparseFeat]] +) -> dict: """ Return an input feature dictionary based on various types of features (SparseFeat, DenseFeat, VarLenSparseFeat). input feature dictionary stores the start and end inices of each feature, helping the model identify the location of @@ -176,15 +225,56 @@ def build_input_features(feature_columns: List[Union[SparseFeat, DenseFeat, VarL features[feat_name] = (curr_features_idx, curr_features_idx + 1) curr_features_idx += 1 elif isinstance(feat, DenseFeat): - features[feat_name] = (curr_features_idx, curr_features_idx + feat.dimension) + features[feat_name] = ( + curr_features_idx, + curr_features_idx + feat.dimension, + ) curr_features_idx += feat.dimension elif isinstance(feat, VarLenSparseFeat): features[feat_name] = (curr_features_idx, curr_features_idx + feat.maxlen) curr_features_idx += feat.maxlen if feat.length_name is not None and feat.length_name not in features: - features[feat.length_name] = (curr_features_idx, curr_features_idx+1) + features[feat.length_name] = (curr_features_idx, curr_features_idx + 1) curr_features_idx += 1 else: raise TypeError(f"Invalid feature column type, got {type(feat)}") return features + +def create_embedding_matrix( + feature_columns: List[Union[SparseFeat, DenseFeat, VarLenSparseFeat]], + init_std: float = 0.0001, + linear: bool = False, + sparse: bool = False, + device: Literal["cuda", "gpu", "mps"] = "cpu", +) -> nn.ModuleDict: + """ + Create embedding matrix. return embedding matrix {feature columns name: nn.Embedding} + :param feature_columns: list about feature instances (SparseFeat, DenseFeat, VarLenSparseFeat) + :param init_std: initial standard deviation + :param linear: embedding dimension is 1 + :param sparse: If True, the gradient by the weight matrix will be a sparse tensor. + :param device: cpu, cuda or mps + :return: embedding dictionary. {feature columns name: nn.Embedding} + """ + sparse_feature_columns = [x for x in feature_columns if isinstance(x, SparseFeat)] + + varlen_sparse_feature_columns = [ + x for x in feature_columns if isinstance(x, VarLenSparseFeat) + ] + + embedding_dict = nn.ModuleDict( + { + feat.embedding_name: nn.Embedding( + feat.vocabulary_size, + feat.embeddin_dim if not linear else 1, + sparse=sparse, + ) + for feat in sparse_feature_columns + varlen_sparse_feature_columns + } + ) + + for tensor in embedding_dict.values(): + nn.init.normal_(tensor.weight, mean=0, std=init_std) + + return embedding_dict.to(device)