-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathcapsulelayers.py
106 lines (91 loc) · 5.13 KB
/
capsulelayers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import mxnet as mx
def squash(data, squash_axis, name=''):
epsilon = 1e-08
s_squared_norm = mx.sym.sum(data=mx.sym.square(data, name='square_'+name),
axis=squash_axis, keepdims=True, name='s_squared_norm_'+name)
scale = s_squared_norm / (1 + s_squared_norm) / mx.sym.sqrt(data=(s_squared_norm+epsilon),
name='s_squared_norm_sqrt_'+name)
squashed_net = mx.sym.broadcast_mul(scale, data, name='squashed_net_'+name)
return squashed_net
def primary_caps(data, dim_vector, n_channels, kernel, strides, name=''):
out = mx.sym.Convolution(data=data,
num_filter=dim_vector * n_channels,
kernel=kernel,
stride=strides,
name=name
)
out = mx.sym.Reshape(data=out, shape=(0, -1, dim_vector))
out = squash(out, squash_axis=2)
return out
class CapsuleLayer:
"""
The capsule layer with dynamic routing.
[batch_size, input_num_capsule, input_dim_vector] => [batch_size, num_capsule, dim_vector]
"""
def __init__(self, num_capsule, dim_vector, batch_size, kernel_initializer, bias_initializer, num_routing=3):
self.num_capsule = num_capsule
self.dim_vector = dim_vector
self.batch_size = batch_size
self.num_routing = num_routing
self.kernel_initializer = kernel_initializer
self.bias_initializer = bias_initializer
def __call__(self, data):
_, out_shapes, __ = data.infer_shape(data=(self.batch_size, 1, 28, 28))
_, input_num_capsule, input_dim_vector = out_shapes[0]
# build w and bias
# W : (input_num_capsule, num_capsule, input_dim_vector, dim_vector)
# bias : (batch_size, input_num_capsule, num_capsule ,1, 1)
w = mx.sym.Variable('Weight',
shape=(1, input_num_capsule, self.num_capsule, input_dim_vector, self.dim_vector),
init=self.kernel_initializer)
bias = mx.sym.Variable('Bias',
shape=(self.batch_size, input_num_capsule, self.num_capsule, 1, 1),
init=self.bias_initializer)
bias = mx.sym.BlockGrad(bias)
bias_ = bias
# input : (batch_size, input_num_capsule, input_dim_vector)
# inputs_expand : (batch_size, input_num_capsule, 1, input_dim_vector, 1)
inputs_expand = mx.sym.Reshape(data=data, shape=(0, 0, -4, -1, 1))
inputs_expand = mx.sym.Reshape(data=inputs_expand, shape=(0, 0, -4, 1, -1, 0))
# input_tiled (batch_size, input_num_capsule, num_capsule, input_dim_vector, 1)
inputs_tiled = mx.sym.tile(data=inputs_expand, reps=(1, 1, self.num_capsule, 1, 1))
# w_tiled : [(1L, input_num_capsule, num_capsule, input_dim_vector, dim_vector)]
w_tiled = mx.sym.tile(w, reps=(self.batch_size, 1, 1, 1, 1))
# inputs_hat : [(1L, input_num_capsule, num_capsule, 1, dim_vector)]
inputs_hat = mx.sym.linalg_gemm2(w_tiled, inputs_tiled, transpose_a=True)
inputs_hat = mx.sym.swapaxes(data=inputs_hat, dim1=3, dim2=4)
inputs_hat_stopped = inputs_hat
inputs_hat_stopped = mx.sym.BlockGrad(inputs_hat_stopped)
for i in range(0, self.num_routing):
c = mx.sym.softmax(bias_, axis=2, name='c' + str(i))
if i == self.num_routing - 1:
outputs = squash(
mx.sym.sum(mx.sym.broadcast_mul(c, inputs_hat, name='broadcast_mul_' + str(i)),
axis=1, keepdims=True,
name='sum_' + str(i)), name='output_' + str(i), squash_axis=4)
else:
outputs = squash(
mx.sym.sum(mx.sym.broadcast_mul(c, inputs_hat_stopped, name='broadcast_mul_' + str(i)),
axis=1, keepdims=True,
name='sum_' + str(i)), name='output_' + str(i), squash_axis=4)
bias_ = bias_ + mx.sym.sum(mx.sym.broadcast_mul(c, inputs_hat_stopped, name='bias_broadcast_mul' + str(i)),
axis=4,
keepdims=True, name='bias_' + str(i))
outputs = mx.sym.Reshape(data=outputs, shape=(-1, self.num_capsule, self.dim_vector))
return outputs