Skip to content

Commit 8d13369

Browse files
author
gkarray
committed
prototype implementing injector/extractor function, not wrapped in a Process
1 parent 73499c2 commit 8d13369

File tree

3 files changed

+152
-2
lines changed

3 files changed

+152
-2
lines changed

src/lava/magma/core/process/ports/ports.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,17 @@ class OutPort(AbstractIOPort, AbstractSrcPort):
432432
sub processes.
433433
"""
434434

435+
def __init__(self, shape: ty.Tuple[int, ...]):
436+
super().__init__(shape)
437+
self.external_pipe_flag = False
438+
self.external_pipe_buffer_size = 64
439+
440+
def flag_external_pipe(self, buffer_size=None):
441+
self.external_pipe_flag = True
442+
443+
if buffer_size is not None:
444+
self.external_pipe_buffer_size = buffer_size
445+
435446
def connect(
436447
self,
437448
ports: ty.Union["AbstractIOPort", ty.List["AbstractIOPort"]],
@@ -493,6 +504,15 @@ def __init__(
493504
super().__init__(shape)
494505
self._reduce_op = reduce_op
495506

507+
self.external_pipe_flag = False
508+
self.external_pipe_buffer_size = 64
509+
510+
def flag_external_pipe(self, buffer_size=None):
511+
self.external_pipe_flag = True
512+
513+
if buffer_size is not None:
514+
self.external_pipe_buffer_size = buffer_size
515+
496516
def connect(self,
497517
ports: ty.Union["InPort", ty.List["InPort"]],
498518
connection_configs: ty.Optional[ConnectionConfigs] = None):

src/lava/magma/runtime/runtime.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import typing as ty
1111

1212
import numpy as np
13+
from lava.magma.core.model.py.ports import PyInPort, PyOutPort
1314
from scipy.sparse import csr_matrix
1415
from lava.magma.compiler.var_model import AbstractVarModel, LoihiSynapseVarModel
1516
from lava.magma.core.process.message_interface_enum import ActorType
@@ -26,7 +27,7 @@
2627
if ty.TYPE_CHECKING:
2728
from lava.magma.core.process.process import AbstractProcess
2829
from lava.magma.compiler.channels.pypychannel import CspRecvPort, CspSendPort, \
29-
CspSelector
30+
CspSelector, PyPyChannel
3031
from lava.magma.compiler.builders.channel_builder import (
3132
ChannelBuilderMp, RuntimeChannelBuilderMp, ServiceChannelBuilderMp,
3233
ChannelBuilderPyNc)
@@ -38,7 +39,7 @@
3839
ChannelType
3940
from lava.magma.compiler.executable import Executable
4041
from lava.magma.compiler.node import NodeConfig
41-
from lava.magma.core.process.ports.ports import create_port_id
42+
from lava.magma.core.process.ports.ports import create_port_id, InPort, OutPort
4243
from lava.magma.core.run_conditions import (AbstractRunCondition,
4344
RunContinuous, RunSteps)
4445
from lava.magma.compiler.channels.watchdog import WatchdogManagerInterface
@@ -308,6 +309,39 @@ def _build_processes(self):
308309
proc._runtime = self
309310
exception_q = Queue()
310311
self.exception_q.append(exception_q)
312+
313+
for name, py_port in proc_builder.py_ports.items():
314+
port = getattr(proc, name)
315+
316+
if port.external_pipe_flag:
317+
if isinstance(port, InPort):
318+
pypychannel = PyPyChannel(
319+
message_infrastructure=self._messaging_infrastructure,
320+
src_name="src",
321+
dst_name=name,
322+
shape=py_port.shape,
323+
dtype=py_port.d_type,
324+
size=64)
325+
326+
proc_builder.set_csp_ports([pypychannel.dst_port])
327+
328+
port.external_pipe_csp_send_port = pypychannel.src_port
329+
port.external_pipe_csp_send_port.start()
330+
331+
if isinstance(port, OutPort):
332+
pypychannel = PyPyChannel(
333+
message_infrastructure=self._messaging_infrastructure,
334+
src_name=name,
335+
dst_name="dst",
336+
shape=py_port.shape,
337+
dtype=py_port.d_type,
338+
size=64)
339+
340+
proc_builder.set_csp_ports([pypychannel.src_port])
341+
342+
port.external_pipe_csp_recv_port = pypychannel.dst_port
343+
port.external_pipe_csp_recv_port.start()
344+
311345
self._messaging_infrastructure.build_actor(target_fn,
312346
proc_builder,
313347
exception_q)
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Copyright (C) 2022 Intel Corporation
2+
# SPDX-License-Identifier: LGPL 2.1 or later
3+
# See: https://spdx.org/licenses/
4+
import threading
5+
import unittest
6+
import typing as ty
7+
8+
import numpy as np
9+
10+
from lava.magma.core.decorator import implements, requires
11+
from lava.magma.core.model.py.model import PyLoihiProcessModel
12+
from lava.magma.core.model.py.ports import PyOutPort, PyInPort
13+
from lava.magma.core.model.py.type import LavaPyType
14+
from lava.magma.core.process.ports.ports import OutPort, InPort
15+
from lava.magma.core.process.process import AbstractProcess
16+
from lava.magma.core.resources import CPU
17+
from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol
18+
from lava.magma.core.run_conditions import RunSteps
19+
from lava.magma.core.run_configs import Loihi2SimCfg
20+
21+
22+
class Process1(AbstractProcess):
23+
def __init__(self, shape_1: ty.Tuple, **kwargs):
24+
super().__init__(shape_1=shape_1, **kwargs)
25+
26+
self.in_1 = InPort(shape=shape_1)
27+
self.out_1 = OutPort(shape=shape_1)
28+
29+
30+
@implements(proc=Process1, protocol=LoihiProtocol)
31+
@requires(CPU)
32+
class LoihiDenseSpkPyProcess1PM(PyLoihiProcessModel):
33+
in_1: PyInPort = LavaPyType(PyInPort.VEC_DENSE, float)
34+
out_1: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, float)
35+
36+
def __init__(self, proc_params):
37+
super().__init__(proc_params)
38+
39+
def run_spk(self):
40+
print(f"Receiving in Process...")
41+
data_1 = self.in_1.recv()
42+
print(f"Received {data_1} in Process...")
43+
44+
print(f"Sending {data_1} from Process...")
45+
self.out_1.send(data_1)
46+
print(f"Sent {data_1} from Process!")
47+
48+
49+
class TestExternalPipeIO(unittest.TestCase):
50+
def test_run_steps_non_blocking(self):
51+
data = [[1], [2], [3], [4], [5]]
52+
53+
relay = Process1(shape_1=(1,))
54+
# Control buffer size with buffer_size arg, default is 64
55+
relay.in_1.flag_external_pipe()
56+
# Control buffer size with buffer_size arg, default is 64
57+
relay.out_1.flag_external_pipe()
58+
59+
run_cfg = Loihi2SimCfg()
60+
run_condition = RunSteps(num_steps=5, blocking=False)
61+
62+
def thread_inject_fn() -> None:
63+
for send_data_single_item in data:
64+
print(f"Sending {send_data_single_item} from thread_inject...")
65+
# Use probe() before send() to know whether or not send() will
66+
# block (i.e if the buffer of external_pipe_csp_send_port
67+
# is full).
68+
relay.in_1.external_pipe_csp_send_port.send(
69+
np.array(send_data_single_item))
70+
print(f"Sent {send_data_single_item} from thread_inject!")
71+
72+
def thread_extract_fn() -> None:
73+
for _ in range(len(data)):
74+
print(f"Receiving in thread_extract...")
75+
# Use probe() before recv() to know whether or not recv() will
76+
# block (i.e if the buffer of external_pipe_csp_recv_port
77+
# is empty).
78+
received_data = relay.out_1.external_pipe_csp_recv_port.recv()
79+
print(f"Received {received_data} in thread_extract!")
80+
81+
thread_inject = threading.Thread(target=thread_inject_fn,
82+
daemon=True)
83+
thread_extract = threading.Thread(target=thread_extract_fn,
84+
daemon=True)
85+
86+
relay.run(condition=run_condition, run_cfg=run_cfg)
87+
88+
thread_inject.start()
89+
thread_extract.start()
90+
91+
relay.wait()
92+
relay.stop()
93+
94+
95+
if __name__ == "__main__":
96+
unittest.main()

0 commit comments

Comments
 (0)