Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SY 1129 allow ni channels to be enabled and disabled #818

Merged
merged 11 commits into from
Sep 13, 2024
1 change: 1 addition & 0 deletions driver/ni/analog_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ std::pair<synnax::Frame, freighter::Error> ni::AnalogReadSource::read(
// Construct and populate synnax frame
size_t data_index = 0;
for (int ch = 0; ch < num_channels; ch++) {
if (this->reader_config.channels[ch].enabled == false) continue;
if (this->reader_config.channels[ch].channel_type == "index") {
auto t = synnax::Series(synnax::TIMESTAMP, d.samples_read_per_channel);
for (uint64_t i = 0; i < d.samples_read_per_channel; ++i)
Expand Down
4 changes: 3 additions & 1 deletion driver/ni/digital_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ void ni::DigitalReadSource::parse_channels(config::Parser &parser) {
config.channel_key = channel_builder.required<uint32_t>("channel");
config.name = (this->reader_config.device_name + "/" + port + "/" +
line);
config.enabled = channel_builder.optional<bool>("enabled", true);
this->reader_config.channels.push_back(config);
});
if (!parser.ok())
Expand All @@ -48,7 +49,7 @@ int ni::DigitalReadSource::create_channels() {
int err = 0;
auto channels = this->reader_config.channels;
for (auto &channel: channels) {
if (channel.channel_type != "index") {
if (channel.channel_type != "index" || !channel.enabled) {
Lham42 marked this conversation as resolved.
Show resolved Hide resolved
err = this->check_ni_error(
ni::NiDAQmxInterface::CreateDIChan(task_handle,
channel.name.c_str(),
Expand Down Expand Up @@ -141,6 +142,7 @@ std::pair<synnax::Frame, freighter::Error> ni::DigitalReadSource::read(
uint64_t data_index = 0;

for (int i = 0; i < num_channels; i++) {
if (this->reader_config.channels[i].enabled == false) continue;
Lham42 marked this conversation as resolved.
Show resolved Hide resolved
if (this->reader_config.channels[i].channel_type == "index") {
auto t = synnax::Series(synnax::TIMESTAMP, this->num_samples_per_channel);
for (uint64_t j = 0; j < d.samples_read_per_channel; ++j)
Expand Down
69 changes: 38 additions & 31 deletions driver/ni/digital_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
// Helper Functions //
///////////////////////////////////////////////////////////////////////////////////
void ni::DigitalWriteSink::get_index_keys() {
if(this->writer_config.state_channel_keys.empty()) return;
if (this->writer_config.state_channel_keys.empty()) return;
auto state_channel = this->writer_config.state_channel_keys[0];
auto [state_channel_info, err] = this->ctx->client->channels.
retrieve(state_channel);
Expand All @@ -41,8 +41,7 @@ ni::DigitalWriteSink::DigitalWriteSink(
: task_handle(task_handle),
ctx(ctx),
task(task),
err_info({}){

err_info({}) {
auto config_parser = config::Parser(task.config);
this->writer_config.task_name = task.name;
this->parse_config(config_parser);
Expand Down Expand Up @@ -93,17 +92,20 @@ void ni::DigitalWriteSink::parse_config(config::Parser &parser) {
ni::ChannelConfig config;
// digital channel names are formatted: <device_name>/port<port_number>/line<line_number>
auto port = "port" + std::to_string(
channel_builder.required<std::uint64_t>(
"port"));
channel_builder.required<std::uint64_t>(
"port"));
auto line = "line" + std::to_string(
channel_builder.required<std::uint64_t>(
"line"));
channel_builder.required<std::uint64_t>(
"line"));

config.name = (this->writer_config.device_name + "/" + port + "/" +
line);

config.channel_key = channel_builder.required<uint32_t>(
"cmd_channel");

config.enabled = channel_builder.optional<bool>("enabled", true);

this->writer_config.drive_cmd_channel_keys.push_back(
config.channel_key);

Expand All @@ -112,6 +114,8 @@ void ni::DigitalWriteSink::parse_config(config::Parser &parser) {
this->writer_config.state_channel_keys.push_back(
state_key);

config.state_channel_key = state_key;

this->channel_map[config.name] =
"channels." + std::to_string(c_count);
this->writer_config.channels.push_back(config);
Expand All @@ -125,7 +129,7 @@ int ni::DigitalWriteSink::init() {
auto channels = this->writer_config.channels;

for (auto &channel: channels) {
if (channel.channel_type != "index") {
if (channel.channel_type != "index" || !channel.enabled) {
Lham42 marked this conversation as resolved.
Show resolved Hide resolved
err = this->check_ni_error(ni::NiDAQmxInterface::CreateDOChan(
this->task_handle, channel.name.c_str(), "",
DAQmx_Val_ChanPerLine));
Expand All @@ -152,7 +156,7 @@ freighter::Error ni::DigitalWriteSink::cycle() {
return freighter::NIL;
}

freighter::Error ni::DigitalWriteSink::start_ni(){
freighter::Error ni::DigitalWriteSink::start_ni() {
if (this->check_ni_error(ni::NiDAQmxInterface::StartTask(this->task_handle))) {
this->log_error(
"failed to start writer for task " + this->writer_config.task_name);
Expand All @@ -165,7 +169,7 @@ freighter::Error ni::DigitalWriteSink::start_ni(){
}


freighter::Error ni::DigitalWriteSink::stop_ni(){
freighter::Error ni::DigitalWriteSink::stop_ni() {
if (this->check_ni_error(ni::NiDAQmxInterface::StopTask(task_handle))) {
this->log_error(
"failed to stop writer for task " + this->writer_config.task_name);
Expand All @@ -180,7 +184,7 @@ freighter::Error ni::DigitalWriteSink::start(const std::string &cmd_key) {
if (this->breaker.running() || !this->ok()) return freighter::NIL;
this->breaker.start();
freighter::Error err = this->start_ni();
if(err) return err;
if (err) return err;
ctx->setState({
.task = this->task.key,
.key = cmd_key,
Expand All @@ -198,7 +202,7 @@ freighter::Error ni::DigitalWriteSink::stop(const std::string &cmd_key) {
if (!this->breaker.running()) return freighter::NIL;
this->breaker.stop();
freighter::Error err = this->stop_ni();
if(err) return err;
if (err) return err;
ctx->setState({
.task = this->task.key,
.key = cmd_key,
Expand All @@ -216,13 +220,13 @@ freighter::Error ni::DigitalWriteSink::write(synnax::Frame frame) {
format_data(std::move(frame));

if (this->check_ni_error(ni::NiDAQmxInterface::WriteDigitalLines(this->task_handle,
1, // number of samples per channel
1, // auto start
10.0, // timeout
DAQmx_Val_GroupByChannel, // data layout
write_buffer, // data
&samplesWritten, // samples written
NULL))) {
1, // number of samples per channel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad formatting. Fix this

1, // auto start
10.0, // timeout
DAQmx_Val_GroupByChannel, // data layout
write_buffer, // data
&samplesWritten, // samples written
NULL))) {
this->log_error("failed while writing digital data");
return freighter::Error(driver::CRITICAL_HARDWARE_ERROR,
"Error writing digital data");
Expand Down Expand Up @@ -275,12 +279,14 @@ void ni::DigitalWriteSink::clear_task() {
std::vector<synnax::ChannelKey> ni::DigitalWriteSink::get_cmd_channel_keys() {
std::vector<synnax::ChannelKey> keys;
for (auto &channel: this->writer_config.channels)
if (channel.channel_type != "index") keys.push_back(channel.channel_key);
if (channel.channel_type != "index" && channel.enabled) keys.push_back(channel.channel_key);
return keys;
}

std::vector<synnax::ChannelKey> ni::DigitalWriteSink::get_state_channel_keys() {
std::vector<synnax::ChannelKey> keys = this->writer_config.state_channel_keys;
std::vector<synnax::ChannelKey> keys;
for (auto &channel: this->writer_config.channels)
if (channel.channel_type != "index" && channel.enabled) keys.push_back(channel.state_channel_key);
keys.push_back(this->writer_config.state_index_key);
return keys;
}
Expand Down Expand Up @@ -350,7 +356,8 @@ void ni::DigitalWriteSink::jsonify_error(std::string s) {
for (const auto &field: fields) {
size_t pos = s.find("\n" + field);
if (pos != std::string::npos && (
first_field_pos == std::string::npos || pos < first_field_pos)) first_field_pos = pos;
first_field_pos == std::string::npos || pos < first_field_pos))
first_field_pos = pos;
}

if (first_field_pos != std::string::npos) message = s.substr(0, first_field_pos);
Expand All @@ -372,7 +379,7 @@ void ni::DigitalWriteSink::jsonify_error(std::string s) {
std::smatch channel_match;
if (std::regex_search(s, physical_channel_match, physical_channel_regex)) {
cn = physical_channel_match[1].str();
if (!device.empty()) cn = device + "/" + cn; // Combine device and physical channel name
if (!device.empty()) cn = device + "/" + cn; // Combine device and physical channel name
} else if (std::regex_search(s, channel_match, channel_regex)) cn = channel_match[1].str();

// Check if the channel name is in the channel map
Expand Down Expand Up @@ -427,13 +434,13 @@ synnax::Frame ni::StateSource::get_state() {
// frame size = # monitored states + 1 state index channel
auto state_frame = synnax::Frame(this->state_map.size() + 1);
state_frame.add(
this->state_index_key,
synnax::Series(
synnax::TimeStamp::now().value,
synnax::TIMESTAMP
)
);
for (auto &[key, value] : this->state_map)
this->state_index_key,
synnax::Series(
synnax::TimeStamp::now().value,
synnax::TIMESTAMP
)
);
for (auto &[key, value]: this->state_map)
state_frame.add(key, synnax::Series(value));
return state_frame;
}
Expand All @@ -449,4 +456,4 @@ void ni::StateSource::update_state(
modified_state_values.pop();
}
waiting_reader.notify_one();
}
}
53 changes: 28 additions & 25 deletions driver/ni/ni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,14 +487,16 @@ void ni::Source::get_index_keys() {
for (auto &channel: this->reader_config.channels) {
auto [channel_info, err] = this->ctx->client->channels.retrieve(
channel.channel_key);
if (err) return this->log_error(
"failed to retrieve channel " + std::to_string(channel.channel_key));
if (err)
return this->log_error(
"failed to retrieve channel " + std::to_string(channel.channel_key));
index_keys.insert(channel_info.index);
}
for(auto &index_key : index_keys) {
for (auto &index_key: index_keys) {
auto [channel_info, err] = this->ctx->client->channels.retrieve(index_key);
if (err) return this->log_error(
"failed to retrieve channel " + std::to_string(index_key));
if (err)
return this->log_error(
"failed to retrieve channel " + std::to_string(index_key));
ni::ChannelConfig index_channel;
index_channel.channel_key = channel_info.key;
index_channel.channel_type = "index";
Expand All @@ -507,11 +509,10 @@ void ni::Source::get_index_keys() {
ni::Source::Source(
TaskHandle task_handle,
const std::shared_ptr<task::Context> &ctx,
const synnax::Task task) :
task_handle(task_handle),
ctx(ctx),
task(task),
err_info({}){
const synnax::Task task) : task_handle(task_handle),
ctx(ctx),
task(task),
err_info({}) {
}

void ni::Source::parse_config(config::Parser &parser) {
Expand Down Expand Up @@ -570,11 +571,13 @@ int ni::Source::init() {
"failed to create channels for " + this->reader_config.task_name);
return -1;
}
if (this->reader_config.sample_rate < this->reader_config.stream_rate || this->reader_config.sample_rate.value < 1) {
if (this->reader_config.sample_rate < this->reader_config.stream_rate || this->reader_config.sample_rate.value <
1) {
this->log_error(
"Failed while configuring timing for NI hardware for task " + this->
reader_config.task_name);
this->err_info["message"] = "sample rate must be greater than or equal to 1 and greater than or equal to the stream rate";
this->err_info["message"] =
"sample rate must be greater than or equal to 1 and greater than or equal to the stream rate";
this->err_info["running"] = false;

this->ctx->setState({
Expand All @@ -592,15 +595,15 @@ int ni::Source::init() {
return 0;
}

freighter::Error ni::Source::cycle(){
freighter::Error ni::Source::cycle() {
auto err = this->start_ni();
if(err) return err;
if (err) return err;
err = this->stop_ni();
if(err) return err;
if (err) return err;
return freighter::NIL;
}

freighter::Error ni::Source::start_ni(){
freighter::Error ni::Source::start_ni() {
if (this->check_ni_error(ni::NiDAQmxInterface::StartTask(this->task_handle))) {
this->log_error(
"failed while starting reader for task " + this->reader_config.task_name +
Expand All @@ -611,7 +614,7 @@ freighter::Error ni::Source::start_ni(){
return freighter::NIL;
}

freighter::Error ni::Source::stop_ni(){
freighter::Error ni::Source::stop_ni() {
if (this->check_ni_error(ni::NiDAQmxInterface::StopTask(this->task_handle))) {
this->log_error(
"failed while stopping reader for task " + this->reader_config.task_name);
Expand Down Expand Up @@ -664,12 +667,12 @@ void ni::Source::clear_task() {

ni::Source::~Source() {
this->clear_task();
if(this->sample_thread.joinable()) this->sample_thread.join();
if (this->sample_thread.joinable()) this->sample_thread.join();
VLOG(1) << "[ni.reader] joined sample thread";
}

int ni::Source::check_ni_error(int32 error) {
if(error == 0) return 0;
if (error == 0) return 0;

char errBuff[4096] = {'\0'};

Expand All @@ -693,10 +696,10 @@ bool ni::Source::ok() {
return this->ok_state;
}

std::vector<synnax::ChannelKey> ni::Source::getChannelKeys() {
std::vector<synnax::ChannelKey> ni::Source::get_channel_keys() {
std::vector<synnax::ChannelKey> keys;
for (auto &channel: this->reader_config.channels) keys.push_back(
channel.channel_key);
for (auto &channel: this->reader_config.channels)
if (channel.enabled) keys.push_back(channel.channel_key);
return keys;
}

Expand Down Expand Up @@ -741,7 +744,7 @@ void ni::Source::jsonify_error(std::string s) {
std::regex task_name_line_regex(R"(\nTask Name:.*\n?)");
s = std::regex_replace(s, task_name_line_regex, "");

// Extract status code
// Extract status code
std::string sc = "";
std::smatch status_code_match;
if (std::regex_search(s, status_code_match, status_code_regex)) sc = status_code_match[1].str();
Expand Down Expand Up @@ -770,7 +773,7 @@ void ni::Source::jsonify_error(std::string s) {
std::string p = "";
std::smatch property_match;
if (std::regex_search(s, property_match, property_regex)) p = property_match[1].str();
if(sc == "-200170") p = "port";
if (sc == "-200170") p = "port";

// Extract possible values
std::string possible_values = "";
Expand Down Expand Up @@ -812,4 +815,4 @@ void ni::Source::jsonify_error(std::string s) {
j.push_back(this->err_info);

LOG(INFO) << this->err_info.dump(4);
}
}
5 changes: 3 additions & 2 deletions driver/ni/ni.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ inline const std::map<std::string, int32_t> UNITS_MAP = {

struct ChannelConfig {
uint32_t channel_key;
uint32_t state_channel_key;
std::string name;
std::string channel_type;
std::shared_ptr<ni::Analog> ni_channel;
bool enabled = false;
bool enabled = true;
synnax::DataType data_type;
};

Expand Down Expand Up @@ -166,7 +167,7 @@ class Source : public pipeline::Source {

void log_error(std::string err_msg);

std::vector<synnax::ChannelKey> getChannelKeys();
std::vector<synnax::ChannelKey> get_channel_keys();

virtual void parse_config(config::Parser &parser);

Expand Down
2 changes: 1 addition & 1 deletion driver/ni/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ std::unique_ptr<task::Task> ni::ReaderTask::configure(
}
source = ni_source;
ni_source->init();
channel_keys = ni_source->getChannelKeys();
channel_keys = ni_source->get_channel_keys();

auto writer_config = synnax::WriterConfig{
.channels = channel_keys,
Expand Down
Loading