-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuild.py
153 lines (133 loc) · 5.16 KB
/
build.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import torch.distributed as dist
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data.dataloader import DataLoader
from dataset.dataset import FSC_Dataset
from model.Extractor import (Resnet18FPN, Resnet50FPN)
from model.Regressor import CountRegressor
import torch
import torch.nn as nn
from torch.optim import Adam, AdamW
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts, CosineAnnealingLR
from loss import BasicLoss
def build_optimizer(params, opt: dict):
# opt = opt["optimizer"]
optimizer_name = opt["name"].lower()
if optimizer_name == "adam":
optimizer = Adam(params, lr=opt["lr"], weight_decay=opt["weight_decay"])
elif optimizer_name == "adamw":
optimizer = AdamW(params, lr=opt["lr"], weight_decay=opt["weight_decay"])
else:
ValueError(f"Not availble optimzer f{optimizer_name}")
return optimizer
def build_model(opt: dict, extractor: bool = False) -> nn.Module:
# opt = opt["model"]
if extractor:
model_name = opt["extractor"]["name"].lower()
if model_name == "resnet18":
return Resnet18FPN()
elif model_name == "resnet50":
return Resnet50FPN()
else:
ValueError("Not available extractor")
else:
model_name = opt["regressor"]["name"].lower()
if model_name == "countregressor":
return CountRegressor(6, pool="mean")
else:
ValueError("Not availble regressor")
bn_momentum = opt["bn_momentum"]
if bn_momentum is not None:
for m in model.modules():
if isinstance(m, (nn.BatchNorm1d, nn.BatchNorm2d, nn.SyncBatchNorm)):
m.momentum = bn_momentum
return model
def build_criterion(opt: dict, name):
# opt = opt["loss"]
criterion_name = name.lower()
if criterion_name == "baseline" or criterion_name == "advanced":
loss = BasicLoss(mse_weight=opt["mse_weight"])
else:
ValueError("No Available Type of Loss")
return loss
def build_scheduler(opt: dict, optimizer, loader, start_epoch):
# opt = opt BE CAREFUL!
scheduler_type = opt["scheduler"]['name'].lower()
if scheduler_type == "onecycle":
scheduler = torch.optim.lr_scheduler.OneCycleLR( # noqa
optimizer,
max_lr=opt['optimizer']['lr'],
epochs=opt['train']['epoch'] + 1,
steps_per_epoch=len(loader) // opt["train"]["num_accum"],
cycle_momentum=opt["scheduler"].get("cycle_momentum", True),
base_momentum=0.85,
max_momentum=0.95,
pct_start=opt["scheduler"]["pct_start"],
last_epoch=start_epoch - 1,
div_factor=opt["scheduler"]['div_factor'],
final_div_factor=opt["scheduler"]['final_div_factor']
)
elif scheduler_type == "cos_annealing":
scheduler = CosineAnnealingLR(optimizer, T_max=opt["scheduler"]["t_max"], eta_min=opt["scheduler"]["eta_min"])
else:
raise ValueError(f"Unsupported scheduler type {scheduler_type}.")
return scheduler
def build_dataset(opt: dict, mode: str = "train") -> FSC_Dataset:
# opt = opt['dataset']
return FSC_Dataset(
data_path=opt["data_path"],
data_type=opt["data_type"],
mode=mode
)
def build_dataloader(dataset: FSC_Dataset, opt: dict, shuffle: bool = True) -> DataLoader:
# opt = opt["dataloader"]
if not dist.is_initialized():
return DataLoader(
dataset,
batch_size=opt["batch_size"],
shuffle=shuffle,
num_workers=opt.get("num_workers", 4),
pin_memory=True,
drop_last=shuffle,
)
else:
assert dist.is_available() and dist.is_initialized()
ddp_sampler = DistributedSampler(
dataset,
num_replicas=dist.get_world_size(),
rank=dist.get_rank(),
shuffle=shuffle,
drop_last=shuffle,
)
world_size = dist.get_world_size()
return DataLoader(
dataset,
batch_size=opt["batch_size"] // world_size,
num_workers=(opt.get("num_workers", 4) + world_size - 1) // world_size,
pin_memory=True,
sampler=ddp_sampler,
)
def split_params_for_optimizer(model, opt):
# opt = opt["optimizer"]
params_small_lr = []
params_small_lr_no_wd = []
params_base_lr = []
params_base_lr_no_wd = []
for param_name, param_value in model.named_parameters():
param_value: torch.Tensor
if "encoder" in param_name:
if param_value.ndim > 1:
params_small_lr.append(param_value)
else:
params_small_lr_no_wd.append(param_value)
else: # decoder
if param_value.ndim > 1:
params_base_lr.append(param_value)
else:
params_base_lr_no_wd.append(param_value)
params_for_optimizer = [
{"params": params_base_lr},
{"params": params_base_lr_no_wd, "weight_decay": 0.0},
{"params": params_small_lr, "lr": opt["lr"] * 0.1},
{"params": params_small_lr_no_wd, "lr": opt["lr"] * 0.1, "weight_decay": 0.0},
]
return params_for_optimizer