-
Notifications
You must be signed in to change notification settings - Fork 1
/
server.py
80 lines (69 loc) · 2.18 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""Server implementation for FedLSF. Aggregation and Broadcasting using FedAvg"""
import ray
import torch
from model import Specformer
from train_class import Trainer_General
class Server:
"""Server Class"""
def __init__(
self,
class_num,
feature_dim,
nlayer,
hidden_dim,
num_heads,
tran_dropout,
feat_dropout,
prop_dropout,
norm,
trainers: list[Trainer_General],
):
self.feature_dim = feature_dim
self.nlayer = nlayer
self.hidden_dim = hidden_dim
self.num_heads = num_heads
self.tran_dropout = tran_dropout
self.feat_dropout = feat_dropout
self.prop_dropout = prop_dropout
self.norm = norm
self.model = Specformer(
class_num,
self.feature_dim,
self.nlayer,
self.hidden_dim,
self.num_heads,
self.tran_dropout,
self.feat_dropout,
self.prop_dropout,
self.norm,
)
self.trainers = trainers
self.num_of_trainers = len(trainers)
self.broadcast_params(-1)
@torch.no_grad()
def zero_params(self):
for p in self.model.parameters():
p.zero_()
@torch.no_grad()
def train(self, current_global_epoch):
for trainer in self.trainers:
trainer.train.remote(current_global_epoch)
params = [trainer.get_params.remote() for trainer in self.trainers]
self.zero_params()
while True:
ready, left = ray.wait(params, num_returns=1, timeout=None)
if ready:
for t in ready:
for p, mp in zip(ray.get(t), self.model.parameters()):
mp.data += p.cpu()
params = left
if not params:
break
for p in self.model.parameters():
p /= self.num_of_trainers
self.broadcast_params(current_global_epoch)
def broadcast_params(self, current_global_epoch):
for trainer in self.trainers:
trainer.update_params.remote(
tuple(self.model.parameters()), current_global_epoch
)