Skip to content

[feat] 각 feature의 embedding 생성 코드 구현 #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 155 additions & 59 deletions CATS/inputs.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,104 @@
from collections import OrderedDict, namedtuple
from typing import List, Literal, Union

import torch.nn as nn

DEFAULT_GROUP_NAME = "default_group"


class SparseFeat(namedtuple('SparseFeat',
['name', 'vocabulary_size', 'embedding_dim', 'use_hash', 'dtype', 'embedding_name',
'group_name'])):
class SparseFeat(
namedtuple(
"SparseFeat",
[
"name",
"vocabulary_size",
"embedding_dim",
"use_hash",
"dtype",
"embedding_name",
"group_name",
],
)
):
"""
Returns information about a single categorical data.
:param name: feature's name
:param vocabulary_size: input category name
:param embedding_dim: Converted embedding's dimension
:param use_hash: whether to use hash
:param dtype: data's type
:param embedding_name: embedding's name
:param group_name: group's name
"""
Returns information about a single categorical data.
:param name: feature's name
:param vocabulary_size: input category name
:param embedding_dim: Converted embedding's dimension
:param use_hash: whether to use hash
:param dtype: data's type
:param embedding_name: embedding's name
:param group_name: group's name
"""

__slots__ = ()

def __new__(cls, name: str, vocabulary_size: int, embedding_dim=4, use_hash=False, dtype="int32", embedding_name=None,
group_name=DEFAULT_GROUP_NAME):
def __new__(
cls,
name: str,
vocabulary_size: int,
embedding_dim=4,
use_hash=False,
dtype="int32",
embedding_name=None,
group_name=DEFAULT_GROUP_NAME,
):
if embedding_name is None:
embedding_name = name
elif embedding_dim == 'auto':
elif embedding_dim == "auto":
embedding_dim = 6 * int(pow(vocabulary_size, 0.25))
if use_hash:
raise NotImplementedError("Feature hashing is not supported in PyTorch version. "
"Please use TensorFlow or disable hashing.")
return super(SparseFeat, cls).__new__(cls, name, vocabulary_size, embedding_dim, use_hash, dtype,
embedding_name, group_name)
raise NotImplementedError(
"Feature hashing is not supported in PyTorch version. "
"Please use TensorFlow or disable hashing."
)
return super(SparseFeat, cls).__new__(
cls,
name,
vocabulary_size,
embedding_dim,
use_hash,
dtype,
embedding_name,
group_name,
)

def __hash__(self):
"""
Determines the hash value based on the name.
:return: self.name's hash
"""
Determines the hash value based on the name.
:return: self.name's hash
"""
return self.name.__hash__()


class VarLenSparseFeat(namedtuple('VarLenSparseFeat',
['sparsefeat', 'maxlen', 'combiner', 'length_name'])):
class VarLenSparseFeat(
namedtuple("VarLenSparseFeat", ["sparsefeat", "maxlen", "combiner", "length_name"])
):
__slots__ = ()

def __new__(cls, sparsefeat: SparseFeat, maxlen: int, combiner: Literal['mean', 'max', 'sum'] = 'mean',
length_name=None):
def __new__(
cls,
sparsefeat: SparseFeat,
maxlen: int,
combiner: Literal["mean", "max", "sum"] = "mean",
length_name=None,
):
"""
:param sparsefeat: a single categorical data's info namedtuple
:param maxlen: maximum categories length
:param combiner: combining method for features ('sum', 'mean', 'max')
:param length_name: feature length name
"""
:param sparsefeat: a single categorical data's info namedtuple
:param maxlen: maximum categories length
:param combiner: combining method for features ('sum', 'mean', 'max')
:param length_name: feature length name
"""
return super(VarLenSparseFeat, cls).__new__(cls, sparsefeat, maxlen, combiner, length_name)
return super(VarLenSparseFeat, cls).__new__(
cls, sparsefeat, maxlen, combiner, length_name
)

@property
def name(self):
"""
VarLenSparseFeat's name
:return: sparsefeat.name
"""
VarLenSparseFeat's name
:return: sparsefeat.name
"""
return self.sparsefeat.name

@property
Expand All @@ -72,17 +112,17 @@ def vocabulary_size(self):
@property
def embedding_dim(self):
"""
VarLenSparseFeat's embedding dimension
:return: sparsefeat.embedding_dim
"""
VarLenSparseFeat's embedding dimension
:return: sparsefeat.embedding_dim
"""
return self.sparsefeat.embedding_dim

@property
def use_hash(self):
"""
whether to use hash
:return: sparsefeat.use_hash
"""
whether to use hash
:return: sparsefeat.use_hash
"""
return self.sparsefeat.use_hash

@property
Expand Down Expand Up @@ -111,36 +151,37 @@ def group_name(self):

