Skip to content

Commit 56afe26

Browse files
authored
Merge pull request #216 from memorysafety/handle-interface-change
Handle interface change
2 parents 4ad2723 + 00a3696 commit 56afe26

File tree

5 files changed

+116
-41
lines changed

5 files changed

+116
-41
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ntp-daemon/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ serde_json = "1.0.82"
2121
sentry = { version = "0.27.0", optional = true }
2222
sentry-tracing = { version = "0.27.0", optional = true }
2323
rand = "0.8.5"
24+
libc = "0.2.126"
2425

2526
[dev-dependencies]
2627
ntp-proto = { path = "../ntp-proto", features=["ext-test"]}

ntp-daemon/src/peer.rs

Lines changed: 93 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{future::Future, marker::PhantomData, ops::ControlFlow, pin::Pin, sync::Arc};
1+
use std::{future::Future, marker::PhantomData, pin::Pin, sync::Arc};
22

33
use ntp_proto::{
44
IgnoreReason, NtpClock, NtpHeader, NtpInstant, NtpTimestamp, Peer, PeerSnapshot, ReferenceId,
@@ -47,6 +47,8 @@ impl ResetEpoch {
4747
pub enum MsgForSystem {
4848
/// Received a Kiss-o'-Death and must demobilize
4949
MustDemobilize(PeerIndex),
50+
/// Experienced a network issue and must be restarted
51+
NetworkIssue(PeerIndex),
5052
/// Received an acceptable packet and made a new peer snapshot
5153
/// A new measurement should try to trigger a clock select
5254
NewMeasurement(PeerIndex, ResetEpoch, PeerSnapshot),
@@ -100,6 +102,18 @@ pub(crate) struct PeerTask<C: 'static + NtpClock + Send, T: Wait> {
100102
reset_epoch: ResetEpoch,
101103
}
102104

105+
#[derive(Debug)]
106+
enum PollResult {
107+
Ok,
108+
NetworkGone,
109+
}
110+
111+
#[derive(Debug)]
112+
enum PacketResult {
113+
Ok,
114+
Demobilize,
115+
}
116+
103117
impl<C, T> PeerTask<C, T>
104118
where
105119
C: 'static + NtpClock + Send,
@@ -120,7 +134,7 @@ where
120134
.reset(self.last_poll_sent + poll_interval);
121135
}
122136

123-
async fn handle_poll(&mut self, poll_wait: &mut Pin<&mut T>) {
137+
async fn handle_poll(&mut self, poll_wait: &mut Pin<&mut T>) -> PollResult {
124138
let system_snapshot = *self.channels.system_snapshots.read().await;
125139
let packet = self.peer.generate_poll_message(system_snapshot);
126140

@@ -145,7 +159,17 @@ where
145159

146160
if let Err(error) = self.socket.send(&packet.serialize()).await {
147161
warn!(?error, "poll message could not be sent");
162+
163+
match error.raw_os_error() {
164+
Some(libc::EHOSTDOWN)
165+
| Some(libc::EHOSTUNREACH)
166+
| Some(libc::ENETDOWN)
167+
| Some(libc::ENETUNREACH) => return PollResult::NetworkGone,
168+
_ => {}
169+
}
148170
}
171+
172+
PollResult::Ok
149173
}
150174

151175
async fn handle_packet(
@@ -154,7 +178,7 @@ where
154178
packet: NtpHeader,
155179
send_timestamp: NtpTimestamp,
156180
recv_timestamp: NtpTimestamp,
157-
) -> ControlFlow<(), ()> {
181+
) -> PacketResult {
158182
let ntp_instant = NtpInstant::now();
159183

160184
let system_snapshot = *self.channels.system_snapshots.read().await;
@@ -184,14 +208,14 @@ where
184208
let msg = MsgForSystem::MustDemobilize(self.index);
185209
self.channels.msg_for_system_sender.send(msg).await.ok();
186210

187-
return ControlFlow::Break(());
211+
return PacketResult::Demobilize;
188212
}
189213
Err(ignore_reason) => {
190214
debug!(?ignore_reason, "packet ignored");
191215
}
192216
}
193217

194-
ControlFlow::Continue(())
218+
PacketResult::Ok
195219
}
196220

