Skip to content

Commit

Permalink
List applying EventPolicies in KafkaSink (#4084)
Browse files Browse the repository at this point in the history
* List applying EventPolicies in KafkaSink

* Update policies in status after contract update

* Only fallback to default authz mode policies, when OIDC is enabled

* Update broker and channel to new utils function

* Fix linter issue

* Return early if OIDC is disabled

* Fix function naming
  • Loading branch information
creydr authored Aug 30, 2024
1 parent 241e6a7 commit 5825486
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 116 deletions.
19 changes: 18 additions & 1 deletion control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
)

const (
ConditionAddressable apis.ConditionType = "Addressable"
ConditionAddressable apis.ConditionType = "Addressable"
ConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"
)

var conditionSet apis.ConditionSet
Expand Down Expand Up @@ -54,3 +55,19 @@ func (ks *KafkaSinkStatus) SetAddress(addr *duckv1.Addressable) {
func (kss *KafkaSinkStatus) InitializeConditions() {
kss.GetConditionSet().Manage(kss).InitializeConditions()
}

func (kss *KafkaSinkStatus) MarkEventPoliciesTrue() {
kss.GetConditionSet().Manage(kss).MarkTrue(ConditionEventPoliciesReady)
}

func (kss *KafkaSinkStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
kss.GetConditionSet().Manage(kss).MarkTrueWithReason(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (kss *KafkaSinkStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
kss.GetConditionSet().Manage(kss).MarkFalse(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (kss *KafkaSinkStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
kss.GetConditionSet().Manage(kss).MarkUnknown(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}
24 changes: 14 additions & 10 deletions control-plane/pkg/core/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (
"sort"
"strings"

"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"

"github.com/rickb777/date/period"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
duck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/pkg/resolver"

Expand All @@ -55,14 +55,18 @@ func ContentModeFromString(mode string) contract.ContentMode {
}
}

// EventPoliciesFromAppliedEventPoliciesStatus resolves a AppliedEventPoliciesStatus into a list of contract.EventPolicy
func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPoliciesStatus, lister v1alpha1.EventPolicyLister, namespace string, features feature.Flags) ([]*contract.EventPolicy, error) {
eventPolicies := make([]*contract.EventPolicy, 0, len(status.Policies))
// ContractEventPoliciesFromEventPolicies resolves a list of v1alpha1.EventPolicy into a list of contract.EventPolicy
func ContractEventPoliciesFromEventPolicies(applyingEventPolicies []*eventingv1alpha1.EventPolicy, namespace string, features feature.Flags) []*contract.EventPolicy {
if !features.IsOIDCAuthentication() {
return nil
}

for _, appliedPolicy := range status.Policies {
policy, err := lister.EventPolicies(namespace).Get(appliedPolicy.Name)
if err != nil {
return nil, fmt.Errorf("failed to get eventPolicy %s: %w", appliedPolicy.Name, err)
eventPolicies := make([]*contract.EventPolicy, 0, len(applyingEventPolicies))

for _, policy := range applyingEventPolicies {
if !policy.Status.IsReady() {
// only add ready eventpolicies to the contract
continue
}

contractPolicy := &contract.EventPolicy{}
Expand Down Expand Up @@ -132,7 +136,7 @@ func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPolicie
// else: deny all -> add no additional policy
}

return eventPolicies, nil
return eventPolicies
}

func EgressConfigFromDelivery(
Expand Down
148 changes: 118 additions & 30 deletions control-plane/pkg/core/config/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,20 @@ import (
"testing"
"time"

corev1 "k8s.io/api/core/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

"google.golang.org/protobuf/encoding/protojson"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
reconcilertesting "knative.dev/pkg/reconciler/testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/runtime/protoimpl"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/client/injection/ducks/duck/v1/addressable"
Expand All @@ -48,7 +47,6 @@ import (
"knative.dev/eventing-kafka-broker/control-plane/pkg/contract"

eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
eventpolicyinformerfake "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake"
)

func TestContentModeFromString(t *testing.T) {
Expand Down Expand Up @@ -513,7 +511,7 @@ func TestMergeEgressConfig(t *testing.T) {
}
}

func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
func TestContractEventPoliciesEventPolicies(t *testing.T) {

tests := []struct {
name string
Expand All @@ -522,7 +520,7 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
namespace string
defaultAuthorizationMode feature.Flag
expected []*contract.EventPolicy
wantErr bool
oidcDisabled bool
}{
{
name: "Exact match",
Expand All @@ -539,6 +537,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
From: []string{
"from-1",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
},
Expand Down Expand Up @@ -566,6 +572,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
From: []string{
"from-*",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
},
Expand Down Expand Up @@ -594,6 +608,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
From: []string{
"from-1",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionTrue,
},
},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -604,6 +626,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
From: []string{
"from-2-*",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
},
Expand Down Expand Up @@ -643,6 +673,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
From: []string{
"from-1",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionTrue,
},
},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -660,6 +698,14 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
From: []string{
"from-2-*",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
},
Expand Down Expand Up @@ -728,46 +774,88 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
defaultAuthorizationMode: feature.AuthorizationDenyAll,
expected: []*contract.EventPolicy{},
}, {
name: "Applying policy does not exist",
name: "Applying policy not ready",
applyingPolicies: []string{
"not-found",
"policy-1",
},
existingEventPolicies: []*eventingv1alpha1.EventPolicy{
{
ObjectMeta: metav1.ObjectMeta{
Name: "policy-1",
Namespace: "my-ns",
},
Status: eventingv1alpha1.EventPolicyStatus{
From: []string{
"from-*",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionFalse,
},
},
},
},
},
},
namespace: "my-ns",
defaultAuthorizationMode: feature.AuthorizationDenyAll,
expected: []*contract.EventPolicy{},
}, {
name: "No policy when OIDC is disabled",
oidcDisabled: true,
applyingPolicies: []string{
"policy-1",
},
existingEventPolicies: []*eventingv1alpha1.EventPolicy{
{
ObjectMeta: metav1.ObjectMeta{
Name: "policy-1",
Namespace: "my-ns",
},
Status: eventingv1alpha1.EventPolicyStatus{
From: []string{
"from-1",
},
Status: duckv1.Status{
Conditions: duckv1.Conditions{
{
Type: eventingv1alpha1.EventPolicyConditionReady,
Status: corev1.ConditionFalse, // is false, as OIDC is disabled
},
},
},
},
},
},
existingEventPolicies: []*eventingv1alpha1.EventPolicy{},
namespace: "my-ns",
defaultAuthorizationMode: feature.AuthorizationAllowSameNamespace,
expected: []*contract.EventPolicy{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

ctx, _ := reconcilertesting.SetupFakeContext(t)
features := feature.Flags{
feature.AuthorizationDefaultMode: tt.defaultAuthorizationMode,
feature.OIDCAuthentication: feature.Enabled,
}

for _, ep := range tt.existingEventPolicies {
err := eventpolicyinformerfake.Get(ctx).Informer().GetStore().Add(ep)
if err != nil {
t.Fatal(err)
}
if tt.oidcDisabled {
features[feature.OIDCAuthentication] = feature.Disabled
}

applyingPoliciesStatus := eventingduck.AppliedEventPoliciesStatus{}
for _, ep := range tt.applyingPolicies {
applyingPoliciesStatus.Policies = append(applyingPoliciesStatus.Policies, eventingduck.AppliedEventPolicyRef{
Name: ep,
APIVersion: eventingv1alpha1.SchemeGroupVersion.String(),
})
}

got, err := EventPoliciesFromAppliedEventPoliciesStatus(applyingPoliciesStatus, eventpolicyinformerfake.Get(ctx).Lister(), tt.namespace, features)
if (err != nil) != tt.wantErr {
t.Errorf("EventPoliciesFromAppliedEventPoliciesStatus() error = %v, wantErr %v", err, tt.wantErr)
return
applyingPolicies := []*eventingv1alpha1.EventPolicy{}
for _, applyingPolicyName := range tt.applyingPolicies {
for _, existingPolicy := range tt.existingEventPolicies {
if applyingPolicyName == existingPolicy.Name {
applyingPolicies = append(applyingPolicies, existingPolicy)
}
}
}

got := ContractEventPoliciesFromEventPolicies(applyingPolicies, tt.namespace, features)
expectedJSON, err := protojson.Marshal(&contract.Ingress{
EventPolicies: tt.expected,
})
Expand Down
Loading

0 comments on commit 5825486

Please sign in to comment.