Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/synnaxlabs/synnax into rc
Browse files Browse the repository at this point in the history
  • Loading branch information
emilbon99 committed Aug 18, 2024
2 parents 83d4856 + 2cb4ab6 commit 4a9ac36
Show file tree
Hide file tree
Showing 14 changed files with 602 additions and 105 deletions.
21 changes: 21 additions & 0 deletions client/py/examples/control/abort/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
To run this example, you'll need three python shells open. In the first shell, start the
simulated_daq:

```bash
python simulated_daq.py
```

In the second shell, start the abort sequence listener:

```bash
python abort_sequence.py
```

In the third shell, run the nominal sequence:

```bash
python nominal_sequence.py
```

We highly recommend using the Synnax [Console](https://docs.synnaxlabs.com/reference/console/get-started)
to visualize the data in these examples.
41 changes: 41 additions & 0 deletions client/py/examples/control/abort/abort_sequence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright 2024 Synnax Labs, Inc.
#
# Use of this software is governed by the Business Source License included in the file
# licenses/BSL.txt.
#
# As of the Change Date specified in that file, in accordance with the Business Source
# License, use of this software will be governed by the Apache License, Version 2.0,
# included in the file licenses/APL.txt.

import synnax as sy
import time

# We've logged in via the CLI, so there's no need to provide credentials here. See
# https://docs.synnaxlabs.com/reference/python-client/get-started for more information.
client = sy.Synnax()

# Define the control channel names
PRESS_VALVE = "press_vlv_cmd"
VENT_VALVE = "vent_vlv_cmd"
PRESSURE = "pressure"

# Open a control sequence under a context manager, so that the control is released when
# the block exits
with client.control.acquire(
name="Abort Sequence",
# Defines the authority the control sequence has by default over the channels.
# A value of 100 is lower than the default value of 200 in the nominal sequence
# i.e. until the abort condition is met, the nominal sequence will have control.
write_authorities=[100],
write=[PRESS_VALVE, VENT_VALVE],
read=[PRESSURE],
) as controller:
# Wait until we hit an abort condition.
controller.wait_until(lambda c: c[PRESSURE] > 30)
# Change the control authority to the highest level - 1. This is higher than
# the 200 value in the nominal sequence, so the abort sequence will take control.
controller.set_authority({PRESS_VALVE: 254, VENT_VALVE: 254})
# Vent the system
controller.set({PRESS_VALVE: False, VENT_VALVE: True})
# Hold control until the user presses Ctrl+C
time.sleep(1e6)
86 changes: 86 additions & 0 deletions client/py/examples/control/abort/nominal_sequence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2024 Synnax Labs, Inc.
#
# Use of this software is governed by the Business Source License included in the file
# licenses/BSL.txt.
#
# As of the Change Date specified in that file, in accordance with the Business Source
# License, use of this software will be governed by the Apache License, Version 2.0,
# included in the file licenses/APL.txt.

import synnax as sy
import time

# We've logged in via the CLI, so there's no need to provide credentials here. See
# https://docs.synnaxlabs.com/reference/python-client/get-started for more information.
client = sy.Synnax()

# Define the control channel names
PRESS_VALVE = "press_vlv_cmd"
VENT_VALVE = "vent_vlv_cmd"
PRESSURE = "pressure"

# Open a control sequence under a context manager, so that the control is released when
# the block exits
with client.control.acquire(
name="Press Sequence",
# Defines the authorities at which the sequence controls the valve channels.
#
# ####
# Notice that we take a higher control authority here than we do at the start of the
# abort sequence (which is at 100). This means that the abort sequence will take
# control OVER this sequence.
# ####
write_authorities=[200],
# We need to set the channels we'll be writing to and reading from.
write=[PRESS_VALVE, VENT_VALVE],
read=[PRESSURE],
) as controller:
# Mark the start of the sequence
start = sy.TimeStamp.now()

# Close the vent valve
controller[VENT_VALVE] = False

# Set the initial target pressure
curr_target = 20

# Pressurize the system five times in 20 psi increments
for i in range(5):
# Open the pressurization valve
controller[PRESS_VALVE] = True
if controller.wait_until(
# Wait until the pressure is greater than the current target
lambda c: c[PRESSURE] > curr_target,
# If the pressure doesn't reach the target in 20 seconds, break the loop and
# vent the system
timeout=20 * sy.TimeSpan.SECOND,
):
# Close the pressurization valve
controller[PRESS_VALVE] = False
# Wait for 2 seconds
time.sleep(2)
# Increment the target
curr_target += 20
else:
break

# Vent the system
controller[VENT_VALVE] = True

# Wait until the pressure is less than 5 psi
controller.wait_until(lambda c: c[PRESSURE] < 5)

# Close the vent valve
controller[VENT_VALVE] = False

# Mark the end of the sequence
end = sy.TimeStamp.now()

# Label the sequence with the end time
client.ranges.create(
name=f"Auto Pressurization Sequence {end}",
time_range=sy.TimeRange(
start=start,
end=end,
),
)
161 changes: 161 additions & 0 deletions client/py/examples/control/abort/simulated_daq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Copyright 2024 Synnax Labs, Inc.
#
# Use of this software is governed by the Business Source License included in the file
# licenses/BSL.txt.
#
# As of the Change Date specified in that file, in accordance with the Business Source
# License, use of this software will be governed by the Apache License, Version 2.0,
# included in the file licenses/APL.txt.

import synnax as sy
import time

"""
This is a simple simulated data acquisition computer that has two valves and a single
pressure sensor. When the press valve is open (press_vlv), the pressure increases. When
the vent valve is open (vent_vlv), the pressure decreases. The pressure is sampled at a
Valves (or any commanded actuator), typically has three associated channels:
1. The command channel - this is where commands are sent down to actuate the valve.
2. The command channel time - stores the timestamps for the command channel,
and 'indexes' the command channel.
3. The state channel - this is where the state of the valve is stored. The DAQ updates
this value when a command is executed. The state channel is indexed by the regular
DAQ timestamp channel.
If you want to add another valve to your simulation, follow the same pattern explained
above using the code below as reference.
"""

# We've logged in via the CLI, so there's no need to provide credentials here. See
# https://docs.synnaxlabs.com/reference/python-client/get-started for more information.
client = sy.Synnax()

# This will store the timestamps for the samples recorded by the simulated DAQ.
daq_time_ch = client.channels.create(
name="daq_time",
is_index=True,
data_type=sy.DataType.TIMESTAMP,
retrieve_if_name_exists=True,
)

# A pressure channel to store simulated pressure values.
pressure = client.channels.create(
name="pressure",
# This says that timestamps are stored in the channel 'daq_time'.
index=daq_time_ch.key,
data_type=sy.DataType.FLOAT32,
retrieve_if_name_exists=True,
)

# Stores the state of the press valve.
press_vlv_state = client.channels.create(
name="press_vlv_state",
# Again, notice that we're storing the timestamps in the 'daq_time' channel. This
# is because the DAQ samples from the pressure, vent valve state, and press valve
# state channels at the same time.
index=daq_time_ch.key,
data_type=sy.DataType.UINT8,
retrieve_if_name_exists=True,
)

# Stores the state of the vent valve.
vent_vlv_state = client.channels.create(
name="vent_vlv_state",
# Once again, we're storing the timestamps in the 'daq_time' channel.
index=daq_time_ch.key,
data_type=sy.DataType.UINT8,
retrieve_if_name_exists=True,
)

# An independent time channel for the press valve command, because it doesn't
# necessarily get emitted in sync with any other channels.
press_vlv_cmd_time_ch = client.channels.create(
name="press_vlv_cmd_time",
is_index=True,
data_type=sy.DataType.TIMESTAMP,
retrieve_if_name_exists=True,
)

# An independent time channel for the vent valve command, because it doesn't
# necessarily get emitted in sync with any other channels.
vent_vlv_cmd_time_ch = client.channels.create(
name="vent_vlv_cmd_time",
is_index=True,
data_type=sy.DataType.TIMESTAMP,
retrieve_if_name_exists=True,
)

# Channel for the vent valve command.
vent_vlv_cmd_ch = client.channels.create(
name="vent_vlv_cmd",
# This is the index channel for the vent valve command, completely independent of
# the other channels.
index=vent_vlv_cmd_time_ch.key,
data_type=sy.DataType.FLOAT32,
retrieve_if_name_exists=True,
)

# Channel for the press valve command. This is the index channel for the press valve
# command, completely independent of the
press_vlv_cmd_ch = client.channels.create(
name="press_vlv_cmd",
# This is the index channel for the press valve command, completely independent of
index=press_vlv_cmd_time_ch.key,
data_type=sy.DataType.FLOAT32,
retrieve_if_name_exists=True,
)


state = {
"press_vlv_state": 0,
"vent_vlv_state": 0,
"pressure": 0,
"daq_time": sy.TimeStamp.now(),
}

rate = (sy.Rate.HZ * 50).period.seconds

with client.open_streamer(["press_vlv_cmd", "vent_vlv_cmd"]) as streamer:
with client.open_writer(
sy.TimeStamp.now(),
channels=[
"daq_time",
"pressure",
"press_vlv_state",
"vent_vlv_state",
],
name="Simulated DAQ",
enable_auto_commit=True,
) as writer:
while True:
time.sleep(rate)
while True:
# Read incoming commands with a non-blocking timeout.
f = streamer.read(0)
# Means we don't have any new data.
if f is None:
break
# If the press valve has been commanded, update its state.
if "press_vlv_cmd" in f:
state["press_vlv_state"] = f["press_vlv_cmd"][-1]
# If the vent valve has been commanded, update its state.
if "vent_vlv_cmd" in f:
state["vent_vlv_state"] = f["vent_vlv_cmd"][-1]

state["daq_time"] = sy.TimeStamp.now()

# If the press valve is open, increase the pressure.
if state["press_vlv_state"] == 1:
state["pressure"] += 0.1
# If the vent valve is open, decrease the pressure.
if state["vent_vlv_state"] == 1:
state["pressure"] -= 0.1

# Clamp the pressure to a minimum of 0, anything less would be physically
# impossible.
if state["pressure"] < 0:
state["pressure"] = 0

writer.write(state)
2 changes: 1 addition & 1 deletion client/py/examples/control/simple_auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
auto.wait_until(lambda auto: auto["pressure_1"] < 100)

# Acquire absolute control on valve 2
auto.authorize("valve_2_cmd", sy.Authority.ABSOLUTE)
auto.set_authority("valve_2_cmd", sy.Authority.ABSOLUTE)

# Open valve 2
auto["valve_2_cmd"] = True
Expand Down
8 changes: 8 additions & 0 deletions client/py/examples/dev/plot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import synnax as sy
import numpy as np

client = sy.Synnax()

data = client.read(sy.TimeRange(1722464238568134144, 1722464246886652416), "press_pt_1")

print("Average", np.average(data))
43 changes: 40 additions & 3 deletions client/py/synnax/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,46 @@ def set(
ch = retrieve_required(self._retriever, ch)[0]
self._writer.write({ch.key: value, ch.index: TimeStamp.now()})

def authorize(self, ch: ChannelKey | ChannelName, value: Authority):
ch = retrieve_required(self._retriever, ch)[0]
self._writer.set_authority({ch.key: value, ch.index: value})
@overload
def set_authority(
self,
value: CrudeAuthority,
) -> bool:
...

@overload
def set_authority(
self,
value: dict[ChannelKey | ChannelName, CrudeAuthority],
) -> bool:
...

@overload
def set_authority(
self,
ch: ChannelKey | ChannelName,
value: CrudeAuthority,
) -> bool:
...

def set_authority(
self,
value: (
dict[ChannelKey | ChannelName | ChannelPayload, CrudeAuthority]
| ChannelKey
| ChannelName
| CrudeAuthority
),
authority: CrudeAuthority | None = None,
) -> bool:
if isinstance(value, dict):
channels = retrieve_required(self._retriever, list(value.keys()))
for ch in channels:
value[ch.index] = value.get(ch.key, value.get(ch.name))
elif authority is not None:
ch = retrieve_required(self._retriever, value)[0]
value = {ch.key: authority, ch.index: authority}
return self._writer.set_authority(value)

def wait_until(
self,
Expand Down
Loading

0 comments on commit 4a9ac36

Please sign in to comment.