-
Notifications
You must be signed in to change notification settings - Fork 3
/
model.py
163 lines (133 loc) · 6.55 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import os
import sys
import glob
import argparse
import torch
import torch.nn as nn
from torchvision import models
import torch.nn.functional as F
import torch.optim as optim
from utils import my_DepthNorm, my_predict
# ______________________________________Monodepth Autoencoder Network__________________________________________________
class UpSample(nn.Sequential):
def __init__(self, skip_input, output_features):
super(UpSample, self).__init__()
self.convA = nn.Conv2d(skip_input, output_features, kernel_size=3, stride=1, padding=1)
self.leakyreluA = nn.LeakyReLU(0.2)
self.convB = nn.Conv2d(output_features, output_features, kernel_size=3, stride=1, padding=1)
self.leakyreluB = nn.LeakyReLU(0.2)
def forward(self, x, concat_with):
up_x = F.interpolate(x, size=[concat_with.size(2), concat_with.size(3)], mode='bilinear', align_corners=True)
return self.leakyreluB( self.convB( self.convA( torch.cat([up_x, concat_with], dim=1) ) ) )
class Decoder(nn.Module):
def __init__(self, num_features=1664, decoder_width = 1.0):
super(Decoder, self).__init__()
features = int(num_features * decoder_width)
self.conv2 = nn.Conv2d(num_features, features, kernel_size=1, stride=1, padding=0)
self.up1 = UpSample(skip_input=features//1 + 256, output_features=features//2)
self.up2 = UpSample(skip_input=features//2 + 128, output_features=features//4)
self.up3 = UpSample(skip_input=features//4 + 64, output_features=features//8)
self.up4 = UpSample(skip_input=features//8 + 64, output_features=features//16)
self.conv3 = nn.Conv2d(features//16, 1, kernel_size=3, stride=1, padding=1)
def forward(self, features):
x_block0, x_block1, x_block2, x_block3, x_block4 = features[3], features[4], features[6], features[8], features[12]
x_d0 = self.conv2(F.relu(x_block4))
x_d1 = self.up1(x_d0, x_block3)
x_d2 = self.up2(x_d1, x_block2)
x_d3 = self.up3(x_d2, x_block1)
x_d4 = self.up4(x_d3, x_block0)
return self.conv3(x_d4)
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.original_model = models.densenet169( pretrained=False )
def forward(self, x):
features = [x]
for k, v in self.original_model.features._modules.items(): features.append( v(features[-1]) )
return features
class PTModel(nn.Module):
def __init__(self):
super(PTModel, self).__init__()
self.encoder = Encoder()
self.decoder = Decoder()
def forward(self, x):
return self.decoder( self.encoder(x) )
# __________________________________________Downsampling Block_____________________________________________________
class WPNet(nn.Module):
"""Waypoint Prediction Network"""
def __init__(self, input_shape, monodepth_model, lr=0.001):
# Don't define flatten and Relu in constructor
super(WPNet, self).__init__()
self.monodepth_model = monodepth_model # change
# Layer freezing -- change later to unfreeze some layers - currently all layers freezed
for param in self.monodepth_model.parameters():
param.requires_grad = False
self.input_shape = input_shape # input batch shape
self.conv_net = torch.nn.Sequential(
nn.Conv2d(4, 16, 3), # tweak its input
# nn.BatchNorm2d(16),
nn.ReLU(),
nn.MaxPool2d(2, 2),
nn.Conv2d(16, 32, 3),
# nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(2, 2),
nn.Conv2d(32, 64, 3),
# nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(2, 2))
# Hack to find the shape to input to flatten layer
with torch.no_grad():
dummy = torch.zeros((input_shape))
x = self.monodepth_model(dummy)
dummy = F.interpolate(dummy, scale_factor=(0.5,0.5), mode='nearest')
x = torch.cat([dummy, x], 1)
x = self.conv_net(x)
s = x.shape
fc_size = s[1] * s[2] * s[3]
self.fc_net = torch.nn.Sequential(
nn.Linear(fc_size, 500),
# nn.BatchNorm1d(500),
nn.Dropout(p=0.2),
nn.Linear(500, 100),
# nn.BatchNorm1d(100),
# nn.Dropout(p=0.2),
nn.Linear(100, 20))
self.fc3 = nn.Linear(20, 3)
self.fc4 = nn.Linear(20, 4)
# Seperation of carts and quats maybe due to different scaling
def cartesian_forward(self, input_batch):
"""Downsampling block to predict x,y,z waypoints"""
x = self.monodepth_model(input_batch)
input_batch = F.interpolate(input_batch, scale_factor=(0.5,0.5), mode='nearest')
x = torch.cat([input_batch, x], 1)
x = self.conv_net(x)
x = torch.flatten(x, 1)
x = self.fc_net(x)
output = self.fc3(x)
return output
def quaternion_forward(self, input_batch):
"""Downsampling block to predict rotational state in qw, qx, qy, qz quaternions"""
x = self.monodepth_model(input_batch)
input_batch = F.interpolate(input_batch, scale_factor=(0.5,0.5), mode='nearest')
x = torch.cat([input_batch, x], 1)
x = self.conv_net(x)
x = torch.flatten(x, 1)
x = self.fc_net(x)
output = self.fc4(x)
return output
def forward(self, input_batch):
"""Returns a vector of waypoints and quarternions - not sure if I wanna return that for n frames or not ??"""
trans = self.cartesian_forward(input_batch)
rot = self.quaternion_forward(input_batch)
output = torch.cat([trans, rot], dim=1)
return output
# _____________________________________________________________________________________________________________________
if __name__ == "__main__":
tensor = torch.zeros((4, 3, 432, 768))
model = PTModel().float()
model.load_state_dict(torch.load("models/nyu.pt"))
wpnet = WPNet(tuple(tensor.shape), model)
# print(wpnet.cartesian_forward(tensor))
# print(wpnet.quaternion_forward(tensor))
print(wpnet.forward(tensor))