Skip to content

Commit f0f47af

Browse files
authored
Merge pull request #1556 from nightkr/feature/error-boundary
Add error boundary wrapper type
2 parents f83043d + d2142ae commit f0f47af

File tree

6 files changed

+242
-0
lines changed

6 files changed

+242
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ schemars = "0.8.6"
7474
secrecy = "0.8.0"
7575
serde = "1.0.130"
7676
serde_json = "1.0.68"
77+
serde-value = "0.7.0"
7778
serde_yaml = "0.9.19"
7879
syn = "2.0.38"
7980
tame-oauth = "0.10.0"

examples/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ path = "pod_reflector.rs"
183183
name = "pod_watcher"
184184
path = "pod_watcher.rs"
185185

186+
[[example]]
187+
name = "errorbounded_configmap_watcher"
188+
path = "errorbounded_configmap_watcher.rs"
189+
186190
[[example]]
187191
name = "request_raw"
188192
path = "request_raw.rs"
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use std::borrow::Cow;
2+
3+
use futures::prelude::*;
4+
use k8s_openapi::{api::core::v1::Pod, NamespaceResourceScope};
5+
use kube::{
6+
api::{Api, ObjectMeta, ResourceExt},
7+
core::DeserializeGuard,
8+
runtime::{reflector::ObjectRef, watcher, WatchStreamExt},
9+
Client, Resource,
10+
};
11+
use serde::Deserialize;
12+
use tracing::*;
13+
14+
// Variant of ConfigMap that only accepts ConfigMaps with a CA certificate
15+
// to demonstrate parsing failure
16+
#[derive(Deserialize, Debug, Clone)]
17+
struct CaConfigMap {
18+
metadata: ObjectMeta,
19+
data: CaConfigMapData,
20+
}
21+
22+
#[derive(Deserialize, Debug, Clone)]
23+
struct CaConfigMapData {
24+
#[serde(rename = "ca.crt")]
25+
ca_crt: String,
26+
}
27+
28+
// Normally you would derive this, but ConfigMap doesn't follow the standard spec/status pattern
29+
impl Resource for CaConfigMap {
30+
type DynamicType = ();
31+
type Scope = NamespaceResourceScope;
32+
33+
fn kind(&(): &Self::DynamicType) -> Cow<'_, str> {
34+
Cow::Borrowed("ConfigMap")
35+
}
36+
37+
fn group(&(): &Self::DynamicType) -> Cow<'_, str> {
38+
Cow::Borrowed("")
39+
}
40+
41+
fn version(&(): &Self::DynamicType) -> Cow<'_, str> {
42+
Cow::Borrowed("v1")
43+
}
44+
45+
fn plural(&(): &Self::DynamicType) -> Cow<'_, str> {
46+
Cow::Borrowed("configmaps")
47+
}
48+
49+
fn meta(&self) -> &ObjectMeta {
50+
&self.metadata
51+
}
52+
53+
fn meta_mut(&mut self) -> &mut ObjectMeta {
54+
&mut self.metadata
55+
}
56+
}
57+
58+
#[tokio::main]
59+
async fn main() -> anyhow::Result<()> {
60+
tracing_subscriber::fmt::init();
61+
let client = Client::try_default().await?;
62+
let api = Api::<DeserializeGuard<CaConfigMap>>::default_namespaced(client);
63+
let use_watchlist = std::env::var("WATCHLIST").map(|s| s == "1").unwrap_or(false);
64+
let wc = if use_watchlist {
65+
// requires WatchList feature gate on 1.27 or later
66+
watcher::Config::default().streaming_lists()
67+
} else {
68+
watcher::Config::default()
69+
};
70+
71+
watcher(api, wc)
72+
.applied_objects()
73+
.default_backoff()
74+
.try_for_each(|cm| async move {
75+
info!("saw {}", ObjectRef::from_obj(&cm));
76+
match cm.0 {
77+
Ok(cm) => info!("contents: {cm:?}"),
78+
Err(err) => warn!("failed to parse: {err}"),
79+
}
80+
Ok(())
81+
})
82+
.await?;
83+
Ok(())
84+
}

kube-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ json-patch = { workspace = true, optional = true }
3535
chrono = { workspace = true, features = ["now"] }
3636
schemars = { workspace = true, optional = true }
3737
k8s-openapi.workspace = true
38+
serde-value.workspace = true
3839

3940
[dev-dependencies]
4041
k8s-openapi = { workspace = true, features = ["latest"] }

