-
Notifications
You must be signed in to change notification settings - Fork 13
/
priorband_template.py
152 lines (124 loc) · 5.01 KB
/
priorband_template.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""Boilerplate code to optimize a simple PyTorch model using PriorBand.
NOTE!!! This code is not meant to be executed.
It is only to serve as a template to help interface NePS with an existing ML/DL pipeline.
The following script is designed as a template for using `PriorBand` from NePS.
It describes the crucial components that a user needs to provide in order to interface PriorBand.
The 2 crucial components are:
* The search space, called the `pipeline_space` in NePS
* This defines the set of hyperparameters that the optimizer will search over
* This declaration also allows injecting priors in the form of defaults per hyperparameter
* The `run_pipeline` function
* This function is called by the optimizer and is responsible for running the pipeline
* The function should at the minimum expect the hyperparameters as keyword arguments
* The function should return the loss of the pipeline as a float
* If the return value is a dictionary, it should have a key called "loss" with the loss as a float
Overall, running an optimizer from NePS involves 4 clear steps:
1. Importing neccessary packages including neps.
2. Designing the search space as a dictionary.
3. Creating the run_pipeline and returning the loss and other wanted metrics.
4. Using neps run with the optimizer of choice.
"""
import logging
import torch
import torch.nn as nn
import torch.nn.functional as F
import neps
from neps.utils.common import load_checkpoint, save_checkpoint
logger = logging.getLogger("neps_template.run")
def pipeline_space() -> dict:
# Create the search space based on NEPS parameters and return the dictionary.
# IMPORTANT:
space = dict(
lr=neps.Float(
lower=1e-5,
upper=1e-2,
log=True, # If True, the search space is sampled in log space
default=1e-3, # a non-None value here acts as the mode of the prior distribution
),
wd=neps.Float(
lower=0,
upper=1e-1,
log=True,
default=1e-3,
),
epoch=neps.Integer(
lower=1,
upper=10,
is_fidelity=True, # IMPORTANT to set this to True for the fidelity parameter
),
)
return space
def run_pipeline(
pipeline_directory, # The directory where the config is saved
previous_pipeline_directory, # The directory of the config's immediate lower fidelity
**config, # The hyperparameters to be used in the pipeline
) -> dict | float:
# Defining the model
# Can define outside the function or import from a file, package, etc.
class my_model(nn.Module):
def __init__(self) -> None:
super().__init__()
self.linear1 = nn.Linear(in_features=224, out_features=512)
self.linear2 = nn.Linear(in_features=512, out_features=10)
def forward(self, x):
x = F.relu(self.linear1(x))
x = self.linear2(x)
return x
# Instantiates the model
model = my_model()
# IMPORTANT: Extracting hyperparameters from passed config
learning_rate = config["lr"]
weight_decay = config["wd"]
# Initializing the optimizer
optimizer = torch.optim.Adam(
model.parameters(), lr=learning_rate, weight_decay=weight_decay
)
## Checkpointing
# loading the checkpoint if it exists
previous_state = load_checkpoint( # predefined function from neps
directory=previous_pipeline_directory,
model=model, # relies on pass-by-reference
optimizer=optimizer, # relies on pass-by-reference
)
# adjusting run budget based on checkpoint
if previous_state is not None:
epoch_already_trained = previous_state["epochs"]
# + Anything else saved in the checkpoint.
else:
epoch_already_trained = 0
# + Anything else with default value.
# Extracting target epochs from config
max_epochs = config["epoch"]
# User TODO:
# Load relevant data for training and validation
# Actual model training
for epoch in range(epoch_already_trained, max_epochs):
# Training loop
...
# Validation loop
...
logger.info(f"Epoch: {epoch}, Loss: {...}, Val. acc.: {...}")
# Save the checkpoint data in the current directory
save_checkpoint(
directory=pipeline_directory,
values_to_save={"epochs": max_epochs},
model=model,
optimizer=optimizer,
)
# Return a dictionary with the results, or a single float value (loss)
return {
"loss": ...,
"info_dict": {
"train_accuracy": ...,
"test_accuracy": ...,
},
}
# end of run_pipeline
if __name__ == "__main__":
neps.run(
run_pipeline=run_pipeline, # User TODO (defined above)
pipeline_space=pipeline_space(), # User TODO (defined above)
root_directory="results",
max_evaluations_total=25, # total number of times `run_pipeline` is called
searcher="priorband", # "priorband_bo" for longer budgets, and set `initial_design_size``
)