def __hash__(self):
"""
Determines the hash value based on the name.
:return: self.name's hash
"""
Determines the hash value based on the name.
:return: self.name's hash
"""
return self.name.__hash__()


class DenseFeat(namedtuple('Dense',
['name', 'dimension', 'dtype'])):
class DenseFeat(namedtuple("Dense", ["name", "dimension", "dtype"])):
__slots__ = ()

def __new__(cls, name: str, dimension=1, dtype="float32"):
"""
Returns information about a numeric data.
:param name: numeric data's attribute name
:param dimension: dimension number
:param dtype: data's type
Returns information about a numeric data.
:param name: numeric data's attribute name
:param dimension: dimension number
:param dtype: data's type
"""
if dimension < 0 and not isinstance(dimension, int):
raise ValueError("dimension must bigger then 0 and must be integer ")
return super(DenseFeat, cls).__new__(cls, name, dimension, dtype)

def __hash__(self):
"""
Determines the hash value based on the name.
:return: self.name's hash
Determines the hash value based on the name.
:return: self.name's hash
"""
return self.name.__hash__()


def get_feature_names(feature_columns: List[Union[SparseFeat, DenseFeat, VarLenSparseFeat]]) -> list:
def get_feature_names(
feature_columns: List[Union[SparseFeat, DenseFeat, VarLenSparseFeat]]
) -> list:
"""
Get list of feature names
:param feature_columns: list about feature instances (SparseFeat, DenseFeat, VarLenSparseFeat)
Expand All @@ -149,15 +190,23 @@ def get_feature_names(feature_columns: List[Union[SparseFeat, DenseFeat, VarLenS
if feature_columns is None:
raise ValueError("feature_columns is None. feature_columns must be list")
if not isinstance(feature_columns, list):
raise ValueError(f"feature_columns is {type(feature_columns)}, feature_columns must be list.")
if not all(isinstance(feature, (SparseFeat, DenseFeat, VarLenSparseFeat)) for feature in feature_columns):
raise ValueError(
f"feature_columns is {type(feature_columns)}, feature_columns must be list."
)
if not all(
isinstance(feature, (SparseFeat, DenseFeat, VarLenSparseFeat))
for feature in feature_columns
):
raise TypeError(
"All elements in feature_columns must be instances of SparseFeat, DenseFeat or VarLenSparseFeat.")
"All elements in feature_columns must be instances of SparseFeat, DenseFeat or VarLenSparseFeat."
)
features = build_input_features(feature_columns)
return list(features.keys())


def build_input_features(feature_columns: List[Union[SparseFeat, DenseFeat, VarLenSparseFeat]]) -> dict:
def build_input_features(
feature_columns: List[Union[SparseFeat, DenseFeat, VarLenSparseFeat]]
) -> dict:
"""
Return an input feature dictionary based on various types of features (SparseFeat, DenseFeat, VarLenSparseFeat).
input feature dictionary stores the start and end inices of each feature, helping the model identify the location of
Expand All @@ -176,15 +225,62 @@ def build_input_features(feature_columns: List[Union[SparseFeat, DenseFeat, VarL
features[feat_name] = (curr_features_idx, curr_features_idx + 1)
curr_features_idx += 1
elif isinstance(feat, DenseFeat):
features[feat_name] = (curr_features_idx, curr_features_idx + feat.dimension)
features[feat_name] = (
curr_features_idx,
curr_features_idx + feat.dimension,
)
curr_features_idx += feat.dimension
elif isinstance(feat, VarLenSparseFeat):
features[feat_name] = (curr_features_idx, curr_features_idx + feat.maxlen)
curr_features_idx += feat.maxlen
if feat.length_name is not None and feat.length_name not in features:
features[feat.length_name] = (curr_features_idx, curr_features_idx+1)
features[feat.length_name] = (curr_features_idx, curr_features_idx + 1)
curr_features_idx += 1
else:
raise TypeError(f"Invalid feature column type, got {type(feat)}")
return features


def create_embedding_matrix(
feature_columns: List[Union[SparseFeat, DenseFeat, VarLenSparseFeat]],
init_std: float = 0.0001,
linear: bool = False,
sparse: bool = False,
device: Literal["cuda", "gpu", "mps"] = "cpu",
) -> nn.ModuleDict:
"""
Create embedding matrix. return embedding matrix {feature columns name: nn.Embedding}
:param feature_columns: list about feature instances (SparseFeat, DenseFeat, VarLenSparseFeat)
:param init_std: initial standard deviation
:param linear: embedding dimension is 1
:param sparse: If True, the gradient by the weight matrix will be a sparse tensor.
:param device: cpu, cuda or mps
:return: embedding dictionary. {feature columns name: nn.Embedding}
"""
sparse_feature_columns = (
list(filter(lambda x: isinstance(x, SparseFeat), feature_columns))
if len(feature_columns)
else []
)

varlen_sparse_feature_columns = (
list(filter(lambda x: isinstance(x, VarLenSparseFeat), feature_columns))
if len(feature_columns)
else []
)

embedding_dict = nn.ModuleDict(
{
feat.embedding_name: nn.Embedding(
feat.vocabulary_size,
feat.embeddin_dim if not linear else 1,
sparse=sparse,
)
for feat in sparse_feature_columns + varlen_sparse_feature_columns
}
)

for tensor in embedding_dict.values():
nn.init.normal_(tensor.weight, mean=0, std=init_std)

return embedding_dict.to(device)
Loading