kube-core/src/error_boundary.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
//! Types for isolating deserialization failures. See [`DeserializeGuard`].
2+
3+
use std::{borrow::Cow, fmt::Display};
4+
5+
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
6+
use serde::Deserialize;
7+
use serde_value::DeserializerError;
8+
9+
use crate::{PartialObjectMeta, Resource};
10+
11+
/// A wrapper type for K that lets deserializing the parent object succeed, even if the K is invalid.
12+
///
13+
/// For example, this can be used to still access valid objects from an `Api::list` call or `watcher`.
14+
// We can't implement Deserialize on Result<K, InvalidObject> directly, both because of the orphan rule and because
15+
// it would conflict with serde's blanket impl on Result<K, E>, even if E isn't Deserialize.
16+
#[derive(Debug, Clone)]
17+
pub struct DeserializeGuard<K>(pub Result<K, InvalidObject>);
18+
19+
/// An object that failed to be deserialized by the [`DeserializeGuard`].
20+
#[derive(Debug, Clone)]
21+
pub struct InvalidObject {
22+
// Should ideally be D::Error, but we don't know what type it has outside of Deserialize::deserialize()
23+
// It *could* be Box<std::error::Error>, but we don't know that it is Send+Sync
24+
/// The error message from deserializing the object.
25+
pub error: String,
26+
/// The metadata of the invalid object.
27+
pub metadata: ObjectMeta,
28+
}
29+
30+
impl Display for InvalidObject {
31+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32+
self.error.fmt(f)
33+
}
34+
}
35+
36+
impl<'de, K> Deserialize<'de> for DeserializeGuard<K>
37+
where
38+
K: Deserialize<'de>,
39+
// Not actually used, but we assume that K is a Kubernetes-style resource with a `metadata` section
40+
K: Resource,
41+
{
42+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
43+
where
44+
D: serde::Deserializer<'de>,
45+
{
46+
// Deserialize::deserialize consumes the deserializer, and we want to retry parsing as an ObjectMetaContainer
47+
// if the initial parse fails, so that we can still implement Resource for the error case
48+
let buffer = serde_value::Value::deserialize(deserializer)?;
49+
50+
// FIXME: can we avoid cloning the whole object? metadata should be enough, and even then we could prune managedFields
51+
K::deserialize(buffer.clone())
52+
.map(Ok)
53+
.or_else(|err| {
54+
let PartialObjectMeta { metadata, .. } =
55+
PartialObjectMeta::<K>::deserialize(buffer).map_err(DeserializerError::into_error)?;
56+
Ok(Err(InvalidObject {
57+
error: err.to_string(),
58+
metadata,
59+
}))
60+
})
61+
.map(DeserializeGuard)
62+
}
63+
}
64+
65+
impl<K: Resource> Resource for DeserializeGuard<K> {
66+
type DynamicType = K::DynamicType;
67+
type Scope = K::Scope;
68+
69+
fn kind(dt: &Self::DynamicType) -> Cow<str> {
70+
K::kind(dt)
71+
}
72+
73+
fn group(dt: &Self::DynamicType) -> Cow<str> {
74+
K::group(dt)
75+
}
76+
77+
fn version(dt: &Self::DynamicType) -> Cow<str> {
78+
K::version(dt)
79+
}
80+
81+
fn plural(dt: &Self::DynamicType) -> Cow<str> {
82+
K::plural(dt)
83+
}
84+
85+
fn meta(&self) -> &ObjectMeta {
86+
self.0.as_ref().map_or_else(|err| &err.metadata, K::meta)
87+
}
88+
89+
fn meta_mut(&mut self) -> &mut ObjectMeta {
90+
self.0.as_mut().map_or_else(|err| &mut err.metadata, K::meta_mut)
91+
}
92+
}
93+
94+
#[cfg(test)]
95+
mod tests {
96+
use k8s_openapi::api::core::v1::{ConfigMap, Pod};
97+
use serde_json::json;
98+
99+
use crate::{DeserializeGuard, Resource};
100+
101+
#[test]
102+
fn should_parse_meta_of_invalid_objects() {
103+
let pod_error = serde_json::from_value::<DeserializeGuard<Pod>>(json!({
104+
"metadata": {
105+
"name": "the-name",
106+
"namespace": "the-namespace",
107+
},
108+
"spec": {
109+
"containers": "not-a-list",
110+
},
111+
}))
112+
.unwrap();
113+
assert_eq!(pod_error.meta().name.as_deref(), Some("the-name"));
114+
assert_eq!(pod_error.meta().namespace.as_deref(), Some("the-namespace"));
115+
pod_error.0.unwrap_err();
116+
}
117+
118+
#[test]
119+
fn should_allow_valid_objects() {
120+
let configmap = serde_json::from_value::<DeserializeGuard<ConfigMap>>(json!({
121+
"metadata": {
122+
"name": "the-name",
123+
"namespace": "the-namespace",
124+
},
125+
"data": {
126+
"foo": "bar",
127+
},
128+
}))
129+
.unwrap();
130+
assert_eq!(configmap.meta().name.as_deref(), Some("the-name"));
131+
assert_eq!(configmap.meta().namespace.as_deref(), Some("the-namespace"));
132+
assert_eq!(
133+
configmap.0.unwrap().data,
134+
Some([("foo".to_string(), "bar".to_string())].into())
135+
)
136+
}
137+
138+
#[test]
139+
fn should_catch_invalid_objects() {
140+
serde_json::from_value::<DeserializeGuard<Pod>>(json!({
141+
"spec": {
142+
"containers": "not-a-list"
143+
}
144+
}))
145+
.unwrap()
146+
.0
147+
.unwrap_err();
148+
}
149+
}

kube-core/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,6 @@ pub use error::ErrorResponse;
7070

7171
mod version;
7272
pub use version::Version;
73+
74+
pub mod error_boundary;
75+
pub use error_boundary::DeserializeGuard;

0 commit comments

Comments
 (0)