197221
async fn run(&mut self, mut poll_wait: Pin<&mut T>) {
@@ -200,7 +224,13 @@ where
200224

201225
tokio::select! {
202226
() = &mut poll_wait => {
203-
self.handle_poll(&mut poll_wait).await;
227+
match self.handle_poll(&mut poll_wait).await {
228+
PollResult::Ok => {},
229+
PollResult::NetworkGone => {
230+
self.channels.msg_for_system_sender.send(MsgForSystem::NetworkIssue(self.index)).await.ok();
231+
break;
232+
}
233+
}
204234
},
205235
result = self.channels.reset.changed() => {
206236
if let Ok(()) = result {
@@ -214,19 +244,26 @@ where
214244
}
215245
}
216246
result = self.socket.recv(&mut buf) => {
217-
let send_timestamp = match self.last_send_timestamp {
218-
Some(ts) => ts,
219-
None => {
220-
warn!("we received a message without having sent one; discarding");
221-
continue;
222-
}
223-
};
224-
225-
if let Some((packet, recv_timestamp)) = accept_packet(result, &buf) {
226-
match self.handle_packet(&mut poll_wait, packet, send_timestamp, recv_timestamp).await{
227-
ControlFlow::Continue(_) => continue,
228-
ControlFlow::Break(_) => break,
229-
}
247+
match accept_packet(result, &buf) {
248+
AcceptResult::Accept(packet, recv_timestamp) => {
249+
let send_timestamp = match self.last_send_timestamp {
250+
Some(ts) => ts,
251+
None => {
252+
warn!("we received a message without having sent one; discarding");
253+
continue;
254+
}
255+
};
256+
257+
match self.handle_packet(&mut poll_wait, packet, send_timestamp, recv_timestamp).await {
258+
PacketResult::Ok => {},
259+
PacketResult::Demobilize => break,
260+
}
261+
},
262+
AcceptResult::NetworkGone => {
263+
self.channels.msg_for_system_sender.send(MsgForSystem::NetworkIssue(self.index)).await.ok();
264+
break;
265+
},
266+
AcceptResult::Ignore => {},
230267
}
231268
},
232269
}
@@ -239,17 +276,26 @@ where
239276
C: 'static + NtpClock + Send,
240277
{
241278
#[instrument(skip(clock, channels))]
242-
pub async fn spawn<A: ToSocketAddrs + std::fmt::Debug>(
279+
pub fn spawn<A: ToSocketAddrs + std::fmt::Debug + Send + Sync + 'static>(
243280
index: PeerIndex,
244281
addr: A,
245282
clock: C,
283+
network_wait_period: std::time::Duration,
246284
mut channels: PeerChannels,
247-
) -> std::io::Result<tokio::task::JoinHandle<()>> {
248-
let socket = UdpSocket::new("0.0.0.0:0", addr).await?;
249-
let our_id = ReferenceId::from_ip(socket.as_ref().local_addr().unwrap().ip());
250-
let peer_id = ReferenceId::from_ip(socket.as_ref().peer_addr().unwrap().ip());
285+
) -> tokio::task::JoinHandle<()> {
286+
tokio::spawn(async move {
287+
let socket = loop {
288+
match UdpSocket::new("0.0.0.0:0", &addr).await {
289+
Ok(socket) => break socket,
290+
Err(error) => {
291+
warn!(?error, "Could not open socket");
292+
tokio::time::sleep(network_wait_period).await;
293+
}
294+
}
295+
};
296+
let our_id = ReferenceId::from_ip(socket.as_ref().local_addr().unwrap().ip());
297+
let peer_id = ReferenceId::from_ip(socket.as_ref().peer_addr().unwrap().ip());
251298

252-
let handle = tokio::spawn(async move {
253299
let local_clock_time = NtpInstant::now();
254300
let peer = Peer::new(our_id, peer_id, local_clock_time);
255301

@@ -273,16 +319,21 @@ where
273319
};
274320

275321
process.run(poll_wait).await
276-
});
277-
278-
Ok(handle)
322+
})
279323
}
280324
}
281325

326+
#[derive(Debug)]
327+
enum AcceptResult {
328+
Accept(NtpHeader, NtpTimestamp),
329+
Ignore,
330+
NetworkGone,
331+
}
332+
282333
fn accept_packet(
283334
result: Result<(usize, Option<NtpTimestamp>), std::io::Error>,
284335
buf: &[u8; 48],
285-
) -> Option<(NtpHeader, NtpTimestamp)> {
336+
) -> AcceptResult {
286337
match result {
287338
Ok((size, Some(recv_timestamp))) => {
288339
// Note: packets are allowed to be bigger when including extensions.
@@ -292,26 +343,32 @@ fn accept_packet(
292343
if size < 48 {
293344
warn!(expected = 48, actual = size, "received packet is too small");
294345

295-
None
346+
AcceptResult::Ignore
296347
} else {
297348
match NtpHeader::deserialize(buf) {
298-
Ok(packet) => Some((packet, recv_timestamp)),
349+
Ok(packet) => AcceptResult::Accept(packet, recv_timestamp),
299350
Err(e) => {
300351
warn!("received invalid packet: {}", e);
301-
None
352+
AcceptResult::Ignore
302353
}
303354
}
304355
}
305356
}
306357
Ok((size, None)) => {
307358
warn!(?size, "received a packet without a timestamp");
308359

309-
None
360+
AcceptResult::Ignore
310361
}
311362
Err(receive_error) => {
312363
warn!(?receive_error, "could not receive packet");
313364

314-
None
365+
match receive_error.raw_os_error() {
366+
Some(libc::EHOSTDOWN)
367+
| Some(libc::EHOSTUNREACH)
368+
| Some(libc::ENETDOWN)
369+
| Some(libc::ENETUNREACH) => AcceptResult::NetworkGone,
370+
_ => AcceptResult::Ignore,
371+
}
315372
}
316373
}
317374
}
@@ -502,15 +559,14 @@ mod tests {
502559
PeerIndex::from_inner(0),
503560
"127.0.0.1:8003",
504561
TestClock {},
562+
std::time::Duration::from_secs(60),
505563
PeerChannels {
506564
msg_for_system_sender,
507565
system_snapshots,
508566
system_config,
509567
reset,
510568
},
511-
)
512-
.await
513-
.unwrap();
569+
);
514570

515571
let peer_epoch = match msg_for_system_receiver.recv().await.unwrap() {
516572
MsgForSystem::UpdatedSnapshot(_, peer_epoch, _) => peer_epoch,

ntp-daemon/src/peer_manager.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use crate::{
88
use ntp_proto::{NtpClock, PeerSnapshot};
99
use tokio::task::JoinHandle;
1010

11+
const NETWORK_WAIT_PERIOD: std::time::Duration = std::time::Duration::from_secs(60);
12+
1113
#[derive(Debug, Clone, Copy)]
1214
pub enum PeerStatus {
1315
/// We are waiting for the first snapshot from this peer _in the current reset epoch_.
@@ -72,17 +74,27 @@ impl<C: NtpClock> Peers<C> {
7274
}
7375
}
7476

75-
pub async fn add_peer(&mut self, config: PeerConfig) -> std::io::Result<JoinHandle<()>> {
77+
fn add_peer_internal(&mut self, config: Arc<PeerConfig>) -> JoinHandle<()> {
7678
let index = self.indexer.get();
7779
let addr = config.addr.clone();
7880
self.peers.insert(
7981
index,
8082
PeerData {
8183
status: PeerStatus::NoMeasurement,
82-
config: Arc::new(config),
84+
config,
8385
},
8486
);
85-
PeerTask::spawn(index, &addr, self.clock.clone(), self.channels.clone()).await
87+
PeerTask::spawn(
88+
index,
89+
addr,
90+
self.clock.clone(),
91+
NETWORK_WAIT_PERIOD,
92+
self.channels.clone(),
93+
)
94+
}
95+
96+
pub async fn add_peer(&mut self, config: PeerConfig) -> JoinHandle<()> {
97+
self.add_peer_internal(Arc::new(config))
8698
}
8799

88100
#[cfg(test)]
@@ -151,6 +163,11 @@ impl<C: NtpClock> Peers<C> {
151163
self.peers.get_mut(&index).unwrap().status = PeerStatus::Measurement(snapshot);
152164
}
153165
}
166+
MsgForSystem::NetworkIssue(index) => {
167+
// Restart the peer reusing its configuration.
168+
let config = self.peers.remove(&index).unwrap().config;
169+
self.add_peer_internal(config);
170+
}
154171
}
155172
}
156173

ntp-daemon/src/system.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub async fn spawn(
5050
UnixNtpClock::new(),
5151
);
5252
for peer_config in peer_configs.iter() {
53-
peers.add_peer(peer_config.to_owned()).await?;
53+
peers.add_peer(peer_config.to_owned()).await;
5454
}
5555
let peers = Arc::new(tokio::sync::RwLock::new(peers));
5656

0 commit comments

Comments
 (0)