Skip to content

Commit

Permalink
Merge pull request #263 from dice-group/refactoring_memorymap
Browse files Browse the repository at this point in the history
Refactoring memorymap
  • Loading branch information
Demirrr authored Oct 25, 2024
2 parents 2eb5db4 + 64b0487 commit 5e82778
Show file tree
Hide file tree
Showing 18 changed files with 358 additions and 266 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Under the hood, dicee executes the run.py script and uses [lightning](https://li
# (1)
dicee --dataset_dir "KGs/UMLS" --model Keci --eval_model "train_val_test"
# (2)
CUDA_VISIBLE_DEVICES=0,1 python dicee/scripts/run.py --trainer PL --dataset_dir "KGs/UMLS" --model Keci --eval_model "train_val_test"
CUDA_VISIBLE_DEVICES=0,1 dicee --trainer PL --dataset_dir "KGs/UMLS" --model Keci --eval_model "train_val_test"
```
Similarly, models can be easily trained with torchrun
```bash
Expand All @@ -141,13 +141,13 @@ _:1 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07
<http://www.benchmark.org/family#hasChild> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#ObjectProperty> .
<http://www.benchmark.org/family#hasParent> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#ObjectProperty> .
```
**Continual Training:** the training phase of a pretrained model can be resumed.
**Continual Training:** the training phase of a pretrained model can be resumed. The model will saved in the same directory ``` --continual_learning "KeciFamilyRun"```.
```bash
dicee --continual_learning KeciFamilyRun --path_single_kg "KGs/Family/family-benchmark_rich_background.owl" --model Keci --path_to_store_single_run KeciFamilyRun --backend rdflib --eval_model None
dicee --continual_learning "KeciFamilyRun" --path_single_kg "KGs/Family/family-benchmark_rich_background.owl" --model Keci --backend rdflib --eval_model None
```

**Apart from n-triples or standard link prediction dataset formats, we support ["owl", "nt", "turtle", "rdf/xml", "n3"]***.
Moreover, a KGE model can be also trained by providing **an endpoint of a triple store**.
Moreover, a KGE model can be also trained by providing **an endpoint of a triple store**.
```bash
dicee --sparql_endpoint "http://localhost:3030/mutagenesis/" --model Keci
```
Expand Down
3 changes: 3 additions & 0 deletions dicee/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def __init__(self, **kwargs):
self.backend: str = "pandas"
"""Backend to read, process, and index input knowledge graph. pandas, polars and rdflib available"""

self.separator: str = "\s+"
"""separator for extracting head, relation and tail from a triple"""

self.trainer: str = 'torchCPUTrainer'
"""Trainer for knowledge graph embedding model"""

Expand Down
12 changes: 4 additions & 8 deletions dicee/dataset_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,7 @@ class OnevsAllDataset(torch.utils.data.Dataset):

def __init__(self, train_set_idx: np.ndarray, entity_idxs):
super().__init__()
assert isinstance(train_set_idx, np.memmap)

assert isinstance(train_set_idx, np.ndarray)
assert isinstance(train_set_idx, np.memmap) or isinstance(train_set_idx, np.ndarray)
assert len(train_set_idx) > 0
self.train_data = train_set_idx
self.target_dim = len(entity_idxs)
Expand Down Expand Up @@ -300,8 +298,7 @@ def __init__(self, train_set_idx: np.ndarray, entity_idxs, relation_idxs, form,
label_smoothing_rate: float = 0.0):
super().__init__()
assert len(train_set_idx) > 0
assert isinstance(train_set_idx, np.memmap)
assert isinstance(train_set_idx, np.ndarray)
assert isinstance(train_set_idx, np.memmap) or isinstance(train_set_idx, np.ndarray)
self.train_data = None
self.train_target = None
self.label_smoothing_rate = torch.tensor(label_smoothing_rate)
Expand Down Expand Up @@ -397,8 +394,7 @@ def __init__(self, train_set_idx: np.ndarray, entity_idxs, relation_idxs,
label_smoothing_rate=0.0):
super().__init__()
assert len(train_set_idx) > 0
assert isinstance(train_set_idx, np.memmap)
assert isinstance(train_set_idx, np.ndarray)
assert isinstance(train_set_idx, np.memmap) or isinstance(train_set_idx, np.ndarray)
self.train_data = None
self.train_target = None
self.label_smoothing_rate = torch.tensor(label_smoothing_rate)
Expand Down Expand Up @@ -689,7 +685,7 @@ def __len__(self):
return self.length

def __getitem__(self, idx):
return torch.from_numpy(self.train_set[idx])
return torch.from_numpy(self.train_set[idx].copy())

def collate_fn(self, batch: List[torch.Tensor]):
batch = torch.stack(batch, dim=0)
Expand Down
142 changes: 81 additions & 61 deletions dicee/executer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
import os
import datetime
from pytorch_lightning import seed_everything

from dicee.knowledge_graph import KG
from dicee.evaluator import Evaluator
# Avoid
from dicee.static_preprocess_funcs import preprocesses_input_args
from dicee.trainer import DICE_Trainer
from dicee.static_funcs import timeit, continual_training_setup_executor, read_or_load_kg, load_json, store
from .knowledge_graph import KG
from .evaluator import Evaluator
from .static_preprocess_funcs import preprocesses_input_args
from .trainer import DICE_Trainer
from .static_funcs import timeit, read_or_load_kg, load_json, store, create_experiment_folder
import numpy as np

logging.getLogger('pytorch_lightning').setLevel(0)
warnings.filterwarnings(action="ignore", category=DeprecationWarning)
Expand All @@ -34,43 +33,33 @@ def __init__(self, args, continuous_training=False):
# (3) Set the continual training flag
self.is_continual_training = continuous_training
# (4) Create an experiment folder or use the previous one
continual_training_setup_executor(self)
self.setup_executor()
# (5) A variable is initialized for pytorch lightning trainer or DICE_Trainer()
self.trainer = None
self.trained_model = None
# (6) A variable is initialized for storing input data.
self.knowledge_graph = None
# (7) Store few data in memory for numerical results, e.g. runtime, H@1 etc.
self.report = dict()
# (8) Create an object to carry out link prediction evaluations
self.evaluator = None # e.g. Evaluator(self)
# (8) Create an object to carry out link prediction evaluations, e.g. Evaluator(self)
self.evaluator = None
# (9) Execution start time
self.start_time = None

def read_or_load_kg(self):
print('*** Read or Load Knowledge Graph ***')
start_time = time.time()
kg = KG(dataset_dir=self.args.dataset_dir,
byte_pair_encoding=self.args.byte_pair_encoding,
padding=True if self.args.byte_pair_encoding and self.args.model != "BytE" else False,
add_noise_rate=self.args.add_noise_rate,
sparql_endpoint=self.args.sparql_endpoint,
path_single_kg=self.args.path_single_kg,
add_reciprocal=self.args.apply_reciprical_or_noise,
eval_model=self.args.eval_model,
read_only_few=self.args.read_only_few,
sample_triples_ratio=self.args.sample_triples_ratio,
path_for_serialization=self.args.full_storage_path,
path_for_deserialization=self.args.path_experiment_folder if hasattr(self.args,
'path_experiment_folder') else None,
backend=self.args.backend,
training_technique=self.args.scoring_technique)
print(f'Preprocessing took: {time.time() - start_time:.3f} seconds')
# (2) Share some info about data for easy access.
print(kg.description_of_input)
return kg

def read_preprocess_index_serialize_data(self) -> None:
def setup_executor(self) -> None:
if self.is_continual_training is False:
# Create a single directory containing KGE and all related data
if self.args.path_to_store_single_run:
os.makedirs(self.args.path_to_store_single_run, exist_ok=True)
self.args.full_storage_path = self.args.path_to_store_single_run
else:
self.args.full_storage_path = create_experiment_folder(folder_name=self.args.storage_path)

with open(self.args.full_storage_path + '/configuration.json', 'w') as file_descriptor:
temp = vars(self.args)
json.dump(temp, file_descriptor, indent=3)

def dept_read_preprocess_index_serialize_data(self) -> None:
""" Read & Preprocess & Index & Serialize Input Data
(1) Read or load the data from disk into memory.
Expand All @@ -84,9 +73,6 @@ def read_preprocess_index_serialize_data(self) -> None:
None
"""
# (1) Read & Preprocess & Index & Serialize Input Data.
self.knowledge_graph = self.read_or_load_kg()

# (2) Store the stats and share parameters
self.args.num_entities = self.knowledge_graph.num_entities
self.args.num_relations = self.knowledge_graph.num_relations
Expand All @@ -102,19 +88,6 @@ def read_preprocess_index_serialize_data(self) -> None:

self.report['runtime_kg_loading'] = time.time() - self.start_time

def load_indexed_data(self) -> None:
""" Load the indexed data from disk into memory
Parameter
----------
Return
----------
None
"""
self.knowledge_graph = read_or_load_kg(self.args, cls=KG)

@timeit
def save_trained_model(self) -> None:
""" Save a knowledge graph embedding model
Expand Down Expand Up @@ -143,18 +116,18 @@ def save_trained_model(self) -> None:
store(trainer=self.trainer,
trained_model=self.trained_model,
model_name='model',
full_storage_path=self.storage_path,
full_storage_path=self.args.full_storage_path,#self.storage_path,
save_embeddings_as_csv=self.args.save_embeddings_as_csv)
else:
store(trainer=self.trainer,
trained_model=self.trained_model,
model_name='model_' + str(datetime.datetime.now()),
full_storage_path=self.storage_path, save_embeddings_as_csv=self.args.save_embeddings_as_csv)
full_storage_path=self.args.full_storage_path,#self.storage_path,
save_embeddings_as_csv=self.args.save_embeddings_as_csv)

self.report['path_experiment_folder'] = self.storage_path
self.report['path_experiment_folder'] = self.args.full_storage_path#self.storage_path
self.report['num_entities'] = self.args.num_entities
self.report['num_relations'] = self.args.num_relations
self.report['path_experiment_folder'] = self.storage_path

def end(self, form_of_labelling: str) -> dict:
"""
Expand Down Expand Up @@ -188,6 +161,7 @@ def end(self, form_of_labelling: str) -> dict:

def write_report(self) -> None:
""" Report training related information in a report.json file """
# @TODO: Move to static funcs
# Report total runtime.
self.report['Runtime'] = time.time() - self.start_time
print(f"Total Runtime: {self.report['Runtime']:.3f} seconds")
Expand All @@ -213,15 +187,45 @@ def start(self) -> dict:
"""
self.start_time = time.time()
print(f"Start time:{datetime.datetime.now()}")
# (1) Loading the Data
# Load the indexed data from disk or read a raw data from disk into knowledge_graph attribute
self.load_indexed_data() if self.is_continual_training else self.read_preprocess_index_serialize_data()
# (1) Reload the memory-map of index knowledge graph stored as a numpy ndarray.
if self.args.path_to_store_single_run and os.path.exists(self.args.path_to_store_single_run+"/memory_map_train_set.npy"):
# (1.1) Read information about memory-map of KG.
with open(self.args.path_to_store_single_run+'/memory_map_details.json', 'r') as file_descriptor:
memory_map_details = json.load(file_descriptor)
self.knowledge_graph = np.memmap(self.args.path_to_store_single_run + '/memory_map_train_set.npy',
mode='r',
dtype=memory_map_details["dtype"],
shape=tuple(memory_map_details["shape"]))
self.args.num_entities = memory_map_details["num_entities"]
self.args.num_relations = memory_map_details["num_relations"]
self.args.num_tokens = None
self.args.max_length_subword_tokens = None
self.args.ordered_bpe_entities = None
else:
self.knowledge_graph = read_or_load_kg(self.args, cls=KG)
self.args.num_entities = self.knowledge_graph.num_entities
self.args.num_relations = self.knowledge_graph.num_relations
self.args.num_tokens = self.knowledge_graph.num_tokens
self.args.max_length_subword_tokens = self.knowledge_graph.max_length_subword_tokens
self.args.ordered_bpe_entities = self.knowledge_graph.ordered_bpe_entities
self.report['num_train_triples'] = len(self.knowledge_graph.train_set)
self.report['num_entities'] = self.knowledge_graph.num_entities
self.report['num_relations'] = self.knowledge_graph.num_relations
self.report['max_length_subword_tokens'] = self.knowledge_graph.max_length_subword_tokens if self.knowledge_graph.max_length_subword_tokens else None
self.report['runtime_kg_loading'] = time.time() - self.start_time
data={"shape":tuple(self.knowledge_graph.train_set.shape),
"dtype":self.knowledge_graph.train_set.dtype.str,
"num_entities":self.knowledge_graph.num_entities,
"num_relations":self.knowledge_graph.num_relations}
with open(self.args.full_storage_path + '/memory_map_details.json', 'w') as file_descriptor:
json.dump(data, file_descriptor, indent=4)

# (2) Create an evaluator object.
self.evaluator = Evaluator(args=self.args)
# (3) Create a trainer object.
self.trainer = DICE_Trainer(args=self.args,
is_continual_training=self.is_continual_training,
storage_path=self.storage_path,
storage_path=self.args.full_storage_path,
evaluator=self.evaluator)
# (4) Start the training
self.trained_model, form_of_labelling = self.trainer.start(knowledge_graph=self.knowledge_graph)
Expand Down Expand Up @@ -279,10 +283,26 @@ def continual_start(self) -> dict:
"""
# (1)
self.trainer = DICE_Trainer(args=self.args, is_continual_training=True,
self.trainer = DICE_Trainer(args=self.args,
is_continual_training=True,
storage_path=self.args.continual_learning)
# (2)
self.trained_model, form_of_labelling = self.trainer.continual_start()

assert os.path.exists(f"{self.args.continual_learning}/memory_map_train_set.npy")
# (1) Reload the memory-map of index knowledge graph stored as a numpy ndarray.
with open(f"{self.args.continual_learning}/memory_map_details.json", 'r') as file_descriptor:
memory_map_details = json.load(file_descriptor)
knowledge_graph = np.memmap(f"{self.args.continual_learning}/memory_map_train_set.npy",
mode='r',
dtype=memory_map_details["dtype"],
shape=tuple(memory_map_details["shape"]))
self.args.num_entities = memory_map_details["num_entities"]
self.args.num_relations = memory_map_details["num_relations"]
self.args.num_tokens = None
self.args.max_length_subword_tokens = None
self.args.ordered_bpe_entities = None

self.trained_model, form_of_labelling = self.trainer.continual_start(knowledge_graph)

# (5) Store trained model.
self.save_trained_model()
Expand All @@ -292,4 +312,4 @@ def continual_start(self) -> dict:
else:
self.evaluator = Evaluator(args=self.args, is_continual_training=True)
self.evaluator.dummy_eval(self.trained_model, form_of_labelling)
return {**self.report, **self.evaluator.report}
return {**self.report, **self.evaluator.report}
23 changes: 13 additions & 10 deletions dicee/knowledge_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import sys
import pandas as pd
import polars as pl
import numpy as np
from .read_preprocess_save_load_kg.util import load_numpy_ndarray
class KG:
""" Knowledge Graph """

Expand All @@ -18,7 +16,7 @@ def __init__(self, dataset_dir: str = None,
add_reciprocal: bool = None, eval_model: str = None,
read_only_few: int = None, sample_triples_ratio: float = None,
path_for_serialization: str = None,
entity_to_idx=None, relation_to_idx=None, backend=None, training_technique: str = None):
entity_to_idx=None, relation_to_idx=None, backend=None, training_technique: str = None, separator:str=None):
"""
:param dataset_dir: A path of a folder containing train.txt, valid.txt, test.text
:param byte_pair_encoding: Apply Byte pair encoding.
Expand All @@ -36,13 +34,14 @@ def __init__(self, dataset_dir: str = None,
:param training_technique
"""
self.dataset_dir = dataset_dir
self.sparql_endpoint = sparql_endpoint
self.path_single_kg = path_single_kg

self.byte_pair_encoding = byte_pair_encoding
self.ordered_shaped_bpe_tokens = None
self.sparql_endpoint = sparql_endpoint
self.add_noise_rate = add_noise_rate
self.num_entities = None
self.num_relations = None
self.path_single_kg = path_single_kg
self.path_for_deserialization = path_for_deserialization
self.add_reciprocal = add_reciprocal
self.eval_model = eval_model
Expand Down Expand Up @@ -72,6 +71,7 @@ def __init__(self, dataset_dir: str = None,
self.target_dim = None
self.train_target_indices = None
self.ordered_bpe_entities = None
self.separator=separator

if self.path_for_deserialization is None:
# Read a knowledge graph into memory
Expand All @@ -83,11 +83,14 @@ def __init__(self, dataset_dir: str = None,
else:
LoadSaveToDisk(kg=self).load()
assert len(self.train_set) > 0, "Training set is empty"
self._describe()
self.description_of_input=None
self.describe()


# TODO: Simplfy
if self.entity_to_idx is not None:
assert isinstance(self.entity_to_idx, dict) or isinstance(self.entity_to_idx,
pl.DataFrame), f"entity_to_idx must be a dict or a polars DataFrame: {type(self.entity_to_idx)}"
assert isinstance(self.entity_to_idx, dict) or isinstance(self.entity_to_idx, pd.DataFrame) or isinstance(self.entity_to_idx,
pl.DataFrame), f"entity_to_idx must be a dict or a pandas/polars DataFrame: {type(self.entity_to_idx)}"

if isinstance(self.entity_to_idx, dict):
self.idx_to_entity = {v: k for k, v in self.entity_to_idx.items()}
Expand All @@ -96,8 +99,8 @@ def __init__(self, dataset_dir: str = None,
print(f"No inverse mapping created as self.entity_to_idx is not a type of dictionary but {type(self.entity_to_idx)}\n"
f"Backend might be selected as polars")

def _describe(self) -> None:
self.description_of_input = f'\n------------------- Description of Dataset {self.dataset_dir} -------------------'
def describe(self) -> None:
self.description_of_input = f'\n------------------- Description of Dataset {self.dataset_dir if isinstance(self.dataset_dir, str) else self.sparql_endpoint if isinstance(self.sparql_endpoint, str) else self.path_single_kg} -------------------'
if self.byte_pair_encoding:
self.description_of_input += f'\nNumber of tokens:{self.num_tokens}' \
f'\nNumber of max sequence of sub-words: {self.max_length_subword_tokens}' \
Expand Down
Loading

0 comments on commit 5e82778

Please sign in to comment.