Skip to content

Commit

Permalink
modified injector and extractor classes
Browse files Browse the repository at this point in the history
  • Loading branch information
PhilippPlank committed Mar 1, 2024
1 parent a8cba42 commit d7e9348
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 138 deletions.
74 changes: 41 additions & 33 deletions src/lava/magma/runtime/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import typing as ty

import numpy as np
from lava.magma.core.model.py.ports import PyInPort, PyOutPort
from scipy.sparse import csr_matrix
from lava.magma.compiler.var_model import AbstractVarModel, LoihiSynapseVarModel
from lava.magma.core.process.message_interface_enum import ActorType
Expand Down Expand Up @@ -310,38 +309,9 @@ def _build_processes(self):
exception_q = Queue()
self.exception_q.append(exception_q)

for name, py_port in proc_builder.py_ports.items():
port = getattr(proc, name)

if port.external_pipe_flag:
if isinstance(port, InPort):
pypychannel = PyPyChannel(
message_infrastructure=self._messaging_infrastructure,
src_name="src",
dst_name=name,
shape=py_port.shape,
dtype=py_port.d_type,
size=64)

proc_builder.set_csp_ports([pypychannel.dst_port])

port.external_pipe_csp_send_port = pypychannel.src_port
port.external_pipe_csp_send_port.start()

if isinstance(port, OutPort):
pypychannel = PyPyChannel(
message_infrastructure=self._messaging_infrastructure,
src_name=name,
dst_name="dst",
shape=py_port.shape,
dtype=py_port.d_type,
size=64)

proc_builder.set_csp_ports([pypychannel.src_port])

port.external_pipe_csp_recv_port = pypychannel.dst_port
port.external_pipe_csp_recv_port.start()

# Create any external pypychannels
self._create_external_channels(proc, proc_builder)

Check notice on line 314 in src/lava/magma/runtime/runtime.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/lava/magma/runtime/runtime.py#L314

Trailing whitespace
self._messaging_infrastructure.build_actor(target_fn,
proc_builder,
exception_q)
Expand All @@ -357,6 +327,44 @@ def _build_runtime_services(self):
rs_builder,
self.exception_q[-1])

def _create_external_channels(self,
proc: AbstractProcess,
proc_builder: AbstractProcessBuilder):
"""Creates a csp channel which can be connected to/from a
non-procss/Lava python environment. This enables I/O to Lava from
external sources."""
for name, py_port in proc_builder.py_ports.items():
port = getattr(proc, name)

if port.external_pipe_flag:
if isinstance(port, InPort):
pypychannel = PyPyChannel(
message_infrastructure=self._messaging_infrastructure,
src_name="src",
dst_name=name,
shape=py_port.shape,
dtype=py_port.d_type,
size=port.external_pipe_buffer_size)

proc_builder.set_csp_ports([pypychannel.dst_port])

port.external_pipe_csp_send_port = pypychannel.src_port
port.external_pipe_csp_send_port.start()

if isinstance(port, OutPort):
pypychannel = PyPyChannel(
message_infrastructure=self._messaging_infrastructure,
src_name=name,
dst_name="dst",
shape=py_port.shape,
dtype=py_port.d_type,
size=port.external_pipe_buffer_size)

proc_builder.set_csp_ports([pypychannel.src_port])

port.external_pipe_csp_recv_port = pypychannel.dst_port
port.external_pipe_csp_recv_port.start()

