Skip to content

Commit 8ec2aa8

Browse files
U-NDC\nmryanU-NDC\nmryan
authored andcommitted
added ability to have multiple endpoints. each endpoint can have its own allowed apids. parsing the input allows any apid destined for any output.
1 parent 31aabe9 commit 8ec2aa8

File tree

5 files changed

+188
-85
lines changed

5 files changed

+188
-85
lines changed

Cargo.lock

Lines changed: 0 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ccsds_router.json

Lines changed: 58 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"input_settings": {
33
"file": {
4-
"file_name": "Logged_Data_Large"
4+
"file_name": "Logged_Data"
55
},
66
"tcp_client": {
77
"port": 8000,
@@ -17,34 +17,60 @@
1717
}
1818
},
1919
"input_selection": "File",
20-
"output_settings": {
21-
"file": {
22-
"file_name": "data.bin"
23-
},
24-
"tcp_client": {
25-
"port": 8000,
26-
"ip": "127.0.0.1"
20+
"output_settings": [
21+
{
22+
"file": {
23+
"file_name": "data.bin"
24+
},
25+
"tcp_client": {
26+
"port": 8000,
27+
"ip": "127.0.0.1"
28+
},
29+
"tcp_server": {
30+
"port": 8000,
31+
"ip": "127.0.0.1"
32+
},
33+
"udp": {
34+
"port": 8001,
35+
"ip": "127.0.0.1"
36+
}
2737
},
28-
"tcp_server": {
29-
"port": 8000,
30-
"ip": "127.0.0.1"
31-
},
32-
"udp": {
33-
"port": 8001,
34-
"ip": "127.0.0.1"
38+
{
39+
"file": {
40+
"file_name": "data2.bin"
41+
},
42+
"tcp_client": {
43+
"port": 8000,
44+
"ip": "127.0.0.1"
45+
},
46+
"tcp_server": {
47+
"port": 8000,
48+
"ip": "127.0.0.1"
49+
},
50+
"udp": {
51+
"port": 8001,
52+
"ip": "127.0.0.1"
53+
}
3554
}
36-
},
37-
"output_selection": "File",
55+
],
56+
"output_selection": [
57+
"File",
58+
"File"
59+
],
3860
"allowed_apids": [
39-
511,
40-
1301,
41-
400,
42-
1401,
43-
414,
44-
3,
45-
16,
46-
1,
47-
411
61+
[
62+
1,
63+
16,
64+
411,
65+
1301,
66+
414,
67+
3,
68+
400,
69+
1401
70+
],
71+
[
72+
1
73+
]
4874
],
4975
"theme": "Dark",
5076
"packet_size": "Variable",
@@ -55,14 +81,14 @@
5581
"postfix_bytes": 0,
5682
"keep_postfix": false
5783
},
58-
"max_length_bytes": 4096,
84+
"max_length_bytes": 65542,
5985
"timestamp_setting": "Asap",
6086
"timestamp_def": {
6187
"offset": 0,
62-
"num_bytes_seconds": "FourBytes",
63-
"num_bytes_subseconds": "FourBytes",
64-
"subsecond_resolution": 0.000001,
65-
"is_little_endian": true
88+
"num_bytes_seconds": "ZeroBytes",
89+
"num_bytes_subseconds": "ZeroBytes",
90+
"subsecond_resolution": 0.0,
91+
"is_little_endian": false
6692
},
67-
"auto_start": true
93+
"auto_start": false
6894
}

src/main.rs

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ use std::sync::mpsc::{channel, Sender, Receiver};
8484
use std::fs::File;
8585
use std::fs::create_dir;
8686
use std::path::PathBuf;
87+
use std::cmp::{min, max};
8788

8889
use simplelog::*;
8990

@@ -183,6 +184,17 @@ fn main() {
183184
},
184185
}
185186

