From 86bbb9d416a2fc328f9b94713dd09aad9b593f4b Mon Sep 17 00:00:00 2001 From: Zihang Xu <1099498275@qq.com> Date: Mon, 18 Nov 2024 22:56:07 -0600 Subject: [PATCH] anomaly detection --- ltsm/data_pipeline/__init__.py | 4 +- ltsm/data_pipeline/anormly_pipeline.py | 208 +++++++++++++ ltsm/data_pipeline/data_pipeline.py | 2 + ltsm/data_pipeline/tokenizer_pipeline.py | 276 ++++++++++++++++++ ltsm/data_provider/data_factory.py | 68 ++++- ltsm/data_provider/data_splitter.py | 10 +- ltsm/data_provider/dataset.py | 112 +++++-- .../tokenizer/standard_scaler.py | 12 +- .../stat_prompt/prompt_generate_split.py | 39 +-- tests/data_provider/data_factory_test.py | 9 +- tests/test_scripts/anomaly_main_ltsm.py | 7 + tests/test_scripts/main_tokenizer.py | 271 +---------------- tests/test_scripts/prompt_generation_norm.sh | 3 +- tests/test_scripts/train_anomaly_main_ltsm.sh | 33 +++ tests/test_scripts/train_ltsm_csv.sh | 26 +- .../test_scripts/train_ltsm_tokenizer_csv.sh | 4 +- 16 files changed, 726 insertions(+), 358 deletions(-) create mode 100644 ltsm/data_pipeline/anormly_pipeline.py create mode 100644 ltsm/data_pipeline/tokenizer_pipeline.py create mode 100644 tests/test_scripts/anomaly_main_ltsm.py create mode 100644 tests/test_scripts/train_anomaly_main_ltsm.sh diff --git a/ltsm/data_pipeline/__init__.py b/ltsm/data_pipeline/__init__.py index 775faaf..b0409a6 100644 --- a/ltsm/data_pipeline/__init__.py +++ b/ltsm/data_pipeline/__init__.py @@ -1,2 +1,4 @@ from .data_pipeline import TrainingPipeline, get_args, seed_all -from .model_manager import ModelManager \ No newline at end of file +from .model_manager import ModelManager +from .anormly_pipeline import AnomalyTrainingPipeline, anomaly_get_args, anomaly_seed_all +from .tokenizer_pipeline import TokenizerTrainingPipeline, tokenizer_get_args, tokenizer_seed_all \ No newline at end of file diff --git a/ltsm/data_pipeline/anormly_pipeline.py b/ltsm/data_pipeline/anormly_pipeline.py new file mode 100644 index 0000000..124a140 --- /dev/null +++ b/ltsm/data_pipeline/anormly_pipeline.py @@ -0,0 +1,208 @@ +"""Pipeline for Anormly Data Detection + Main Difference from the LTSM : + - pred_len == seq_len + - label is the anomaly label of input seq_len + - loss is CE/BCE + +""" + +import numpy as np +import torch +import argparse +import random +import ipdb +from torch import nn + +from ltsm.data_provider.data_factory import get_datasets +from ltsm.data_provider.data_loader import HF_Dataset +from ltsm.data_pipeline.model_manager import ModelManager + +import logging +from transformers import ( + Trainer, + TrainingArguments +) + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', +) + +class AnomalyModelManager(ModelManager): + def compute_loss(self, model, inputs, return_outputs=False): + """ + Computes the loss for model training. + + Args: + model (torch.nn.Module): The model used for predictions. + inputs (dict): Input data and labels. + return_outputs (bool): If True, returns both loss and model outputs. + + Returns: + torch.Tensor or tuple: The computed loss, and optionally the outputs. + """ + outputs = model(inputs["input_data"]) # output should be B, L, M + labels = inputs["labels"] + #print(outputs.shape, labels.shape) + #B, L, M, _ = outputs.shape + loss = nn.functional.cross_entropy(outputs, labels) + #loss = nn.functional.cross_entropy(outputs.reshape(B*L,-1), inputs["labels"][:,1:].long().reshape(B*L)) + return (loss, outputs) if return_outputs else loss + +class AnomalyTrainingPipeline(): + """ + A pipeline for managing the training and evaluation process of a machine learning model. + + Attributes: + args (argparse.Namespace): Arguments containing training configuration and hyperparameters. + model_manager (ModelManager): An instance responsible for creating, managing, and optimizing the model. + """ + def __init__(self, args: argparse.Namespace): + """ + Initializes the TrainingPipeline with given arguments and a model manager. + + Args: + args (argparse.Namespace): Contains training settings such as output directory, batch size, + learning rate, and other hyperparameters. + """ + self.args = args + self.model_manager = AnomalyModelManager(args) + + def run(self): + """ + Runs the training and evaluation process for the model. + + The process includes: + - Logging configuration and training arguments. + - Creating a model with the model manager. + - Setting up training and evaluation parameters. + - Loading and formatting training and evaluation datasets. + - Training the model and saving metrics and state. + - Evaluating the model on test datasets and logging metrics. + """ + logging.info(self.args) + + model = self.model_manager.create_model() + + # Training settings + training_args = TrainingArguments( + output_dir=self.args.output_dir, + per_device_train_batch_size=self.args.batch_size, + per_device_eval_batch_size=self.args.batch_size, + evaluation_strategy="steps", + num_train_epochs=self.args.train_epochs, + fp16=False, + save_steps=100, + eval_steps=25, + logging_steps=5, + learning_rate=self.args.learning_rate, + gradient_accumulation_steps=self.args.gradient_accumulation_steps, + save_total_limit=10, + remove_unused_columns=False, + push_to_hub=False, + load_best_model_at_end=True, + ) + + train_dataset, eval_dataset, test_datasets, _ = get_datasets(self.args) + train_dataset, eval_dataset= HF_Dataset(train_dataset), HF_Dataset(eval_dataset) + + trainer = Trainer( + model=model, + args=training_args, + data_collator=self.model_manager.collate_fn, + compute_metrics=self.model_manager.compute_metrics, + train_dataset=train_dataset, + eval_dataset=eval_dataset, + tokenizer=None, + optimizers=(self.model_manager.optimizer, self.model_manager.scheduler), + ) + + # Overload the trainer API + if not self.args.eval: + trainer.compute_loss = self.model_manager.compute_loss + trainer.prediction_step = self.model_manager.prediction_step + train_results = trainer.train() + trainer.save_model() + trainer.log_metrics("train", train_results.metrics) + trainer.save_metrics("train", train_results.metrics) + trainer.save_state() + + # Testing settings + for test_dataset in test_datasets: + trainer.compute_loss = self.model_manager.compute_loss + trainer.prediction_step = self.model_manager.prediction_step + test_dataset = HF_Dataset(test_dataset) + + metrics = trainer.evaluate(test_dataset) + trainer.log_metrics("Test", metrics) + trainer.save_metrics("Test", metrics) + +def anomaly_get_args(): + parser = argparse.ArgumentParser(description='LTSM') + + # Basic Config + parser.add_argument('--model_id', type=str, default='test_run', help='model id') + parser.add_argument('--model_name_or_path', type=str, default="gpt2-medium", help='model name') + parser.add_argument('--seed', type=int, default=2024, help='random seed') + parser.add_argument('--device', type=str, default="cuda:0") + parser.add_argument('--checkpoints', type=str, default='./checkpoints/') + + # Data Settings + parser.add_argument('--data_path', nargs='+', default='dataset/weather.csv', help='data files') + parser.add_argument('--test_data_path_list', nargs='+', required=True, help='test data file') + parser.add_argument('--prompt_data_path', type=str, default='./weather.csv', help='prompt data file') + parser.add_argument('--data_processing', type=str, default="standard_scaler", help='data processing method') + parser.add_argument('--train_ratio', type=float, default=0.7, help='train data ratio') + parser.add_argument('--val_ratio', type=float, default=0.1, help='validation data ratio') + parser.add_argument('--do_anomaly', type=bool, default=True, help='do anomaly detection') + + # Forecasting Settings + parser.add_argument('--seq_len', type=int, default=113, help='input sequence length') + parser.add_argument('--pred_len', type=int, default=None, help='prediction sequence length') + parser.add_argument('--prompt_len', type=int, default=133, help='prompt sequence length') + + # Model Settings + parser.add_argument('--lora', action="store_true", help='use lora') + parser.add_argument('--lora_dim', type=int, default=128, help='dimension of lora') + parser.add_argument('--gpt_layers', type=int, default=3, help='number of gpt layers') + parser.add_argument('--d_model', type=int, default=1024, help='dimension of model') + parser.add_argument('--n_heads', type=int, default=16, help='number of heads') + parser.add_argument('--d_ff', type=int, default=512, help='dimension of fcn') + parser.add_argument('--dropout', type=float, default=0.2, help='dropout') + parser.add_argument('--enc_in', type=int, default=1, help='encoder input size') + parser.add_argument('--c_out', type=int, default=862, help='output size') + parser.add_argument('--patch_size', type=int, default=16, help='patch size') + parser.add_argument('--pretrain', type=int, default=1, help='is pretrain') + parser.add_argument('--local_pretrain', type=str, default="None", help='local pretrain weight') + parser.add_argument('--freeze', type=int, default=1, help='is model weight frozen') + parser.add_argument('--model', type=str, default='model', help='model name, , options:[LTSM, LTSM_WordPrompt, LTSM_Tokenizer]') + parser.add_argument('--stride', type=int, default=8, help='stride') + parser.add_argument('--tmax', type=int, default=10, help='tmax') + + # Training Settings + parser.add_argument('--eval', type=int, default=0, help='evaluation') + parser.add_argument('--itr', type=int, default=1, help='experiments times') + parser.add_argument('--output_dir', type=str, default='output/ltsm_train_lr0005/', help='output directory') + parser.add_argument('--downsample_rate', type=int, default=100, help='downsample rate') + parser.add_argument('--llm_layers', type=int, default=32) + parser.add_argument('--decay_fac', type=float, default=0.75, help='decay factor') + parser.add_argument('--learning_rate', type=float, default=0.0001, help='learning rate') + parser.add_argument('--batch_size', type=int, default=512, help='batch size') + parser.add_argument('--num_workers', type=int, default=10, help='number of workers') + parser.add_argument('--train_epochs', type=int, default=1, help='number of epochs') + parser.add_argument('--lradj', type=str, default='type1', help='learning rate adjustment type') + parser.add_argument('--patience', type=int, default=3, help='early stopping patience') + parser.add_argument('--gradient_accumulation_steps', type=int, default=64, help='gradient accumulation steps') + args, unknown = parser.parse_known_args() + + if args.pred_len is None: + logging.info(f"Anomaly Mode, Set pred_len to seq_len") + args.pred_len = args.seq_len + + return args + + +def anomaly_seed_all(fixed_seed): + random.seed(fixed_seed) + torch.manual_seed(fixed_seed) + np.random.seed(fixed_seed) \ No newline at end of file diff --git a/ltsm/data_pipeline/data_pipeline.py b/ltsm/data_pipeline/data_pipeline.py index 1eef3bf..0a7a24a 100644 --- a/ltsm/data_pipeline/data_pipeline.py +++ b/ltsm/data_pipeline/data_pipeline.py @@ -101,6 +101,7 @@ def run(self): for test_dataset in test_datasets: trainer.compute_loss = self.model_manager.compute_loss trainer.prediction_step = self.model_manager.prediction_step + test_dataset = HF_Dataset(test_dataset) metrics = trainer.evaluate(test_dataset) trainer.log_metrics("Test", metrics) @@ -123,6 +124,7 @@ def get_args(): parser.add_argument('--data_processing', type=str, default="standard_scaler", help='data processing method') parser.add_argument('--train_ratio', type=float, default=0.7, help='train data ratio') parser.add_argument('--val_ratio', type=float, default=0.1, help='validation data ratio') + parser.add_argument('--do_anomaly', type=bool, default=False, help='do anomaly detection') # Forecasting Settings parser.add_argument('--seq_len', type=int, default=336, help='input sequence length') diff --git a/ltsm/data_pipeline/tokenizer_pipeline.py b/ltsm/data_pipeline/tokenizer_pipeline.py new file mode 100644 index 0000000..356f49a --- /dev/null +++ b/ltsm/data_pipeline/tokenizer_pipeline.py @@ -0,0 +1,276 @@ +"""Pipeline for tokenizer-ltsm. + Task: Time Series Forecasting. +""" + +import numpy as np +import torch +import argparse +import random +import ipdb +from torch import nn + +from ltsm.data_provider.data_factory import get_datasets +from ltsm.data_provider.data_loader import HF_Dataset +from ltsm.data_pipeline.model_manager import ModelManager +from ltsm.data_provider.tokenizer.tokenizer_processor import TokenizerConfig +from ltsm.models import get_model, LTSMConfig +from peft import get_peft_model, LoraConfig + +import logging +from transformers import ( + Trainer, + TrainingArguments +) + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', +) + +class TokenizerModelManager(ModelManager): + def __init__(self, args: argparse.Namespace): + """ + Initializes the ModelManager with provided arguments and default values for model, optimizer, and scheduler. + + Args: + args (argparse.Namespace): Training configurations and hyperparameters. + """ + super().__init__(args) + self.tokenizer = None + def create_tokenizer(self): + context_length = self.args.seq_len+self.args.pred_len + prediction_length = self.args.pred_len + n_tokens = 1024 + n_special_tokens = 2 + config = TokenizerConfig( + tokenizer_class="MeanScaleUniformBins", + tokenizer_kwargs=dict(low_limit=-3.0, high_limit=3.0), + n_tokens=n_tokens, + n_special_tokens=n_special_tokens, + pad_token_id=0, + eos_token_id=1, + use_eos_token=0, + model_type="causal", + context_length=context_length, + prediction_length=prediction_length, + num_samples=20, + temperature=1.0, + top_k=50, + top_p=1.0, + ) + + self.tokenizer = config.create_tokenizer() + def compute_loss(self, model, inputs, return_outputs=False): + """ + Computes the loss for model training. + + Args: + model (torch.nn.Module): The model used for predictions. + inputs (dict): Input data and labels. + return_outputs (bool): If True, returns both loss and model outputs. + + Returns: + torch.Tensor or tuple: The computed loss, and optionally the outputs. + """ + outputs = model(inputs["input_data"]) + B, L, M, _ = outputs.shape + loss = nn.functional.cross_entropy(outputs.reshape(B*L,-1), inputs["labels"][:,1:].long().reshape(B*L)) + return (loss, outputs) if return_outputs else loss + + @torch.no_grad() + def prediction_step(self, model, inputs, prediction_loss_only=False, ignore_keys=None): + input_data = inputs["input_data"].to(model.module.device) + labels = inputs["labels"].to(model.module.device) + scale = labels[:,0] + labels = labels[:,1:] + outputs = model(input_data) + indices = torch.max(outputs, dim=-1).indices + + output_value = self.tokenizer.output_transform(indices, scale) + label_value = self.tokenizer.output_transform(labels.unsqueeze(-1).long(), scale) + loss = nn.functional.mse_loss(output_value, label_value) + return (loss, output_value, label_value) + + def create_model(self): + """ + Initializes and configures the model based on specified arguments, including options for + freezing parameters or applying LoRA (Low-Rank Adaptation). + + Returns: + torch.nn.Module: The configured model ready for training. + """ + model_config = LTSMConfig(**vars(self.args)) + self.model = get_model(model_config) + + if self.args.lora: + peft_config = LoraConfig( + target_modules=["c_attn"], + inference_mode=False, + r=self.args.lora_dim, + lora_alpha=32, + lora_dropout=0.1 + ) + self.model = get_peft_model(self.model, peft_config) + self.model.print_trainable_parameters() + + elif self.args.freeze: + self.freeze_parameters() + + self.print_trainable_parameters() + self.create_tokenizer() + + # Optimizer settings + return self.model + +class TokenizerTrainingPipeline(): + """ + A pipeline for managing the training and evaluation process of a machine learning model. + + Attributes: + args (argparse.Namespace): Arguments containing training configuration and hyperparameters. + model_manager (ModelManager): An instance responsible for creating, managing, and optimizing the model. + """ + def __init__(self, args: argparse.Namespace): + """ + Initializes the TrainingPipeline with given arguments and a model manager. + + Args: + args (argparse.Namespace): Contains training settings such as output directory, batch size, + learning rate, and other hyperparameters. + """ + self.args = args + self.model_manager = TokenizerModelManager(args) + + def run(self): + """ + Runs the training and evaluation process for the model. + + The process includes: + - Logging configuration and training arguments. + - Creating a model with the model manager. + - Setting up training and evaluation parameters. + - Loading and formatting training and evaluation datasets. + - Training the model and saving metrics and state. + - Evaluating the model on test datasets and logging metrics. + """ + logging.info(self.args) + + model = self.model_manager.create_model() + + # Training settings + training_args = TrainingArguments( + output_dir=self.args.output_dir, + per_device_train_batch_size=self.args.batch_size, + per_device_eval_batch_size=self.args.batch_size, + evaluation_strategy="steps", + num_train_epochs=self.args.train_epochs, + fp16=False, + save_steps=100, + eval_steps=25, + logging_steps=5, + learning_rate=self.args.learning_rate, + gradient_accumulation_steps=self.args.gradient_accumulation_steps, + save_total_limit=10, + remove_unused_columns=False, + push_to_hub=False, + load_best_model_at_end=True, + ) + + train_dataset, eval_dataset, test_datasets, _ = get_datasets(self.args) + train_dataset, eval_dataset= HF_Dataset(train_dataset), HF_Dataset(eval_dataset) + + trainer = Trainer( + model=model, + args=training_args, + data_collator=self.model_manager.collate_fn, + compute_metrics=self.model_manager.compute_metrics, + train_dataset=train_dataset, + eval_dataset=eval_dataset, + tokenizer=None, + optimizers=(self.model_manager.optimizer, self.model_manager.scheduler), + ) + + # Overload the trainer API + if not self.args.eval: + trainer.compute_loss = self.model_manager.compute_loss + trainer.prediction_step = self.model_manager.prediction_step + train_results = trainer.train() + trainer.save_model() + trainer.log_metrics("train", train_results.metrics) + trainer.save_metrics("train", train_results.metrics) + trainer.save_state() + + # Testing settings + for test_dataset in test_datasets: + trainer.compute_loss = self.model_manager.compute_loss + trainer.prediction_step = self.model_manager.prediction_step + test_dataset = HF_Dataset(test_dataset) + + metrics = trainer.evaluate(test_dataset) + trainer.log_metrics("Test", metrics) + trainer.save_metrics("Test", metrics) + +def tokenizer_get_args(): + parser = argparse.ArgumentParser(description='LTSM') + + # Basic Config + parser.add_argument('--model_id', type=str, default='test_run', help='model id') + parser.add_argument('--model_name_or_path', type=str, default="gpt2-medium", help='model name') + parser.add_argument('--seed', type=int, default=2024, help='random seed') + parser.add_argument('--device', type=str, default="cuda:0") + parser.add_argument('--checkpoints', type=str, default='./checkpoints/') + + # Data Settings + parser.add_argument('--data_path', nargs='+', default='dataset/weather.csv', help='data files') + parser.add_argument('--test_data_path_list', nargs='+', required=True, help='test data file') + parser.add_argument('--prompt_data_path', type=str, default='./weather.csv', help='prompt data file') + parser.add_argument('--data_processing', type=str, default="standard_scaler", help='data processing method') + parser.add_argument('--train_ratio', type=float, default=0.7, help='train data ratio') + parser.add_argument('--val_ratio', type=float, default=0.1, help='validation data ratio') + parser.add_argument('--do_anomaly', type=bool, default=False, help='do anomaly detection') + + # Forecasting Settings + parser.add_argument('--seq_len', type=int, default=336, help='input sequence length') + parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length') + parser.add_argument('--prompt_len', type=int, default=133, help='prompt sequence length') + + # Model Settings + parser.add_argument('--lora', action="store_true", help='use lora') + parser.add_argument('--lora_dim', type=int, default=128, help='dimension of lora') + parser.add_argument('--gpt_layers', type=int, default=3, help='number of gpt layers') + parser.add_argument('--d_model', type=int, default=1024, help='dimension of model') + parser.add_argument('--n_heads', type=int, default=16, help='number of heads') + parser.add_argument('--d_ff', type=int, default=512, help='dimension of fcn') + parser.add_argument('--dropout', type=float, default=0.2, help='dropout') + parser.add_argument('--enc_in', type=int, default=1, help='encoder input size') + parser.add_argument('--c_out', type=int, default=862, help='output size') + parser.add_argument('--patch_size', type=int, default=16, help='patch size') + parser.add_argument('--pretrain', type=int, default=1, help='is pretrain') + parser.add_argument('--local_pretrain', type=str, default="None", help='local pretrain weight') + parser.add_argument('--freeze', type=int, default=1, help='is model weight frozen') + parser.add_argument('--model', type=str, default='model', help='model name, , options:[LTSM, LTSM_WordPrompt, LTSM_Tokenizer]') + parser.add_argument('--stride', type=int, default=8, help='stride') + parser.add_argument('--tmax', type=int, default=10, help='tmax') + + # Training Settings + parser.add_argument('--eval', type=int, default=0, help='evaluation') + parser.add_argument('--itr', type=int, default=1, help='experiments times') + parser.add_argument('--output_dir', type=str, default='output/ltsm_train_lr0005/', help='output directory') + parser.add_argument('--downsample_rate', type=int, default=100, help='downsample rate') + parser.add_argument('--llm_layers', type=int, default=32) + parser.add_argument('--decay_fac', type=float, default=0.75, help='decay factor') + parser.add_argument('--learning_rate', type=float, default=0.0001, help='learning rate') + parser.add_argument('--batch_size', type=int, default=512, help='batch size') + parser.add_argument('--num_workers', type=int, default=10, help='number of workers') + parser.add_argument('--train_epochs', type=int, default=1, help='number of epochs') + parser.add_argument('--lradj', type=str, default='type1', help='learning rate adjustment type') + parser.add_argument('--patience', type=int, default=3, help='early stopping patience') + parser.add_argument('--gradient_accumulation_steps', type=int, default=64, help='gradient accumulation steps') + args, unknown = parser.parse_known_args() + + return args + +def tokenizer_seed_all(fixed_seed): + random.seed(fixed_seed) + torch.manual_seed(fixed_seed) + np.random.seed(fixed_seed) \ No newline at end of file diff --git a/ltsm/data_provider/data_factory.py b/ltsm/data_provider/data_factory.py index 438c145..35c7693 100644 --- a/ltsm/data_provider/data_factory.py +++ b/ltsm/data_provider/data_factory.py @@ -32,7 +32,8 @@ def __init__( model: str= None, scale_on_train: bool = False, downsample_rate: int = 10, - split_test_sets: bool = True + split_test_sets: bool = True, + do_anomaly: bool = False ): """ Initializes the DatasetFactory with the given arguments. @@ -58,6 +59,7 @@ def __init__( self.scale_on_train = scale_on_train self.downsample_rate = downsample_rate self.split_test_sets = split_test_sets + self.do_anomaly = do_anomaly # Initialize dataset splitter self.splitter = SplitterByTimestamp( @@ -168,6 +170,8 @@ def loadPrompts(self, data_path: str, prompt_data_path:str, buff: List[Union[int prompt_data.append([self.data_paths.index(data_path)]) else: for instance_idx in buff: + if instance_idx == 'anomaly': + continue # anomaly is the label for anomaly data, no prompt data is available instance_prompt = self.__get_prompt( prompt_data_path, data_path, @@ -182,7 +186,7 @@ def loadPrompts(self, data_path: str, prompt_data_path:str, buff: List[Union[int return prompt_data, missing - def createTorchDS(self, data: List[np.ndarray], prompt_data: List[List[np.float64]], downsample_rate: int)->TSDataset: + def createTorchDS(self, data: List[np.ndarray], prompt_data: List[List[np.float64]], downsample_rate: int, do_anomaly:bool)->TSDataset: """ Creates a pyTorch Dataset from a list of sequences and a list of their corresponding prompts. @@ -203,7 +207,8 @@ def createTorchDS(self, data: List[np.ndarray], prompt_data: List[List[np.float6 prompt=prompt_data, seq_len=self.seq_len, pred_len=self.pred_len, - downsample_rate=downsample_rate + downsample_rate=downsample_rate, + do_anomaly=do_anomaly ) else: return TSPromptDataset( @@ -211,7 +216,8 @@ def createTorchDS(self, data: List[np.ndarray], prompt_data: List[List[np.float6 prompt=prompt_data, seq_len=self.seq_len, pred_len=self.pred_len, - downsample_rate=downsample_rate + downsample_rate=downsample_rate, + do_anomaly=do_anomaly ) def getDatasets(self)->Tuple[TSDataset, TSDataset, List[TSDataset]]: @@ -231,9 +237,16 @@ def getDatasets(self)->Tuple[TSDataset, TSDataset, List[TSDataset]]: for data_path in self.data_paths: # Step 0: Read data, the output is a list of 1-d time-series df_data = self.fetch(data_path) + #print(df_data.index) + #print(df_data.loc['anomaly'].iloc[:20]) + #print(df_data.shape) # Step 1: Get train, val, and test splits - sub_train_data, sub_val_data, sub_test_data, buff = self.splitter.get_csv_splits(df_data) + sub_train_data, sub_val_data, sub_test_data, buff = self.splitter.get_csv_splits(df_data, self.do_anomaly) + # print(len(sub_train_data), len(sub_val_data), len(sub_test_data)) + # print(sub_train_data[0].shape, sub_val_data[0].shape, sub_test_data[0].shape) + #print(sub_train_data[-1][:10]) + # Step 2: Scale the datasets. We fit on the whole sequence by default. # To fit on the train sequence only, set scale_on_train=True @@ -242,8 +255,10 @@ def getDatasets(self)->Tuple[TSDataset, TSDataset, List[TSDataset]]: train_data=sub_train_data, val_data=sub_val_data, test_data=sub_test_data, - fit_train_only=self.scale_on_train + fit_train_only=self.scale_on_train, + do_anomaly=self.do_anomaly ) + #print(type(sub_train_data), type(sub_val_data), type(sub_test_data)) logging.info(f"Data {data_path} has been split into train, val, test sets with the following shapes: {sub_train_data[0].shape}, {sub_val_data[0].shape}, {sub_test_data[0].shape}") # Step 2.5: Load prompt for each instance @@ -275,28 +290,53 @@ def getDatasets(self)->Tuple[TSDataset, TSDataset, List[TSDataset]]: val_prompts = [data for data, instance_idx in zip(val_prompts, buff) if instance_idx not in missing] sub_test_prompt_data = [data for data, instance_idx in zip(sub_test_prompt_data, buff) if instance_idx not in missing] + # print(len(sub_train_data), len(sub_val_data), len(sub_test_data)) + # print(sub_train_data[0].shape, sub_val_data[0].shape, sub_test_data[0].shape) + + if self.do_anomaly: + label_train = sub_train_data[-1] + label_val = sub_val_data[-1] + label_test = sub_test_data[-1] + + sub_train_data = [[(x,y) for x,y in zip(data, label_train)] for data in sub_train_data[:-1]] + sub_val_data = [[(x,y) for x,y in zip(data, label_val)] for data in sub_val_data[:-1]] + sub_test_data = [[(x,y) for x,y in zip(data, label_test)] for data in sub_test_data[:-1]] + + # print(len(sub_train_data), len(sub_val_data), len(sub_test_data)) + # print(sub_train_data[0][0:2], sub_val_data[0][0:2], sub_test_data[0][0:2]) + + train_prompt_data.extend(train_prompts) val_prompt_data.extend(val_prompts) train_data.extend(sub_train_data) val_data.extend(sub_val_data) + + + if self.split_test_sets: # Create a Torch dataset for each sub test dataset - test_ds_list.append(self.createTorchDS(sub_test_data, sub_test_prompt_data, 1)) + test_ds_list.append(self.createTorchDS(sub_test_data, sub_test_prompt_data, 1, self.do_anomaly)) else: test_data.extend(sub_test_data) test_prompt_data.extend(sub_test_prompt_data) # Step 3: Create Torch datasets (samplers) - train_ds = self.createTorchDS(train_data, train_prompt_data, self.downsample_rate) + # print("====================================") + # print("len(train_data):", len(train_data)) + # print("len(train_data[0]):", len(train_data[0]),len(train_data[1])) + # print("len(train_data[-1]):",len(train_data[-1]),len(train_data[-2])) + # print("====================================") + + train_ds = self.createTorchDS(train_data, train_prompt_data, self.downsample_rate, self.do_anomaly) if os.path.split(os.path.dirname(self.data_paths[0]))[-1] == "monash": - val_ds = self.createTorchDS(val_data, val_prompt_data, 54) + val_ds = self.createTorchDS(val_data, val_prompt_data, 54, self.do_anomaly) else: - val_ds = self.createTorchDS(val_data, val_prompt_data, self.downsample_rate) + val_ds = self.createTorchDS(val_data, val_prompt_data, self.downsample_rate, self.do_anomaly) if not self.split_test_sets: - test_ds_list.append(self.createTorchDS(test_data, test_prompt_data, 1)) + test_ds_list.append(self.createTorchDS(test_data, test_prompt_data, 1, self.do_anomaly)) return train_ds, val_ds, test_ds_list @@ -310,7 +350,8 @@ def get_datasets(args): train_ratio=args.train_ratio, val_ratio=args.val_ratio, model=args.model, - downsample_rate=args.downsample_rate + downsample_rate=args.downsample_rate, + do_anomaly=args.do_anomaly ) train_ds, val_ds, test_ds_list= ds_factory.getDatasets() @@ -327,7 +368,8 @@ def get_data_loaders(args): train_ratio=args.train_ratio, val_ratio=args.val_ratio, model=args.model, - split_test_sets=False + split_test_sets=False, + do_anomaly=args.do_anomaly ) train_dataset, val_dataset, test_datasets = dataset_factory.getDatasets() print(f"Data loaded, train size {len(train_dataset)}, val size {len(val_dataset)}") diff --git a/ltsm/data_provider/data_splitter.py b/ltsm/data_provider/data_splitter.py index 9fc7999..953e05e 100644 --- a/ltsm/data_provider/data_splitter.py +++ b/ltsm/data_provider/data_splitter.py @@ -31,7 +31,7 @@ def __init__(self, seq_len: int, pred_len: int, train_ratio: float, val_ratio: f self.val_ratio = val_ratio - def get_csv_splits(self, df_data: pd.DataFrame) -> Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray], List[np.ndarray]]: + def get_csv_splits(self, df_data: pd.DataFrame, do_anomaly: bool=False) -> Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray], List[np.ndarray]]: """ Splits the .csv data into training-validation-training sets. @@ -54,8 +54,12 @@ def get_csv_splits(self, df_data: pd.DataFrame) -> Tuple[List[np.ndarray], List[ num_train = int(len(sequence) * self.train_ratio) num_val = int(len(sequence) * self.val_ratio) - if num_train < self.seq_len + self.pred_len: - continue + if not do_anomaly: + if num_train < self.seq_len + self.pred_len: + continue + else: + if num_train < self.seq_len: + continue # We also add the previous seq_len points to the val and test sets diff --git a/ltsm/data_provider/dataset.py b/ltsm/data_provider/dataset.py index c97249c..9572811 100644 --- a/ltsm/data_provider/dataset.py +++ b/ltsm/data_provider/dataset.py @@ -5,39 +5,69 @@ from ltsm.data_provider.tokenizer.tokenizer_processor import TokenizerConfig +import logging + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', +) + class TSDataset(Dataset): def __init__( self, data, seq_len, pred_len, + do_anomaly=False, ): self.data = data self.seq_len = seq_len - self.pred_len = pred_len + self.do_anomaly = do_anomaly + + if not self.do_anomaly: + self.pred_len = pred_len + else: + self.pred_len = self.seq_len # Create a map from item index to sequence index and offset self.num_items = 0 self.item2sequence, self.item2offset = [], [] for sequence_index, sequence in enumerate(self.data): - assert len(sequence) >= self.seq_len + self.pred_len, f"Sequence must have a lenth with at least seq_len + pred_len, the current length is {len(sequence)}" + if not self.do_anomaly: + assert len(sequence) >= self.seq_len + self.pred_len, f"Sequence must have a lenth with at least seq_len + pred_len, the current length is {len(sequence)}" + window_size = len(sequence) - self.seq_len - self.pred_len + 1 + else: + window_size = len(sequence) - self.seq_len + 1 cur_offset = 0 - for _ in range(len(sequence) - self.seq_len - self.pred_len + 1): + for _ in range(window_size): self.item2sequence.append(sequence_index) self.item2offset.append(cur_offset) cur_offset += 1 self.num_items += 1 def __getitem__(self, index): - sequence_index = self.item2sequence[index] - x_begin = self.item2offset[index] - x_end = x_begin + self.seq_len - y_begin = x_end - y_end = y_begin + self.pred_len - - seq_x = torch.from_numpy(np.expand_dims(self.data[sequence_index][x_begin:x_end], -1)) - seq_y = torch.from_numpy(np.expand_dims(self.data[sequence_index][y_begin:y_end], -1)) + if not self.do_anomaly: + sequence_index = self.item2sequence[index] + x_begin = self.item2offset[index] + x_end = x_begin + self.seq_len + y_begin = x_end + y_end = y_begin + self.pred_len + + seq_x = torch.from_numpy(np.expand_dims(self.data[sequence_index][x_begin:x_end], -1)) + seq_y = torch.from_numpy(np.expand_dims(self.data[sequence_index][y_begin:y_end], -1)) + else: + sequence_index = self.item2sequence[index] + x_begin = self.item2offset[index] + x_end = x_begin + self.seq_len + y_begin = x_begin + y_end = x_end + + data_x = np.array([x for x,y in self.data[sequence_index][x_begin:x_end]]) + data_y = np.array([y for x,y in self.data[sequence_index][y_begin:y_end]]) + + seq_x = torch.from_numpy(np.expand_dims(data_x, -1)) + seq_y = torch.from_numpy(np.expand_dims(data_y, -1)) return seq_x, seq_y @@ -52,36 +82,62 @@ def __init__( seq_len, pred_len, downsample_rate=10, + do_anomaly=False, ): self.prompt = prompt self.seq_len = seq_len - self.pred_len = pred_len + if not do_anomaly: + self.pred_len = pred_len + else: + self.pred_len = self.seq_len self.num_items = 0 self.item2sequence, self.item2offset = [], [] self.data = data + self.do_anomaly = do_anomaly for sequence_index, sequence in enumerate(self.data): - assert len(sequence) >= self.seq_len + self.pred_len, f"Sequence must have a length with at least seq_len + pred_len, the current length is {len(sequence)}" + if not self.do_anomaly: + assert len(sequence) >= self.seq_len + self.pred_len, f"Sequence must have a lenth with at least seq_len + pred_len, the current length is {len(sequence)}" + window_size = len(sequence) - self.seq_len - self.pred_len + 1 + else: + window_size = len(sequence) - self.seq_len + 1 cur_offset = 0 - for cur_offset in range(0, len(sequence) - self.seq_len - self.pred_len + 1, downsample_rate): + for _ in range(window_size): self.item2sequence.append(sequence_index) self.item2offset.append(cur_offset) + cur_offset += 1 self.num_items += 1 def __getitem__(self, index): - sequence_index = self.item2sequence[index] - x_begin = self.item2offset[index] - x_end = x_begin + self.seq_len - y_begin = x_end - y_end = y_begin + self.pred_len - prompt= self.prompt[sequence_index] - - # prompt is a list, self.data[sequence_index][x_begin:x_end])is a numpy array with shape(seq_len,), like (366,) - seq_x = np.concatenate((prompt, self.data[sequence_index][x_begin:x_end])) - seq_x = torch.from_numpy(np.expand_dims(seq_x, -1)) - seq_y = torch.from_numpy(np.expand_dims(self.data[sequence_index][y_begin:y_end], -1)) + if not self.do_anomaly: + sequence_index = self.item2sequence[index] + x_begin = self.item2offset[index] + x_end = x_begin + self.seq_len + y_begin = x_end + y_end = y_begin + self.pred_len + prompt= self.prompt[sequence_index] + + # prompt is a list, self.data[sequence_index][x_begin:x_end])is a numpy array with shape(seq_len,), like (366,) + seq_x = np.concatenate((prompt, self.data[sequence_index][x_begin:x_end])) + seq_x = torch.from_numpy(np.expand_dims(seq_x, -1)) + seq_y = torch.from_numpy(np.expand_dims(self.data[sequence_index][y_begin:y_end], -1)) + else: + sequence_index = self.item2sequence[index] + x_begin = self.item2offset[index] + x_end = x_begin + self.seq_len + y_begin = x_begin + y_end = x_end + prompt= self.prompt[sequence_index] + + data_x = np.array([x for x,y in self.data[sequence_index][x_begin:x_end]]) + data_y = np.array([y for x,y in self.data[sequence_index][y_begin:y_end]]) + + seq_x = np.concatenate((prompt, data_x)) + seq_x = torch.from_numpy(np.expand_dims(seq_x, -1)) + seq_y = torch.from_numpy(np.expand_dims(data_y, -1)) + return seq_x, seq_y def __len__(self): @@ -95,9 +151,13 @@ def __init__( seq_len, pred_len, downsample_rate=10, + do_anomaly=False, ): self.seq_len = seq_len - self.pred_len = pred_len + if not do_anomaly: + self.pred_len = pred_len + else: + self.pred_len = self.seq_len self.num_items = 0 self.item2sequence, self.item2offset = [], [] self.data = data diff --git a/ltsm/data_provider/tokenizer/standard_scaler.py b/ltsm/data_provider/tokenizer/standard_scaler.py index 63c2970..ffa9f2b 100644 --- a/ltsm/data_provider/tokenizer/standard_scaler.py +++ b/ltsm/data_provider/tokenizer/standard_scaler.py @@ -19,7 +19,7 @@ class StandardScaler(BaseProcessor): def __init__(self): self._scaler = None - def process(self, raw_data: np.ndarray, train_data: List[np.ndarray], val_data: List[np.ndarray], test_data: List[np.ndarray], fit_train_only:bool=False)->Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]: + def process(self, raw_data: np.ndarray, train_data: List[np.ndarray], val_data: List[np.ndarray], test_data: List[np.ndarray], fit_train_only:bool=False, do_anomaly:bool=False)->Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]: """ Standardizes the training, validation, and test sets by removing the mean and scaling to unit variance. @@ -35,12 +35,18 @@ def process(self, raw_data: np.ndarray, train_data: List[np.ndarray], val_data: A tuple of three lists containing the processed training, validation, and test data. """ scaled_train_data, scaled_val_data, scaled_test_data = [], [], [] - for raw_sequence, train_sequence, val_sequence, test_sequence in zip( + for i, (raw_sequence, train_sequence, val_sequence, test_sequence) in enumerate(zip( raw_data, train_data, val_data, test_data, - ): + )): + if do_anomaly and i == len(raw_data) - 1: # Skip anomaly label + scaled_train_data.append(train_sequence) + scaled_val_data.append(val_sequence) + scaled_test_data.append(test_sequence) + continue + train_sequence = train_sequence.reshape(-1, 1) val_sequence = val_sequence.reshape(-1, 1) test_sequence = test_sequence.reshape(-1, 1) diff --git a/ltsm/prompt_reader/stat_prompt/prompt_generate_split.py b/ltsm/prompt_reader/stat_prompt/prompt_generate_split.py index e922d1a..7dc94fc 100644 --- a/ltsm/prompt_reader/stat_prompt/prompt_generate_split.py +++ b/ltsm/prompt_reader/stat_prompt/prompt_generate_split.py @@ -71,16 +71,16 @@ def prompt_generation(ts, ts_name): return None else: - #column_name = [name.replace("/", "-") for name in list(ts.columns)] - column_name_map = {} - column_name = [] - for i, name in enumerate(ts.columns): - if not name.isnumeric(): - new_name = str(i) - else: - new_name = name - column_name.append(new_name) - column_name_map[name] = new_name + column_name = [name.replace("/", "-") for name in list(ts.columns)] + # column_name_map = {} + # column_name = [] + # for i, name in enumerate(ts.columns): + # if not name.isnumeric(): + # new_name = str(i) + # else: + # new_name = name + # column_name.append(new_name) + # column_name_map[name] = new_name prompt_buf_train = pd.DataFrame(np.zeros((133, ts.shape[1])), columns=column_name) prompt_buf_val = pd.DataFrame(np.zeros((133, ts.shape[1])), columns=column_name) prompt_buf_test = pd.DataFrame(np.zeros((133, ts.shape[1])), columns=column_name) @@ -98,13 +98,13 @@ def prompt_generation(ts, ts_name): prompt_val = prompt_generation_single(ts_val) prompt_test = prompt_generation_single(ts_test) - # prompt_buf_train[index.replace("/", "-")] = prompt_train.T.values - # prompt_buf_val[index.replace("/", "-")] = prompt_val.T.values - # prompt_buf_test[index.replace("/", "-")] = prompt_test.T.values - new_index = column_name_map[index] - prompt_buf_train[new_index] = prompt_train.T.values - prompt_buf_val[new_index] = prompt_val.T.values - prompt_buf_test[new_index] = prompt_test.T.values + prompt_buf_train[index.replace("/", "-")] = prompt_train.T.values + prompt_buf_val[index.replace("/", "-")] = prompt_val.T.values + prompt_buf_test[index.replace("/", "-")] = prompt_test.T.values + # new_index = column_name_map[index] + # prompt_buf_train[new_index] = prompt_train.T.values + # prompt_buf_val[new_index] = prompt_val.T.values + # prompt_buf_test[new_index] = prompt_test.T.values prompt_buf_total = {"train": prompt_buf_train, "val": prompt_buf_val, "test": prompt_buf_test} print(prompt_buf_total) @@ -184,7 +184,7 @@ def prompt_save(prompt_buf, output_path, data_name, save_format="pth.tar", ifTes print("Export", file_path, prompt_data.shape) -def data_import(path, format="feather"): +def data_import(path, format="feather", anomaly=False): if format == "feather": data = read_feather(path) @@ -199,6 +199,9 @@ def data_import(path, format="feather"): data_dir = data_name[0:data_name.rfind("/")] if "date" in data.columns: data = data.drop("date", axis=1) + if "anomaly" in data.columns: + data = data.drop("anomaly", axis=1) + print("Drop anomaly column") return data, data_name, data_dir diff --git a/tests/data_provider/data_factory_test.py b/tests/data_provider/data_factory_test.py index 57c9695..1f7f6ec 100644 --- a/tests/data_provider/data_factory_test.py +++ b/tests/data_provider/data_factory_test.py @@ -30,7 +30,8 @@ def setup(tmp_path): train_ratio=0.7, val_ratio=0.1, model="", - scale_on_train=True + scale_on_train=True, + do_anomaly=False ) # Save functions for threes different formats @@ -192,7 +193,7 @@ def test_data_factory_load_missing_prompts(setup): def test_data_factory_createTorchDS_empty(setup): data_path, prompt_data_path, prompt_data_folder, datasetFactory = setup - dataset = datasetFactory.createTorchDS([], [], 1) + dataset = datasetFactory.createTorchDS([], [], 1, False) assert dataset is None def test_data_factory_createTorchDS(setup): @@ -202,7 +203,7 @@ def test_data_factory_createTorchDS(setup): prompt_len = 10 data = [np.array([2*i for i in range(100)])] prompt_data = [[0.1*i for i in range(10)]] - dataset = datasetFactory.createTorchDS(data, prompt_data, 1) + dataset = datasetFactory.createTorchDS(data, prompt_data, 1, False) assert type(dataset) == TSPromptDataset for i in range(len(dataset)): x = dataset[i][0] @@ -223,7 +224,7 @@ def test_data_factory_createTorchDS_tokenizer(setup): datasetFactory.model = "LTSM_Tokenizer" data = [np.array([2*i for i in range(100)])] prompt_data = [[0.1*i for i in range(10)]] - dataset = datasetFactory.createTorchDS(data, prompt_data, 1) + dataset = datasetFactory.createTorchDS(data, prompt_data, 1, False) assert type(dataset) == TSTokenDataset diff --git a/tests/test_scripts/anomaly_main_ltsm.py b/tests/test_scripts/anomaly_main_ltsm.py new file mode 100644 index 0000000..eb191e1 --- /dev/null +++ b/tests/test_scripts/anomaly_main_ltsm.py @@ -0,0 +1,7 @@ +from ltsm.data_pipeline import AnomalyTrainingPipeline, anomaly_get_args, anomaly_seed_all + +if __name__ == "__main__": + args = anomaly_get_args() + anomaly_seed_all(args.seed) + pipeline = AnomalyTrainingPipeline(args) + pipeline.run() \ No newline at end of file diff --git a/tests/test_scripts/main_tokenizer.py b/tests/test_scripts/main_tokenizer.py index 5e1e678..f77729a 100644 --- a/tests/test_scripts/main_tokenizer.py +++ b/tests/test_scripts/main_tokenizer.py @@ -1,268 +1,7 @@ -import numpy as np -import torch -from torch import nn -import os -import argparse -import random -import sys - -sys.path.append("/home/zx57/ltsm") # path to the ltsm repo - -from ltsm.data_provider.data_factory import get_datasets -from ltsm.data_provider.data_factory import DatasetFactory -from ltsm.data_reader.csv_reader import transform_csv_dataset -from ltsm.data_provider.data_loader import HF_Dataset -from ltsm.data_provider.tokenizer.tokenizer_processor import TokenizerConfig -from ltsm.models import get_model, LTSMConfig -from peft import get_peft_model, LoraConfig - -from transformers import ( - Trainer, - TrainingArguments, - EvalPrediction, - #set_seed, -) -def get_args(): - parser = argparse.ArgumentParser(description='LTSM') - - # Basic Config - parser.add_argument('--model_id', type=str, default='test_run', help='model id') - parser.add_argument('--model_name_or_path', type=str, default="gpt2-medium", help='model name') - parser.add_argument('--seed', type=int, default=2024, help='random seed') - parser.add_argument('--device', type=str, default="cuda:0") - parser.add_argument('--checkpoints', type=str, default='./checkpoints/') - - # Data Settings - parser.add_argument('--data_path', nargs='+', default='dataset/weather.csv', help='data files') - parser.add_argument('--test_data_path', type=str, default='dataset/weather.csv', help='test data file') - parser.add_argument('--test_data_path_list', nargs='+', required=True, help='test data file') - parser.add_argument('--prompt_data_path', type=str, default='./weather.csv', help='prompt data file') - parser.add_argument('--data_processing', type=str, default="standard_scaler", help='data processing method') - parser.add_argument('--train_ratio', type=float, default=0.7, help='train data ratio') - parser.add_argument('--val_ratio', type=float, default=0.1, help='validation data ratio') - - # Forecasting Settings - parser.add_argument('--seq_len', type=int, default=336, help='input sequence length') - parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length') - parser.add_argument('--prompt_len', type=int, default=133, help='prompt sequence length') - - - # Model Settings - parser.add_argument('--lora', action="store_true", help='use lora') - parser.add_argument('--lora_dim', type=int, default=128, help='dimension of lora') - parser.add_argument('--gpt_layers', type=int, default=3, help='number of gpt layers') - parser.add_argument('--d_model', type=int, default=1024, help='dimension of model') - parser.add_argument('--n_heads', type=int, default=16, help='number of heads') - parser.add_argument('--d_ff', type=int, default=512, help='dimension of fcn') - parser.add_argument('--dropout', type=float, default=0.2, help='dropout') - parser.add_argument('--enc_in', type=int, default=1, help='encoder input size') - parser.add_argument('--c_out', type=int, default=862, help='output size') - parser.add_argument('--patch_size', type=int, default=16, help='patch size') - parser.add_argument('--pretrain', type=int, default=1, help='is pretrain') - parser.add_argument('--local_pretrain', type=str, default="None", help='local pretrain weight') - parser.add_argument('--freeze', type=int, default=1, help='is model weight frozen') - parser.add_argument('--model', type=str, default='model', help='model name, , options:[LTSM, LTSM_WordPrompt, LTSM_Tokenizer]') - parser.add_argument('--stride', type=int, default=8, help='stride') - parser.add_argument('--tmax', type=int, default=10, help='tmax') - - # Training Settings - parser.add_argument('--eval', type=int, default=0, help='evaluation') - parser.add_argument('--itr', type=int, default=1, help='experiments times') - parser.add_argument('--output_dir', type=str, default='output/ltsm_train_lr0005/', help='output directory') - parser.add_argument('--downsample_rate', type=int, default=100, help='downsample rate') - parser.add_argument('--llm_layers', type=int, default=32) - parser.add_argument('--decay_fac', type=float, default=0.75, help='decay factor') - parser.add_argument('--learning_rate', type=float, default=0.0001, help='learning rate') - parser.add_argument('--batch_size', type=int, default=512, help='batch size') - parser.add_argument('--num_workers', type=int, default=10, help='number of workers') - parser.add_argument('--train_epochs', type=int, default=1, help='number of epochs') - parser.add_argument('--lradj', type=str, default='type1', help='learning rate adjustment type') - parser.add_argument('--patience', type=int, default=3, help='early stopping patience') - parser.add_argument('--gradient_accumulation_steps', type=int, default=64, help='gradient accumulation steps') - args, unknown = parser.parse_known_args() - - return args - - -def seed_all(fixed_seed): - random.seed(fixed_seed) - torch.manual_seed(fixed_seed) - np.random.seed(fixed_seed) - -def freeze_parameters(model): - - freeze_param_buf = ["gpt2"] - for n, p in model.named_parameters(): - if any(fp in n for fp in freeze_param_buf): - p.requires_grad = False - print(f"{n} has been freeezed") - - trainable_param_buf = ["ln", "wpe", "in_layer", "out_layer", "lora"] - for n, p in model.named_parameters(): - if any(fp in n for fp in trainable_param_buf): - p.requires_grad = True - -def print_trainable_parameters(model): - for n, p in model.named_parameters(): - if p.requires_grad: - print(f"{n} is trainable...") - -def run(args): - print(args) - model_config = LTSMConfig(**vars(args)) - model = get_model(model_config) - - if args.lora: - peft_config = LoraConfig( - target_modules=["c_attn"], # ["q", "v"], - inference_mode=False, - r=args.lora_dim, - lora_alpha=32, - lora_dropout=0.1 - ) - model = get_peft_model(model, peft_config) - model.print_trainable_parameters() - - elif args.freeze: - freeze_parameters(model) - - print_trainable_parameters(model) - - - model_optim = torch.optim.Adam(model.parameters(), lr=args.learning_rate) - lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(model_optim, T_max=args.tmax, eta_min=1e-8) - - # Load Tokenizer Config, Reference: https://github.com/amazon-science/chronos-forecasting - context_length = args.seq_len+args.pred_len - prediction_length = args.pred_len - n_tokens = 1024 - n_special_tokens = 2 - config = TokenizerConfig( - tokenizer_class="MeanScaleUniformBins", - tokenizer_kwargs=dict(low_limit=-3.0, high_limit=3.0), - n_tokens=n_tokens, - n_special_tokens=n_special_tokens, - pad_token_id=0, - eos_token_id=1, - use_eos_token=0, - model_type="causal", - context_length=context_length, - prediction_length=prediction_length, - num_samples=20, - temperature=1.0, - top_k=50, - top_p=1.0, - ) - - tokenizer = config.create_tokenizer() - - def compute_metrics(p: EvalPrediction): - preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions - preds = np.squeeze(preds) - if preds.shape != p.label_ids.shape: - label_ids = np.squeeze(p.label_ids) - else: - label_ids = p.label_ids - return { - "mse": ((preds - label_ids) ** 2).mean().item(), - "mae": (np.abs(preds - label_ids)).mean().item()} - - def compute_loss(model, inputs, return_outputs=False): - outputs = model(inputs["input_data"]) - B, L, M, _ = outputs.shape - loss = nn.functional.cross_entropy(outputs.reshape(B*L,-1), inputs["labels"][:,1:].long().reshape(B*L)) - return (loss, outputs) if return_outputs else loss - - def collate_fn(batch): - return { - 'input_data': torch.from_numpy(np.stack([x['input_data'] for x in batch])).type(torch.float32), - 'labels': torch.from_numpy(np.stack([x['labels'] for x in batch])).type(torch.float32), - } - - @torch.no_grad() - def prediction_step(model, inputs, prediction_loss_only=False, ignore_keys=None): - input_data = inputs["input_data"].to(model.module.device) - labels = inputs["labels"].to(model.module.device) - scale = labels[:,0] - labels = labels[:,1:] - outputs = model(input_data) - indices = torch.max(outputs, dim=-1).indices - - output_value = tokenizer.output_transform(indices, scale) - label_value = tokenizer.output_transform(labels.unsqueeze(-1).long(), scale) - loss = nn.functional.mse_loss(output_value, label_value) - return (loss, output_value, label_value) - - - training_args = TrainingArguments( - output_dir=args.output_dir, - per_device_train_batch_size=args.batch_size, - per_device_eval_batch_size=args.batch_size, - evaluation_strategy="steps", - num_train_epochs=args.train_epochs, - fp16=False, - save_steps=100, - eval_steps=25, - logging_steps=5, - learning_rate=args.learning_rate, - gradient_accumulation_steps=args.gradient_accumulation_steps, - save_total_limit=10, - remove_unused_columns=False, - push_to_hub=False, - load_best_model_at_end=True, - ) - - # Training settings - new_data_paths = [] - for data_path in args.data_path: - new_path = data_path.split("/") - new_path[-3] += "_transformed" - new_path = "/".join(new_path) - new_data_paths.append(new_path) - print(os.path.dirname(data_path), os.path.dirname(new_path)) - transform_csv_dataset(os.path.dirname(data_path), os.path.dirname(new_path)) - args.data_path = new_data_paths - train_dataset, eval_dataset, test_datasets, _ = get_datasets(args) - train_dataset, eval_dataset= HF_Dataset(train_dataset), HF_Dataset(eval_dataset) - print("Start Training. Train dataset size:", len(train_dataset)) - trainer = Trainer( - model=model, - args=training_args, - data_collator=collate_fn, - compute_metrics=compute_metrics, - train_dataset=train_dataset, - eval_dataset=eval_dataset, - tokenizer=None, - optimizers=(model_optim, lr_scheduler), - ) - - # Overload the trainer API - if not args.eval: - trainer.compute_loss = compute_loss - trainer.prediction_step = prediction_step - train_results = trainer.train() - trainer.save_model() - trainer.log_metrics("train", train_results.metrics) - trainer.save_metrics("train", train_results.metrics) - trainer.save_state() - - # Testing settings - #for data_path in args.test_data_path_list: - for test_dataset in test_datasets: - trainer.compute_loss = compute_loss - trainer.prediction_step = prediction_step - # args.test_data_path = data_path - # _, _, test_dataset, _ = get_datasets(args) - test_dataset = HF_Dataset(test_dataset) - - metrics = trainer.evaluate(test_dataset) - trainer.log_metrics("Test", metrics) - trainer.save_metrics("Test", metrics) - +from ltsm.data_pipeline import TokenizerTrainingPipeline, tokenizer_get_args, tokenizer_seed_all if __name__ == "__main__": - print("Starting LTSM-TOkenizer...") - args = get_args() - seed_all(args.seed) - print("Args loaded, start running...") - run(args) + args = tokenizer_get_args() + tokenizer_seed_all(args.seed) + pipeline = TokenizerTrainingPipeline(args) + pipeline.run() \ No newline at end of file diff --git a/tests/test_scripts/prompt_generation_norm.sh b/tests/test_scripts/prompt_generation_norm.sh index 951dff2..218e2d4 100644 --- a/tests/test_scripts/prompt_generation_norm.sh +++ b/tests/test_scripts/prompt_generation_norm.sh @@ -1,6 +1,5 @@ data_name=" - exchange_rate - illness" + multi-Synthetic" save_format="pth.tar" diff --git a/tests/test_scripts/train_anomaly_main_ltsm.sh b/tests/test_scripts/train_anomaly_main_ltsm.sh new file mode 100644 index 0000000..6813fd6 --- /dev/null +++ b/tests/test_scripts/train_anomaly_main_ltsm.sh @@ -0,0 +1,33 @@ +TRAIN="../../datasets/multi-Synthetic/0.csv" + + +TEST="../../datasets/multi-Synthetic/0.csv" + +PROMPT="../../prompt_bank/stat-prompt/prompt_data_normalize_split" + +epoch=10 +downsample_rate=20 +freeze=0 +lr=1e-3 + + +for seq_len in 113 +do + OUTPUT_PATH="output/ltsm_lr${lr}_loraFalse_down${downsample_rate}_freeze${freeze}_e${epoch}_pred${pred_len}/" + echo "Current OUTPUT_PATH: ${OUTPUT_PATH}" + CUDA_VISIBLE_DEVICES=5,6,7 python3 anomaly_main_ltsm.py \ + --model LTSM \ + --model_name_or_path gpt2-medium \ + --train_epochs ${epoch} \ + --batch_size 100 \ + --seq_len ${seq_len} \ + --gradient_accumulation_steps 64 \ + --data_path ${TRAIN} \ + --test_data_path_list ${TEST} \ + --prompt_data_path ${PROMPT} \ + --freeze ${freeze} \ + --learning_rate ${lr} \ + --downsample_rate ${downsample_rate} \ + --output_dir ${OUTPUT_PATH}\ + --eval 0 +done \ No newline at end of file diff --git a/tests/test_scripts/train_ltsm_csv.sh b/tests/test_scripts/train_ltsm_csv.sh index 3c13bea..5c715a1 100755 --- a/tests/test_scripts/train_ltsm_csv.sh +++ b/tests/test_scripts/train_ltsm_csv.sh @@ -1,35 +1,21 @@ -TRAIN="../../datasets/ETT-small/ETTh1.csv - ../../datasets/ETT-small/ETTh2.csv - ../../datasets/ETT-small/ETTm1.csv - ../../datasets/ETT-small/ETTm2.csv - ../../datasets/electricity/electricity.csv - ../../datasets/exchange_rate/exchange_rate.csv - ../../datasets/traffic/traffic.csv - ../../datasets/weather/weather.csv" +TRAIN="../../datasets/exchange_rate/exchange_rate.csv" -TEST="../../datasets/ETT-small/ETTh1.csv - ../../datasets/ETT-small/ETTh2.csv - ../../datasets/ETT-small/ETTm1.csv - ../../datasets/ETT-small/ETTm2.csv - ../../datasets/electricity/electricity.csv - ../../datasets/exchange_rate/exchange_rate.csv - ../../datasets/traffic/traffic.csv - ../../datasets/weather/weather.csv" +TEST="../../datasets/exchange_rate/exchange_rate.csv" PROMPT="../../prompt_bank/prompt_data_normalize_split" -epoch=1000 +epoch=2 downsample_rate=20 freeze=0 lr=1e-3 - -for pred_len in 96 192 336 720 +# 96 192 336 720 +for pred_len in 96 do OUTPUT_PATH="output/ltsm_lr${lr}_loraFalse_down${downsample_rate}_freeze${freeze}_e${epoch}_pred${pred_len}/" echo "Current OUTPUT_PATH: ${OUTPUT_PATH}" - CUDA_VISIBLE_DEVICES=0,1,2,3 python3 main_ltsm.py \ + CUDA_VISIBLE_DEVICES=1,2,3 python3 main_ltsm.py \ --model LTSM \ --model_name_or_path gpt2-medium \ --train_epochs ${epoch} \ diff --git a/tests/test_scripts/train_ltsm_tokenizer_csv.sh b/tests/test_scripts/train_ltsm_tokenizer_csv.sh index abaeb9e..c07bbb9 100644 --- a/tests/test_scripts/train_ltsm_tokenizer_csv.sh +++ b/tests/test_scripts/train_ltsm_tokenizer_csv.sh @@ -6,9 +6,9 @@ TRAIN=" TEST=" datasets/exchange_rate/exchange_rate.csv datasets/illness/national_illness.csv" -PROMPT="prompt_bank/stat-prompt/prompt_data_normalize_split" +PROMPT="prompt_bank/prompt_data_normalize_split" lr=1e-3 -epoch=50 +epoch=10 downsample_rate=20 freeze=0 d_ff=128