def _get_resp_for_run(self):
"""
Gets response from RuntimeServices
Expand Down
58 changes: 22 additions & 36 deletions src/lava/proc/io/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import typing as ty

from lava.magma.core.process.process import AbstractProcess
from lava.magma.core.process.ports.ports import InPort, RefPort, Var
from lava.magma.core.process.ports.ports import InPort, OutPort, RefPort, Var
from lava.magma.core.resources import CPU
from lava.magma.core.decorator import implements, requires
from lava.magma.core.model.py.model import PyLoihiProcessModel
from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol
from lava.magma.core.model.py.type import LavaPyType
from lava.magma.core.model.py.ports import PyInPort, PyRefPort
from lava.magma.core.model.py.ports import PyInPort, PyOutPort, PyRefPort
from lava.magma.compiler.channels.pypychannel import PyPyChannel
from lava.magma.runtime.message_infrastructure.multiprocessing import \
MultiProcessing
Expand Down Expand Up @@ -51,9 +51,9 @@ class Extractor(AbstractProcess):
def __init__(self,
shape: ty.Tuple[int, ...],
buffer_size: ty.Optional[int] = 50,
channel_config: ty.Optional[utils.ChannelConfig] = None) -> \
None:
super().__init__()
channel_config: ty.Optional[utils.ChannelConfig] = None,
**kwargs) -> None:
super().__init__(shape_1=shape, **kwargs)

channel_config = channel_config or utils.ChannelConfig()

Expand All @@ -63,78 +63,64 @@ def __init__(self,

self._shape = shape

self._multi_processing = MultiProcessing()
self._multi_processing.start()

# Stands for ProcessModel to Process
pm_to_p = PyPyChannel(message_infrastructure=self._multi_processing,
src_name="src",
dst_name="dst",
shape=self._shape,
dtype=float,
size=buffer_size)
self._pm_to_p_dst_port = pm_to_p.dst_port
self._pm_to_p_dst_port.start()

self.proc_params["channel_config"] = channel_config
self.proc_params["pm_to_p_src_port"] = pm_to_p.src_port

self._receive_when_empty = channel_config.get_receive_empty_function()
self._receive_when_not_empty = \
channel_config.get_receive_not_empty_function()

self.in_port = InPort(shape=self._shape)
self.in_port = InPort(shape=shape)
self.out_port = OutPort(shape=shape)
self.out_port.flag_external_pipe(buffer_size=buffer_size)

def receive(self) -> np.ndarray:
"""Receive data from the ProcessModel.
The data is received from pm_to_p.dst_port.
The data is received from out_port.
Returns
----------
data : np.ndarray
Data received.
"""
elements_in_buffer = self._pm_to_p_dst_port._queue.qsize()
if not hasattr(self.out_port, 'external_pipe_csp_recv_port'):
raise AssertionError("The Runtime needs to be created before"
"calling <send>. Please use the method "
"<create_runtime> or <run> on your Lava"
" network before using <send>.")

elements_in_buffer = self.out_port.external_pipe_csp_recv_port._queue.qsize()

if elements_in_buffer == 0:
data = self._receive_when_empty(
self._pm_to_p_dst_port,
self.out_port.external_pipe_csp_recv_port,
np.zeros(self._shape))
else:
data = self._receive_when_not_empty(
self._pm_to_p_dst_port,
self.out_port.external_pipe_csp_recv_port,
np.zeros(self._shape),
elements_in_buffer)

return data

def __del__(self) -> None:
super().__del__()

self._multi_processing.stop()
self._pm_to_p_dst_port.join()


@implements(proc=Extractor, protocol=LoihiProtocol)
@requires(CPU)
class PyLoihiExtractorModel(PyLoihiProcessModel):
"""PyLoihiProcessModel for the Extractor Process."""
in_port: PyInPort = LavaPyType(PyInPort.VEC_DENSE, float)
out_port: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, float)

def __init__(self, proc_params: dict) -> None:
super().__init__(proc_params=proc_params)

channel_config = self.proc_params["channel_config"]
self._pm_to_p_src_port = self.proc_params["pm_to_p_src_port"]
self._pm_to_p_src_port.start()

self._send = channel_config.get_send_full_function()

def run_spk(self) -> None:
self._send(self._pm_to_p_src_port, self.in_port.recv())

def __del__(self) -> None:
self._pm_to_p_src_port.join()
self._send(self.out_port.csp_ports[-1],
self.in_port.recv())


class VarWire(AbstractProcess):
Expand Down
71 changes: 27 additions & 44 deletions src/lava/proc/io/injector.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
import typing as ty

from lava.magma.core.process.process import AbstractProcess
from lava.magma.core.process.ports.ports import OutPort
from lava.magma.core.process.ports.ports import InPort, OutPort
from lava.magma.core.resources import CPU
from lava.magma.core.decorator import implements, requires
from lava.magma.core.model.py.model import PyLoihiProcessModel
from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol
from lava.magma.core.model.py.type import LavaPyType
from lava.magma.core.model.py.ports import PyOutPort
from lava.magma.runtime.message_infrastructure.multiprocessing import \
MultiProcessing
from lava.magma.compiler.channels.pypychannel import PyPyChannel
from lava.magma.core.model.py.ports import PyInPort, PyOutPort
from lava.proc.io import utils


Expand Down Expand Up @@ -47,74 +44,63 @@ class Injector(AbstractProcess):
buffer is full and how the dst_port behaves when the buffer is empty
and not empty.
"""