187+
// make sure there is at least one of the output settings
188+
if config.output_settings.len() == 0 {
189+
config.output_settings = vec!(Default::default());
190+
}
191+
if config.output_selection.len() == 0 {
192+
config.output_selection = vec!(Default::default());
193+
}
194+
if config.allowed_apids.len() == 0 {
195+
config.allowed_apids = vec!(None);
196+
}
197+
186198
// Spawn processing thread
187199
let (gui_sender, gui_receiver) = channel::<GuiMessage>();
188200
let (proc_sender, proc_receiver) = channel::<ProcessingMsg>();
@@ -296,6 +308,8 @@ fn run_gui(config: &mut AppConfig, config_file_name: &mut String, receiver: Rece
296308

297309
let mut packets_dropped = 0;
298310

311+
let mut output_index = 0;
312+
299313
// index of selection for how to treat timestamps
300314
let mut timestamp_selection: i32 = 1;
301315

@@ -430,6 +444,37 @@ fn run_gui(config: &mut AppConfig, config_file_name: &mut String, receiver: Rece
430444
output_settings_shown = !output_settings_shown;
431445
}
432446
});
447+
ui.same_line(0.0);
448+
if ui.small_button(im_str!("New")) {
449+
config.output_selection.push(Default::default());
450+
config.output_settings.push(Default::default());
451+
config.allowed_apids.push(None);
452+
output_index += 1;
453+
}
454+
ui.same_line(0.0);
455+
if ui.small_button(im_str!("Prev")) {
456+
if output_index > 0 {
457+
output_index -= 1;
458+
}
459+
}
460+
ui.same_line(0.0);
461+
ui.text(format!("{}", output_index));
462+
ui.same_line(0.0);
463+
if ui.small_button(im_str!("Next")) {
464+
output_index = min(output_index + 1, config.output_selection.len() - 1);
465+
}
466+
ui.same_line(0.0);
467+
if ui.small_button(im_str!("Delete")) {
468+
// only allow deletion if this is not the last output
469+
if config.output_selection.len() > 1 {
470+
config.output_selection.remove(output_index);
471+
config.output_settings.remove(output_index);
472+
config.allowed_apids.remove(output_index);
473+
output_index = min(output_index, config.output_selection.len() - 1);
474+
}
475+
}
476+
ui.same_line(0.0);
477+
ui.text(format!("({})", config.output_selection.len()));
433478
if output_settings_shown {
434479
ui.child_frame(im_str!("SelectOutputType"), ((WINDOW_WIDTH - 15.0), OUTPUT_SETTINGS_FRAME_HEIGHT))
435480
.movable(true)
@@ -438,7 +483,11 @@ fn run_gui(config: &mut AppConfig, config_file_name: &mut String, receiver: Rece
438483
.show_scrollbar(true)
439484
.always_show_vertical_scroll_bar(true)
440485
.build(|| {
441-
output_stream_ui(&ui, &mut config.output_selection, &mut config.output_settings, &mut config.allowed_apids, &mut imgui_str);
486+
output_stream_ui(&ui,
487+
&mut config.output_selection[output_index],
488+
&mut config.output_settings[output_index],
489+
&mut config.allowed_apids[output_index],
490+
&mut imgui_str);
442491
});
443492
}
444493

@@ -829,27 +878,22 @@ fn packet_statistics_ui(ui: &Ui, processing_stats: &ProcessingStats, packets_dro
829878
ui.separator();
830879

831880
for packet_stats in processing_stats.packet_history.values() {
832-
packet_summary_ui(ui, &packet_stats);
833881
ui.next_column();
834882
ui.text(format!(" {:>5}", &packet_stats.apid.to_string()));
835883
packet_summary_ui(ui, &packet_stats);
836884

837-
packet_summary_ui(ui, &packet_stats);
838885
ui.next_column();
839886
ui.text(format!(" {:>5}", packet_stats.packet_count.to_string()));
840887
packet_summary_ui(ui, &packet_stats);
841888

842-
packet_summary_ui(ui, &packet_stats);
843889
ui.next_column();
844890
ui.text(format!(" {:>9}", &packet_stats.byte_count.to_string()));
845891
packet_summary_ui(ui, &packet_stats);
846892

847-
packet_summary_ui(ui, &packet_stats);
848893
ui.next_column();
849894
ui.text(format!(" {:>5}", &packet_stats.last_len.to_string()));
850895
packet_summary_ui(ui, &packet_stats);
851896

852-
packet_summary_ui(ui, &packet_stats);
853897
ui.next_column();
854898
ui.text(format!(" {:>5}", &packet_stats.last_seq.to_string()));
855899
packet_summary_ui(ui, &packet_stats);

src/processing.rs

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,19 @@ fn start_input_thread(app_config: AppConfig, sender: SyncSender<PacketMsg>) {
242242
let packet_size = app_config.packet_size;
243243

244244
let mut ccsds_parser_config: CcsdsParserConfig = CcsdsParserConfig::new();
245-
ccsds_parser_config.allowed_apids = app_config.allowed_apids.clone();
245+
246+
let mut allowed_apids: Vec<u16> = vec!();
247+
for allowed_apid_vec in app_config.allowed_apids.iter() {
248+
allowed_apid_vec.clone().map(|vec| allowed_apids.extend(vec));
249+
}
250+
if allowed_apids.len() == 0 {
251+
ccsds_parser_config.allowed_apids = None;
252+
} else {
253+
allowed_apids.sort();
254+
allowed_apids.dedup();
255+
ccsds_parser_config.allowed_apids = Some(allowed_apids);
256+
}
257+
246258
match app_config.packet_size {
247259
PacketSize::Variable => ccsds_parser_config.max_packet_length = None,
248260
PacketSize::Fixed(num_bytes) => ccsds_parser_config.max_packet_length = Some(num_bytes),
@@ -273,7 +285,7 @@ pub fn process_thread(sender: Sender<GuiMessage>, receiver: Receiver<ProcessingM
273285
bytes: Vec::with_capacity(4096),
274286
};
275287

276-
let mut out_stream = WriteStream::Null;
288+
let mut output_streams = vec!();
277289

278290
let mut endianness: Endianness = Endianness::Little;
279291

@@ -286,7 +298,7 @@ pub fn process_thread(sender: Sender<GuiMessage>, receiver: Receiver<ProcessingM
286298
'state_loop: loop {
287299
match state {
288300
ProcessingState::Idle => {
289-
out_stream = WriteStream::Null;
301+
output_streams = vec!();
290302

291303
let msg_result = receiver.recv().ok();
292304
match msg_result {
@@ -303,24 +315,31 @@ pub fn process_thread(sender: Sender<GuiMessage>, receiver: Receiver<ProcessingM
303315
}
304316

305317
// open streams
306-
match open_output_stream(&app_config.output_settings, app_config.output_selection) {
307-
Ok(stream) => {
308-
out_stream = stream;
309-
310-
// spawn off a thread for reading the input stream
311-
// TODO make this a config option for depth
312-
let (sender, receiver) = sync_channel(100);
313-
packet_receiver = receiver;
314-
315-
start_input_thread(app_config.clone(), sender);
316-
state = ProcessingState::Processing;
317-
},
318-
319-
Err(err_string) => {
320-
sender.send(GuiMessage::Error(err_string)).unwrap();
321-
sender.send(GuiMessage::Finished).unwrap();
322-
},
318+
for index in 0..app_config.output_settings.len() {
319+
let output_stream = open_output_stream(&app_config.output_settings[index],
320+
app_config.output_selection[index]);
321+
match output_stream {
322+
Ok(stream) => {
323+
output_streams.push(stream)
324+
},
325+
326+
Err(err_string) => {
327+
sender.send(GuiMessage::Error(err_string)).unwrap();
328+
sender.send(GuiMessage::Finished).unwrap();
329+
state = ProcessingState::Idle;
330+
output_streams = vec!();
331+
continue 'state_loop;
332+
},
333+
}
323334
}
335+
336+
// spawn off a thread for reading the input stream
337+
// TODO make this a config option for depth
338+
let (sender, receiver) = sync_channel(100);
339+
packet_receiver = receiver;
340+
341+
start_input_thread(app_config.clone(), sender);
342+
state = ProcessingState::Processing;
324343
},
325344

326345
Some(ProcessingMsg::Terminate) => {
@@ -435,7 +454,21 @@ pub fn process_thread(sender: Sender<GuiMessage>, receiver: Receiver<ProcessingM
435454
remaining_timeout = SystemTime::now().duration_since(time_to_send).unwrap_or(Duration::from_secs(0));
436455
}
437456

438-
stream_send(&mut out_stream, &packet.bytes);
457+
for index in 0..output_streams.len() {
458+
let apid_allowed;
459+
460+
match app_config.allowed_apids[index] {
461+
Some(ref apids) => {
462+
apid_allowed = apids.contains(&packet.header.control.apid());
463+
},
464+
465+
None => apid_allowed = true,
466+
}
467+
468+
if apid_allowed {
469+
stream_send(&mut output_streams[index], &packet.bytes);
470+
}
471+
}
439472

440473
/* Report packet to GUI */
441474
let mut packet_update = PacketUpdate { apid: packet.header.control.apid(),

0 commit comments

Comments
 (0)