From ae4cc16e51ce5d9ef6c881f6b1337da5978d63b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20D=C3=B6ll?= Date: Wed, 28 Feb 2024 10:18:06 +0000 Subject: [PATCH] wip: add nats target --- cmd/natstarget-adapter/main.go | 11 + cmd/xslttransformation-adapter/Dockerfile | 4 +- config/301-natstarget.yaml | 372 +++++++++++++++ go.mod | 3 + go.sum | 6 + pkg/apis/targets/v1alpha1/nats_lifecycle.go | 49 ++ pkg/apis/targets/v1alpha1/nats_types.go | 49 ++ .../targets/v1alpha1/zz_generated.deepcopy.go | 82 ++++ .../targets/v1alpha1/fake/fake_natstarget.go | 125 ++++++ .../v1alpha1/fake/fake_targets_client.go | 4 + .../targets/v1alpha1/generated_expansion.go | 2 + .../typed/targets/v1alpha1/natstarget.go | 179 ++++++++ .../typed/targets/v1alpha1/targets_client.go | 5 + .../informers/externalversions/generic.go | 2 + .../targets/v1alpha1/interface.go | 7 + .../targets/v1alpha1/natstarget.go | 74 +++ .../targets/v1alpha1/natstarget/fake/fake.go | 24 + .../v1alpha1/natstarget/filtered/fake/fake.go | 36 ++ .../natstarget/filtered/natstarget.go | 49 ++ .../targets/v1alpha1/natstarget/natstarget.go | 36 ++ .../targets/v1alpha1/natstarget/controller.go | 154 +++++++ .../targets/v1alpha1/natstarget/reconciler.go | 424 ++++++++++++++++++ .../targets/v1alpha1/natstarget/state.go | 81 ++++ .../targets/v1alpha1/expansion_generated.go | 8 + .../listers/targets/v1alpha1/natstarget.go | 83 ++++ pkg/targets/adapter/natstarget/adapter.go | 97 ++++ 26 files changed, 1964 insertions(+), 2 deletions(-) create mode 100644 cmd/natstarget-adapter/main.go create mode 100644 config/301-natstarget.yaml create mode 100644 pkg/apis/targets/v1alpha1/nats_lifecycle.go create mode 100644 pkg/apis/targets/v1alpha1/nats_types.go create mode 100644 pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/fake/fake_natstarget.go create mode 100644 pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/natstarget.go create mode 100644 pkg/client/generated/informers/externalversions/targets/v1alpha1/natstarget.go create mode 100644 pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/fake/fake.go create mode 100644 pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/filtered/fake/fake.go create mode 100644 pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/filtered/natstarget.go create mode 100644 pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/natstarget.go create mode 100644 pkg/client/generated/injection/reconciler/targets/v1alpha1/natstarget/controller.go create mode 100644 pkg/client/generated/injection/reconciler/targets/v1alpha1/natstarget/reconciler.go create mode 100644 pkg/client/generated/injection/reconciler/targets/v1alpha1/natstarget/state.go create mode 100644 pkg/client/generated/listers/targets/v1alpha1/natstarget.go create mode 100644 pkg/targets/adapter/natstarget/adapter.go diff --git a/cmd/natstarget-adapter/main.go b/cmd/natstarget-adapter/main.go new file mode 100644 index 00000000..74c7e24a --- /dev/null +++ b/cmd/natstarget-adapter/main.go @@ -0,0 +1,11 @@ +package main + +import ( + pkgadapter "knative.dev/eventing/pkg/adapter/v2" + + "github.com/zeiss/typhoon/pkg/targets/adapter/natstarget" +) + +func main() { + pkgadapter.Main("natstarget", natstarget.EnvAccessorCtor, natstarget.NewTarget) +} diff --git a/cmd/xslttransformation-adapter/Dockerfile b/cmd/xslttransformation-adapter/Dockerfile index e689c975..7fbd3d58 100644 --- a/cmd/xslttransformation-adapter/Dockerfile +++ b/cmd/xslttransformation-adapter/Dockerfile @@ -7,7 +7,7 @@ RUN set -eux; \ apt-get install -y --no-install-recommends libxml2-dev libxslt1-dev liblzma-dev zlib1g-dev -WORKDIR /go/triggermesh +WORKDIR /go/typhoon COPY . . RUN go build -o /xslttransformation-adapter ./cmd/xslttransformation-adapter @@ -36,7 +36,7 @@ FROM gcr.io/distroless/base-debian11:nonroot # Ensure the /kodata entries used by Knative to augment the logger with the # current VCS revision are present. -COPY --from=builder /go/triggermesh/.git/HEAD /go/triggermesh/.git/refs/ /kodata/ +COPY --from=builder /go/typhoon/.git/HEAD /go/typhoon/.git/refs/ /kodata/ ENV KO_DATA_PATH=/kodata # (!) COPY follows symlinks diff --git a/config/301-natstarget.yaml b/config/301-natstarget.yaml new file mode 100644 index 00000000..7da1e436 --- /dev/null +++ b/config/301-natstarget.yaml @@ -0,0 +1,372 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: natstargets.targets.typhoon.zeiss.com + labels: + knative.dev/crd-install: 'true' + typhoon.zeiss.com/crd-install: 'true' + duck.knative.dev/addressable: 'true' + annotations: + registry.typhoon.zeiss.com/acceptedEventTypes: | + [ + { "type": "*" } + ] +spec: + group: targets.typhoon.zeiss.com + names: + kind: natsTarget + plural: natstargets + categories: + - all + - knative + - eventing + - targets + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + subresources: + status: {} + schema: + openAPIV3Schema: + type: object + description: Typhoon event target for nats. + properties: + spec: + description: Desired state of event target. + type: object + properties: + topic: + description: Topic name to stream the target events to. + type: string + minLength: 1 + topicReplicationFactor: + description: The number of replicas required to stream to the topic. + type: integer + minimum: 1 + maximum: 32767 + topicPartitions: + description: The number of partitions used by the topic to stream an event to. + type: integer + minimum: 1 + maximum: 2147483647 + bootstrapServers: + description: Array of nats servers used to bootstrap the connection. + type: array + items: + type: string + minLength: 1 + auth: + description: Authentication method used to interact with nats. + type: object + properties: + saslEnable: + description: Boolean to indicate if SASL is enabled. + type: boolean + tlsEnable: + description: Boolean to indicate if TLS is enabled. + type: boolean + securityMechanism: + description: securityMechanism attribute indicate which mechanism to use. + type: string + enum: [GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512] + username: + description: nats account username when using SASL. + type: string + password: + description: nats account password when using SASL. + type: object + properties: + value: + description: Plain text password. + type: string + valueFromSecret: + description: A reference to a Kubernetes Secret object containing the password. + type: object + properties: + name: + type: string + key: + type: string + required: + - name + - key + oneOf: + - required: [value] + - required: [valueFromSecret] + kerberos: + description: Kerberos Authentication method to interact with nats. + type: object + properties: + realm: + description: Name of the Kerberos Realm. + type: string + serviceName: + description: The primary name of the nats server configured. + type: string + username: + description: Kerberos username or Kerberos Principal Name. The Username or the Principal doesn't require + the Realm in it. + type: string + password: + description: Kerberos Password. + type: object + properties: + value: + description: Plain text password. + type: string + valueFromSecret: + description: A reference to a Kubernetes Secret object containing the password. + type: object + properties: + name: + type: string + key: + type: string + required: + - name + - key + oneOf: + - required: [value] + - required: [valueFromSecret] + config: + type: object + properties: + valueFromSecret: + description: A reference to a Kubernetes Secret object containing the kerberos configuration file + (krb5.conf). + type: object + properties: + name: + type: string + key: + type: string + required: + - name + - key + required: + - valueFromSecret + keytab: + type: object + properties: + valueFromSecret: + description: A reference to a Kubernetes Secret object containing the kerberos keytab file contents. + type: object + properties: + name: + type: string + key: + type: string + required: + - name + - key + required: + - valueFromSecret + tls: + description: TLS Authentication method to interact with nats. + type: object + properties: + ca: + description: The value to the configured CA. + type: object + properties: + valueFromSecret: + description: A reference to a Kubernetes Secret object containing the value. + type: object + properties: + name: + type: string + key: + type: string + required: + - name + - key + required: + - valueFromSecret + clientCert: + description: The value of the SSL Client Cert. + type: object + properties: + valueFromSecret: + description: A reference to a Kubernetes Secret object containing the Client Cert content. + type: object + properties: + name: + type: string + key: + type: string + required: + - name + - key + required: + - valueFromSecret + clientKey: + description: The value of the SSL Client Key. + type: object + properties: + valueFromSecret: + description: A reference to a Kubernetes Secret object containing the Client Key content. + type: object + properties: + name: + type: string + key: + type: string + required: + - name + - key + required: + - valueFromSecret + skipVerify: + description: SkipVerify controls whether a client verifies the server's certificate chain and host + name. If skipVerify is true, crypto/tls accepts any certificate presented by the server and any + host name in that certificate. In this mode, TLS is susceptible to machine-in-the-middle attacks + unless custom verification is used. This should be used only for testing. + type: boolean + required: + - saslEnable + discardCloudEventContext: + description: Whether to omit CloudEvent context attributes in messages sent to nats. When this property is + false (default), the entire CloudEvent payload is included. When this property is true, only the CloudEvent + data is included. + type: boolean + adapterOverrides: + description: Kubernetes object parameters to apply on top of default adapter values. + type: object + properties: + annotations: + description: Adapter annotations. + type: object + additionalProperties: + type: string + labels: + description: Adapter labels. + type: object + additionalProperties: + type: string + env: + description: Adapter environment variables. + type: array + items: + type: object + properties: + name: + type: string + value: + type: string + public: + description: Adapter visibility scope. + type: boolean + resources: + description: Compute Resources required by the adapter. More info at https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + properties: + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: Limits describes the maximum amount of compute resources allowed. More info at https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: Requests describes the minimum amount of compute resources required. If Requests is omitted + for a container, it defaults to Limits if that is explicitly specified, otherwise to an implementation-defined + value. More info at https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + tolerations: + description: Pod tolerations, as documented at https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/ + Tolerations require additional configuration for Knative-based deployments - https://knative.dev/docs/serving/configuration/feature-flags/ + type: array + items: + type: object + properties: + key: + description: Taint key that the toleration applies to. + type: string + operator: + description: Key's relationship to the value. + type: string + enum: [Exists, Equal] + value: + description: Taint value the toleration matches to. + type: string + effect: + description: Taint effect to match. + type: string + enum: [NoSchedule, PreferNoSchedule, NoExecute] + tolerationSeconds: + description: Period of time a toleration of effect NoExecute tolerates the taint. + type: integer + format: int64 + nodeSelector: + description: NodeSelector only allow the object pods to be created at nodes where all selector labels + are present, as documented at https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#nodeselector. + NodeSelector require additional configuration for Knative-based deployments - https://knative.dev/docs/serving/configuration/feature-flags/ + type: object + additionalProperties: + type: string + affinity: + description: Scheduling constraints of the pod. More info at https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity. + Affinity require additional configuration for Knative-based deployments - https://knative.dev/docs/serving/configuration/feature-flags/ + type: object + x-kubernetes-preserve-unknown-fields: true + required: + - bootstrapServers + - topic + status: + type: object + description: Reported status of the event target. + properties: + observedGeneration: + type: integer + format: int64 + conditions: + type: array + items: + type: object + properties: + type: + type: string + status: + type: string + enum: ['True', 'False', Unknown] + severity: + type: string + enum: [Error, Warning, Info] + reason: + type: string + message: + type: string + lastTransitionTime: + type: string + format: date-time + required: + - type + - status + address: + type: object + properties: + url: + type: string + additionalPrinterColumns: + - name: URL + type: string + jsonPath: .status.address.url + - name: Ready + type: string + jsonPath: .status.conditions[?(@.type=='Ready')].status + - name: Reason + type: string + jsonPath: .status.conditions[?(@.type=='Ready')].reason + - name: Age + type: date + jsonPath: .metadata.creationTimestamp \ No newline at end of file diff --git a/go.mod b/go.mod index 3cbbcf72..2bd99311 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/katallaxie/pkg v0.5.12 github.com/kelseyhightower/envconfig v1.4.0 github.com/logzio/logzio-go v1.0.6 + github.com/nats-io/nats.go v1.33.1 github.com/oapi-codegen/fiber-middleware v1.0.1 github.com/oapi-codegen/runtime v1.1.1 github.com/sethvargo/go-limiter v0.7.2 @@ -206,6 +207,8 @@ require ( github.com/moricho/tparallel v0.3.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nakabonne/nestif v0.3.1 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/nishanths/exhaustive v0.11.0 // indirect github.com/nishanths/predeclared v0.2.2 // indirect github.com/nunnatsa/ginkgolinter v0.14.1 // indirect diff --git a/go.sum b/go.sum index 40128a47..9055b157 100644 --- a/go.sum +++ b/go.sum @@ -822,6 +822,12 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/nakabonne/nestif v0.3.1 h1:wm28nZjhQY5HyYPx+weN3Q65k6ilSBxDb8v5S81B81U= github.com/nakabonne/nestif v0.3.1/go.mod h1:9EtoZochLn5iUprVDmDjqGKPofoUEBL8U4Ngq6aY7OE= +github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70= +github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nishanths/exhaustive v0.11.0 h1:T3I8nUGhl/Cwu5Z2hfc92l0e04D2GEW6e0l8pzda2l0= github.com/nishanths/exhaustive v0.11.0/go.mod h1:RqwDsZ1xY0dNdqHho2z6X+bgzizwbLYOWnZbbl2wLB4= diff --git a/pkg/apis/targets/v1alpha1/nats_lifecycle.go b/pkg/apis/targets/v1alpha1/nats_lifecycle.go new file mode 100644 index 00000000..a78dead7 --- /dev/null +++ b/pkg/apis/targets/v1alpha1/nats_lifecycle.go @@ -0,0 +1,49 @@ +package v1alpha1 + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime/schema" + + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" + + "github.com/zeiss/typhoon/pkg/apis/common/v1alpha1" +) + +// GetGroupVersionKind implements kmeta.OwnerRefable. +func (*NatsTarget) GetGroupVersionKind() schema.GroupVersionKind { + return SchemeGroupVersion.WithKind("KafkaTarget") +} + +// GetConditionSet implements duckv1.KRShaped. +func (*NatsTarget) GetConditionSet() apis.ConditionSet { + return v1alpha1.DefaultConditionSet +} + +// GetStatus implements duckv1.KRShaped. +func (t *NatsTarget) GetStatus() *duckv1.Status { + return &t.Status.Status +} + +// GetStatusManager implements Reconcilable. +func (t *NatsTarget) GetStatusManager() *v1alpha1.StatusManager { + return &v1alpha1.StatusManager{ + ConditionSet: t.GetConditionSet(), + Status: &t.Status, + } +} + +// GetAdapterOverrides implements AdapterConfigurable. +func (t *NatsTarget) GetAdapterOverrides() *v1alpha1.AdapterOverrides { + return t.Spec.AdapterOverrides +} + +// SetDefaults implements apis.Defaultable +func (t *NatsTarget) SetDefaults(ctx context.Context) { +} + +// Validate implements apis.Validatable +func (t *NatsTarget) Validate(ctx context.Context) *apis.FieldError { + return nil +} diff --git a/pkg/apis/targets/v1alpha1/nats_types.go b/pkg/apis/targets/v1alpha1/nats_types.go new file mode 100644 index 00000000..462acc54 --- /dev/null +++ b/pkg/apis/targets/v1alpha1/nats_types.go @@ -0,0 +1,49 @@ +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/zeiss/typhoon/pkg/apis/common/v1alpha1" +) + +// +genclient +// +genreconciler +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// NatsTarget is the Schema for an NatsTarget. +type NatsTarget struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec NatsTargetSpec `json:"spec"` + Status v1alpha1.Status `json:"status,omitempty"` +} + +// Check the interfaces the event target should be implementing. +var ( + _ v1alpha1.Reconcilable = (*NatsTarget)(nil) + _ v1alpha1.AdapterConfigurable = (*NatsTarget)(nil) +) + +// NatsTargetSpec defines the desired state of the event target. +type NatsTargetSpec struct { + // Subject where messages are produced. + Subject string `json:"subject"` + + // URL of the Nats server. + URL string `json:"url"` + + // Adapter spec overrides parameters. + // +optional + AdapterOverrides *v1alpha1.AdapterOverrides `json:"adapterOverrides,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// NatsTargetList is a list of event target instances. +type NatsTargetList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + + Items []NatsTarget `json:"items"` +} diff --git a/pkg/apis/targets/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/targets/v1alpha1/zz_generated.deepcopy.go index 7057cd26..c3b0932f 100644 --- a/pkg/apis/targets/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/targets/v1alpha1/zz_generated.deepcopy.go @@ -862,6 +862,88 @@ func (in *LogzTargetSpec) DeepCopy() *LogzTargetSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NatsTarget) DeepCopyInto(out *NatsTarget) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NatsTarget. +func (in *NatsTarget) DeepCopy() *NatsTarget { + if in == nil { + return nil + } + out := new(NatsTarget) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NatsTarget) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NatsTargetList) DeepCopyInto(out *NatsTargetList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]NatsTarget, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NatsTargetList. +func (in *NatsTargetList) DeepCopy() *NatsTargetList { + if in == nil { + return nil + } + out := new(NatsTargetList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NatsTargetList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NatsTargetSpec) DeepCopyInto(out *NatsTargetSpec) { + *out = *in + if in.AdapterOverrides != nil { + in, out := &in.AdapterOverrides, &out.AdapterOverrides + *out = new(commonv1alpha1.AdapterOverrides) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NatsTargetSpec. +func (in *NatsTargetSpec) DeepCopy() *NatsTargetSpec { + if in == nil { + return nil + } + out := new(NatsTargetSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OracleFunctionSpecSpec) DeepCopyInto(out *OracleFunctionSpecSpec) { *out = *in diff --git a/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/fake/fake_natstarget.go b/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/fake/fake_natstarget.go new file mode 100644 index 00000000..309f1300 --- /dev/null +++ b/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/fake/fake_natstarget.go @@ -0,0 +1,125 @@ +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1alpha1 "github.com/zeiss/typhoon/pkg/apis/targets/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeNatsTargets implements NatsTargetInterface +type FakeNatsTargets struct { + Fake *FakeTargetsV1alpha1 + ns string +} + +var natstargetsResource = v1alpha1.SchemeGroupVersion.WithResource("natstargets") + +var natstargetsKind = v1alpha1.SchemeGroupVersion.WithKind("NatsTarget") + +// Get takes name of the natsTarget, and returns the corresponding natsTarget object, and an error if there is any. +func (c *FakeNatsTargets) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.NatsTarget, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(natstargetsResource, c.ns, name), &v1alpha1.NatsTarget{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.NatsTarget), err +} + +// List takes label and field selectors, and returns the list of NatsTargets that match those selectors. +func (c *FakeNatsTargets) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.NatsTargetList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(natstargetsResource, natstargetsKind, c.ns, opts), &v1alpha1.NatsTargetList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.NatsTargetList{ListMeta: obj.(*v1alpha1.NatsTargetList).ListMeta} + for _, item := range obj.(*v1alpha1.NatsTargetList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested natsTargets. +func (c *FakeNatsTargets) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(natstargetsResource, c.ns, opts)) + +} + +// Create takes the representation of a natsTarget and creates it. Returns the server's representation of the natsTarget, and an error, if there is any. +func (c *FakeNatsTargets) Create(ctx context.Context, natsTarget *v1alpha1.NatsTarget, opts v1.CreateOptions) (result *v1alpha1.NatsTarget, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(natstargetsResource, c.ns, natsTarget), &v1alpha1.NatsTarget{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.NatsTarget), err +} + +// Update takes the representation of a natsTarget and updates it. Returns the server's representation of the natsTarget, and an error, if there is any. +func (c *FakeNatsTargets) Update(ctx context.Context, natsTarget *v1alpha1.NatsTarget, opts v1.UpdateOptions) (result *v1alpha1.NatsTarget, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(natstargetsResource, c.ns, natsTarget), &v1alpha1.NatsTarget{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.NatsTarget), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeNatsTargets) UpdateStatus(ctx context.Context, natsTarget *v1alpha1.NatsTarget, opts v1.UpdateOptions) (*v1alpha1.NatsTarget, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(natstargetsResource, "status", c.ns, natsTarget), &v1alpha1.NatsTarget{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.NatsTarget), err +} + +// Delete takes name of the natsTarget and deletes it. Returns an error if one occurs. +func (c *FakeNatsTargets) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteActionWithOptions(natstargetsResource, c.ns, name, opts), &v1alpha1.NatsTarget{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeNatsTargets) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(natstargetsResource, c.ns, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.NatsTargetList{}) + return err +} + +// Patch applies the patch and returns the patched natsTarget. +func (c *FakeNatsTargets) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.NatsTarget, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(natstargetsResource, c.ns, name, pt, data, subresources...), &v1alpha1.NatsTarget{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.NatsTarget), err +} diff --git a/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/fake/fake_targets_client.go b/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/fake/fake_targets_client.go index 8ac48b7d..1ca405f6 100644 --- a/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/fake/fake_targets_client.go +++ b/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/fake/fake_targets_client.go @@ -36,6 +36,10 @@ func (c *FakeTargetsV1alpha1) LogzTargets(namespace string) v1alpha1.LogzTargetI return &FakeLogzTargets{c, namespace} } +func (c *FakeTargetsV1alpha1) NatsTargets(namespace string) v1alpha1.NatsTargetInterface { + return &FakeNatsTargets{c, namespace} +} + func (c *FakeTargetsV1alpha1) OracleTargets(namespace string) v1alpha1.OracleTargetInterface { return &FakeOracleTargets{c, namespace} } diff --git a/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/generated_expansion.go b/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/generated_expansion.go index a0dcae0c..bc8cbf6b 100644 --- a/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/generated_expansion.go +++ b/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/generated_expansion.go @@ -14,6 +14,8 @@ type LogzMetricsTargetExpansion interface{} type LogzTargetExpansion interface{} +type NatsTargetExpansion interface{} + type OracleTargetExpansion interface{} type SplunkTargetExpansion interface{} diff --git a/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/natstarget.go b/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/natstarget.go new file mode 100644 index 00000000..c56103d0 --- /dev/null +++ b/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/natstarget.go @@ -0,0 +1,179 @@ +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + "time" + + v1alpha1 "github.com/zeiss/typhoon/pkg/apis/targets/v1alpha1" + scheme "github.com/zeiss/typhoon/pkg/client/generated/clientset/internalclientset/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// NatsTargetsGetter has a method to return a NatsTargetInterface. +// A group's client should implement this interface. +type NatsTargetsGetter interface { + NatsTargets(namespace string) NatsTargetInterface +} + +// NatsTargetInterface has methods to work with NatsTarget resources. +type NatsTargetInterface interface { + Create(ctx context.Context, natsTarget *v1alpha1.NatsTarget, opts v1.CreateOptions) (*v1alpha1.NatsTarget, error) + Update(ctx context.Context, natsTarget *v1alpha1.NatsTarget, opts v1.UpdateOptions) (*v1alpha1.NatsTarget, error) + UpdateStatus(ctx context.Context, natsTarget *v1alpha1.NatsTarget, opts v1.UpdateOptions) (*v1alpha1.NatsTarget, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.NatsTarget, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.NatsTargetList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.NatsTarget, err error) + NatsTargetExpansion +} + +// natsTargets implements NatsTargetInterface +type natsTargets struct { + client rest.Interface + ns string +} + +// newNatsTargets returns a NatsTargets +func newNatsTargets(c *TargetsV1alpha1Client, namespace string) *natsTargets { + return &natsTargets{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the natsTarget, and returns the corresponding natsTarget object, and an error if there is any. +func (c *natsTargets) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.NatsTarget, err error) { + result = &v1alpha1.NatsTarget{} + err = c.client.Get(). + Namespace(c.ns). + Resource("natstargets"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of NatsTargets that match those selectors. +func (c *natsTargets) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.NatsTargetList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.NatsTargetList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("natstargets"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested natsTargets. +func (c *natsTargets) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("natstargets"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a natsTarget and creates it. Returns the server's representation of the natsTarget, and an error, if there is any. +func (c *natsTargets) Create(ctx context.Context, natsTarget *v1alpha1.NatsTarget, opts v1.CreateOptions) (result *v1alpha1.NatsTarget, err error) { + result = &v1alpha1.NatsTarget{} + err = c.client.Post(). + Namespace(c.ns). + Resource("natstargets"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(natsTarget). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a natsTarget and updates it. Returns the server's representation of the natsTarget, and an error, if there is any. +func (c *natsTargets) Update(ctx context.Context, natsTarget *v1alpha1.NatsTarget, opts v1.UpdateOptions) (result *v1alpha1.NatsTarget, err error) { + result = &v1alpha1.NatsTarget{} + err = c.client.Put(). + Namespace(c.ns). + Resource("natstargets"). + Name(natsTarget.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(natsTarget). + Do(ctx). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *natsTargets) UpdateStatus(ctx context.Context, natsTarget *v1alpha1.NatsTarget, opts v1.UpdateOptions) (result *v1alpha1.NatsTarget, err error) { + result = &v1alpha1.NatsTarget{} + err = c.client.Put(). + Namespace(c.ns). + Resource("natstargets"). + Name(natsTarget.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(natsTarget). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the natsTarget and deletes it. Returns an error if one occurs. +func (c *natsTargets) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("natstargets"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *natsTargets) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("natstargets"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched natsTarget. +func (c *natsTargets) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.NatsTarget, err error) { + result = &v1alpha1.NatsTarget{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("natstargets"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/targets_client.go b/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/targets_client.go index 994712df..37ee7e01 100644 --- a/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/targets_client.go +++ b/pkg/client/generated/clientset/internalclientset/typed/targets/v1alpha1/targets_client.go @@ -18,6 +18,7 @@ type TargetsV1alpha1Interface interface { KafkaTargetsGetter LogzMetricsTargetsGetter LogzTargetsGetter + NatsTargetsGetter OracleTargetsGetter SplunkTargetsGetter } @@ -51,6 +52,10 @@ func (c *TargetsV1alpha1Client) LogzTargets(namespace string) LogzTargetInterfac return newLogzTargets(c, namespace) } +func (c *TargetsV1alpha1Client) NatsTargets(namespace string) NatsTargetInterface { + return newNatsTargets(c, namespace) +} + func (c *TargetsV1alpha1Client) OracleTargets(namespace string) OracleTargetInterface { return newOracleTargets(c, namespace) } diff --git a/pkg/client/generated/informers/externalversions/generic.go b/pkg/client/generated/informers/externalversions/generic.go index 2506a59d..e49a2dad 100644 --- a/pkg/client/generated/informers/externalversions/generic.go +++ b/pkg/client/generated/informers/externalversions/generic.go @@ -87,6 +87,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Targets().V1alpha1().LogzMetricsTargets().Informer()}, nil case targetsv1alpha1.SchemeGroupVersion.WithResource("logztargets"): return &genericInformer{resource: resource.GroupResource(), informer: f.Targets().V1alpha1().LogzTargets().Informer()}, nil + case targetsv1alpha1.SchemeGroupVersion.WithResource("natstargets"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Targets().V1alpha1().NatsTargets().Informer()}, nil case targetsv1alpha1.SchemeGroupVersion.WithResource("oracletargets"): return &genericInformer{resource: resource.GroupResource(), informer: f.Targets().V1alpha1().OracleTargets().Informer()}, nil case targetsv1alpha1.SchemeGroupVersion.WithResource("splunktargets"): diff --git a/pkg/client/generated/informers/externalversions/targets/v1alpha1/interface.go b/pkg/client/generated/informers/externalversions/targets/v1alpha1/interface.go index 2f996310..46aaf4ac 100644 --- a/pkg/client/generated/informers/externalversions/targets/v1alpha1/interface.go +++ b/pkg/client/generated/informers/externalversions/targets/v1alpha1/interface.go @@ -20,6 +20,8 @@ type Interface interface { LogzMetricsTargets() LogzMetricsTargetInformer // LogzTargets returns a LogzTargetInformer. LogzTargets() LogzTargetInformer + // NatsTargets returns a NatsTargetInformer. + NatsTargets() NatsTargetInformer // OracleTargets returns a OracleTargetInformer. OracleTargets() OracleTargetInformer // SplunkTargets returns a SplunkTargetInformer. @@ -67,6 +69,11 @@ func (v *version) LogzTargets() LogzTargetInformer { return &logzTargetInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// NatsTargets returns a NatsTargetInformer. +func (v *version) NatsTargets() NatsTargetInformer { + return &natsTargetInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // OracleTargets returns a OracleTargetInformer. func (v *version) OracleTargets() OracleTargetInformer { return &oracleTargetInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/client/generated/informers/externalversions/targets/v1alpha1/natstarget.go b/pkg/client/generated/informers/externalversions/targets/v1alpha1/natstarget.go new file mode 100644 index 00000000..24bbe802 --- /dev/null +++ b/pkg/client/generated/informers/externalversions/targets/v1alpha1/natstarget.go @@ -0,0 +1,74 @@ +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + targetsv1alpha1 "github.com/zeiss/typhoon/pkg/apis/targets/v1alpha1" + internalclientset "github.com/zeiss/typhoon/pkg/client/generated/clientset/internalclientset" + internalinterfaces "github.com/zeiss/typhoon/pkg/client/generated/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/zeiss/typhoon/pkg/client/generated/listers/targets/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// NatsTargetInformer provides access to a shared informer and lister for +// NatsTargets. +type NatsTargetInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.NatsTargetLister +} + +type natsTargetInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewNatsTargetInformer constructs a new informer for NatsTarget type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewNatsTargetInformer(client internalclientset.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredNatsTargetInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredNatsTargetInformer constructs a new informer for NatsTarget type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredNatsTargetInformer(client internalclientset.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.TargetsV1alpha1().NatsTargets(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.TargetsV1alpha1().NatsTargets(namespace).Watch(context.TODO(), options) + }, + }, + &targetsv1alpha1.NatsTarget{}, + resyncPeriod, + indexers, + ) +} + +func (f *natsTargetInformer) defaultInformer(client internalclientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredNatsTargetInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *natsTargetInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&targetsv1alpha1.NatsTarget{}, f.defaultInformer) +} + +func (f *natsTargetInformer) Lister() v1alpha1.NatsTargetLister { + return v1alpha1.NewNatsTargetLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/fake/fake.go b/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/fake/fake.go new file mode 100644 index 00000000..66a64fdc --- /dev/null +++ b/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/fake/fake.go @@ -0,0 +1,24 @@ +// Code generated by injection-gen. DO NOT EDIT. + +package fake + +import ( + context "context" + + fake "github.com/zeiss/typhoon/pkg/client/generated/injection/informers/factory/fake" + natstarget "github.com/zeiss/typhoon/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" +) + +var Get = natstarget.Get + +func init() { + injection.Fake.RegisterInformer(withInformer) +} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := fake.Get(ctx) + inf := f.Targets().V1alpha1().NatsTargets() + return context.WithValue(ctx, natstarget.Key{}, inf), inf.Informer() +} diff --git a/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/filtered/fake/fake.go b/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/filtered/fake/fake.go new file mode 100644 index 00000000..f735c20c --- /dev/null +++ b/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/filtered/fake/fake.go @@ -0,0 +1,36 @@ +// Code generated by injection-gen. DO NOT EDIT. + +package fake + +import ( + context "context" + + factoryfiltered "github.com/zeiss/typhoon/pkg/client/generated/injection/informers/factory/filtered" + filtered "github.com/zeiss/typhoon/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/filtered" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +var Get = filtered.Get + +func init() { + injection.Fake.RegisterFilteredInformers(withInformer) +} + +func withInformer(ctx context.Context) (context.Context, []controller.Informer) { + untyped := ctx.Value(factoryfiltered.LabelKey{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch labelkey from context.") + } + labelSelectors := untyped.([]string) + infs := []controller.Informer{} + for _, selector := range labelSelectors { + f := factoryfiltered.Get(ctx, selector) + inf := f.Targets().V1alpha1().NatsTargets() + ctx = context.WithValue(ctx, filtered.Key{Selector: selector}, inf) + infs = append(infs, inf.Informer()) + } + return ctx, infs +} diff --git a/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/filtered/natstarget.go b/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/filtered/natstarget.go new file mode 100644 index 00000000..74f9c327 --- /dev/null +++ b/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/filtered/natstarget.go @@ -0,0 +1,49 @@ +// Code generated by injection-gen. DO NOT EDIT. + +package filtered + +import ( + context "context" + + v1alpha1 "github.com/zeiss/typhoon/pkg/client/generated/informers/externalversions/targets/v1alpha1" + filtered "github.com/zeiss/typhoon/pkg/client/generated/injection/informers/factory/filtered" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +func init() { + injection.Default.RegisterFilteredInformers(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct { + Selector string +} + +func withInformer(ctx context.Context) (context.Context, []controller.Informer) { + untyped := ctx.Value(filtered.LabelKey{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch labelkey from context.") + } + labelSelectors := untyped.([]string) + infs := []controller.Informer{} + for _, selector := range labelSelectors { + f := filtered.Get(ctx, selector) + inf := f.Targets().V1alpha1().NatsTargets() + ctx = context.WithValue(ctx, Key{Selector: selector}, inf) + infs = append(infs, inf.Informer()) + } + return ctx, infs +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context, selector string) v1alpha1.NatsTargetInformer { + untyped := ctx.Value(Key{Selector: selector}) + if untyped == nil { + logging.FromContext(ctx).Panicf( + "Unable to fetch github.com/zeiss/typhoon/pkg/client/generated/informers/externalversions/targets/v1alpha1.NatsTargetInformer with selector %s from context.", selector) + } + return untyped.(v1alpha1.NatsTargetInformer) +} diff --git a/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/natstarget.go b/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/natstarget.go new file mode 100644 index 00000000..c1b645ad --- /dev/null +++ b/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget/natstarget.go @@ -0,0 +1,36 @@ +// Code generated by injection-gen. DO NOT EDIT. + +package natstarget + +import ( + context "context" + + v1alpha1 "github.com/zeiss/typhoon/pkg/client/generated/informers/externalversions/targets/v1alpha1" + factory "github.com/zeiss/typhoon/pkg/client/generated/injection/informers/factory" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +func init() { + injection.Default.RegisterInformer(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct{} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := factory.Get(ctx) + inf := f.Targets().V1alpha1().NatsTargets() + return context.WithValue(ctx, Key{}, inf), inf.Informer() +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context) v1alpha1.NatsTargetInformer { + untyped := ctx.Value(Key{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch github.com/zeiss/typhoon/pkg/client/generated/informers/externalversions/targets/v1alpha1.NatsTargetInformer from context.") + } + return untyped.(v1alpha1.NatsTargetInformer) +} diff --git a/pkg/client/generated/injection/reconciler/targets/v1alpha1/natstarget/controller.go b/pkg/client/generated/injection/reconciler/targets/v1alpha1/natstarget/controller.go new file mode 100644 index 00000000..24ed2f4d --- /dev/null +++ b/pkg/client/generated/injection/reconciler/targets/v1alpha1/natstarget/controller.go @@ -0,0 +1,154 @@ +// Code generated by injection-gen. DO NOT EDIT. + +package natstarget + +import ( + context "context" + fmt "fmt" + reflect "reflect" + strings "strings" + + internalclientsetscheme "github.com/zeiss/typhoon/pkg/client/generated/clientset/internalclientset/scheme" + client "github.com/zeiss/typhoon/pkg/client/generated/injection/client" + natstarget "github.com/zeiss/typhoon/pkg/client/generated/injection/informers/targets/v1alpha1/natstarget" + zap "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + labels "k8s.io/apimachinery/pkg/labels" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + scheme "k8s.io/client-go/kubernetes/scheme" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + record "k8s.io/client-go/tools/record" + kubeclient "knative.dev/pkg/client/injection/kube/client" + controller "knative.dev/pkg/controller" + logging "knative.dev/pkg/logging" + logkey "knative.dev/pkg/logging/logkey" + reconciler "knative.dev/pkg/reconciler" +) + +const ( + defaultControllerAgentName = "natstarget-controller" + defaultFinalizerName = "natstargets.targets.typhoon.zeiss.com" +) + +// NewImpl returns a controller.Impl that handles queuing and feeding work from +// the queue through an implementation of controller.Reconciler, delegating to +// the provided Interface and optional Finalizer methods. OptionsFn is used to return +// controller.ControllerOptions to be used by the internal reconciler. +func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsFn) *controller.Impl { + logger := logging.FromContext(ctx) + + // Check the options function input. It should be 0 or 1. + if len(optionsFns) > 1 { + logger.Fatal("Up to one options function is supported, found: ", len(optionsFns)) + } + + natstargetInformer := natstarget.Get(ctx) + + lister := natstargetInformer.Lister() + + var promoteFilterFunc func(obj interface{}) bool + var promoteFunc = func(bkt reconciler.Bucket) {} + + rec := &reconcilerImpl{ + LeaderAwareFuncs: reconciler.LeaderAwareFuncs{ + PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error { + + // Signal promotion event + promoteFunc(bkt) + + all, err := lister.List(labels.Everything()) + if err != nil { + return err + } + for _, elt := range all { + if promoteFilterFunc != nil { + if ok := promoteFilterFunc(elt); !ok { + continue + } + } + enq(bkt, types.NamespacedName{ + Namespace: elt.GetNamespace(), + Name: elt.GetName(), + }) + } + return nil + }, + }, + Client: client.Get(ctx), + Lister: lister, + reconciler: r, + finalizerName: defaultFinalizerName, + } + + ctrType := reflect.TypeOf(r).Elem() + ctrTypeName := fmt.Sprintf("%s.%s", ctrType.PkgPath(), ctrType.Name()) + ctrTypeName = strings.ReplaceAll(ctrTypeName, "/", ".") + + logger = logger.With( + zap.String(logkey.ControllerType, ctrTypeName), + zap.String(logkey.Kind, "targets.typhoon.zeiss.com.NatsTarget"), + ) + + impl := controller.NewContext(ctx, rec, controller.ControllerOptions{WorkQueueName: ctrTypeName, Logger: logger}) + agentName := defaultControllerAgentName + + // Pass impl to the options. Save any optional results. + for _, fn := range optionsFns { + opts := fn(impl) + if opts.ConfigStore != nil { + rec.configStore = opts.ConfigStore + } + if opts.FinalizerName != "" { + rec.finalizerName = opts.FinalizerName + } + if opts.AgentName != "" { + agentName = opts.AgentName + } + if opts.SkipStatusUpdates { + rec.skipStatusUpdates = true + } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } + if opts.PromoteFilterFunc != nil { + promoteFilterFunc = opts.PromoteFilterFunc + } + if opts.PromoteFunc != nil { + promoteFunc = opts.PromoteFunc + } + } + + rec.Recorder = createRecorder(ctx, agentName) + + return impl +} + +func createRecorder(ctx context.Context, agentName string) record.EventRecorder { + logger := logging.FromContext(ctx) + + recorder := controller.GetEventRecorder(ctx) + if recorder == nil { + // Create event broadcaster + logger.Debug("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + watches := []watch.Interface{ + eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), + eventBroadcaster.StartRecordingToSink( + &v1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events("")}), + } + recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: agentName}) + go func() { + <-ctx.Done() + for _, w := range watches { + w.Stop() + } + }() + } + + return recorder +} + +func init() { + internalclientsetscheme.AddToScheme(scheme.Scheme) +} diff --git a/pkg/client/generated/injection/reconciler/targets/v1alpha1/natstarget/reconciler.go b/pkg/client/generated/injection/reconciler/targets/v1alpha1/natstarget/reconciler.go new file mode 100644 index 00000000..484a493b --- /dev/null +++ b/pkg/client/generated/injection/reconciler/targets/v1alpha1/natstarget/reconciler.go @@ -0,0 +1,424 @@ +// Code generated by injection-gen. DO NOT EDIT. + +package natstarget + +import ( + context "context" + json "encoding/json" + fmt "fmt" + + v1alpha1 "github.com/zeiss/typhoon/pkg/apis/targets/v1alpha1" + internalclientset "github.com/zeiss/typhoon/pkg/client/generated/clientset/internalclientset" + targetsv1alpha1 "github.com/zeiss/typhoon/pkg/client/generated/listers/targets/v1alpha1" + zap "go.uber.org/zap" + "go.uber.org/zap/zapcore" + v1 "k8s.io/api/core/v1" + equality "k8s.io/apimachinery/pkg/api/equality" + errors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + types "k8s.io/apimachinery/pkg/types" + sets "k8s.io/apimachinery/pkg/util/sets" + record "k8s.io/client-go/tools/record" + controller "knative.dev/pkg/controller" + kmp "knative.dev/pkg/kmp" + logging "knative.dev/pkg/logging" + reconciler "knative.dev/pkg/reconciler" +) + +// Interface defines the strongly typed interfaces to be implemented by a +// controller reconciling v1alpha1.NatsTarget. +type Interface interface { + // ReconcileKind implements custom logic to reconcile v1alpha1.NatsTarget. Any changes + // to the objects .Status or .Finalizers will be propagated to the stored + // object. It is recommended that implementors do not call any update calls + // for the Kind inside of ReconcileKind, it is the responsibility of the calling + // controller to propagate those properties. The resource passed to ReconcileKind + // will always have an empty deletion timestamp. + ReconcileKind(ctx context.Context, o *v1alpha1.NatsTarget) reconciler.Event +} + +// Finalizer defines the strongly typed interfaces to be implemented by a +// controller finalizing v1alpha1.NatsTarget. +type Finalizer interface { + // FinalizeKind implements custom logic to finalize v1alpha1.NatsTarget. Any changes + // to the objects .Status or .Finalizers will be ignored. Returning a nil or + // Normal type reconciler.Event will allow the finalizer to be deleted on + // the resource. The resource passed to FinalizeKind will always have a set + // deletion timestamp. + FinalizeKind(ctx context.Context, o *v1alpha1.NatsTarget) reconciler.Event +} + +// ReadOnlyInterface defines the strongly typed interfaces to be implemented by a +// controller reconciling v1alpha1.NatsTarget if they want to process resources for which +// they are not the leader. +type ReadOnlyInterface interface { + // ObserveKind implements logic to observe v1alpha1.NatsTarget. + // This method should not write to the API. + ObserveKind(ctx context.Context, o *v1alpha1.NatsTarget) reconciler.Event +} + +type doReconcile func(ctx context.Context, o *v1alpha1.NatsTarget) reconciler.Event + +// reconcilerImpl implements controller.Reconciler for v1alpha1.NatsTarget resources. +type reconcilerImpl struct { + // LeaderAwareFuncs is inlined to help us implement reconciler.LeaderAware. + reconciler.LeaderAwareFuncs + + // Client is used to write back status updates. + Client internalclientset.Interface + + // Listers index properties about resources. + Lister targetsv1alpha1.NatsTargetLister + + // Recorder is an event recorder for recording Event resources to the + // Kubernetes API. + Recorder record.EventRecorder + + // configStore allows for decorating a context with config maps. + // +optional + configStore reconciler.ConfigStore + + // reconciler is the implementation of the business logic of the resource. + reconciler Interface + + // finalizerName is the name of the finalizer to reconcile. + finalizerName string + + // skipStatusUpdates configures whether or not this reconciler automatically updates + // the status of the reconciled resource. + skipStatusUpdates bool +} + +// Check that our Reconciler implements controller.Reconciler. +var _ controller.Reconciler = (*reconcilerImpl)(nil) + +// Check that our generated Reconciler is always LeaderAware. +var _ reconciler.LeaderAware = (*reconcilerImpl)(nil) + +func NewReconciler(ctx context.Context, logger *zap.SugaredLogger, client internalclientset.Interface, lister targetsv1alpha1.NatsTargetLister, recorder record.EventRecorder, r Interface, options ...controller.Options) controller.Reconciler { + // Check the options function input. It should be 0 or 1. + if len(options) > 1 { + logger.Fatal("Up to one options struct is supported, found: ", len(options)) + } + + // Fail fast when users inadvertently implement the other LeaderAware interface. + // For the typed reconcilers, Promote shouldn't take any arguments. + if _, ok := r.(reconciler.LeaderAware); ok { + logger.Fatalf("%T implements the incorrect LeaderAware interface. Promote() should not take an argument as genreconciler handles the enqueuing automatically.", r) + } + + rec := &reconcilerImpl{ + LeaderAwareFuncs: reconciler.LeaderAwareFuncs{ + PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error { + all, err := lister.List(labels.Everything()) + if err != nil { + return err + } + for _, elt := range all { + // TODO: Consider letting users specify a filter in options. + enq(bkt, types.NamespacedName{ + Namespace: elt.GetNamespace(), + Name: elt.GetName(), + }) + } + return nil + }, + }, + Client: client, + Lister: lister, + Recorder: recorder, + reconciler: r, + finalizerName: defaultFinalizerName, + } + + for _, opts := range options { + if opts.ConfigStore != nil { + rec.configStore = opts.ConfigStore + } + if opts.FinalizerName != "" { + rec.finalizerName = opts.FinalizerName + } + if opts.SkipStatusUpdates { + rec.skipStatusUpdates = true + } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } + } + + return rec +} + +// Reconcile implements controller.Reconciler +func (r *reconcilerImpl) Reconcile(ctx context.Context, key string) error { + logger := logging.FromContext(ctx) + + // Initialize the reconciler state. This will convert the namespace/name + // string into a distinct namespace and name, determine if this instance of + // the reconciler is the leader, and any additional interfaces implemented + // by the reconciler. Returns an error is the resource key is invalid. + s, err := newState(key, r) + if err != nil { + logger.Error("Invalid resource key: ", key) + return nil + } + + // If we are not the leader, and we don't implement either ReadOnly + // observer interfaces, then take a fast-path out. + if s.isNotLeaderNorObserver() { + return controller.NewSkipKey(key) + } + + // If configStore is set, attach the frozen configuration to the context. + if r.configStore != nil { + ctx = r.configStore.ToContext(ctx) + } + + // Add the recorder to context. + ctx = controller.WithEventRecorder(ctx, r.Recorder) + + // Get the resource with this namespace/name. + + getter := r.Lister.NatsTargets(s.namespace) + + original, err := getter.Get(s.name) + + if errors.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing and call + // the ObserveDeletion handler if appropriate. + logger.Debugf("Resource %q no longer exists", key) + if del, ok := r.reconciler.(reconciler.OnDeletionInterface); ok { + return del.ObserveDeletion(ctx, types.NamespacedName{ + Namespace: s.namespace, + Name: s.name, + }) + } + return nil + } else if err != nil { + return err + } + + // Don't modify the informers copy. + resource := original.DeepCopy() + + var reconcileEvent reconciler.Event + + name, do := s.reconcileMethodFor(resource) + // Append the target method to the logger. + logger = logger.With(zap.String("targetMethod", name)) + switch name { + case reconciler.DoReconcileKind: + // Set and update the finalizer on resource if r.reconciler + // implements Finalizer. + if resource, err = r.setFinalizerIfFinalizer(ctx, resource); err != nil { + return fmt.Errorf("failed to set finalizers: %w", err) + } + + if !r.skipStatusUpdates { + reconciler.PreProcessReconcile(ctx, resource) + } + + // Reconcile this copy of the resource and then write back any status + // updates regardless of whether the reconciliation errored out. + reconcileEvent = do(ctx, resource) + + if !r.skipStatusUpdates { + reconciler.PostProcessReconcile(ctx, resource, original) + } + + case reconciler.DoFinalizeKind: + // For finalizing reconcilers, if this resource being marked for deletion + // and reconciled cleanly (nil or normal event), remove the finalizer. + reconcileEvent = do(ctx, resource) + + if resource, err = r.clearFinalizer(ctx, resource, reconcileEvent); err != nil { + return fmt.Errorf("failed to clear finalizers: %w", err) + } + + case reconciler.DoObserveKind: + // Observe any changes to this resource, since we are not the leader. + reconcileEvent = do(ctx, resource) + + } + + // Synchronize the status. + switch { + case r.skipStatusUpdates: + // This reconciler implementation is configured to skip resource updates. + // This may mean this reconciler does not observe spec, but reconciles external changes. + case equality.Semantic.DeepEqual(original.Status, resource.Status): + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the injectionInformer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + case !s.isLeader: + // High-availability reconcilers may have many replicas watching the resource, but only + // the elected leader is expected to write modifications. + logger.Warn("Saw status changes when we aren't the leader!") + default: + if err = r.updateStatus(ctx, logger, original, resource); err != nil { + logger.Warnw("Failed to update resource status", zap.Error(err)) + r.Recorder.Eventf(resource, v1.EventTypeWarning, "UpdateFailed", + "Failed to update status for %q: %v", resource.Name, err) + return err + } + } + + // Report the reconciler event, if any. + if reconcileEvent != nil { + var event *reconciler.ReconcilerEvent + if reconciler.EventAs(reconcileEvent, &event) { + logger.Infow("Returned an event", zap.Any("event", reconcileEvent)) + r.Recorder.Event(resource, event.EventType, event.Reason, event.Error()) + + // the event was wrapped inside an error, consider the reconciliation as failed + if _, isEvent := reconcileEvent.(*reconciler.ReconcilerEvent); !isEvent { + return reconcileEvent + } + return nil + } + + if controller.IsSkipKey(reconcileEvent) { + // This is a wrapped error, don't emit an event. + } else if ok, _ := controller.IsRequeueKey(reconcileEvent); ok { + // This is a wrapped error, don't emit an event. + } else { + logger.Errorw("Returned an error", zap.Error(reconcileEvent)) + r.Recorder.Event(resource, v1.EventTypeWarning, "InternalError", reconcileEvent.Error()) + } + return reconcileEvent + } + + return nil +} + +func (r *reconcilerImpl) updateStatus(ctx context.Context, logger *zap.SugaredLogger, existing *v1alpha1.NatsTarget, desired *v1alpha1.NatsTarget) error { + existing = existing.DeepCopy() + return reconciler.RetryUpdateConflicts(func(attempts int) (err error) { + // The first iteration tries to use the injectionInformer's state, subsequent attempts fetch the latest state via API. + if attempts > 0 { + + getter := r.Client.TargetsV1alpha1().NatsTargets(desired.Namespace) + + existing, err = getter.Get(ctx, desired.Name, metav1.GetOptions{}) + if err != nil { + return err + } + } + + // If there's nothing to update, just return. + if equality.Semantic.DeepEqual(existing.Status, desired.Status) { + return nil + } + + if logger.Desugar().Core().Enabled(zapcore.DebugLevel) { + if diff, err := kmp.SafeDiff(existing.Status, desired.Status); err == nil && diff != "" { + logger.Debug("Updating status with: ", diff) + } + } + + existing.Status = desired.Status + + updater := r.Client.TargetsV1alpha1().NatsTargets(existing.Namespace) + + _, err = updater.UpdateStatus(ctx, existing, metav1.UpdateOptions{}) + return err + }) +} + +// updateFinalizersFiltered will update the Finalizers of the resource. +// TODO: this method could be generic and sync all finalizers. For now it only +// updates defaultFinalizerName or its override. +func (r *reconcilerImpl) updateFinalizersFiltered(ctx context.Context, resource *v1alpha1.NatsTarget, desiredFinalizers sets.Set[string]) (*v1alpha1.NatsTarget, error) { + // Don't modify the informers copy. + existing := resource.DeepCopy() + + var finalizers []string + + // If there's nothing to update, just return. + existingFinalizers := sets.New[string](existing.Finalizers...) + + if desiredFinalizers.Has(r.finalizerName) { + if existingFinalizers.Has(r.finalizerName) { + // Nothing to do. + return resource, nil + } + // Add the finalizer. + finalizers = append(existing.Finalizers, r.finalizerName) + } else { + if !existingFinalizers.Has(r.finalizerName) { + // Nothing to do. + return resource, nil + } + // Remove the finalizer. + existingFinalizers.Delete(r.finalizerName) + finalizers = sets.List(existingFinalizers) + } + + mergePatch := map[string]interface{}{ + "metadata": map[string]interface{}{ + "finalizers": finalizers, + "resourceVersion": existing.ResourceVersion, + }, + } + + patch, err := json.Marshal(mergePatch) + if err != nil { + return resource, err + } + + patcher := r.Client.TargetsV1alpha1().NatsTargets(resource.Namespace) + + resourceName := resource.Name + updated, err := patcher.Patch(ctx, resourceName, types.MergePatchType, patch, metav1.PatchOptions{}) + if err != nil { + r.Recorder.Eventf(existing, v1.EventTypeWarning, "FinalizerUpdateFailed", + "Failed to update finalizers for %q: %v", resourceName, err) + } else { + r.Recorder.Eventf(updated, v1.EventTypeNormal, "FinalizerUpdate", + "Updated %q finalizers", resource.GetName()) + } + return updated, err +} + +func (r *reconcilerImpl) setFinalizerIfFinalizer(ctx context.Context, resource *v1alpha1.NatsTarget) (*v1alpha1.NatsTarget, error) { + if _, ok := r.reconciler.(Finalizer); !ok { + return resource, nil + } + + finalizers := sets.New[string](resource.Finalizers...) + + // If this resource is not being deleted, mark the finalizer. + if resource.GetDeletionTimestamp().IsZero() { + finalizers.Insert(r.finalizerName) + } + + // Synchronize the finalizers filtered by r.finalizerName. + return r.updateFinalizersFiltered(ctx, resource, finalizers) +} + +func (r *reconcilerImpl) clearFinalizer(ctx context.Context, resource *v1alpha1.NatsTarget, reconcileEvent reconciler.Event) (*v1alpha1.NatsTarget, error) { + if _, ok := r.reconciler.(Finalizer); !ok { + return resource, nil + } + if resource.GetDeletionTimestamp().IsZero() { + return resource, nil + } + + finalizers := sets.New[string](resource.Finalizers...) + + if reconcileEvent != nil { + var event *reconciler.ReconcilerEvent + if reconciler.EventAs(reconcileEvent, &event) { + if event.EventType == v1.EventTypeNormal { + finalizers.Delete(r.finalizerName) + } + } + } else { + finalizers.Delete(r.finalizerName) + } + + // Synchronize the finalizers filtered by r.finalizerName. + return r.updateFinalizersFiltered(ctx, resource, finalizers) +} diff --git a/pkg/client/generated/injection/reconciler/targets/v1alpha1/natstarget/state.go b/pkg/client/generated/injection/reconciler/targets/v1alpha1/natstarget/state.go new file mode 100644 index 00000000..0659388b --- /dev/null +++ b/pkg/client/generated/injection/reconciler/targets/v1alpha1/natstarget/state.go @@ -0,0 +1,81 @@ +// Code generated by injection-gen. DO NOT EDIT. + +package natstarget + +import ( + fmt "fmt" + + v1alpha1 "github.com/zeiss/typhoon/pkg/apis/targets/v1alpha1" + types "k8s.io/apimachinery/pkg/types" + cache "k8s.io/client-go/tools/cache" + reconciler "knative.dev/pkg/reconciler" +) + +// state is used to track the state of a reconciler in a single run. +type state struct { + // key is the original reconciliation key from the queue. + key string + // namespace is the namespace split from the reconciliation key. + namespace string + // name is the name split from the reconciliation key. + name string + // reconciler is the reconciler. + reconciler Interface + // roi is the read only interface cast of the reconciler. + roi ReadOnlyInterface + // isROI (Read Only Interface) the reconciler only observes reconciliation. + isROI bool + // isLeader the instance of the reconciler is the elected leader. + isLeader bool +} + +func newState(key string, r *reconcilerImpl) (*state, error) { + // Convert the namespace/name string into a distinct namespace and name. + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return nil, fmt.Errorf("invalid resource key: %s", key) + } + + roi, isROI := r.reconciler.(ReadOnlyInterface) + + isLeader := r.IsLeaderFor(types.NamespacedName{ + Namespace: namespace, + Name: name, + }) + + return &state{ + key: key, + namespace: namespace, + name: name, + reconciler: r.reconciler, + roi: roi, + isROI: isROI, + isLeader: isLeader, + }, nil +} + +// isNotLeaderNorObserver checks to see if this reconciler with the current +// state is enabled to do any work or not. +// isNotLeaderNorObserver returns true when there is no work possible for the +// reconciler. +func (s *state) isNotLeaderNorObserver() bool { + if !s.isLeader && !s.isROI { + // If we are not the leader, and we don't implement the ReadOnly + // interface, then take a fast-path out. + return true + } + return false +} + +func (s *state) reconcileMethodFor(o *v1alpha1.NatsTarget) (string, doReconcile) { + if o.GetDeletionTimestamp().IsZero() { + if s.isLeader { + return reconciler.DoReconcileKind, s.reconciler.ReconcileKind + } else if s.isROI { + return reconciler.DoObserveKind, s.roi.ObserveKind + } + } else if fin, ok := s.reconciler.(Finalizer); s.isLeader && ok { + return reconciler.DoFinalizeKind, fin.FinalizeKind + } + return "unknown", nil +} diff --git a/pkg/client/generated/listers/targets/v1alpha1/expansion_generated.go b/pkg/client/generated/listers/targets/v1alpha1/expansion_generated.go index 529ab1fb..3d34e534 100644 --- a/pkg/client/generated/listers/targets/v1alpha1/expansion_generated.go +++ b/pkg/client/generated/listers/targets/v1alpha1/expansion_generated.go @@ -50,6 +50,14 @@ type LogzTargetListerExpansion interface{} // LogzTargetNamespaceLister. type LogzTargetNamespaceListerExpansion interface{} +// NatsTargetListerExpansion allows custom methods to be added to +// NatsTargetLister. +type NatsTargetListerExpansion interface{} + +// NatsTargetNamespaceListerExpansion allows custom methods to be added to +// NatsTargetNamespaceLister. +type NatsTargetNamespaceListerExpansion interface{} + // OracleTargetListerExpansion allows custom methods to be added to // OracleTargetLister. type OracleTargetListerExpansion interface{} diff --git a/pkg/client/generated/listers/targets/v1alpha1/natstarget.go b/pkg/client/generated/listers/targets/v1alpha1/natstarget.go new file mode 100644 index 00000000..75368793 --- /dev/null +++ b/pkg/client/generated/listers/targets/v1alpha1/natstarget.go @@ -0,0 +1,83 @@ +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/zeiss/typhoon/pkg/apis/targets/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// NatsTargetLister helps list NatsTargets. +// All objects returned here must be treated as read-only. +type NatsTargetLister interface { + // List lists all NatsTargets in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.NatsTarget, err error) + // NatsTargets returns an object that can list and get NatsTargets. + NatsTargets(namespace string) NatsTargetNamespaceLister + NatsTargetListerExpansion +} + +// natsTargetLister implements the NatsTargetLister interface. +type natsTargetLister struct { + indexer cache.Indexer +} + +// NewNatsTargetLister returns a new NatsTargetLister. +func NewNatsTargetLister(indexer cache.Indexer) NatsTargetLister { + return &natsTargetLister{indexer: indexer} +} + +// List lists all NatsTargets in the indexer. +func (s *natsTargetLister) List(selector labels.Selector) (ret []*v1alpha1.NatsTarget, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.NatsTarget)) + }) + return ret, err +} + +// NatsTargets returns an object that can list and get NatsTargets. +func (s *natsTargetLister) NatsTargets(namespace string) NatsTargetNamespaceLister { + return natsTargetNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// NatsTargetNamespaceLister helps list and get NatsTargets. +// All objects returned here must be treated as read-only. +type NatsTargetNamespaceLister interface { + // List lists all NatsTargets in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.NatsTarget, err error) + // Get retrieves the NatsTarget from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.NatsTarget, error) + NatsTargetNamespaceListerExpansion +} + +// natsTargetNamespaceLister implements the NatsTargetNamespaceLister +// interface. +type natsTargetNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all NatsTargets in the indexer for a given namespace. +func (s natsTargetNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.NatsTarget, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.NatsTarget)) + }) + return ret, err +} + +// Get retrieves the NatsTarget from the indexer for a given namespace and name. +func (s natsTargetNamespaceLister) Get(name string) (*v1alpha1.NatsTarget, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("natstarget"), name) + } + return obj.(*v1alpha1.NatsTarget), nil +} diff --git a/pkg/targets/adapter/natstarget/adapter.go b/pkg/targets/adapter/natstarget/adapter.go new file mode 100644 index 00000000..fc410c7b --- /dev/null +++ b/pkg/targets/adapter/natstarget/adapter.go @@ -0,0 +1,97 @@ +package natstarget + +import ( + "context" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/zeiss/typhoon/pkg/apis/targets" + "go.uber.org/zap" + pkgadapter "knative.dev/eventing/pkg/adapter/v2" + "knative.dev/pkg/logging" +) + +// EnvAccessorCtor for configuration parameters +func EnvAccessorCtor() pkgadapter.EnvConfigAccessor { + return &envAccessor{} +} + +type envAccessor struct { + pkgadapter.EnvConfig + + url string `envconfig:"NATS_URL"` + subject string `envconfig:"NATS_SUBJECT"` +} + +var _ pkgadapter.Adapter = (*natsAdapter)(nil) + +type natsAdapter struct { + client cloudevents.Client + js jetstream.JetStream + conn *nats.Conn + + subject string + + logger *zap.SugaredLogger + mt *pkgadapter.MetricTag +} + +// NewTarget adapter implementation +func NewTarget(ctx context.Context, envAcc pkgadapter.EnvConfigAccessor, client cloudevents.Client) pkgadapter.Adapter { + logger := logging.FromContext(ctx) + + mt := &pkgadapter.MetricTag{ + ResourceGroup: targets.KafkaTargetResource.String(), + Namespace: envAcc.GetNamespace(), + Name: envAcc.GetName(), + } + + env := envAcc.(*envAccessor) + + nc, err := nats.Connect(env.url) + if err != nil { + logger.Panicw("failed to connect to NATS", zap.Error(err)) + } + + js, err := jetstream.New(nc) + if err != nil { + logger.Panicw("failed to connect to JetStream", zap.Error(err)) + } + + return &natsAdapter{ + conn: nc, + mt: mt, + js: js, + client: client, + subject: env.subject, + } +} + +// Start is the main entrypoint for the adapter +func (a *natsAdapter) Start(ctx context.Context) error { + a.logger.Info("starting NATS.io adapter") + + defer func() { + a.conn.Close() + }() + + return a.client.StartReceiver(ctx, a.dispatch) +} + +func (a *natsAdapter) dispatch(event cloudevents.Event) cloudevents.Result { + msg := event.Data() + + f, err := a.js.PublishAsync(a.subject, msg) + if err != nil { + return err + } + + select { + case <-f.Err(): + return err + case <-f.Ok(): + return cloudevents.ResultACK + + } +}