def __init__(self,
shape: ty.Tuple[int, ...],
buffer_size: ty.Optional[int] = 50,
channel_config: ty.Optional[utils.ChannelConfig] = None) -> \
None:
super().__init__()
channel_config: ty.Optional[utils.ChannelConfig] = None,
**kwargs) -> None:
super().__init__(shape_1=shape, **kwargs)

channel_config = channel_config or utils.ChannelConfig()

utils.validate_shape(shape)
utils.validate_buffer_size(buffer_size)
utils.validate_channel_config(channel_config)

self._multi_processing = MultiProcessing()
self._multi_processing.start()

# Stands for Process to ProcessModel
p_to_pm = PyPyChannel(message_infrastructure=self._multi_processing,
src_name="src",
dst_name="dst",
shape=shape,
dtype=float,
size=buffer_size)
self._p_to_pm_src_port = p_to_pm.src_port
self._p_to_pm_src_port.start()
self.in_port = InPort(shape=shape)
self.in_port.flag_external_pipe(buffer_size=buffer_size)
self.out_port = OutPort(shape=shape)

self.proc_params["shape"] = shape
self.proc_params["channel_config"] = channel_config
self.proc_params["p_to_pm_dst_port"] = p_to_pm.dst_port

self._send = channel_config.get_send_full_function()

self.out_port = OutPort(shape=shape)


Check notice on line 68 in src/lava/proc/io/injector.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/lava/proc/io/injector.py#L68

Trailing whitespace
def send(self, data: np.ndarray) -> None:
"""Send data to the ProcessModel.
The data is sent through p_to_pm.src_port.
"""Send data to connected process.
Parameters
----------
data : np.ndarray
Data to be sent.
Raises
------
AssertionError
If the runtime of the Lava network was not created.
"""
self._send(self._p_to_pm_src_port, data)

def __del__(self) -> None:
super().__del__()

self._multi_processing.stop()
self._p_to_pm_src_port.join()
# The csp channel is created by the runtime
if hasattr(self.in_port, 'external_pipe_csp_send_port'):
self._send(self.in_port.external_pipe_csp_send_port, data)
else:
raise AssertionError("The Runtime needs to be created before"
"calling <send>. Please use the method "
"<create_runtime> or <run> on your Lava"
" network before using <send>.")


@implements(proc=Injector, protocol=LoihiProtocol)
@requires(CPU)
class PyLoihiInjectorModel(PyLoihiProcessModel):
"""PyLoihiProcessModel for the Injector Process."""
in_port: PyInPort = LavaPyType(PyInPort.VEC_DENSE, float)
out_port: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, float)

def __init__(self, proc_params: dict) -> None:
super().__init__(proc_params=proc_params)

shape = self.proc_params["shape"]
channel_config = self.proc_params["channel_config"]
self._p_to_pm_dst_port = self.proc_params["p_to_pm_dst_port"]
self._p_to_pm_dst_port.start()

self._zeros = np.zeros(shape)

self._receive_when_empty = channel_config.get_receive_empty_function()
Expand All @@ -123,19 +109,16 @@ def __init__(self, proc_params: dict) -> None:

def run_spk(self) -> None:
self._zeros.fill(0)
elements_in_buffer = self._p_to_pm_dst_port._queue.qsize()
elements_in_buffer = self.in_port.csp_ports[-1]._queue.qsize()

if elements_in_buffer == 0:
data = self._receive_when_empty(
self._p_to_pm_dst_port,
self.in_port,
self._zeros)
else:
data = self._receive_when_not_empty(
self._p_to_pm_dst_port,
self.in_port,
self._zeros,
elements_in_buffer)

self.out_port.send(data)

def __del__(self) -> None:
self._p_to_pm_dst_port.join()
Loading

0 comments on commit d7e9348

Please sign in to comment.