-
Notifications
You must be signed in to change notification settings - Fork 158
SigmaS4Delta Neuronmodel and Layer with Unittests #830
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
0f2dd9b
first wokring version
smm-ncl f44df1d
S4D model cleaned
smm-ncl dba843f
update license
smm-ncl dbb699e
fix imports
smm-ncl 4462206
linting
smm-ncl c9f7646
incorporate reviews
smm-ncl 77c7a40
Merge branch 'main' into S4
smm-ncl 6138d59
update docstring
smm-ncl File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
# See: https://spdx.org/licenses/ | ||
|
||
import numpy as np | ||
from typing import Any, Dict | ||
from lava.proc.sdn.models import AbstractSigmaDeltaModel | ||
from lava.magma.core.decorator import implements, requires, tag | ||
from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol | ||
from lava.proc.s4d.process import SigmaS4dDelta, SigmaS4dDeltaLayer | ||
from lava.magma.core.resources import CPU | ||
from lava.magma.core.model.py.ports import PyInPort, PyOutPort | ||
from lava.magma.core.model.py.type import LavaPyType | ||
from lava.magma.core.model.sub.model import AbstractSubProcessModel | ||
from lava.proc.sparse.process import Sparse | ||
|
||
|
||
class AbstractSigmaS4dDeltaModel(AbstractSigmaDeltaModel): | ||
a_in = None | ||
s_out = None | ||
|
||
# SigmaDelta Variables | ||
vth = None | ||
sigma = None | ||
act = None | ||
residue = None | ||
error = None | ||
state_exp = None | ||
bias = None | ||
|
||
# S4 Variables | ||
a = None | ||
b = None | ||
c = None | ||
s4_state = None | ||
s4_exp = None | ||
|
||
def __init__(self, proc_params: Dict[str, Any]) -> None: | ||
""" | ||
Sigma delta neuron model that implements S4D | ||
(as described by Gu et al., 2022) dynamics as its activation function. | ||
|
||
Relevant parameters in proc_params | ||
-------------------------- | ||
a: np.ndarray | ||
Diagonal elements of the state matrix of the S4D model. | ||
b: np.ndarray | ||
Diagonal elements of the input matrix of the S4D model. | ||
c: np.ndarray | ||
Diagonal elements of the output matrix of the S4D model. | ||
s4_state: np.ndarray | ||
State vector of the S4D model. | ||
""" | ||
super().__init__(proc_params) | ||
self.a = self.proc_params['a'] | ||
self.b = self.proc_params['b'] | ||
self.c = self.proc_params['c'] | ||
self.s4_state = self.proc_params['s4_state'] | ||
|
||
def activation_dynamics(self, sigma_data: np.ndarray) -> np.ndarray: | ||
"""Sigma Delta activation dynamics. Performs S4D dynamics. | ||
|
||
This function simulates the behavior of a linear time-invariant system | ||
with diagonalized state-space representation. | ||
(For reference see Gu et al., 2022) | ||
|
||
The state-space equations are given by: | ||
s4_state_{k+1} = A * s4_state_k + B * input_k | ||
act_k = C * s4_state_k | ||
|
||
where: | ||
- s4_state_k is the state vector at time step k, | ||
- input_k is the input vector at time step k, | ||
- act_k is the output vector at time step k, | ||
- A is the diagonal state matrix, | ||
- B is the diagonal input matrix, | ||
- C is the diagonal output matrix. | ||
|
||
The function computes the next output step of the | ||
system for the given input signal. | ||
|
||
Parameters | ||
---------- | ||
sigma_data: np.ndarray | ||
sigma decoded data | ||
|
||
Returns | ||
------- | ||
np.ndarray | ||
activation output | ||
""" | ||
|
||
self.s4_state = self.s4_state * self.a + sigma_data * self.b | ||
act = self.c * self.s4_state * 2 | ||
return act | ||
|
||
|
||
@implements(proc=SigmaS4dDelta, protocol=LoihiProtocol) | ||
@requires(CPU) | ||
@tag('floating_pt') | ||
class PySigmaS4dDeltaModelFloat(AbstractSigmaS4dDeltaModel): | ||
"""Floating point implementation of SigmaS4dDelta neuron.""" | ||
a_in = LavaPyType(PyInPort.VEC_DENSE, float) | ||
s_out = LavaPyType(PyOutPort.VEC_DENSE, float) | ||
|
||
vth: np.ndarray = LavaPyType(np.ndarray, float) | ||
sigma: np.ndarray = LavaPyType(np.ndarray, float) | ||
act: np.ndarray = LavaPyType(np.ndarray, float) | ||
residue: np.ndarray = LavaPyType(np.ndarray, float) | ||
error: np.ndarray = LavaPyType(np.ndarray, float) | ||
bias: np.ndarray = LavaPyType(np.ndarray, float) | ||
|
||
state_exp: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=3) | ||
cum_error: np.ndarray = LavaPyType(np.ndarray, bool, precision=1) | ||
spike_exp: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=3) | ||
s4_exp: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=3) | ||
|
||
# S4 vaiables | ||
s4_state: np.ndarray = LavaPyType(np.ndarray, float) | ||
a: np.ndarray = LavaPyType(np.ndarray, float) | ||
b: np.ndarray = LavaPyType(np.ndarray, float) | ||
c: np.ndarray = LavaPyType(np.ndarray, float) | ||
|
||
def run_spk(self) -> None: | ||
# Receive synaptic input | ||
a_in_data = self.a_in.recv() | ||
s_out = self.dynamics(a_in_data) | ||
self.s_out.send(s_out) | ||
|
||
|
||
@implements(proc=SigmaS4dDeltaLayer, protocol=LoihiProtocol) | ||
class SubDenseLayerModel(AbstractSubProcessModel): | ||
def __init__(self, proc): | ||
"""Builds (Sparse -> S4D -> Sparse) connection of the process.""" | ||
conn_weights = proc.proc_params.get("conn_weights") | ||
shape = proc.proc_params.get("shape") | ||
state_exp = proc.proc_params.get("state_exp") | ||
num_message_bits = proc.proc_params.get("num_message_bits") | ||
s4_exp = proc.proc_params.get("s4_exp") | ||
d_states = proc.proc_params.get("d_states") | ||
a = proc.proc_params.get("a") | ||
b = proc.proc_params.get("b") | ||
c = proc.proc_params.get("c") | ||
vth = proc.proc_params.get("vth") | ||
|
||
# Instantiate processes | ||
self.sparse1 = Sparse(weights=conn_weights.T, weight_exp=state_exp, | ||
num_message_bits=num_message_bits) | ||
self.sigma_S4d_delta = SigmaS4dDelta(shape=(shape[0] * d_states,), | ||
vth=vth, | ||
state_exp=state_exp, | ||
s4_exp=s4_exp, | ||
a=a, | ||
b=b, | ||
c=c) | ||
self.sparse2 = Sparse(weights=conn_weights, weight_exp=state_exp, | ||
num_message_bits=num_message_bits) | ||
|
||
# Make connections Sparse -> SigmaS4Delta -> Sparse | ||
proc.in_ports.s_in.connect(self.sparse1.in_ports.s_in) | ||
self.sparse1.out_ports.a_out.connect(self.sigma_S4d_delta.in_ports.a_in) | ||
self.sigma_S4d_delta.out_ports.s_out.connect(self.sparse2.s_in) | ||
self.sparse2.out_ports.a_out.connect(proc.out_ports.a_out) | ||
|
||
# Set aliases | ||
proc.vars.a.alias(self.sigma_S4d_delta.vars.a) | ||
proc.vars.b.alias(self.sigma_S4d_delta.vars.b) | ||
proc.vars.c.alias(self.sigma_S4d_delta.vars.c) | ||
proc.vars.s4_state.alias(self.sigma_S4d_delta.vars.s4_state) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: BSD-3-Clause | ||
# See: https://spdx.org/licenses/ | ||
|
||
import typing as ty | ||
import numpy as np | ||
from lava.magma.core.process.process import AbstractProcess | ||
from lava.magma.core.process.variable import Var | ||
from lava.magma.core.process.ports.ports import InPort, OutPort | ||
from lava.proc.sdn.process import ActivationMode, SigmaDelta | ||
|
||
|
||
class SigmaS4dDelta(SigmaDelta, AbstractProcess): | ||
def __init__( | ||
self, | ||
shape: ty.Tuple[int, ...], | ||
vth: ty.Union[int, float], | ||
a: float, | ||
b: float, | ||
c: float, | ||
state_exp: ty.Optional[int] = 0, | ||
s4_exp: ty.Optional[int] = 0) -> None: | ||
""" | ||
Sigma delta neuron process that implements S4D (described by | ||
Gu et al., 2022) dynamics as its activation function. | ||
|
||
This process simulates the behavior of a linear time-invariant system | ||
with diagonal state-space representation. | ||
The state-space equations are given by: | ||
s4_state_{k+1} = A * s4_state_k + B * inp_k | ||
act_k = C * s4_state_k | ||
|
||
where: | ||
- s4_state_k is the state vector at time step k, | ||
- inp_k is the input vector at time step k, | ||
- act_k is the output vector at time step k, | ||
- A is the diagonal state matrix, | ||
- B is the diagonal input matrix, | ||
- C is the diagonal output matrix. | ||
|
||
Parameters | ||
---------- | ||
shape: Tuple | ||
Shape of the sigma process. | ||
vth: int or float | ||
Threshold of the delta encoder. | ||
a: np.ndarray | ||
Diagonal elements of the state matrix of the S4D model. | ||
b: np.ndarray | ||
Diagonal elements of the input matrix of the S4D model. | ||
c: np.ndarray | ||
Diagonal elements of the output matrix of the S4D model. | ||
state_exp: int | ||
Scaling exponent with base 2 for the reconstructed sigma variables. | ||
Note: This should only be used for nc models. | ||
Default is 0. | ||
s4_exp: int | ||
Scaling exponent with base 2 for the S4 state variables. | ||
Note: This should only be used for nc models. | ||
Default is 0. | ||
""" | ||
|
||
super().__init__(shape=shape, | ||
vth=vth, | ||
a=a, | ||
b=b, | ||
c=c, | ||
s4_state=0, | ||
state_exp=state_exp, | ||
s4_exp=s4_exp) | ||
|
||
# Variables for S4 | ||
self.a = Var(shape=shape, init=a) | ||
self.b = Var(shape=shape, init=b) | ||
self.c = Var(shape=shape, init=c) | ||
self.s4_state = Var(shape=shape, init=0) | ||
self.s4_exp = Var(shape=(1,), init=s4_exp) | ||
|
||
|
||
class SigmaS4dDeltaLayer(AbstractProcess): | ||
def __init__( | ||
self, | ||
shape: ty.Tuple[int, ...], | ||
vth: ty.Union[int, float], | ||
a: float, | ||
b: float, | ||
c: float, | ||
d_states: ty.Optional[int] = 1, | ||
s4_exp: ty.Optional[int] = 0, | ||
num_message_bits: ty.Optional[int] = 24, | ||
state_exp: ty.Optional[int] = 0) -> None: | ||
""" | ||
Combines S4D neuron with Sparse Processes that allow for multiple | ||
d_states. | ||
|
||
Connectivity: Sparse -> SigmaS4dDelta -> Sparse. | ||
Relieves user from computing required connection weights for multiple | ||
d_states. | ||
|
||
Parameters | ||
---------- | ||
shape: Tuple | ||
Shape of the sigma process. | ||
vth: int or float | ||
Threshold of the delta encoder. | ||
a: np.ndarray | ||
Diagonal elements of the state matrix of the S4D model. | ||
b: np.ndarray | ||
Diagonal elements of the input matrix of the S4D model. | ||
c: np.ndarray | ||
Diagonal elements of the output matrix of the S4D model. | ||
d_states: int | ||
Number of hidden states of the S4D model. | ||
Default is 1. | ||
state_exp: int | ||
Scaling exponent with base 2 for the reconstructed sigma variables. | ||
Note: Only relevant for nc model. | ||
Default is 0. | ||
num_message_bits: int | ||
Number of message bits to be used in Sparse connection processes. | ||
Note: Only relevant for nc model. | ||
s4_exp: int | ||
Scaling exponent with base 2 for the S4 state variables. | ||
Note: Only relevant for nc model. | ||
Default is 0. | ||
""" | ||
|
||
# Automatically takes care of expansion and reduction of dimensionality | ||
# for multiple hidden states (d_states) | ||
conn_weights = np.kron(np.eye(shape[0]), np.ones(d_states)) | ||
s4_state = 0 | ||
super().__init__(shape=shape, | ||
vth=vth, | ||
a=a, | ||
b=b, | ||
c=c, | ||
s4_exp=s4_exp, | ||
s4_state=s4_state, | ||
conn_weights=conn_weights, | ||
num_message_bits=num_message_bits, | ||
d_states=d_states, | ||
state_exp=state_exp, | ||
act_mode=ActivationMode.UNIT) | ||
|
||
# Ports | ||
self.s_in = InPort(shape=shape) | ||
self.a_out = OutPort(shape=shape) | ||
|
||
# General variables | ||
self.state_exp = Var(shape=(1,), init=state_exp) | ||
|
||
# Variables for S4 | ||
self.a = Var(shape=(shape[0] * d_states,), init=a) | ||
self.b = Var(shape=(shape[0] * d_states,), init=b) | ||
self.c = Var(shape=(shape[0] * d_states,), init=c) | ||
self.s4_state = Var(shape=(shape[0] * d_states,), init=0) | ||
self.S4_exp = Var(shape=(1,), init=s4_exp) | ||
|
||
# Variables for connecting Dense processes | ||
# Project input_dim to input_dim * d_states | ||
self.conn_weights = Var(shape=shape, init=conn_weights) | ||
self.num_message_bits = Var(shape=(1,), init=num_message_bits) | ||
|
||
@property | ||
def shape(self) -> ty.Tuple[int, ...]: | ||
"""Return shape of the Process.""" | ||
return self.proc_params['shape'] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Git LFS file not shown
Git LFS file not shown
Git LFS file not shown
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.