-
Notifications
You must be signed in to change notification settings - Fork 5
/
0071-eventio.rs
132 lines (120 loc) · 3.52 KB
/
0071-eventio.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*!
```rudra-poc
[target]
crate = "eventio"
version = "0.4.0"
indexed_version = "0.3.5"
[[target.peer]]
crate = "crossbeam-channel"
version = "0.5.0"
[[target.peer]]
crate = "pcap-parser"
version = "0.9.3"
[report]
issue_url = "https://github.com/petabi/eventio/issues/33"
issue_date = 2020-12-20
rustsec_url = "https://github.com/RustSec/advisory-db/pull/600"
rustsec_id = "RUSTSEC-2020-0108"
[[bugs]]
analyzer = "SendSyncVariance"
bug_class = "SendSyncVariance"
rudra_report_locations = ["src/pcap.rs:22:1: 22:55"]
```
!*/
#![forbid(unsafe_code)]
use crossbeam_channel::unbounded;
use eventio::{pcap, Input};
use pcap_parser::{LegacyPcapBlock, PcapHeader, ToVec};
use std::{
cell::Cell,
io::{self, Cursor},
rc::Rc,
sync::atomic::{AtomicUsize, Ordering},
thread,
};
/// Non-Send object implementing `Read` trait.
struct CustomRead {
read: Cursor<Vec<u8>>,
non_send_counter: Rc<Cell<usize>>,
atomic_cnt: bool,
}
impl CustomRead {
fn new(non_send_counter: Rc<Cell<usize>>, atomic_cnt: bool) -> Self {
CustomRead {
read: Self::create_pcap(),
non_send_counter,
atomic_cnt,
}
}
fn create_pcap() -> Cursor<Vec<u8>> {
let fake_content = b"fake packet";
let pkt = LegacyPcapBlock {
ts_sec: 0,
ts_usec: 0,
caplen: fake_content.len() as u32,
origlen: fake_content.len() as u32,
data: fake_content,
}
.to_vec_raw()
.unwrap();
let mut buf = PcapHeader::new().to_vec_raw().unwrap();
for _ in 0..N_PACKETS {
buf.extend(pkt.iter());
}
Cursor::new(buf)
}
fn update_read_cnt(&self) {
if self.atomic_cnt {
REAL_CNT.fetch_add(1, Ordering::Release);
} else {
self.non_send_counter.set(self.non_send_counter.get() + 1);
}
}
}
impl std::io::Read for CustomRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// Increment counter to keep track of '# of read events'.
self.update_read_cnt();
self.read.read(buf)
}
}
const N_PACKETS: usize = 9_000_000;
const NTHREADS: usize = 8;
static REAL_CNT: AtomicUsize = AtomicUsize::new(0);
fn experiment(atomic_cnt: bool) -> usize {
let (data_tx, data_rx) = unbounded();
let (ack_tx, ack_rx) = unbounded();
// Non-Sync counter that counts the # of read events that happen within this thread.
// This counter should only be updated within a single thread.
// Updating this counter concurrently from multiple threads will result in incorrect counts.
let read_cnt_in_thread = Rc::new(Cell::new(0_usize));
let mut children = Vec::with_capacity(NTHREADS);
for _ in 0..NTHREADS {
let input = pcap::Input::with_read(
data_tx.clone(),
ack_rx.clone(),
CustomRead::new(Rc::clone(&read_cnt_in_thread), atomic_cnt),
);
children.push(thread::spawn(move || {
// `input` is moved from parent thread to child thread.
input.run().unwrap()
}));
}
std::mem::drop(data_tx);
for ev in data_rx.iter() {
ack_tx.send(ev.seq_no).unwrap();
}
std::mem::drop(ack_tx);
for child in children.into_iter() {
child.join().unwrap();
}
if atomic_cnt {
REAL_CNT.load(Ordering::Acquire)
} else {
read_cnt_in_thread.get()
}
}
fn main() {
// Check that `read_cnt_in_thread` maintains incorrect count of events.
assert_eq!(experiment(true), experiment(false),);
}