Skip to content

Commit

Permalink
Add VGSLModelGenerator.model_to_vgsl() function
Browse files Browse the repository at this point in the history
  • Loading branch information
TimKoornstra committed Sep 29, 2023
1 parent a6112f0 commit 607acfe
Show file tree
Hide file tree
Showing 5 changed files with 494 additions and 21 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ jobs:
env:
TF_CPP_MIN_LOG_LEVEL: '2'

- name: Test model to VGSL
if: always()
run: |
python -m unittest tests/test_model_to_vgsl.py
env:
TF_CPP_MIN_LOG_LEVEL: '2'

- name: Test replacing model layers
if: always()
run: |
Expand Down
16 changes: 15 additions & 1 deletion src/custom_layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


class ResidualBlock(layers.Layer):
def __init__(self, filters, x, y, initializer, downsample):
def __init__(self, filters, x, y, initializer=tf.initializers.GlorotNormal, downsample=False):
super().__init__()
self.downsample = downsample
self.conv1 = layers.Conv2D(filters,
Expand Down Expand Up @@ -41,6 +41,20 @@ def __init__(self, filters, x, y, initializer, downsample):
self.elu_layer = layers.ELU()
self.bn_layer = layers.BatchNormalization()

def get_config(self):
config = super().get_config()
config.update({
'filters': self.conv1.filters,
'x': self.conv1.kernel_size[0],
'y': self.conv1.kernel_size[1],

# Serializing the initializer
'initializer': tf.keras.initializers.serialize(
self.conv1.kernel_initializer),
'downsample': self.downsample
})
return config

def call(self, x, training=False):
y = self.conv1(x)
y = self.conv2(y)
Expand Down
121 changes: 111 additions & 10 deletions src/vgsl_model_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,101 @@ def print_models():
# Helper functions #
########################

@staticmethod
def model_to_vgsl(model):
def get_dropout(dropout, recurrent_dropout=0):
"""Helper function to generate dropout specifications."""

dropout_spec = f",D{int(dropout*100)}" if dropout > 0 else ""
recurrent_dropout_spec = f",Rd{int(recurrent_dropout*100)}" \
if recurrent_dropout > 0 else ""

return f"{dropout_spec}{recurrent_dropout_spec}"

def get_stride_spec(strides):
"""Helper function to generate stride specifications."""

return f",{strides[0]},{strides[1]}" if strides != (1, 1) else ""

vgsl_parts = []
activation_map = {'sigmoid': 's', 'tanh': 't', 'relu': 'r',
'elu': 'e', 'linear': 'l', 'softmax': 'm'}

# Map Input layer
if isinstance(model.layers[0], tf.keras.layers.InputLayer):
input_shape = model.layers[0].input_shape[0]
start_idx = 1
else:
input_shape = model.layers[0].input_shape
start_idx = 0
vgsl_parts.append(
f"{input_shape[0]},{input_shape[2]},{input_shape[1]},"
f"{input_shape[3]}")

for idx in range(start_idx, len(model.layers)):
layer = model.layers[idx]
if isinstance(layer, tf.keras.layers.Conv2D):
act = activation_map[layer.get_config()["activation"]]

vgsl_parts.append(
f"C{act}{layer.kernel_size[0]},{layer.kernel_size[1]},"
f"{layer.filters}{get_stride_spec(layer.strides)}")

elif isinstance(layer, tf.keras.layers.Dense):
act = activation_map[layer.get_config()["activation"]]
prefix = "O1" if idx == len(model.layers) - 1 or isinstance(
model.layers[idx + 1], tf.keras.layers.Activation) else "F"

vgsl_parts.append(f"{prefix}{act}{layer.units}")

elif isinstance(layer, (tf.keras.layers.LSTM,
tf.keras.layers.GRU)):
act = 'L' if isinstance(layer, tf.keras.layers.LSTM) else 'G'
direction = 'r' if layer.go_backwards else 'f'
return_sequences = "s" if layer.return_sequences else ""

vgsl_parts.append(
f"{act}{direction}{return_sequences}{layer.units}"
f"{get_dropout(layer.dropout, layer.recurrent_dropout)}")

elif isinstance(layer, layers.Bidirectional):
wrapped_layer = layer.layer
cell_type = 'l' if isinstance(
wrapped_layer, tf.keras.layers.LSTM) else 'g'
dropout = get_dropout(wrapped_layer.dropout,
wrapped_layer.recurrent_dropout)

vgsl_parts.append(
f"B{cell_type}{wrapped_layer.units}{dropout}")

elif isinstance(layer, layers.BatchNormalization):
vgsl_parts.append("Bn")

elif isinstance(layer, layers.MaxPooling2D):
vgsl_parts.append(
f"Mp{layer.pool_size[0]},{layer.pool_size[1]},"
f"{layer.strides[0]},{layer.strides[1]}")

elif isinstance(layer, layers.AveragePooling2D):
vgsl_parts.append(
f"Ap{layer.pool_size[0]},{layer.pool_size[1]},"
f"{layer.strides[0]},{layer.strides[1]}")

elif isinstance(layer, layers.Dropout):
vgsl_parts.append(f"D{int(layer.rate*100)}")

elif isinstance(layer, layers.Reshape):
vgsl_parts.append("Rc")

elif isinstance(layer, ResidualBlock):
downsample_spec = "d" if layer.downsample else ""

vgsl_parts.append(
f"RB{downsample_spec}{layer.conv1.kernel_size[0]},"
f"{layer.conv1.kernel_size[1]},{layer.conv1.filters}")

return " ".join(vgsl_parts)

@ staticmethod
def get_units_or_outputs(layer: str) -> int:
"""
Expand Down Expand Up @@ -780,16 +875,18 @@ def lstm_generator(self,
"format: L(f|r)[s]<n>[,D<rate>,Rd<rate>].")

direction, summarize, n, dropout, recurrent_dropout = match.groups()
dropout = 0 if dropout is None else int(dropout.replace('D',""))
recurrent_dropout = 0 if recurrent_dropout is None else int(recurrent_dropout.replace("Rd",""))
dropout = 0 if dropout is None else int(dropout.replace('D', ""))
recurrent_dropout = 0 if recurrent_dropout is None else int(
recurrent_dropout.replace("Rd", ""))

# Check if the dropout is valid
if dropout < 0 or dropout > 100:
raise ValueError("Dropout rate must be in the range [0, 100].")

# Check if the recurrent dropout is valid
if recurrent_dropout < 0 or recurrent_dropout > 100:
raise ValueError("Recurrent dropout rate must be in the range [0, 100].")
raise ValueError(
"Recurrent dropout rate must be in the range [0, 100].")

n = int(n)

Expand Down Expand Up @@ -854,16 +951,18 @@ def gru_generator(self,
"format: G(f|r)[s]<n>[,D<rate>,Rd<rate>].")

direction, summarize, n, dropout, recurrent_dropout = match.groups()
dropout = 0 if dropout is None else int(dropout.replace('D',""))
recurrent_dropout = 0 if recurrent_dropout is None else int(recurrent_dropout.replace("Rd",""))
dropout = 0 if dropout is None else int(dropout.replace('D', ""))
recurrent_dropout = 0 if recurrent_dropout is None else int(
recurrent_dropout.replace("Rd", ""))

# Check if the dropout is valid
if dropout < 0 or dropout > 100:
raise ValueError("Dropout rate must be in the range [0, 100].")

# Check if the recurrent dropout is valid
if recurrent_dropout < 0 or recurrent_dropout > 100:
raise ValueError("Recurrent dropout rate must be in the range [0, 100].")
raise ValueError(
"Recurrent dropout rate must be in the range [0, 100].")

# Convert n to integer
n = int(n)
Expand Down Expand Up @@ -938,16 +1037,18 @@ def bidirectional_generator(self,
"(recurrent) dropout rate.")

layer_type, units, dropout, recurrent_dropout = match.groups()
dropout = 0 if dropout is None else int(dropout.replace('D',""))
recurrent_dropout = 0 if recurrent_dropout is None else int(recurrent_dropout.replace("Rd",""))
dropout = 0 if dropout is None else int(dropout.replace('D', ""))
recurrent_dropout = 0 if recurrent_dropout is None else int(
recurrent_dropout.replace("Rd", ""))

# Check if the dropout is valid
if dropout < 0 or dropout > 100:
raise ValueError("Dropout rate must be in the range [0, 100].")

# Check if the recurrent dropout is valid
if recurrent_dropout < 0 or recurrent_dropout > 100:
raise ValueError("Recurrent dropout rate must be in the range [0, 100].")
raise ValueError(
"Recurrent dropout rate must be in the range [0, 100].")

units = int(units)

Expand Down Expand Up @@ -1127,7 +1228,7 @@ def get_output_layer(self,

if linearity == "s":
return layers.Dense(classes,
activation='softmax',
activation='sigmoid',
kernel_initializer=self._initializer)
elif linearity == "l":
return layers.Dense(classes,
Expand Down
20 changes: 10 additions & 10 deletions tests/test_model_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# > Third party dependencies
import numpy as np

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import activations

Expand Down Expand Up @@ -420,7 +421,6 @@ def test_lstm_error_handling(self):
"LSTM layer Lf128,D is of unexpected format. Expected "
"format: L(f|r)[s]<n>[,D<rate>,Rd<rate>].")


# Invalid dropout rate (value greater than 100)
vgsl_spec_string = "None,64,None,1 Lf128,D101 O1s10"
with self.assertRaises(ValueError) as context:
Expand Down Expand Up @@ -591,19 +591,19 @@ def test_bidirectional_error_handling(self):
self.VGSLModelGenerator(vgsl_spec_string)
self.assertEqual(str(context.exception),
"Layer B128 is of unexpected format. "
"Expected format: B(g|l)<n>[,D<rate>,Rd<rate>] where 'g' stands for "
"GRU, 'l' stands for LSTM, 'n' is the number of units, 'rate' is the"
"(recurrent) dropout rate.")
"Expected format: B(g|l)<n>[,D<rate>,Rd<rate>] where 'g' stands for "
"GRU, 'l' stands for LSTM, 'n' is the number of units, 'rate' is the"
"(recurrent) dropout rate.")

# Invalid RNN layer type
vgsl_spec_string = "None,64,None,1 Rc Bx128 O1s10"
with self.assertRaises(ValueError) as context:
self.VGSLModelGenerator(vgsl_spec_string)
self.assertEqual(str(context.exception),
"Layer Bx128 is of unexpected format. "
"Expected format: B(g|l)<n>[,D<rate>,Rd<rate>] where 'g' stands for "
"GRU, 'l' stands for LSTM, 'n' is the number of units, 'rate' is the"
"(recurrent) dropout rate.")
"Expected format: B(g|l)<n>[,D<rate>,Rd<rate>] where 'g' stands for "
"GRU, 'l' stands for LSTM, 'n' is the number of units, 'rate' is the"
"(recurrent) dropout rate.")

# Invalid number of units (negative)
vgsl_spec_string = "None,64,None,1 Rc Bg-128 O1s10"
Expand All @@ -618,9 +618,9 @@ def test_bidirectional_error_handling(self):
self.VGSLModelGenerator(vgsl_spec_string)
self.assertEqual(str(context.exception),
"Layer Bl128,D is of unexpected format. "
"Expected format: B(g|l)<n>[,D<rate>,Rd<rate>] where 'g' stands for "
"GRU, 'l' stands for LSTM, 'n' is the number of units, 'rate' is the"
"(recurrent) dropout rate.")
"Expected format: B(g|l)<n>[,D<rate>,Rd<rate>] where 'g' stands for "
"GRU, 'l' stands for LSTM, 'n' is the number of units, 'rate' is the"
"(recurrent) dropout rate.")

# Invalid dropout rate (negative value)
vgsl_spec_string = "None,64,None,1 Bl128,D-50 O1s10"
Expand Down
Loading

0 comments on commit 607acfe

Please sign in to comment.