Skip to content

Commit

Permalink
Ensure that RRDP delta hashes don’t changes between updates. (#951)
Browse files Browse the repository at this point in the history
This PR ensures that the hash of an RRDP delta with a given serial doesn’t
change between updates. It stores the list of delta serials and hashes with
the RRDP repository state in its archive and checks that hashes for serial
numbers present both in the repository state and a new notification are
equal. Otherwise falls back to a snapshot update.

This PR implements the draft-ietf-sidrops-rrdp-desynchronization-00. The
draft suggests to limit the number of deltas stored. We are not yet doing
that. Instead this should be part of limiting the number of deltas taken out
of the notification file when parsing in a follow up PR in rpki-rs.

This PR changes the format of the repository state and thus increases its
version to 1. Strictly speaking, we never released version 0, but it’s been
in main from quite some time, so an increase feels prudent.
  • Loading branch information
partim authored Apr 10, 2024
1 parent 385e74d commit 070a608
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 6 deletions.
51 changes: 47 additions & 4 deletions src/collector/rrdp/archive.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{cmp, io, fs};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -345,7 +346,7 @@ impl archive::ObjectMeta for RrdpObjectMeta {
//------------ RepositoryState -----------------------------------------------

/// The current state of an RRDP repository.
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct RepositoryState {
/// The rpkiNotify URI of the repository.
pub rpki_notify: uri::Https,
Expand Down Expand Up @@ -376,14 +377,22 @@ pub struct RepositoryState {
/// This is the complete tag including the quotation marks and possibly
/// the weak prefix.
pub etag: Option<Bytes>,

/// Information of the deltas since in the last notificiation.
pub delta_state: HashMap<u64, rrdp::Hash>,
}

impl RepositoryState {
/// The current version of the data.
///
/// This is 1 since version 0 was in the main branch for quite some time.
const VERSION: u8 = 1;

/// Reads the state from an IO reader.
fn parse(reader: &mut impl io::Read) -> Result<Self, io::Error> {
// Version number. Must be 0u8.
// Version number.
let version = u8::parse(reader)?;
if version != 0 {
if version != Self::VERSION {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("unexpected version {}", version)
Expand All @@ -398,19 +407,21 @@ impl RepositoryState {
best_before_ts: Parse::parse(reader)?,
last_modified_ts: Parse::parse(reader)?,
etag: Parse::parse(reader)?,
delta_state: Parse::parse(reader)?,
})
}

/// Composes the encoded state.
fn compose(&self, writer: &mut impl io::Write) -> Result<(), io::Error> {
0u8.compose(writer)?; // version
Self::VERSION.compose(writer)?; // version
self.rpki_notify.compose(writer)?;
self.session.compose(writer)?;
self.serial.compose(writer)?;
self.updated_ts.compose(writer)?;
self.best_before_ts.compose(writer)?;
self.last_modified_ts.compose(writer)?;
self.etag.compose(writer)?;
self.delta_state.compose(writer)?;
Ok(())
}

Expand Down Expand Up @@ -519,3 +530,35 @@ impl From<archive::AccessError<HashMismatch>> for AccessError {
}
}


//============ Tests =========================================================

#[cfg(test)]
mod test {
use super::*;
use std::str::FromStr;

#[test]
fn compose_parse_repository_state() {
let state = RepositoryState {
rpki_notify: uri::Https::from_str(
"https://foo.bar/baz"
).unwrap(),
session: Uuid::from_u128(0xa1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8u128),
serial: 0x1234567812345678u64,
updated_ts: -12,
best_before_ts: 123789123789123,
last_modified_ts: Some(239123908123),
etag: None,
delta_state: [
(18, rrdp::Hash::from_data(b"123")),
(19, rrdp::Hash::from_data(b"332")),
].iter().cloned().collect(),
};
let mut buf = Vec::new();
state.compose(&mut buf).unwrap();
let parsed = RepositoryState::parse(&mut buf.as_slice()).unwrap();
assert_eq!(state, parsed);
}
}

5 changes: 4 additions & 1 deletion src/collector/rrdp/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,10 @@ impl<'a> RepositoryUpdate<'a> {
mut archive: RrdpArchive,
state: RepositoryState,
) -> Result<Option<SnapshotReason>, RunFailed> {
if let Err(reason) = notify.check_deltas(&state) {
return Ok(Some(reason))
}

let deltas = match self.calc_deltas(notify.content(), &state) {
Ok(deltas) => deltas,
Err(reason) => return Ok(Some(reason)),
Expand Down Expand Up @@ -1013,6 +1017,5 @@ impl<'a> RepositoryUpdate<'a> {

Ok(deltas)
}

}

24 changes: 24 additions & 0 deletions src/collector/rrdp/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,28 @@ impl Notification {
best_before_ts: fallback.best_before().timestamp(),
last_modified_ts: self.last_modified.map(|x| x.timestamp()),
etag: self.etag.clone(),
delta_state: self.content.deltas().iter().map(|delta| {
(delta.serial(), delta.hash())
}).collect(),
}
}

/// Checks that the deltas match those present in `state`.
///
/// Ensures that for delta serial numbers present both in the notification
/// and the state the hash values match.
pub fn check_deltas(
&self, state: &RepositoryState
) -> Result<(), SnapshotReason> {
for delta in self.content().deltas() {
if let Some(state_hash) = state.delta_state.get(&delta.serial()) {
if delta.hash() != *state_hash {
return Err(SnapshotReason::DeltaMutation)
}
}
}
Ok(())
}
}


Expand Down Expand Up @@ -571,6 +591,9 @@ pub enum SnapshotReason {
/// The delta set in the notification file is inconsistent.
BadDeltaSet,

/// At least one delta hash has changed from a previous update.
DeltaMutation,

/// A larger-than-supported serial number was encountered.
LargeSerial,

Expand All @@ -596,6 +619,7 @@ impl SnapshotReason {
NewRepository => "new-repository",
NewSession => "new-session",
BadDeltaSet => "inconsistent-delta-set",
DeltaMutation => "delta-mutation",
LargeSerial => "large-serial",
OutdatedLocal => "outdate-local",
ConflictingDelta => "conflicting-delta",
Expand Down
41 changes: 40 additions & 1 deletion src/utils/binio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
//! how to serialize themselves. The module implements the traits for all the
//! types we need.

use std::{error, fmt, io, slice};
use std::{error, fmt, hash, io, slice};
use std::collections::HashMap;
use bytes::Bytes;
use rpki::{rrdp, uri};
use rpki::repository::x509::Serial;
Expand Down Expand Up @@ -329,6 +330,44 @@ impl<R: io::Read> Parse<R> for Serial {
}


//------------ HashMap<K, V> -------------------------------------------------
//
// Encoded as the number of items as a u64 followed by pairs of key and value.

impl<K: Compose<W>, V: Compose<W>, W: io::Write> Compose<W> for HashMap<K, V> {
fn compose(&self, target: &mut W) -> Result<(), io::Error> {
u64::try_from(self.len())
.map_err(|_| ParseError::format("excessively large vec"))?
.compose(target)?;
for (key, value) in self {
key.compose(target)?;
value.compose(target)?;
}
Ok(())
}
}

impl<K, V, R> Parse<R> for HashMap<K, V>
where
K: Parse<R> + Eq + hash::Hash,
V: Parse<R>,
R: io::Read
{
fn parse(source: &mut R) -> Result<Self, ParseError> {
let len = usize::try_from(u64::parse(source)?).map_err(|_| {
ParseError::format("too many items in vec")
})?;
let mut res = HashMap::with_capacity(len);
for _ in 0..len {
if res.insert(K::parse(source)?, V::parse(source)?).is_some() {
return Err(ParseError::format("duplicate keys"))
}
}
Ok(res)
}
}


//------------ ParseError ----------------------------------------------------

#[derive(Debug)]
Expand Down

0 comments on commit 070a608

Please sign in to comment.