From 33122be798bf1c55e45e5bfb47adef81ef611cca Mon Sep 17 00:00:00 2001 From: cristinazuhe Date: Wed, 24 Jan 2024 10:26:43 +0100 Subject: [PATCH 1/5] Deleted some scripts. TODO: Review the notebooks. --- flexnlp/notebooks/Centralized_QA.ipynb | 479 ------------------------- flexnlp/notebooks/Centralized_QA.py | 216 ----------- flexnlp/notebooks/FederatedSS.py | 185 ---------- flexnlp/notebooks/Federated_QA.py | 380 -------------------- 4 files changed, 1260 deletions(-) delete mode 100644 flexnlp/notebooks/Centralized_QA.ipynb delete mode 100644 flexnlp/notebooks/Centralized_QA.py delete mode 100644 flexnlp/notebooks/FederatedSS.py delete mode 100644 flexnlp/notebooks/Federated_QA.py diff --git a/flexnlp/notebooks/Centralized_QA.ipynb b/flexnlp/notebooks/Centralized_QA.ipynb deleted file mode 100644 index e74aa54..0000000 --- a/flexnlp/notebooks/Centralized_QA.ipynb +++ /dev/null @@ -1,479 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 15, - "metadata": {}, - "outputs": [], - "source": [ - "from datasets import load_dataset\n", - "from transformers import AutoTokenizer\n", - "from transformers import DefaultDataCollator\n", - "from transformers import AutoModelForQuestionAnswering, TrainingArguments, Trainer\n", - "import collections\n", - "import numpy as np\n", - "import evaluate\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Load SQuAD dataset" - ] - }, - { - "cell_type": "code", - "execution_count": 16, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "Found cached dataset squad (C:/Users/Cris/.cache/huggingface/datasets/squad/plain_text/1.0.0/d6ec3ceb99ca480ce37cdd35555d6cb2511d223b9150cce08a837ef62ffea453)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "DatasetDict({\n", - " train: Dataset({\n", - " features: ['id', 'title', 'context', 'question', 'answers'],\n", - " num_rows: 700\n", - " })\n", - " test: Dataset({\n", - " features: ['id', 'title', 'context', 'question', 'answers'],\n", - " num_rows: 176\n", - " })\n", - "})\n" - ] - } - ], - "source": [ - "# Load a percentage of squal\n", - "squad = load_dataset(\"squad\", split=\"train[:1%]\")\n", - "# Split 80% train, 20% test\n", - "squad = squad.train_test_split(test_size=0.2)\n", - "print(squad )" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Preprocess" - ] - }, - { - "cell_type": "code", - "execution_count": 17, - "metadata": {}, - "outputs": [], - "source": [ - "model_checkpoint = \"distilbert-base-uncased\"\n", - "#model_checkpoint = \"bert-base-cased\"\n", - "tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)" - ] - }, - { - "cell_type": "code", - "execution_count": 18, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - }, - { - "data": { - "text/plain": [ - "(700, 728)" - ] - }, - "execution_count": 18, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "max_length = 384\n", - "stride = 128\n", - "\n", - "\n", - "def preprocess_training_examples(examples):\n", - " questions = [q.strip() for q in examples[\"question\"]]\n", - " inputs = tokenizer(\n", - " questions,\n", - " examples[\"context\"],\n", - " max_length=max_length,\n", - " truncation=\"only_second\",\n", - " stride=stride,\n", - " return_overflowing_tokens=True,\n", - " return_offsets_mapping=True,\n", - " padding=\"max_length\",\n", - " )\n", - "\n", - " offset_mapping = inputs.pop(\"offset_mapping\")\n", - " sample_map = inputs.pop(\"overflow_to_sample_mapping\")\n", - " answers = examples[\"answers\"]\n", - " start_positions = []\n", - " end_positions = []\n", - "\n", - " for i, offset in enumerate(offset_mapping):\n", - " sample_idx = sample_map[i]\n", - " answer = answers[sample_idx]\n", - " start_char = answer[\"answer_start\"][0]\n", - " end_char = answer[\"answer_start\"][0] + len(answer[\"text\"][0])\n", - " sequence_ids = inputs.sequence_ids(i)\n", - "\n", - " # Find the start and end of the context\n", - " idx = 0\n", - " while sequence_ids[idx] != 1:\n", - " idx += 1\n", - " context_start = idx\n", - " while sequence_ids[idx] == 1:\n", - " idx += 1\n", - " context_end = idx - 1\n", - "\n", - " # If the answer is not fully inside the context, label is (0, 0)\n", - " if offset[context_start][0] > start_char or offset[context_end][1] < end_char:\n", - " start_positions.append(0)\n", - " end_positions.append(0)\n", - " else:\n", - " # Otherwise it's the start and end token positions\n", - " idx = context_start\n", - " while idx <= context_end and offset[idx][0] <= start_char:\n", - " idx += 1\n", - " start_positions.append(idx - 1)\n", - "\n", - " idx = context_end\n", - " while idx >= context_start and offset[idx][1] >= end_char:\n", - " idx -= 1\n", - " end_positions.append(idx + 1)\n", - "\n", - " inputs[\"start_positions\"] = start_positions\n", - " inputs[\"end_positions\"] = end_positions\n", - " return inputs\n", - "\n", - "train_dataset = squad[\"train\"].map(\n", - " preprocess_training_examples,\n", - " batched=True,\n", - " remove_columns=squad[\"train\"].column_names,\n", - ")\n", - "len(squad[\"train\"]), len(train_dataset)" - ] - }, - { - "cell_type": "code", - "execution_count": 19, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - " \r" - ] - }, - { - "data": { - "text/plain": [ - "(176, 180)" - ] - }, - "execution_count": 19, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "def preprocess_validation_examples(examples):\n", - " questions = [q.strip() for q in examples[\"question\"]]\n", - " inputs = tokenizer(\n", - " questions,\n", - " examples[\"context\"],\n", - " max_length=max_length,\n", - " truncation=\"only_second\",\n", - " stride=stride,\n", - " return_overflowing_tokens=True,\n", - " return_offsets_mapping=True,\n", - " padding=\"max_length\",\n", - " )\n", - "\n", - " sample_map = inputs.pop(\"overflow_to_sample_mapping\")\n", - " example_ids = []\n", - "\n", - " for i in range(len(inputs[\"input_ids\"])):\n", - " sample_idx = sample_map[i]\n", - " example_ids.append(examples[\"id\"][sample_idx])\n", - "\n", - " sequence_ids = inputs.sequence_ids(i)\n", - " offset = inputs[\"offset_mapping\"][i]\n", - " inputs[\"offset_mapping\"][i] = [\n", - " o if sequence_ids[k] == 1 else None for k, o in enumerate(offset)\n", - " ]\n", - "\n", - " inputs[\"example_id\"] = example_ids\n", - " return inputs\n", - "\n", - "validation_dataset = squad[\"test\"].map(\n", - " preprocess_validation_examples,\n", - " batched=True,\n", - " remove_columns=squad[\"test\"].column_names,\n", - ")\n", - "len(squad[\"test\"]), len(validation_dataset)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Training" - ] - }, - { - "cell_type": "code", - "execution_count": 20, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "Some weights of DistilBertForQuestionAnswering were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['qa_outputs.bias', 'qa_outputs.weight']\n", - "You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.\n" - ] - } - ], - "source": [ - "model = AutoModelForQuestionAnswering.from_pretrained(\"distilbert-base-uncased\")" - ] - }, - { - "cell_type": "code", - "execution_count": 21, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - " 0%| | 0/138 [00:00 max_answer_length\n", - " if (\n", - " end_index < start_index\n", - " or end_index - start_index + 1 > max_answer_length\n", - " ):\n", - " continue\n", - "\n", - " answer = {\n", - " \"text\": context[offsets[start_index][0] : offsets[end_index][1]],\n", - " \"logit_score\": start_logit[start_index] + end_logit[end_index],\n", - " }\n", - " answers.append(answer)\n", - "\n", - " # Select the answer with the best score\n", - " if len(answers) > 0:\n", - " best_answer = max(answers, key=lambda x: x[\"logit_score\"])\n", - " predicted_answers.append(\n", - " {\"id\": example_id, \"prediction_text\": best_answer[\"text\"]}\n", - " )\n", - " else:\n", - " predicted_answers.append({\"id\": example_id, \"prediction_text\": \"\"})\n", - "\n", - " theoretical_answers = [{\"id\": ex[\"id\"], \"answers\": ex[\"answers\"]} for ex in examples]\n", - " return metric.compute(predictions=predicted_answers, references=theoretical_answers)" - ] - }, - { - "cell_type": "code", - "execution_count": 23, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "100%|██████████| 12/12 [00:58<00:00, 4.88s/it]\n" - ] - } - ], - "source": [ - "predictions, _, _ = trainer.predict(validation_dataset)\n", - "start_logits, end_logits = predictions" - ] - }, - { - "cell_type": "code", - "execution_count": 24, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "180\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "100%|██████████| 176/176 [00:00<00:00, 573.78it/s]\n" - ] - }, - { - "data": { - "text/plain": [ - "{'exact_match': 10.227272727272727, 'f1': 13.03407354677408}" - ] - }, - "execution_count": 24, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "print(len(predictions[0]))\n", - "compute_metrics(start_logits, end_logits, validation_dataset, squad[\"test\"])" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "flexible", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.3" - }, - "orig_nbformat": 4 - }, - "nbformat": 4, - "nbformat_minor": 2 -} diff --git a/flexnlp/notebooks/Centralized_QA.py b/flexnlp/notebooks/Centralized_QA.py deleted file mode 100644 index 14c82fa..0000000 --- a/flexnlp/notebooks/Centralized_QA.py +++ /dev/null @@ -1,216 +0,0 @@ -import torch -from datasets import load_dataset -from transformers import AutoTokenizer -from transformers import DefaultDataCollator -from transformers import AutoModelForQuestionAnswering, TrainingArguments, Trainer -import collections -import numpy as np -import evaluate - - -device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - -# Load a percentage of squal -squad = load_dataset("squad", split="train") -# Split 80% train, 20% test -squad = squad.train_test_split(test_size=0.2) -print(squad ) - -model_checkpoint = "distilbert-base-uncased" -#model_checkpoint = "bert-base-cased" -tokenizer = AutoTokenizer.from_pretrained(model_checkpoint) - -max_length = 384 -stride = 128 - - -def preprocess_training_examples(examples): - questions = [q.strip() for q in examples["question"]] - inputs = tokenizer( - questions, - examples["context"], - max_length=max_length, - truncation="only_second", - stride=stride, - return_overflowing_tokens=True, - return_offsets_mapping=True, - padding="max_length", - ) - - offset_mapping = inputs.pop("offset_mapping") - sample_map = inputs.pop("overflow_to_sample_mapping") - answers = examples["answers"] - start_positions = [] - end_positions = [] - - for i, offset in enumerate(offset_mapping): - sample_idx = sample_map[i] - answer = answers[sample_idx] - start_char = answer["answer_start"][0] - end_char = answer["answer_start"][0] + len(answer["text"][0]) - sequence_ids = inputs.sequence_ids(i) - - # Find the start and end of the context - idx = 0 - while sequence_ids[idx] != 1: - idx += 1 - context_start = idx - while sequence_ids[idx] == 1: - idx += 1 - context_end = idx - 1 - - # If the answer is not fully inside the context, label is (0, 0) - if offset[context_start][0] > start_char or offset[context_end][1] < end_char: - start_positions.append(0) - end_positions.append(0) - else: - # Otherwise it's the start and end token positions - idx = context_start - while idx <= context_end and offset[idx][0] <= start_char: - idx += 1 - start_positions.append(idx - 1) - - idx = context_end - while idx >= context_start and offset[idx][1] >= end_char: - idx -= 1 - end_positions.append(idx + 1) - - inputs["start_positions"] = start_positions - inputs["end_positions"] = end_positions - return inputs - -train_dataset = squad["train"].map( - preprocess_training_examples, - batched=True, - remove_columns=squad["train"].column_names, -) - -print("Len train y test") -print(len(squad["train"]), len(train_dataset)) - -def preprocess_validation_examples(examples): - questions = [q.strip() for q in examples["question"]] - inputs = tokenizer( - questions, - examples["context"], - max_length=max_length, - truncation="only_second", - stride=stride, - return_overflowing_tokens=True, - return_offsets_mapping=True, - padding="max_length", - ) - - sample_map = inputs.pop("overflow_to_sample_mapping") - example_ids = [] - - for i in range(len(inputs["input_ids"])): - sample_idx = sample_map[i] - example_ids.append(examples["id"][sample_idx]) - - sequence_ids = inputs.sequence_ids(i) - offset = inputs["offset_mapping"][i] - inputs["offset_mapping"][i] = [ - o if sequence_ids[k] == 1 else None for k, o in enumerate(offset) - ] - - inputs["example_id"] = example_ids - return inputs - -validation_dataset = squad["test"].map( - preprocess_validation_examples, - batched=True, - remove_columns=squad["test"].column_names, -) - -print("Len val y test") -len(squad["test"]), len(validation_dataset) - -# Training -model = AutoModelForQuestionAnswering.from_pretrained("distilbert-base-uncased") - -training_args = TrainingArguments( - output_dir="my_awesome_qa_model", - evaluation_strategy="epoch", - learning_rate=2e-5, - per_device_train_batch_size=16, - per_device_eval_batch_size=16, - num_train_epochs=3, - weight_decay=0.01, - use_cpu=False, - torch_compile=True, -) - -trainer = Trainer( - model=model, - args=training_args, - train_dataset=train_dataset, - eval_dataset=validation_dataset, - tokenizer=tokenizer, - # data_collator=data_collator, -) - -output = trainer.train() - -from tqdm.auto import tqdm - -n_best = 20 -max_answer_length = 30 -predicted_answers = [] -metric = evaluate.load("squad") - -def compute_metrics(start_logits, end_logits, features, examples): - example_to_features = collections.defaultdict(list) - for idx, feature in enumerate(features): - example_to_features[feature["example_id"]].append(idx) - - predicted_answers = [] - for example in tqdm(examples): - example_id = example["id"] - context = example["context"] - answers = [] - - # Loop through all features associated with that example - for feature_index in example_to_features[example_id]: - start_logit = start_logits[feature_index] - end_logit = end_logits[feature_index] - offsets = features[feature_index]["offset_mapping"] - - start_indexes = np.argsort(start_logit)[-1 : -n_best - 1 : -1].tolist() - end_indexes = np.argsort(end_logit)[-1 : -n_best - 1 : -1].tolist() - for start_index in start_indexes: - for end_index in end_indexes: - # Skip answers that are not fully in the context - if offsets[start_index] is None or offsets[end_index] is None: - continue - # Skip answers with a length that is either < 0 or > max_answer_length - if ( - end_index < start_index - or end_index - start_index + 1 > max_answer_length - ): - continue - - answer = { - "text": context[offsets[start_index][0] : offsets[end_index][1]], - "logit_score": start_logit[start_index] + end_logit[end_index], - } - answers.append(answer) - - # Select the answer with the best score - if len(answers) > 0: - best_answer = max(answers, key=lambda x: x["logit_score"]) - predicted_answers.append( - {"id": example_id, "prediction_text": best_answer["text"]} - ) - else: - predicted_answers.append({"id": example_id, "prediction_text": ""}) - - theoretical_answers = [{"id": ex["id"], "answers": ex["answers"]} for ex in examples] - return metric.compute(predictions=predicted_answers, references=theoretical_answers) - -predictions, _, _ = trainer.predict(validation_dataset) -start_logits, end_logits = predictions - -print(len(predictions[0])) -compute_metrics(start_logits, end_logits, validation_dataset, squad["test"]) - diff --git a/flexnlp/notebooks/FederatedSS.py b/flexnlp/notebooks/FederatedSS.py deleted file mode 100644 index 7bb0de1..0000000 --- a/flexnlp/notebooks/FederatedSS.py +++ /dev/null @@ -1,185 +0,0 @@ -from copy import deepcopy -from datasets import load_dataset -from sentence_transformers import SentenceTransformer, models # , util -from sentence_transformers import InputExample -from sentence_transformers import losses -from sentence_transformers.evaluation import EmbeddingSimilarityEvaluator, TripletEvaluator -import torch -import torch.nn as nn -from torch.utils.data import DataLoader -from datasets import Dataset as HFDataset - -# FLEXible imports -from flex.data import FedDatasetConfig, FedDataDistribution -from flex.data import Dataset -from flex.model import FlexModel -from flex.pool import FlexPool -from flex.pool.decorators import init_server_model -from flex.pool.decorators import deploy_server_model -from flex.pool import deploy_server_model, deploy_server_model_pt -from torch.utils.data import Dataset as TorchDataset -from flex.pool import collect_clients_weights_pt -from flex.pool import fed_avg -from flex.pool import set_aggregated_weights_pt -from flex.pool import evaluate_server_model -from flexnlp.utils.adapters import ss_triplet_input_adapter - -device = ( - "cuda" - if torch.cuda.is_available() - else "mps" - if torch.backends.mps.is_available() - else "cpu" -) - -print(device) - - -# Load the dataset -dataset_id = "embedding-data/QQP_triplets" -# dataset_id = "embedding-data/sentence-compression" - -data = load_dataset(dataset_id, split=['train[:1%]'])[0].train_test_split(test_size=0.1) -dataset, test_dataset = data['train'], data['test'] -print(f"- The {dataset_id} dataset has {dataset.num_rows} examples.") -print(f"- Each example is a {type(dataset[0])} with a {type(dataset[0]['set'])} as value.") -print(f"- Examples look like this: {dataset[0]}") - - -# Load the model -## Step 1: use an existing language model -# word_embedding_model = models.Transformer('distilroberta-base') - -## Step 2: use a pool function over the token embeddings -# pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension()) - -## Join steps 1 and 2 using the modules argument -# model = SentenceTransformer(modules=[word_embedding_model, pooling_model]) - - -# From centralized to federated data - -config = FedDatasetConfig(seed=0) -config.n_clients = 2 -config.replacement = False # ensure that clients do not share any data -config.client_names = ['client1', 'client2'] # Optional -flex_dataset = FedDataDistribution.from_config_with_huggingface_dataset(data=dataset, config=config, - X_columns=['set'], # 'title', 'context', 'question'], - label_columns=['set'] - # X_columns=['input_ids', 'attention_mask'], - # label_columns=['start_positions', 'end_positions'] - ) - -@init_server_model -def build_server_model(): - server_flex_model = FlexModel() - word_embedding_model = models.Transformer('distilroberta-base') - pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension()) - server_flex_model['model'] = SentenceTransformer(modules=[word_embedding_model, pooling_model]) - - return server_flex_model - -flex_pool = FlexPool.client_server_pool(fed_dataset=flex_dataset, init_func=build_server_model) - -clients = flex_pool.clients -servers = flex_pool.servers -aggregators = flex_pool.aggregators - -print(f"Number of nodes in the pool {len(flex_pool)}: {len(servers)} server plus {len(clients)} clients. The server is also an aggregator") - -# Deploy the model -@deploy_server_model -def copy_server_model_to_clients(server_flex_model: FlexModel): - return deepcopy(server_flex_model) - -servers.map(copy_server_model_to_clients, clients) # Using the function created with the decorator -# servers.map(deploy_server_model_pt, clients) # Using the primitive function - -# Prepare data for training phase - -def create_input_examples_for_training(X_data_as_list, X_test_as_list): - """Function to create a DataLoader to train/finetune the model at client level - - Args: - X_data_as_list (list): List containing the examples. Each example is a dict - with the following keys: query, pos, neg. - """ - train_examples = [InputExample(texts=[example['query'], example['pos'][0], example['neg'][0]]) for example in X_data_as_list] - dev_examples = [InputExample(texts=[example['query'], example['pos'][0], example['neg'][0]]) for example in X_test_as_list] - return DataLoader(train_examples, shuffle=True, batch_size=16), dev_examples - -def train(client_flex_model: FlexModel, client_data: Dataset): - print("Training client") - model = client_flex_model['model'] - sentences = ['This is an example sentence', 'Each sentence is converted'] - encodings = model.encode(sentences) - print(f"Old encodings: {encodings}") - X_data = client_data.X_data.tolist() - tam_train = int(len(X_data) * 0.75) - X_data, X_test = X_data[:tam_train], X_data[tam_train:] - train_dataloader, dev_examples = ss_triplet_input_adapter(X_train_as_list=X_data, X_test_as_list=X_test) - train_loss = losses.TripletLoss(model=model) - evaluator = TripletEvaluator.from_input_examples(dev_examples) - warmup_steps = int(len(train_dataloader) * 1 * 0.1) #10% of train data - model.fit(train_objectives=[(train_dataloader, train_loss)], - epochs=1, - warmup_steps=warmup_steps, - evaluator=evaluator, - evaluation_steps=1000, - ) - # model.evaluate(evaluator, 'model_evaluation') - sentences = ['This is an example sentence', 'Each sentence is converted'] - encodings = model.encode(sentences) - print(f"New encodings: {encodings}") - -clients.map(train) - -aggregators.map(collect_clients_weights_pt, clients) - -aggregators.map(fed_avg) - -aggregators.map(set_aggregated_weights_pt, servers) - -def create_input_examples_for_testing(X_test_as_list): - """Function to create a DataLoader to train/finetune the model at client level - - Args: - X_test_as_list (list): List containing the examples. Each example is a dict - with the following keys: query, pos, neg. - """ - return [InputExample(texts=[example['query'], example['pos'][0], example['neg'][0]]) for example in X_test_as_list] - -test_dataset = Dataset.from_huggingface_dataset(test_dataset, X_columns=['set']) - -@evaluate_server_model -def evaluate_global_model(server_flex_model: FlexModel, test_data=None): - _, X_test = ss_triplet_input_adapter(X_test_as_list=test_dataset.X_data.tolist(), train=False) - model = server_flex_model["model"] - evaluator = TripletEvaluator.from_input_examples(X_test) - model.evaluate(evaluator, 'server_evaluation') - print("Model evaluation saved to file.") - -servers.map(evaluate_global_model, test_data=test_dataset) - -def train_n_rounds(n_rounds, clients_per_round=2): - pool = FlexPool.client_server_pool(fed_dataset=flex_dataset, init_func=build_server_model) - for i in range(n_rounds): - print(f"\nRunning round: {i+1} of {n_rounds}") - selected_clients_pool = pool.clients.select(clients_per_round) - selected_clients = selected_clients_pool.clients - print(f"Selected clients for this round: {len(selected_clients)}") - # Deploy the server model to the selected clients - pool.servers.map(deploy_server_model_pt, selected_clients) - # Each selected client trains her model - selected_clients.map(train) - # The aggregador collects weights from the selected clients and aggregates them - pool.aggregators.map(collect_clients_weights_pt, selected_clients) - pool.aggregators.map(fed_avg) - # The aggregator send its aggregated weights to the server - pool.aggregators.map(set_aggregated_weights_pt, pool.servers) - servers.map(evaluate_global_model, test_data=test_dataset) - -# Train the model for n_rounds -# train_n_rounds(5) - -# End diff --git a/flexnlp/notebooks/Federated_QA.py b/flexnlp/notebooks/Federated_QA.py deleted file mode 100644 index 2620345..0000000 --- a/flexnlp/notebooks/Federated_QA.py +++ /dev/null @@ -1,380 +0,0 @@ -from copy import deepcopy -from tqdm.auto import tqdm -import torch -import torch.nn as nn -from datasets import load_dataset -from datasets import Dataset as HFDataset -from transformers import AutoTokenizer -from transformers import DefaultDataCollator -from transformers import AutoModelForQuestionAnswering, TrainingArguments, Trainer -import collections -import numpy as np -import evaluate - -# FLEXible imports -from flex.data import FedDatasetConfig, FedDataDistribution -from flex.data import Dataset -from flex.model import FlexModel -from flex.pool import FlexPool -from flex.pool.decorators import init_server_model -from flex.pool.decorators import deploy_server_model -from flex.pool import deploy_server_model, deploy_server_model_pt -from torch.utils.data import Dataset as TorchDataset -from flex.pool import collect_clients_weights_pt -from flex.pool import fed_avg -from flex.pool import set_aggregated_weights_pt -from flex.pool import evaluate_server_model - - -device = ( - "cuda" - if torch.cuda.is_available() - else "mps" - if torch.backends.mps.is_available() - else "cpu" -) - -print(device) - -# Load the dataset -# Load a percentage of squal -squad = load_dataset("squad", split="train[:1%]") -# Split 80% train, 20% test -squad = squad.train_test_split(test_size=0.9) -print(squad) - -# Preprocess functions - -model_checkpoint = "distilbert-base-uncased" -#model_checkpoint = "bert-base-cased" -tokenizer = AutoTokenizer.from_pretrained(model_checkpoint) - -max_length = 512 -stride = 128 - - -def preprocess_training_examples(examples): - questions = [q.strip() for q in examples["question"]] - inputs = tokenizer( - questions, - examples["context"], - max_length=max_length, - truncation="only_second", - stride=stride, - return_overflowing_tokens=True, - return_offsets_mapping=True, - padding="max_length", - ) - - offset_mapping = inputs.pop("offset_mapping") - sample_map = inputs.pop("overflow_to_sample_mapping") - answers = examples["answers"] - start_positions = [] - end_positions = [] - - for i, offset in enumerate(offset_mapping): - sample_idx = sample_map[i] - answer = answers[sample_idx] - start_char = answer["answer_start"][0] - end_char = answer["answer_start"][0] + len(answer["text"][0]) - sequence_ids = inputs.sequence_ids(i) - - # Find the start and end of the context - idx = 0 - while sequence_ids[idx] != 1: - idx += 1 - context_start = idx - while sequence_ids[idx] == 1: - idx += 1 - context_end = idx - 1 - - # If the answer is not fully inside the context, label is (0, 0) - if offset[context_start][0] > start_char or offset[context_end][1] < end_char: - start_positions.append(0) - end_positions.append(0) - else: - # Otherwise it's the start and end token positions - idx = context_start - while idx <= context_end and offset[idx][0] <= start_char: - idx += 1 - start_positions.append(idx - 1) - - idx = context_end - while idx >= context_start and offset[idx][1] >= end_char: - idx -= 1 - end_positions.append(idx + 1) - - inputs["start_positions"] = start_positions - inputs["end_positions"] = end_positions - return inputs - -def preprocess_training_examples_as_lists(examples, answers_examples): - """ - Function that preprocess the data that comes as a list - instead as a Dataset type. - Args: - examples (list[list]): List of lists containg the examples to - preprocess. ['id', 'title', 'context', 'question'] - answers (list[str]): List containing the answers - """ - questions = [q[3].strip() for q in examples] - contexts = [c[2] for c in examples] - inputs = tokenizer( - questions, - # examples["context"], - contexts, - max_length=max_length, - truncation="only_second", - stride=stride, - return_overflowing_tokens=True, - return_offsets_mapping=True, - padding="max_length", - ) - - offset_mapping = inputs.pop("offset_mapping") - sample_map = inputs.pop("overflow_to_sample_mapping") - # answers = examples["answers"] - answers = [answers_examples[1][i] for i in range(len(answers_examples[1]))] - start_positions = [] - end_positions = [] - - for i, offset in enumerate(offset_mapping): - sample_idx = sample_map[i] - answer = answers[sample_idx] - start_char = answer["answer_start"][0] - end_char = answer["answer_start"][0] + len(answer["text"][0]) - sequence_ids = inputs.sequence_ids(i) - - # Find the start and end of the context - idx = 0 - while sequence_ids[idx] != 1: - idx += 1 - context_start = idx - while sequence_ids[idx] == 1: - idx += 1 - context_end = idx - 1 - - # If the answer is not fully inside the context, label is (0, 0) - if offset[context_start][0] > start_char or offset[context_end][1] < end_char: - start_positions.append(0) - end_positions.append(0) - else: - # Otherwise it's the start and end token positions - idx = context_start - while idx <= context_end and offset[idx][0] <= start_char: - idx += 1 - start_positions.append(idx - 1) - - idx = context_end - while idx >= context_start and offset[idx][1] >= end_char: - idx -= 1 - end_positions.append(idx + 1) - - inputs["start_positions"] = start_positions - inputs["end_positions"] = end_positions - return HFDataset.from_dict(inputs) - - -def preprocess_validation_examples(examples): - questions = [q.strip() for q in examples["question"]] - inputs = tokenizer( - questions, - examples["context"], - max_length=max_length, - truncation="only_second", - stride=stride, - return_overflowing_tokens=True, - return_offsets_mapping=True, - padding="max_length", - ) - - sample_map = inputs.pop("overflow_to_sample_mapping") - example_ids = [] - - for i in range(len(inputs["input_ids"])): - sample_idx = sample_map[i] - example_ids.append(examples["id"][sample_idx]) - - sequence_ids = inputs.sequence_ids(i) - offset = inputs["offset_mapping"][i] - inputs["offset_mapping"][i] = [ - o if sequence_ids[k] == 1 else None for k, o in enumerate(offset) - ] - - inputs["example_id"] = example_ids - return inputs - - -train_dataset_processed = squad["train"].map( - preprocess_training_examples, - batched=True, - remove_columns=squad["train"].column_names, -) - - -train_dataset = squad["train"] - -test_dataset = squad["test"].map( - preprocess_validation_examples, - batched=True, - remove_columns=squad["test"].column_names, -) - -# From centralized to federated data - -config = FedDatasetConfig(seed=0) -config.n_clients = 2 -config.replacement = False # ensure that clients do not share any data -config.client_names = ['client1', 'client2'] # Optional -flex_dataset = FedDataDistribution.from_config_with_huggingface_dataset(data=train_dataset, config=config, - X_columns=['id', 'title', 'context', 'question'], - label_columns=['answers'] - # X_columns=['input_ids', 'attention_mask'], - # label_columns=['start_positions', 'end_positions'] - ) - -# Init the server model and deploy it -@init_server_model -def build_server_model(): - server_flex_model = FlexModel() - - server_flex_model['model'] = AutoModelForQuestionAnswering.from_pretrained("distilbert-base-uncased") - # Required to store this for later stages of the FL training process - server_flex_model['training_args'] = TrainingArguments( - output_dir="my_awesome_qa_model", - # evaluation_strategy="epoch", - learning_rate=2e-5, - per_device_train_batch_size=16, - per_device_eval_batch_size=16, - num_train_epochs=3, - weight_decay=0.01, - ) - - # server_flex_model['trainer'] = trainer - - return server_flex_model - -flex_pool = FlexPool.client_server_pool(fed_dataset=flex_dataset, init_func=build_server_model) - -clients = flex_pool.clients -servers = flex_pool.servers -aggregators = flex_pool.aggregators - -print(f"Number of nodes in the pool {len(flex_pool)}: {len(servers)} server plus {len(clients)} clients. The server is also an aggregator") - -# Deploy the model -@deploy_server_model -def copy_server_model_to_clients(server_flex_model: FlexModel): - return deepcopy(server_flex_model) - -servers.map(copy_server_model_to_clients, clients) # Using the function created with the decorator -# servers.map(deploy_server_model_pt, clients) # Using the primitive function - -# Train each client's model -def train(client_flex_model: FlexModel, client_data: Dataset): - print("Training client") - model = client_flex_model['model'] - training_args = client_flex_model['training_args'] - # client_train_dataset = client_data.to_numpy() - X_data = client_data.X_data.tolist() - y_data = client_data.to_list() - client_train_dataset = preprocess_training_examples_as_lists(examples=X_data, answers_examples=y_data) - # breakpoint() - trainer = Trainer( - model = model, - args=training_args, - train_dataset=client_train_dataset, - # eval_dataset=validation_dataset, - tokenizer=tokenizer, - # data_collator=data_collator, - ) - trainer.train() - -clients.map(train) - -aggregators.map(collect_clients_weights_pt, clients) - -aggregators.map(fed_avg) - -aggregators.map(set_aggregated_weights_pt, servers) - -# TODO: Add the evaluate function -n_best = 20 -max_answer_length = 30 -predicted_answers = [] -metric = evaluate.load("squad") - -def compute_metrics(start_logits, end_logits, features, examples): - example_to_features = collections.defaultdict(list) - for idx, feature in enumerate(features): - example_to_features[feature["example_id"]].append(idx) - - predicted_answers = [] - for example in tqdm(examples): - example_id = example["id"] - context = example["context"] - answers = [] - - # Loop through all features associated with that example - for feature_index in example_to_features[example_id]: - start_logit = start_logits[feature_index] - end_logit = end_logits[feature_index] - offsets = features[feature_index]["offset_mapping"] - - start_indexes = np.argsort(start_logit)[-1 : -n_best - 1 : -1].tolist() - end_indexes = np.argsort(end_logit)[-1 : -n_best - 1 : -1].tolist() - for start_index in start_indexes: - for end_index in end_indexes: - # Skip answers that are not fully in the context - if offsets[start_index] is None or offsets[end_index] is None: - continue - # Skip answers with a length that is either < 0 or > max_answer_length - if ( - end_index < start_index - or end_index - start_index + 1 > max_answer_length - ): - continue - - answer = { - "text": context[offsets[start_index][0] : offsets[end_index][1]], - "logit_score": start_logit[start_index] + end_logit[end_index], - } - answers.append(answer) - - # Select the answer with the best score - if len(answers) > 0: - best_answer = max(answers, key=lambda x: x["logit_score"]) - predicted_answers.append( - {"id": example_id, "prediction_text": best_answer["text"]} - ) - else: - predicted_answers.append({"id": example_id, "prediction_text": ""}) - - theoretical_answers = [{"id": ex["id"], "answers": ex["answers"]} for ex in examples] - return metric.compute(predictions=predicted_answers, references=theoretical_answers) - -@evaluate_server_model -def evaluate_global_model(server_flex_model: FlexModel, test_data=None): - model = server_flex_model["model"] - training_args = server_flex_model["training_args"] - trainer = Trainer( - model = model, - args=training_args, - train_dataset=test_data, - # eval_dataset=validation_dataset, - tokenizer=tokenizer, - # data_collator=data_collator, - ) - predictions, _, _ = trainer.predict(test_data) - start_logits, end_logits = predictions - print(len(predictions[0])) - return compute_metrics(start_logits, end_logits, test_data, squad["test"]) - -# predictions, _, _ = trainer.predict(validation_dataset) -# start_logits, end_logits = predictions - -# print(len(predictions[0])) -# compute_metrics(start_logits, end_logits, validation_dataset, squad["test"]) -metrics = servers.map(evaluate_global_model, test_data=test_dataset) - -print(metrics) From 927261a4d989f421275196e354c27e9e2bc53406 Mon Sep 17 00:00:00 2001 From: cristinazuhe Date: Wed, 24 Jan 2024 10:57:12 +0100 Subject: [PATCH 2/5] Moved the notebooks out of flexnlp folder --- .../Federated IMDb PT using FLExible with a GRU.ipynb | 2 +- .../Federated QA with Hugginface using FLEXIBLE.ipynb | 2 +- .../Federated SS with SentenceTransformers using FLEXible.ipynb | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename {flexnlp/notebooks => notebooks}/Federated IMDb PT using FLExible with a GRU.ipynb (99%) rename {flexnlp/notebooks => notebooks}/Federated QA with Hugginface using FLEXIBLE.ipynb (99%) rename {flexnlp/notebooks => notebooks}/Federated SS with SentenceTransformers using FLEXible.ipynb (100%) diff --git a/flexnlp/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb b/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb similarity index 99% rename from flexnlp/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb rename to notebooks/Federated IMDb PT using FLExible with a GRU.ipynb index 6478bc4..747c893 100644 --- a/flexnlp/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb +++ b/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb @@ -703,7 +703,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.4" + "version": "3.11.3" }, "orig_nbformat": 4 }, diff --git a/flexnlp/notebooks/Federated QA with Hugginface using FLEXIBLE.ipynb b/notebooks/Federated QA with Hugginface using FLEXIBLE.ipynb similarity index 99% rename from flexnlp/notebooks/Federated QA with Hugginface using FLEXIBLE.ipynb rename to notebooks/Federated QA with Hugginface using FLEXIBLE.ipynb index d2fa8ce..6eb0eae 100644 --- a/flexnlp/notebooks/Federated QA with Hugginface using FLEXIBLE.ipynb +++ b/notebooks/Federated QA with Hugginface using FLEXIBLE.ipynb @@ -622,7 +622,7 @@ "metadata": {}, "outputs": [], "source": [ - "# train_n_rounds(5)" + "train_n_rounds(5)" ] } ], diff --git a/flexnlp/notebooks/Federated SS with SentenceTransformers using FLEXible.ipynb b/notebooks/Federated SS with SentenceTransformers using FLEXible.ipynb similarity index 100% rename from flexnlp/notebooks/Federated SS with SentenceTransformers using FLEXible.ipynb rename to notebooks/Federated SS with SentenceTransformers using FLEXible.ipynb From 2a4587e2ad78cbbe3bc47b6de36a565d8f0b36b9 Mon Sep 17 00:00:00 2001 From: cristinazuhe Date: Wed, 24 Jan 2024 14:59:50 +0100 Subject: [PATCH 3/5] Added some comments to the notebooks, still need to review the SA notebnook --- ...ed IMDb PT using FLExible with a GRU.ipynb | 34 ++++-- ...ed QA with Hugginface using FLEXIBLE.ipynb | 105 +++++++----------- ... SentenceTransformers using FLEXible.ipynb | 33 +++--- 3 files changed, 77 insertions(+), 95 deletions(-) diff --git a/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb b/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb index 747c893..7f3dba2 100644 --- a/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb +++ b/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb @@ -1,5 +1,16 @@ { "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# FLEXNLP tutorial: Sentiment Analysis using Pytorch.\n", + "\n", + "FLEXNLP is an extension of the FLEXible library, developed to add specify features for Natural Language Processing (NLP). We offer tools to adapt your code easily into a federated environment. If you are not familiar with FLEXible, we recommend first to look at the tutorials, in order to understand how to convert your centralized code into a federated one.\n", + "\n", + "In this notebook, we show how to federate a recurrent neural network (RNN) with PyTorch. We use some primitives from FLEXible, but you can create your own ones." + ] + }, { "cell_type": "code", "execution_count": null, @@ -7,9 +18,7 @@ "outputs": [], "source": [ "from copy import deepcopy\n", - "import numpy as np\n", "\n", - "from datasets.load import load_dataset\n", "import torch\n", "import torch.nn as nn\n", "import torch.nn.functional as F\n", @@ -26,8 +35,6 @@ "metadata": {}, "outputs": [], "source": [ - "\n", - "# device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n", "device = (\n", " \"cuda\"\n", " if torch.cuda.is_available()\n", @@ -43,7 +50,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Cargamos el dataset" + "Load the dataset. We use the torchtext library to load the IMDb dataset." ] }, { @@ -52,7 +59,6 @@ "metadata": {}, "outputs": [], "source": [ - "# imdb_dataset = load_dataset('imdb', split=['train', 'test']) # Get the dataset from huggingface library\n", "train_dataset, test_dataset = torchtext.datasets.IMDB() # Get the dataset from torchtext library\n", "unique_classes = set([label for (label, text) in train_dataset])\n", "num_classes = len(unique_classes)" @@ -62,7 +68,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Preparativos como los embeddings, el vocabulario, etc" + "As we are using an RNN, we have to prepare the embeddings, in this case the GloVe embeddings, and we need to create prepare the vocabulary for the embeddings layer." ] }, { @@ -72,7 +78,7 @@ "outputs": [], "source": [ "from torchtext.data.utils import get_tokenizer\n", - "from torchtext.vocab import GloVe, FastText, vocab" + "from torchtext.vocab import GloVe, vocab" ] }, { @@ -91,6 +97,13 @@ "print(f\"Shape of embeddings: {glove.vectors.shape}\")" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Once the embeddings and the vocabulary are loaded, we show how to use it." + ] + }, { "cell_type": "code", "execution_count": null, @@ -254,8 +267,7 @@ "from flex.model import FlexModel\n", "from flex.pool import FlexPool\n", "\n", - "from flex.pool.decorators import init_server_model\n", - "from flex.pool.decorators import deploy_server_model" + "from flex.pool.decorators import init_server_model" ] }, { @@ -288,7 +300,7 @@ " self.gru = nn.GRU(self.embedding_size,\n", " hidden_size,\n", " batch_first=True,\n", - " num_layers=1\n", + " num_layers=1,\n", " bidirectional=True,\n", " dropout=0.5\n", " )\n", diff --git a/notebooks/Federated QA with Hugginface using FLEXIBLE.ipynb b/notebooks/Federated QA with Hugginface using FLEXIBLE.ipynb index 6eb0eae..4106709 100644 --- a/notebooks/Federated QA with Hugginface using FLEXIBLE.ipynb +++ b/notebooks/Federated QA with Hugginface using FLEXIBLE.ipynb @@ -1,5 +1,16 @@ { "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# FLEXLP tutotial: Training a Question Answering model using Pytorch and Huggingface\n", + "\n", + "FLEXNLP is an extension of the FLEXible library, developed to add specify features for Natural Language Processing (NLP). We offer tools to adapt your code easily into a federated environment. If you are not familiar with FLEXible, we recommend first to look at the tutorials, in order to understand how to convert your centralized code into a federated one.\n", + "\n", + "In this notebook, we show how to federate a HuggingFace model for Question Answering. We use some primitives from FLEXible, but you can create your own ones." + ] + }, { "cell_type": "code", "execution_count": null, @@ -12,7 +23,6 @@ "from datasets import load_dataset\n", "from datasets import Dataset as HFDataset\n", "from transformers import AutoTokenizer\n", - "from transformers import DefaultDataCollator\n", "from transformers import AutoModelForQuestionAnswering, TrainingArguments, Trainer\n", "import collections\n", "import numpy as np\n", @@ -40,7 +50,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Load el dataset" + "### Load the dataset\n", + "\n", + "First we load the dataset. As there isn't federated datasets for this task, it is needed to load a centralized dataset and federate it. In this tutorial we are using the ´squad´ dataset from **Huggigface Datasets**. This dataset is usually used as a benchmark for question answering models, and it is compatible with FLEXIble, as we show below. For this tutorial we are using 1% of the data, to just show how to load the data and use the model. We split the data into train/test instead of using the train/test split from the dataset." ] }, { @@ -60,7 +72,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Preprocess" + "### Preprocess\n", + "\n", + "In order to use the dataset, we need to preprocess it to adapt the data into the expected input. We have created to different functions to preprocess the data, one for the training examples and another for the test/validation examples." ] }, { @@ -171,13 +185,9 @@ "\n", " inputs[\"example_id\"] = example_ids\n", " return inputs\n", - "\"\"\"\n", - "train_dataset = squad[\"train\"].map(\n", - " preprocess_training_examples,\n", - " batched=True,\n", - " remove_columns=squad[\"train\"].column_names,\n", - ")\n", - "\"\"\"\n", + "\n", + "train_dataset = squad[\"train\"]\n", + "\n", "test_dataset = squad[\"test\"].map(\n", " preprocess_validation_examples,\n", " batched=True,\n", @@ -185,33 +195,13 @@ ")\n" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "train_dataset = squad[\"train\"]\n", - "print(train_dataset)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# test_dataset = squad[\"test\"]\n", - "print(test_dataset)" - ] - }, { "cell_type": "markdown", "metadata": {}, "source": [ "# From centralized data to federated data\n", "\n", - "First we're going to federate the dataset using the FedDataDristibution class, that has functions to load multiple datasets from deep learning libraries such as PyTorch or TensorFlow. In this notebook we are using PyTorch, so we need to use the functions from the PyTorch ecosystem, and for the text datasets, we need to use the function `from_config_with_huggingface_dataset`." + "First we're going to federate the dataset using the FedDataDristibution class, that has functions to load multiple datasets from deep learning libraries such as PyTorch or TensorFlow. In this notebook we are using HuggingFace with PyTorch, so we need to use the primitives functions from the PyTorch ecosystem. The data is available in the *datasets* library, from HuggingFace, that's why here we use the function `from_config_with_huggingface_dataset`." ] }, { @@ -229,8 +219,6 @@ "flex_dataset = FedDataDistribution.from_config_with_huggingface_dataset(data=train_dataset, config=config,\n", " X_columns=['id', 'title', 'context', 'question'],\n", " label_columns=['answers']\n", - " # X_columns=['input_ids', 'attention_mask'],\n", - " # label_columns=['start_positions', 'end_positions']\n", " )" ] }, @@ -250,15 +238,14 @@ "from flex.data import Dataset\n", "\n", "test_dataset = Dataset.from_huggingface_dataset(test_dataset,\n", - " X_columns=['input_ids', 'attention_mask'])\n", - " # label_columns=['start_positions', 'end_positions'])" + " X_columns=['input_ids', 'attention_mask'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "# 2) Federate a model with FLEXible.\n", + "# Federate a model with FLEXible.\n", "\n", "Once we've federated the dataset, it's time to create the FlexPool. The FlexPool class is the one that simulates the real-time scenario for federated learning, so it is in charge of the communications across actors. " ] @@ -282,7 +269,7 @@ "source": [ "In this notebook we are going to simulate a client-server architecture, which we can easily build using the FlexPool class, using the function `client_server_architecture`. This function needs a FlexDataset, which we already have prepared, and a function to initialize the server model, which we have to create.\n", "\n", - "The model we are going to use is a simple LSTM, which will have the embeddings, the LSTM, a Linear layer and the output layer." + "The model we are going to use is `distilbert-base-uncased` for Question Answering, and we load it as follows." ] }, { @@ -307,8 +294,6 @@ " weight_decay=0.01,\n", " )\n", "\n", - " # server_flex_model['trainer'] = trainer\n", - "\n", " return server_flex_model" ] }, @@ -373,29 +358,6 @@ "As we have preprocesed the text before federating the data, and we are using the `Trainer` class from the Transformers library, we can train the client's models using the `train` function from the `Trainer` class" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "train_dataset" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from torch.utils.data import Dataset as TorchDataset\n", - "\n", - "class QADataset(TorchDataset):\n", - " def __init__(self, encodings, labels) -> None:\n", - " self.encodings = encodings\n", - " self.labels = labels" - ] - }, { "cell_type": "code", "execution_count": null, @@ -407,7 +369,6 @@ " print(\"Training client\")\n", " model = client_flex_model['model']\n", " training_args = client_flex_model['training_args']\n", - " # client_train_dataset = client_data.to_numpy()\n", " X_data = client_data.X_data.tolist()\n", " y_data = client_data.to_list()\n", " client_train_dataset = preprocess_training_examples_as_lists(examples=X_data, answers_examples=y_data)\n", @@ -480,7 +441,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Now it's turn to evaluate the global model. To do so, we have to create a function using the decoratod `evaluate_server_model`. \n", + "### Evaluate the model\n", + "\n", + "Now it's turn to evaluate the global model. To do so, we have to create a function using the decorator `evaluate_server_model`. \n", "\n", "In question answering we have to postprocess the predictions obtained, so we have created the function `compute_metrics` that will give us the performance of the model. Here we use the trainer function too. To do so, we creater a trainer instance in the server's FlexModel." ] @@ -491,6 +454,8 @@ "metadata": {}, "outputs": [], "source": [ + "from tqdm import tqdm\n", + "\n", "n_best = 20\n", "max_answer_length = 30\n", "predicted_answers = []\n", @@ -552,7 +517,6 @@ "metadata": {}, "outputs": [], "source": [ - "# TODO: Test the training phase on GPU before evaluating the model\n", "from flex.pool import evaluate_server_model\n", "\n", "\n", @@ -564,9 +528,7 @@ " model = model,\n", " args=training_args,\n", " train_dataset=test_data,\n", - " # eval_dataset=validation_dataset,\n", " tokenizer=tokenizer,\n", - " # data_collator=data_collator,\n", " )\n", " predictions, _, _ = trainer.predict(test_data)\n", " start_logits, end_logits = predictions\n", @@ -624,6 +586,15 @@ "source": [ "train_n_rounds(5)" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# End\n", + "\n", + "Congratulations, you have just trained a Question Answering model using the flexnlp library from the FLEXible environment." + ] } ], "metadata": { diff --git a/notebooks/Federated SS with SentenceTransformers using FLEXible.ipynb b/notebooks/Federated SS with SentenceTransformers using FLEXible.ipynb index e225a97..b0d5fea 100644 --- a/notebooks/Federated SS with SentenceTransformers using FLEXible.ipynb +++ b/notebooks/Federated SS with SentenceTransformers using FLEXible.ipynb @@ -4,7 +4,11 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Training a Semantic Similarity/Semantic Search with Sentence Transformers using FLEXible." + "# FLEXLP tutotial: Training a Semantic Similarity/Semantic Search with Sentence Transformers using Sentence Transformers.\n", + "\n", + "FLEXNLP is an extension of the FLEXible library, developed to add specify features for Natural Language Processing (NLP). We offer tools to adapt your code easily into a federated environment. If you are not familiar with FLEXible, we recommend first to look at the tutorials, in order to understand how to convert your centralized code into a federated one.\n", + "\n", + "In this notebook, we show how to federate a Sentence Transformers model. We use some primitives from FLEXible, but you can create your own ones." ] }, { @@ -15,14 +19,10 @@ "source": [ "from copy import deepcopy\n", "from datasets import load_dataset\n", - "from sentence_transformers import SentenceTransformer, models # , util\n", - "from sentence_transformers import InputExample\n", + "from sentence_transformers import SentenceTransformer, models\n", "from sentence_transformers import losses\n", - "from sentence_transformers.evaluation import EmbeddingSimilarityEvaluator, TripletEvaluator\n", - "import torch\n", - "import torch.nn as nn\n", - "from torch.utils.data import DataLoader\n", - "from datasets import Dataset as HFDataset" + "from sentence_transformers.evaluation import TripletEvaluator\n", + "import torch" ] }, { @@ -46,9 +46,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Load el dataset\n", + "### Load the dataset\n", "\n", - "First we load the dataset. As there isn't federated datasets for this task, it is needed to load a centralized dataset and federate it. In this tutorial we are using the ´embedding-data/QQP_triplets´ dataset from **Huggigface Datasets**." + "First we load the dataset. As there isn't federated datasets for this task, it is needed to load a centralized dataset and federate it. In this tutorial we are using the ´embedding-data/QQP_triplets´ dataset from **Huggigface Datasets**. We split the data into train/test, so we can evaluate in too on server's side." ] }, { @@ -74,7 +74,7 @@ "source": [ "# From centralized data to federated data\n", "\n", - "First we're going to federate the dataset using the FedDataDristibution class, that has functions to load multiple datasets from deep learning libraries such as PyTorch or TensorFlow. In this notebook we are using PyTorch, so we need to use the functions from the PyTorch ecosystem, and for the text datasets, we need to use the function `from_config_with_torchtext_dataset`." + "First we're going to federate the dataset using the FedDataDristibution class, that has functions to load multiple datasets from deep learning libraries such as PyTorch, TensorFlow or HuggingFace. In this notebook we are using PyTorch, so we need to use the functions from the PyTorch ecosystem with Huggingface, and for the text datasets, we need to use the function `from_config_with_huggingface_dataset`." ] }, { @@ -90,7 +90,7 @@ "config.replacement = False # ensure that clients do not share any data\n", "config.client_names = ['client1', 'client2'] # Optional\n", "flex_dataset = FedDataDistribution.from_config_with_huggingface_dataset(data=dataset, config=config,\n", - " X_columns=['set'], # 'title', 'context', 'question'],\n", + " X_columns=['set'],\n", " label_columns=['set']\n", " )" ] @@ -113,8 +113,7 @@ "from flex.model import FlexModel\n", "from flex.pool import FlexPool\n", "\n", - "from flex.pool.decorators import init_server_model\n", - "from flex.pool.decorators import deploy_server_model" + "from flex.pool.decorators import init_server_model" ] }, { @@ -123,7 +122,7 @@ "source": [ "In this notebook we are going to simulate a client-server architecture, which we can easily build using the FlexPool class, using the function `client_server_architecture`. This function needs a FlexDataset, which we already have prepared, and a function to initialize the server model, which we have to create.\n", "\n", - "The model we are going to use is a simple LSTM, which will have the embeddings, the LSTM, a Linear layer and the output layer." + "The model we are going to use is a `distilroberta-base` from the SentenceTransformers library." ] }, { @@ -198,7 +197,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "We need to prepare the data for the model. In this case, we have to create a DataLoader with de ´InputExample´ format from **SentenceTransformers**. We have commented the evaluator of the model in the clients, but we keep it on the server side. " + "We need to prepare the data for the model. We have created an adapter for a triplet dataset, so once you load the data into FLEXible, we can just use the `ss_triplet_input_adapter` function. We have commented the evaluator of the model in the clients, but we keep it on the server side. " ] }, { @@ -301,7 +300,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Now it's turn to evaluate the global model. To do so, we have to create a function using the decoratod `evaluate_server_model`." + "Now it's turn to evaluate the global model. To do so, we have to create a function using the decoratod `evaluate_server_model`. We use the adapter function too, and here we show how to evaluate the model. The **SentenceTransformers** library has evaluators depeding on the dataset, and using the function `ss_triplet_input_adapter` let us use it to evaluate the model within the seleted data. The results are saved into a csv file on a folder, by default *model_evaluation*." ] }, { From 8ca6d5962cb6c2d11399cfefe35fafc3e30f2778 Mon Sep 17 00:00:00 2001 From: AlArgente Date: Thu, 25 Jan 2024 09:25:40 +0100 Subject: [PATCH 4/5] Added the collate functions to the SA notebook. --- flexnlp/utils/collators/collate_functions.py | 4 +- ...ed IMDb PT using FLExible with a GRU.ipynb | 39 ++++++++++++------- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/flexnlp/utils/collators/collate_functions.py b/flexnlp/utils/collators/collate_functions.py index ba26457..a897ec0 100644 --- a/flexnlp/utils/collators/collate_functions.py +++ b/flexnlp/utils/collators/collate_functions.py @@ -42,8 +42,8 @@ def basic_collate_pad_sequence_classification(batch): function and the labels for the batch. """ label_list, text_list = [], [] - for (_label, _text) in batch: + for (_text, _label) in batch: label_list.append(_label) - text_list.append(torch.tensor(_text)) if not isinstance(_text, torch.tensor) else text_list.append(_text) + text_list.append(_text) if torch.is_tensor(_text) else text_list.append(torch.tensor(_text)) label_list = torch.tensor(label_list, dtype=torch.int64) return torch.nn.utils.rnn.pad_sequence(text_list, batch_first=True), label_list diff --git a/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb b/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb index 7f3dba2..b2a550a 100644 --- a/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb +++ b/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb @@ -405,6 +405,7 @@ "from tqdm import tqdm\n", "\n", "from torch.nn.utils.rnn import pad_sequence\n", + "from flexnlp.utils.collators import basic_collate_pad_sequence_classification\n", "\n", "BATCH_SIZE = 256\n", "NUM_EPOCHS = 10\n", @@ -443,6 +444,10 @@ " label_list = torch.tensor(label_list, dtype=torch.int64)\n", " return pad_sequence(text_list, padding_value=pad_index, batch_first=True), label_list\n", "\n", + "def preprocess_text(text):\n", + " text_transform = lambda x: [vocabulary[\"\"]]+[vocabulary[token] for token in spacy_tokenizer(x)]+[vocabulary[\"\"]]\n", + " return list(text_transform(clean_str(text)))\n", + "\n", "def batch_sampler_v2(batch_size, indices):\n", " random.shuffle(indices)\n", " pooled_indices = []\n", @@ -463,8 +468,17 @@ " client_flex_model['train_indices'] = train_indices\n", " else:\n", " train_indices = client_flex_model['train_indices']\n", + "\n", + " X_data = client_data.X_data.tolist()\n", + " X_data = [preprocess_text(text) for text in X_data]\n", + " y_data = client_data.y_data.tolist()\n", + " label_transform = lambda x: int(x) - 1\n", + " label_list = [label_transform(_label) for _label in y_data]\n", + "\n", + " client_data = client_data.from_array(X_data, label_list)\n", + "\n", " # batch_size=BATCH_SIZE, shuffle=True, # No es necesario usarlo porque usamos el batch_sampler\n", - " client_dataloader = DataLoader(client_data, collate_fn=collate_batch, batch_size=BATCH_SIZE,\n", + " client_dataloader = DataLoader(client_data, collate_fn=basic_collate_pad_sequence_classification, batch_size=BATCH_SIZE,\n", " shuffle=True)\n", " #  batch_sampler=batch_sampler_v2(BATCH_SIZE, train_indices))\n", " model = client_flex_model[\"model\"]\n", @@ -590,11 +604,16 @@ " model = model.to(device)\n", " criterion=server_flex_model['criterion']\n", " # get test data as a torchvision object\n", - " test_dataloader = DataLoader(test_dataset, batch_size=256, shuffle=True, pin_memory=False, collate_fn=collate_batch)\n", - " X_data, _ = test_dataset.to_list()\n", - " test_indices = [(i, len(tokenizer(s[0]))) for i, s in enumerate(X_data)]\n", - " test_dataloader = DataLoader(test_dataset, collate_fn=collate_batch,\n", - " batch_sampler=batch_sampler_v2(BATCH_SIZE, test_indices))\n", + " # test_dataloader = DataLoader(test_data, batch_size=256, shuffle=True, pin_memory=False, collate_fn=collate_batch)\n", + " X_data, y_data = test_data.X_data.tolist(), test_data.y_data.tolist()\n", + " X_data = [preprocess_text(text) for text in X_data]\n", + " label_transform = lambda x: int(x) - 1\n", + " label_list = [label_transform(_label) for _label in y_data]\n", + " # test_indices = [(i, len(tokenizer(s[0]))) for i, s in enumerate(X_data)]\n", + " test_data = test_data.from_array(X_data, label_list)\n", + " test_dataloader = DataLoader(test_data, batch_size=256, shuffle=True, pin_memory=False, collate_fn=basic_collate_pad_sequence_classification)\n", + " # test_dataloader = DataLoader(test_data, collate_fn=basic_collate_pad_sequence_classification,\n", + " #  batch_sampler=batch_sampler_v2(BATCH_SIZE, test_indices))\n", " losses = []\n", " with torch.no_grad():\n", " for data, target in test_dataloader:\n", @@ -605,12 +624,6 @@ " losses.append(loss.item())\n", " test_acc += (output.argmax(1) == target).sum().item()\n", " total_count += target.shape[0]\n", - " # print(f\"Prediciones: {pred.squeeze(dim=1)}\")\n", - " # print(f\"Output: {output.data.max(1, keepdim=True)}\")\n", - " # print(f\"Target: {target}\")\n", - " # print(pred.eq(target.data.view_as(pred)).long().cpu().sum().item())\n", - " # test_acc += pred.eq(target.data.view_as(pred)).long().cpu().sum().item()\n", - " # print(f\"Test accuracy: {test_acc}\")\n", "\n", " test_loss = sum(losses) / len(losses)\n", " test_acc /= total_count\n", @@ -715,7 +728,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.3" + "version": "3.11.4" }, "orig_nbformat": 4 }, From aa9407c0555d47f355f8660c32f370886b94df37 Mon Sep 17 00:00:00 2001 From: cristinazuhe Date: Thu, 25 Jan 2024 09:31:10 +0100 Subject: [PATCH 5/5] Notebook ready. Collators an adapters ready too. It would be ok to add 2 aggregators normally used on NLP problems. --- ...ed IMDb PT using FLExible with a GRU.ipynb | 49 ++++--------------- 1 file changed, 9 insertions(+), 40 deletions(-) diff --git a/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb b/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb index b2a550a..3a09787 100644 --- a/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb +++ b/notebooks/Federated IMDb PT using FLExible with a GRU.ipynb @@ -431,36 +431,10 @@ "\n", " return string.strip().lower()\n", "\n", - "def collate_batch(batch):\n", - " def preprocess_text(text):\n", - " text_transform = lambda x: [vocabulary[\"\"]]+[vocabulary[token] for token in spacy_tokenizer(x)]+[vocabulary[\"\"]]\n", - " return list(text_transform(clean_str(text)))\n", - " label_list, text_list = [], []\n", - " for (_text, _label) in batch:\n", - " label_transform = lambda x: int(x) - 1\n", - " label_list.append(label_transform(_label))\n", - " processed_text = torch.tensor(preprocess_text(_text))\n", - " text_list.append(processed_text)\n", - " label_list = torch.tensor(label_list, dtype=torch.int64)\n", - " return pad_sequence(text_list, padding_value=pad_index, batch_first=True), label_list\n", - "\n", "def preprocess_text(text):\n", " text_transform = lambda x: [vocabulary[\"\"]]+[vocabulary[token] for token in spacy_tokenizer(x)]+[vocabulary[\"\"]]\n", " return list(text_transform(clean_str(text)))\n", "\n", - "def batch_sampler_v2(batch_size, indices):\n", - " random.shuffle(indices)\n", - " pooled_indices = []\n", - " # create pool of indices with similar lengths \n", - " for i in range(0, len(indices), batch_size * 100):\n", - " pooled_indices.extend(sorted(indices[i:i + batch_size * 100], key=lambda x: x[1]))\n", - "\n", - " pooled_indices = [x[0] for x in pooled_indices]\n", - "\n", - " # yield indices for current batch\n", - " for i in range(0, len(pooled_indices), batch_size):\n", - " yield pooled_indices[i:i + batch_size]\n", - "\n", "def train(client_flex_model: FlexModel, client_data: Dataset):\n", " X_data, y_data = client_data.to_list()\n", " if 'train_indices' not in client_flex_model:\n", @@ -476,28 +450,20 @@ " label_list = [label_transform(_label) for _label in y_data]\n", "\n", " client_data = client_data.from_array(X_data, label_list)\n", - "\n", - " # batch_size=BATCH_SIZE, shuffle=True, # No es necesario usarlo porque usamos el batch_sampler\n", " client_dataloader = DataLoader(client_data, collate_fn=basic_collate_pad_sequence_classification, batch_size=BATCH_SIZE,\n", " shuffle=True)\n", - " #  batch_sampler=batch_sampler_v2(BATCH_SIZE, train_indices))\n", " model = client_flex_model[\"model\"]\n", - " # lr = 0.001\n", " optimizer = client_flex_model['optimizer_func'](model.parameters(), lr=0.01, **client_flex_model[\"optimizer_kwargs\"])\n", " model = model.train()\n", " model = model.to(device)\n", " criterion = client_flex_model[\"criterion\"]\n", - " # Al usar batch_sampler, hay que recargar el DataLoader en cada epoch.\n", " for _ in tqdm(range(NUM_EPOCHS)):\n", - " # client_dataloader = DataLoader(client_data, collate_fn=collate_batch,\n", - " # batch_sampler=batch_sampler_v2(BATCH_SIZE, train_indices))\n", " losses = []\n", " total_acc, total_count = 0, 0\n", " for texts, labels in client_dataloader:\n", " optimizer.zero_grad()\n", " texts, labels = texts.to(device), labels.to(device)\n", " predicted_labels = model(texts).squeeze(dim=0)\n", - " # pred = pred.squeeze(dim=0)\n", " loss = criterion(predicted_labels, labels)\n", " if predicted_labels.isnan().any():\n", " print(f\"Text in batch: {texts}\")\n", @@ -603,17 +569,13 @@ " total_count = 0\n", " model = model.to(device)\n", " criterion=server_flex_model['criterion']\n", - " # get test data as a torchvision object\n", - " # test_dataloader = DataLoader(test_data, batch_size=256, shuffle=True, pin_memory=False, collate_fn=collate_batch)\n", + " # Prepare the test data for the prediction\n", " X_data, y_data = test_data.X_data.tolist(), test_data.y_data.tolist()\n", " X_data = [preprocess_text(text) for text in X_data]\n", " label_transform = lambda x: int(x) - 1\n", " label_list = [label_transform(_label) for _label in y_data]\n", - " # test_indices = [(i, len(tokenizer(s[0]))) for i, s in enumerate(X_data)]\n", " test_data = test_data.from_array(X_data, label_list)\n", " test_dataloader = DataLoader(test_data, batch_size=256, shuffle=True, pin_memory=False, collate_fn=basic_collate_pad_sequence_classification)\n", - " # test_dataloader = DataLoader(test_data, collate_fn=basic_collate_pad_sequence_classification,\n", - " #  batch_sampler=batch_sampler_v2(BATCH_SIZE, test_indices))\n", " losses = []\n", " with torch.no_grad():\n", " for data, target in test_dataloader:\n", @@ -641,6 +603,13 @@ "metrics = servers.map(evaluate_global_model, test_data=test_dataset)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Show the metrics after evaluating the model." + ] + }, { "cell_type": "code", "execution_count": null, @@ -681,7 +650,7 @@ " pool.aggregators.map(fed_avg)\n", " # The aggregator send its aggregated weights to the server\n", " pool.aggregators.map(set_aggregated_weights_pt, pool.servers)\n", - " metrics = pool.servers.map(evaluate_global_model, test_data=test_imdb_dataset)\n", + " metrics = pool.servers.map(evaluate_global_model, test_data=test_dataset)\n", " loss, acc = metrics[0]\n", " print(f\"Server: Test acc: {acc:.4f}, test loss: {loss:.4f}\")" ]