From 96fa4cdeb633034724955813653c8f2c16d92dcd Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Fri, 21 Apr 2023 11:00:11 +0530 Subject: [PATCH 1/4] tables crd --- PROJECT | 17 ++- api/v1beta1/pinottable_types.go | 57 ++++++++++ api/v1beta1/zz_generated.deepcopy.go | 89 +++++++++++++++ cmd/main.go | 8 ++ config/crd/kustomization.yaml | 3 + .../patches/cainjection_in_pinotschemas.yaml | 2 +- .../patches/cainjection_in_pinottables.yaml | 7 ++ .../crd/patches/webhook_in_pinotschemas.yaml | 2 +- .../crd/patches/webhook_in_pinottables.yaml | 16 +++ config/default/kustomization.yaml | 4 +- config/manager/kustomization.yaml | 4 +- config/rbac/pinottable_editor_role.yaml | 31 ++++++ config/rbac/pinottable_viewer_role.yaml | 27 +++++ .../datainfra.io_v1beta1_pinottable.yaml | 12 +++ config/samples/kustomization.yaml | 1 + internal/pinot_controller/pinot_controller.go | 2 +- .../pinotschema_controller.go | 3 +- .../table_controller/pinottable_controller.go | 101 ++++++++++++++++++ internal/table_controller/reconciler.go | 27 +++++ internal/table_controller/suite_test.go | 80 ++++++++++++++ 20 files changed, 480 insertions(+), 13 deletions(-) create mode 100644 api/v1beta1/pinottable_types.go create mode 100644 config/crd/patches/cainjection_in_pinottables.yaml create mode 100644 config/crd/patches/webhook_in_pinottables.yaml create mode 100644 config/rbac/pinottable_editor_role.yaml create mode 100644 config/rbac/pinottable_viewer_role.yaml create mode 100644 config/samples/datainfra.io_v1beta1_pinottable.yaml create mode 100644 internal/table_controller/pinottable_controller.go create mode 100644 internal/table_controller/reconciler.go create mode 100644 internal/table_controller/suite_test.go diff --git a/PROJECT b/PROJECT index f71d80b..08419f9 100644 --- a/PROJECT +++ b/PROJECT @@ -5,8 +5,8 @@ domain: datainfra.io layout: - go.kubebuilder.io/v4-alpha -projectName: pinot-operator -repo: github.com/datainfrahq/pinot-operator +projectName: pinot-control-plane-k8s +repo: github.com/datainfrahq/pinot-control-plane-k8s resources: - api: crdVersion: v1 @@ -15,7 +15,7 @@ resources: domain: datainfra.io group: datainfra.io kind: Pinot - path: github.com/datainfrahq/pinot-operator/api/v1beta1 + path: github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1 version: v1beta1 - api: crdVersion: v1 @@ -24,6 +24,15 @@ resources: domain: datainfra.io group: datainfra.io kind: PinotSchema - path: github.com/datainfrahq/pinot-operator/api/v1beta1 + path: github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1 + version: v1beta1 +- api: + crdVersion: v1 + namespaced: true + controller: true + domain: datainfra.io + group: datainfra.io + kind: PinotTable + path: github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1 version: v1beta1 version: "3" diff --git a/api/v1beta1/pinottable_types.go b/api/v1beta1/pinottable_types.go new file mode 100644 index 0000000..5c4ab05 --- /dev/null +++ b/api/v1beta1/pinottable_types.go @@ -0,0 +1,57 @@ +/* +DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// PinotTableSpec defines the desired state of PinotTable +type PinotTableSpec struct { + Foo string `json:"foo,omitempty"` +} + +// PinotTableStatus defines the observed state of PinotTable +type PinotTableStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file +} + +//+kubebuilder:object:root=true +//+kubebuilder:subresource:status + +// PinotTable is the Schema for the pinottables API +type PinotTable struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec PinotTableSpec `json:"spec,omitempty"` + Status PinotTableStatus `json:"status,omitempty"` +} + +//+kubebuilder:object:root=true + +// PinotTableList contains a list of PinotTable +type PinotTableList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []PinotTable `json:"items"` +} + +func init() { + SchemeBuilder.Register(&PinotTable{}, &PinotTableList{}) +} diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index 2b9d557..addf123 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -427,6 +427,95 @@ func (in *PinotStatus) DeepCopy() *PinotStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PinotTable) DeepCopyInto(out *PinotTable) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PinotTable. +func (in *PinotTable) DeepCopy() *PinotTable { + if in == nil { + return nil + } + out := new(PinotTable) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PinotTable) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PinotTableList) DeepCopyInto(out *PinotTableList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]PinotTable, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PinotTableList. +func (in *PinotTableList) DeepCopy() *PinotTableList { + if in == nil { + return nil + } + out := new(PinotTableList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PinotTableList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PinotTableSpec) DeepCopyInto(out *PinotTableSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PinotTableSpec. +func (in *PinotTableSpec) DeepCopy() *PinotTableSpec { + if in == nil { + return nil + } + out := new(PinotTableSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PinotTableStatus) DeepCopyInto(out *PinotTableStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PinotTableStatus. +func (in *PinotTableStatus) DeepCopy() *PinotTableStatus { + if in == nil { + return nil + } + out := new(PinotTableStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *StorageConfig) DeepCopyInto(out *StorageConfig) { *out = *in diff --git a/cmd/main.go b/cmd/main.go index 8c36a0e..5142e22 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -34,6 +34,7 @@ import ( datainfraiov1beta1 "github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1" pinotcontroller "github.com/datainfrahq/pinot-control-plane-k8s/internal/pinot_controller" schemacontroller "github.com/datainfrahq/pinot-control-plane-k8s/internal/schema_controller" + tablecontroller "github.com/datainfrahq/pinot-control-plane-k8s/internal/table_controller" //+kubebuilder:scaffold:imports ) @@ -98,6 +99,13 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "PinotSchemaController") os.Exit(1) } + if err = (&tablecontroller.PinotTableReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "PinotTable") + os.Exit(1) + } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 2e5fd2d..b6e1801 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -4,6 +4,7 @@ resources: - bases/datainfra.io_pinots.yaml - bases/datainfra.io_pinotschemas.yaml +- bases/datainfra.io_pinottables.yaml #+kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: @@ -11,12 +12,14 @@ patchesStrategicMerge: # patches here are for enabling the conversion webhook for each CRD #- patches/webhook_in_pinots.yaml #- patches/webhook_in_pinotschemas.yaml +#- patches/webhook_in_pinottables.yaml #+kubebuilder:scaffold:crdkustomizewebhookpatch # [CERTMANAGER] To enable cert-manager, uncomment all the sections with [CERTMANAGER] prefix. # patches here are for enabling the CA injection for each CRD #- patches/cainjection_in_pinots.yaml #- patches/cainjection_in_pinotschemas.yaml +#- patches/cainjection_in_pinottables.yaml #+kubebuilder:scaffold:crdkustomizecainjectionpatch # the following config is for teaching kustomize how to do kustomization for CRDs. diff --git a/config/crd/patches/cainjection_in_pinotschemas.yaml b/config/crd/patches/cainjection_in_pinotschemas.yaml index 9c89496..ae28983 100644 --- a/config/crd/patches/cainjection_in_pinotschemas.yaml +++ b/config/crd/patches/cainjection_in_pinotschemas.yaml @@ -4,4 +4,4 @@ kind: CustomResourceDefinition metadata: annotations: cert-manager.io/inject-ca-from: CERTIFICATE_NAMESPACE/CERTIFICATE_NAME - name: pinotschemas.datainfra.io.datainfra.io + name: pinotschemas.datainfra.io diff --git a/config/crd/patches/cainjection_in_pinottables.yaml b/config/crd/patches/cainjection_in_pinottables.yaml new file mode 100644 index 0000000..0cb9164 --- /dev/null +++ b/config/crd/patches/cainjection_in_pinottables.yaml @@ -0,0 +1,7 @@ +# The following patch adds a directive for certmanager to inject CA into the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + cert-manager.io/inject-ca-from: CERTIFICATE_NAMESPACE/CERTIFICATE_NAME + name: pinottables.datainfra.io diff --git a/config/crd/patches/webhook_in_pinotschemas.yaml b/config/crd/patches/webhook_in_pinotschemas.yaml index f5778d0..392b224 100644 --- a/config/crd/patches/webhook_in_pinotschemas.yaml +++ b/config/crd/patches/webhook_in_pinotschemas.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: - name: pinotschemas.datainfra.io.datainfra.io + name: pinotschemas.datainfra.io spec: conversion: strategy: Webhook diff --git a/config/crd/patches/webhook_in_pinottables.yaml b/config/crd/patches/webhook_in_pinottables.yaml new file mode 100644 index 0000000..4332430 --- /dev/null +++ b/config/crd/patches/webhook_in_pinottables.yaml @@ -0,0 +1,16 @@ +# The following patch enables a conversion webhook for the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: pinottables.datainfra.io +spec: + conversion: + strategy: Webhook + webhook: + clientConfig: + service: + namespace: system + name: webhook-service + path: /convert + conversionReviewVersions: + - v1 diff --git a/config/default/kustomization.yaml b/config/default/kustomization.yaml index c9f1f71..69e602c 100644 --- a/config/default/kustomization.yaml +++ b/config/default/kustomization.yaml @@ -1,12 +1,12 @@ # Adds namespace to all resources. -namespace: pinot-operator-system +namespace: pinot-control-plane # Value of this field is prepended to the # names of all resources, e.g. a deployment named # "wordpress" becomes "alices-wordpress". # Note that it should also match with the prefix (text before '-') of the namespace # field above. -namePrefix: pinot-operator- +namePrefix: pinot-control-plane- # Labels to add to all resources and selectors. #labels: diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 5fd7ab1..0342c0d 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: datainfrahq/pinot-operator - newTag: v0.0.1 + newName: datainfrahq/pinot-control-plane + newTag: v0.0.3 diff --git a/config/rbac/pinottable_editor_role.yaml b/config/rbac/pinottable_editor_role.yaml new file mode 100644 index 0000000..a8b34d9 --- /dev/null +++ b/config/rbac/pinottable_editor_role.yaml @@ -0,0 +1,31 @@ +# permissions for end users to edit pinottables. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/name: clusterrole + app.kubernetes.io/instance: pinottable-editor-role + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: pinot-operator + app.kubernetes.io/part-of: pinot-operator + app.kubernetes.io/managed-by: kustomize + name: pinottable-editor-role +rules: +- apiGroups: + - datainfra.io.datainfra.io + resources: + - pinottables + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - datainfra.io.datainfra.io + resources: + - pinottables/status + verbs: + - get diff --git a/config/rbac/pinottable_viewer_role.yaml b/config/rbac/pinottable_viewer_role.yaml new file mode 100644 index 0000000..05e0b55 --- /dev/null +++ b/config/rbac/pinottable_viewer_role.yaml @@ -0,0 +1,27 @@ +# permissions for end users to view pinottables. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/name: clusterrole + app.kubernetes.io/instance: pinottable-viewer-role + app.kubernetes.io/component: rbac + app.kubernetes.io/created-by: pinot-operator + app.kubernetes.io/part-of: pinot-operator + app.kubernetes.io/managed-by: kustomize + name: pinottable-viewer-role +rules: +- apiGroups: + - datainfra.io.datainfra.io + resources: + - pinottables + verbs: + - get + - list + - watch +- apiGroups: + - datainfra.io.datainfra.io + resources: + - pinottables/status + verbs: + - get diff --git a/config/samples/datainfra.io_v1beta1_pinottable.yaml b/config/samples/datainfra.io_v1beta1_pinottable.yaml new file mode 100644 index 0000000..e73c98e --- /dev/null +++ b/config/samples/datainfra.io_v1beta1_pinottable.yaml @@ -0,0 +1,12 @@ +apiVersion: datainfra.io/v1beta1 +kind: PinotTable +metadata: + labels: + app.kubernetes.io/name: pinottable + app.kubernetes.io/instance: pinottable-sample + app.kubernetes.io/part-of: pinot-control-plane-k8s + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/created-by: pinot-control-plane-k8s + name: pinottable-sample +spec: + # TODO(user): Add fields here diff --git a/config/samples/kustomization.yaml b/config/samples/kustomization.yaml index 3871230..b4ea598 100644 --- a/config/samples/kustomization.yaml +++ b/config/samples/kustomization.yaml @@ -2,4 +2,5 @@ resources: - datainfra.io_v1beta1_pinot.yaml - datainfra.io_v1beta1_pinotschema.yaml +- datainfra.io_v1beta1_pinottable.yaml #+kubebuilder:scaffold:manifestskustomizesamples diff --git a/internal/pinot_controller/pinot_controller.go b/internal/pinot_controller/pinot_controller.go index df31094..791952b 100644 --- a/internal/pinot_controller/pinot_controller.go +++ b/internal/pinot_controller/pinot_controller.go @@ -50,7 +50,7 @@ func NewPinotReconciler(mgr ctrl.Manager) *PinotReconciler { Log: initLogger, Scheme: mgr.GetScheme(), ReconcileWait: lookupReconcileTime(initLogger), - Recorder: mgr.GetEventRecorderFor("pinot-operator"), + Recorder: mgr.GetEventRecorderFor("pinot-control-plane"), } } diff --git a/internal/schema_controller/pinotschema_controller.go b/internal/schema_controller/pinotschema_controller.go index 647170b..cf161c6 100644 --- a/internal/schema_controller/pinotschema_controller.go +++ b/internal/schema_controller/pinotschema_controller.go @@ -50,7 +50,7 @@ func NewPinotReconciler(mgr ctrl.Manager) *PinotSchemaReconciler { Log: initLogger, Scheme: mgr.GetScheme(), ReconcileWait: lookupReconcileTime(initLogger), - Recorder: mgr.GetEventRecorderFor("pinot-operator"), + Recorder: mgr.GetEventRecorderFor("pinot-control-plane"), } } @@ -59,7 +59,6 @@ func NewPinotReconciler(mgr ctrl.Manager) *PinotSchemaReconciler { //+kubebuilder:rbac:groups=datainfra.io,resources=pinotschemas/finalizers,verbs=update func (r *PinotSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - _ = log.FromContext(ctx) logr := log.FromContext(ctx) diff --git a/internal/table_controller/pinottable_controller.go b/internal/table_controller/pinottable_controller.go new file mode 100644 index 0000000..e108bdc --- /dev/null +++ b/internal/table_controller/pinottable_controller.go @@ -0,0 +1,101 @@ +/* +DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tablecontroller + +import ( + "context" + "os" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1" + datainfraiov1beta1 "github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1" + "github.com/go-logr/logr" +) + +// PinotTableReconciler reconciles a PinotTable object +type PinotTableReconciler struct { + client.Client + Log logr.Logger + Scheme *runtime.Scheme + // reconcile time duration, defaults to 10s + ReconcileWait time.Duration + Recorder record.EventRecorder +} + +func NewPinotTableReconciler(mgr ctrl.Manager) *PinotTableReconciler { + initLogger := ctrl.Log.WithName("controllers").WithName("pinot") + return &PinotTableReconciler{ + Client: mgr.GetClient(), + Log: initLogger, + Scheme: mgr.GetScheme(), + ReconcileWait: lookupReconcileTime(initLogger), + Recorder: mgr.GetEventRecorderFor("pinot-control-plane"), + } +} + +//+kubebuilder:rbac:groups=datainfra.io,resources=pinottables,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=datainfra.io,resources=pinottables/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=datainfra.io,resources=pinottables/finalizers,verbs=update + +func (r *PinotTableReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logr := log.FromContext(ctx) + + pinotTableCR := &v1beta1.PinotTable{} + err := r.Get(context.TODO(), req.NamespacedName, pinotTableCR) + if err != nil { + if errors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + if err := r.do(ctx, pinotTableCR); err != nil { + logr.Error(err, err.Error()) + return ctrl.Result{}, err + } else { + return ctrl.Result{RequeueAfter: r.ReconcileWait}, nil + } +} + +// SetupWithManager sets up the controller with the Manager. +func (r *PinotTableReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&datainfraiov1beta1.PinotTable{}). + Complete(r) +} + +func lookupReconcileTime(log logr.Logger) time.Duration { + val, exists := os.LookupEnv("RECONCILE_WAIT") + if !exists { + return time.Second * 10 + } else { + v, err := time.ParseDuration(val) + if err != nil { + log.Error(err, err.Error()) + // Exit Program if not valid + os.Exit(1) + } + return v + } +} diff --git a/internal/table_controller/reconciler.go b/internal/table_controller/reconciler.go new file mode 100644 index 0000000..15124ef --- /dev/null +++ b/internal/table_controller/reconciler.go @@ -0,0 +1,27 @@ +/* +DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tablecontroller + +import ( + "context" + + "github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1" +) + +func (r *PinotTableReconciler) do(ctx context.Context, schema *v1beta1.PinotTable) error { + return nil +} diff --git a/internal/table_controller/suite_test.go b/internal/table_controller/suite_test.go new file mode 100644 index 0000000..851a4ef --- /dev/null +++ b/internal/table_controller/suite_test.go @@ -0,0 +1,80 @@ +/* +DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tablecontroller + +import ( + "path/filepath" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + datainfraiov1beta1 "github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1" + //+kubebuilder:scaffold:imports +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. + +var cfg *rest.Config +var k8sClient client.Client +var testEnv *envtest.Environment + +func TestAPIs(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecs(t, "Controller Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + By("bootstrapping test environment") + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, + } + + var err error + // cfg is defined in this file globally. + cfg, err = testEnv.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) + + err = datainfraiov1beta1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + //+kubebuilder:scaffold:scheme + + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient).NotTo(BeNil()) + +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + err := testEnv.Stop() + Expect(err).NotTo(HaveOccurred()) +}) From 69ea0ef1d0b62f0b802a543dcbfd5660f71c8bb2 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Fri, 21 Apr 2023 15:14:12 +0530 Subject: [PATCH 2/4] table crud --- README.md | 2 +- api/v1beta1/pinotschema_types.go | 8 +- api/v1beta1/pinottable_types.go | 26 ++- cmd/main.go | 10 +- .../crd/bases/datainfra.io_pinotschemas.yaml | 7 +- .../crd/bases/datainfra.io_pinottables.yaml | 69 ++++++ config/rbac/role.yaml | 26 +++ examples/pinotschema-simple.yaml | 4 +- examples/pinottable-simple.yaml | 44 ++++ .../pinotschema_controller.go | 2 +- internal/schema_controller/reconciler.go | 130 ++++++----- internal/table_controller/reconciler.go | 215 +++++++++++++++++- 12 files changed, 454 insertions(+), 89 deletions(-) create mode 100644 config/crd/bases/datainfra.io_pinottables.yaml create mode 100644 examples/pinottable-simple.yaml diff --git a/README.md b/README.md index 43fc0b8..878d27d 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ Control Plane for deploying and managing heterogenous apache pinot kubernetes cl ### Getting Started ``` -export STORAGE_CLASS_NAME=civo-volume +export STORAGE_CLASS_NAME=standard make helm-install-pinot-control-plane make helm-install-zk-operator envsubst < examples/pinot-simple.yaml | kubectl apply -f - -n pinot diff --git a/api/v1beta1/pinotschema_types.go b/api/v1beta1/pinotschema_types.go index fb0124d..54cb88c 100644 --- a/api/v1beta1/pinotschema_types.go +++ b/api/v1beta1/pinotschema_types.go @@ -22,8 +22,10 @@ import ( // PinotSchemaSpec defines the desired state of PinotSchema type PinotSchemaSpec struct { - ClusterName string `json:"clusterName"` - SchemaJson string `json:"schema.json,omitempty"` + // +required + PinotCluster string `json:"pinotCluster"` + // +required + PinotSchemaJson string `json:"schema.json"` } // PinotSchemaStatus defines the observed state of PinotSchema @@ -33,7 +35,7 @@ type PinotSchemaStatus struct { // +kubebuilder:object:root=true // +kubebuilder:subresource:status // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" -// +kubebuilder:printcolumn:name="Pinot_Cluster",type="string",JSONPath=".spec.clusterName" +// +kubebuilder:printcolumn:name="Pinot_Cluster",type="string",JSONPath=".spec.pinotCluster" // PinotSchema is the Schema for the pinotschemas API type PinotSchema struct { metav1.TypeMeta `json:",inline"` diff --git a/api/v1beta1/pinottable_types.go b/api/v1beta1/pinottable_types.go index 5c4ab05..18d1d76 100644 --- a/api/v1beta1/pinottable_types.go +++ b/api/v1beta1/pinottable_types.go @@ -20,20 +20,34 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +type PinotTableType string + +const ( + RealTimeTable PinotTableType = "realtime" + OfflineTimeTable PinotTableType = "offline" +) + // PinotTableSpec defines the desired state of PinotTable type PinotTableSpec struct { - Foo string `json:"foo,omitempty"` + // +required + PinotCluster string `json:"pinotCluster"` + // +required + PinotSchema string `json:"pinotSchema"` + // +required + PinotTableType PinotTableType `json:"pinotTableType"` + // +required + PinotTablesJson string `json:"tables.json"` } // PinotTableStatus defines the observed state of PinotTable type PinotTableStatus struct { - // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster - // Important: Run "make" to regenerate code after modifying this file } -//+kubebuilder:object:root=true -//+kubebuilder:subresource:status - +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" +// +kubebuilder:printcolumn:name="Pinot_Cluster",type="string",JSONPath=".spec.pinotCluster" +// +kubebuilder:printcolumn:name="Pinot_Schema",type="string",JSONPath=".spec.pinotSchema" // PinotTable is the Schema for the pinottables API type PinotTable struct { metav1.TypeMeta `json:",inline"` diff --git a/cmd/main.go b/cmd/main.go index 5142e22..1eeea7e 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -95,17 +95,15 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "PinotController") os.Exit(1) } - if err = (schemacontroller.NewPinotReconciler(mgr)).SetupWithManager(mgr); err != nil { + if err = (schemacontroller.NewPinotSchemaReconciler(mgr)).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PinotSchemaController") os.Exit(1) } - if err = (&tablecontroller.PinotTableReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "PinotTable") + if err = (tablecontroller.NewPinotTableReconciler(mgr)).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "PinotTableController") os.Exit(1) } + //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/config/crd/bases/datainfra.io_pinotschemas.yaml b/config/crd/bases/datainfra.io_pinotschemas.yaml index 64af925..1a32a5a 100644 --- a/config/crd/bases/datainfra.io_pinotschemas.yaml +++ b/config/crd/bases/datainfra.io_pinotschemas.yaml @@ -19,7 +19,7 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date - - jsonPath: .spec.clusterName + - jsonPath: .spec.pinotCluster name: Pinot_Cluster type: string name: v1beta1 @@ -42,12 +42,13 @@ spec: spec: description: PinotSchemaSpec defines the desired state of PinotSchema properties: - clusterName: + pinotCluster: type: string schema.json: type: string required: - - clusterName + - pinotCluster + - schema.json type: object status: description: PinotSchemaStatus defines the observed state of PinotSchema diff --git a/config/crd/bases/datainfra.io_pinottables.yaml b/config/crd/bases/datainfra.io_pinottables.yaml new file mode 100644 index 0000000..c5ce24f --- /dev/null +++ b/config/crd/bases/datainfra.io_pinottables.yaml @@ -0,0 +1,69 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.11.1 + creationTimestamp: null + name: pinottables.datainfra.io +spec: + group: datainfra.io + names: + kind: PinotTable + listKind: PinotTableList + plural: pinottables + singular: pinottable + scope: Namespaced + versions: + - additionalPrinterColumns: + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + - jsonPath: .spec.pinotCluster + name: Pinot_Cluster + type: string + - jsonPath: .spec.pinotSchema + name: Pinot_Schema + type: string + name: v1beta1 + schema: + openAPIV3Schema: + description: PinotTable is the Schema for the pinottables API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: PinotTableSpec defines the desired state of PinotTable + properties: + pinotCluster: + type: string + pinotSchema: + type: string + pinotTableType: + type: string + tables.json: + type: string + required: + - pinotCluster + - pinotSchema + - pinotTableType + - tables.json + type: object + status: + description: PinotTableStatus defines the observed state of PinotTable + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index ac14fb9..3687bad 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -139,3 +139,29 @@ rules: - get - patch - update +- apiGroups: + - datainfra.io + resources: + - pinottables + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - datainfra.io + resources: + - pinottables/finalizers + verbs: + - update +- apiGroups: + - datainfra.io + resources: + - pinottables/status + verbs: + - get + - patch + - update diff --git a/examples/pinotschema-simple.yaml b/examples/pinotschema-simple.yaml index 58cf586..0402e13 100644 --- a/examples/pinotschema-simple.yaml +++ b/examples/pinotschema-simple.yaml @@ -3,7 +3,7 @@ kind: PinotSchema metadata: name: airlinestats spec: - clusterName: pinot-simple + pinotCluster: pinot-simple schema.json: |- { "metricFieldSpecs": [ @@ -339,5 +339,5 @@ spec: "granularity": "1:DAYS" } ], - "schemaName": "airlinestats" + "schemaName": "airlineStats" } diff --git a/examples/pinottable-simple.yaml b/examples/pinottable-simple.yaml new file mode 100644 index 0000000..03eb140 --- /dev/null +++ b/examples/pinottable-simple.yaml @@ -0,0 +1,44 @@ +apiVersion: datainfra.io/v1beta1 +kind: PinotTable +metadata: + name: airlinestats +spec: + pinotCluster: pinot-simple + pinotSchema: airlinestats + pinotTableType: REALTIME + tables.json: |- + { + "tableName": "airlineStats", + "tableType": "REALTIME", + "segmentsConfig": { + "timeColumnName": "DaysSinceEpoch", + "timeType": "DAYS", + "retentionTimeUnit": "DAYS", + "retentionTimeValue": "5650", + "segmentPushType": "APPEND", + "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", + "schemaName": "airlineStats", + "replication": "1", + "replicasPerPartition": "1" + }, + "tenants": {}, + "tableIndexConfig": { + "loadMode": "MMAP", + "streamConfigs": { + "streamType": "kafka", + "stream.kafka.consumer.type": "simple", + "stream.kafka.topic.name": "flights-realtime", + "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", + "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", + "stream.kafka.hlc.zk.connect.string": "kafka-zookeeper:2181", + "stream.kafka.zk.broker.url": "kafka-zookeeper:2181", + "stream.kafka.broker.list": "kafka-0.kafka-headless.pinot.svc.cluster.local:9092", + "realtime.segment.flush.threshold.time": "3600000", + "realtime.segment.flush.threshold.size": "100", + "stream.kafka.consumer.prop.auto.offset.reset": "smallest" + } + }, + "metadata": { + "customConfigs": {} + } + } diff --git a/internal/schema_controller/pinotschema_controller.go b/internal/schema_controller/pinotschema_controller.go index cf161c6..321d23d 100644 --- a/internal/schema_controller/pinotschema_controller.go +++ b/internal/schema_controller/pinotschema_controller.go @@ -43,7 +43,7 @@ type PinotSchemaReconciler struct { Recorder record.EventRecorder } -func NewPinotReconciler(mgr ctrl.Manager) *PinotSchemaReconciler { +func NewPinotSchemaReconciler(mgr ctrl.Manager) *PinotSchemaReconciler { initLogger := ctrl.Log.WithName("controllers").WithName("pinot") return &PinotSchemaReconciler{ Client: mgr.GetClient(), diff --git a/internal/schema_controller/reconciler.go b/internal/schema_controller/reconciler.go index 8dda171..c99253d 100644 --- a/internal/schema_controller/reconciler.go +++ b/internal/schema_controller/reconciler.go @@ -40,25 +40,17 @@ const ( PinotSchemaControllerDeleteSuccess = "PinotSchemaControllerDeleteSuccess" PinotSchemaControllerDeleteFail = "PinotSchemaControllerDeleteFail" PinotSchemaControllerFinalizer = "pinotschema.datainfra.io/finalizer" + PinotControllerPort = "9000" ) func (r *PinotSchemaReconciler) do(ctx context.Context, schema *v1beta1.PinotSchema) error { - listOpts := []client.ListOption{ - client.InNamespace(schema.Namespace), - client.MatchingLabels(map[string]string{ - "custom_resource": schema.Spec.ClusterName, - "nodeType": "controller", - }), - } - svcList := &v1.ServiceList{} - if err := r.Client.List(ctx, svcList, listOpts...); err != nil { + + svcName, err := r.getControllerSvcUrl(schema.Namespace, schema.Spec.PinotCluster) + if err != nil { return err } - var svcName string - for range svcList.Items { - svcName = svcList.Items[0].Name - } + fmt.Println(makeControllerCreateSchemaPath(svcName)) getOwnerRef := makeOwnerRef( schema.APIVersion, @@ -66,7 +58,7 @@ func (r *PinotSchemaReconciler) do(ctx context.Context, schema *v1beta1.PinotSch schema.Name, schema.UID, ) - cm := r.makeSchemaConfigMap(schema, getOwnerRef, schema.Spec.SchemaJson) + cm := r.makeSchemaConfigMap(schema, getOwnerRef, schema.Spec.PinotSchemaJson) build := builder.NewBuilder( builder.ToNewBuilderConfigMap([]builder.BuilderConfigMap{*cm}), @@ -82,63 +74,45 @@ func (r *PinotSchemaReconciler) do(ctx context.Context, schema *v1beta1.PinotSch return err } - if resp == controllerutil.OperationResultCreated { - if schema.Spec.SchemaJson != "" { - - http := internalHTTP.NewHTTPClient(http.MethodPost, makeControllerUrl(svcName, schema.Namespace)+"/schemas", http.Client{}, []byte(schema.Spec.SchemaJson)) - resp, err := http.Do() - if err != nil { - build.Recorder.GenericEvent(schema, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotSchemaControllerCreateFail) - return err - } + switch resp { + case controllerutil.OperationResultCreated: + http := internalHTTP.NewHTTPClient(http.MethodPost, makeControllerCreateSchemaPath(svcName), http.Client{}, []byte(schema.Spec.PinotSchemaJson)) + resp, err := http.Do() + if err != nil { + build.Recorder.GenericEvent(schema, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotSchemaControllerCreateFail) + return err + } - if getRespCode(resp) != "200" && getRespCode(resp) != "" { - build.Recorder.GenericEvent(schema, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotSchemaControllerCreateFail) - } else { - build.Recorder.GenericEvent(schema, v1.EventTypeNormal, fmt.Sprintf("Resp [%s]", string(resp)), PinotSchemaControllerCreateSuccess) - } + if getRespCode(resp) != "200" && getRespCode(resp) != "" { + build.Recorder.GenericEvent(schema, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotSchemaControllerCreateFail) + } else { + build.Recorder.GenericEvent(schema, v1.EventTypeNormal, fmt.Sprintf("Resp [%s]", string(resp)), PinotSchemaControllerCreateSuccess) } - } else if resp == controllerutil.OperationResultUpdated { - if schema.Spec.SchemaJson != "" { - schemaName, err := getSchemaName(schema.Spec.SchemaJson) - if err != nil { - return err - } - http := internalHTTP.NewHTTPClient(http.MethodPut, makeControllerUrl(svcName, schema.Namespace)+"/schemas/"+schemaName, http.Client{}, []byte(schema.Spec.SchemaJson)) - resp, err := http.Do() - if err != nil { - build.Recorder.GenericEvent(schema, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotSchemaControllerUpdateFail) - return err - } - if getRespCode(resp) != "200" && getRespCode(resp) != "" { - build.Recorder.GenericEvent(schema, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotSchemaControllerUpdateFail) - } else { - build.Recorder.GenericEvent(schema, v1.EventTypeNormal, fmt.Sprintf("Resp [%s]", string(resp)), PinotSchemaControllerUpdateSuccess) - } + case controllerutil.OperationResultUpdated: + schemaName, err := getSchemaName(schema.Spec.PinotSchemaJson) + if err != nil { + return err } - } - - if schema.ObjectMeta.DeletionTimestamp.IsZero() { - // The object is not being deleted, so if it does not have our finalizer, - // then lets add the finalizer and update the object. This is equivalent - // registering our finalizer. - if !controllerutil.ContainsFinalizer(schema, PinotSchemaControllerFinalizer) { - controllerutil.AddFinalizer(schema, PinotSchemaControllerFinalizer) - if err := r.Update(ctx, schema); err != nil { - return err - } + http := internalHTTP.NewHTTPClient(http.MethodPut, makeControllerUpdateDeleteSchemaPath(svcName, schemaName), http.Client{}, []byte(schema.Spec.PinotSchemaJson)) + resp, err := http.Do() + if err != nil { + build.Recorder.GenericEvent(schema, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotSchemaControllerUpdateFail) + return err } - } else { - // The object is being deleted + if getRespCode(resp) != "200" && getRespCode(resp) != "" { + build.Recorder.GenericEvent(schema, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotSchemaControllerUpdateFail) + } else { + build.Recorder.GenericEvent(schema, v1.EventTypeNormal, fmt.Sprintf("Resp [%s]", string(resp)), PinotSchemaControllerUpdateSuccess) + } + default: if controllerutil.ContainsFinalizer(schema, PinotSchemaControllerFinalizer) { // our finalizer is present, so lets handle any external dependency - fmt.Println("Deleting") - schemaName, err := getSchemaName(schema.Spec.SchemaJson) + schemaName, err := getSchemaName(schema.Spec.PinotSchemaJson) if err != nil { return err } - http := internalHTTP.NewHTTPClient(http.MethodDelete, makeControllerUrl(svcName, schema.Namespace)+"/schemas/"+schemaName, http.Client{}, []byte{}) + http := internalHTTP.NewHTTPClient(http.MethodDelete, makeControllerUpdateDeleteSchemaPath(svcName, schemaName), http.Client{}, []byte{}) resp, err := http.Do() if err != nil { build.Recorder.GenericEvent(schema, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotSchemaControllerDeleteFail) @@ -156,7 +130,6 @@ func (r *PinotSchemaReconciler) do(ctx context.Context, schema *v1beta1.PinotSch return err } } - return nil } return nil @@ -199,10 +172,6 @@ func makeOwnerRef(apiVersion, kind, name string, uid types.UID) *metav1.OwnerRef } } -func makeControllerUrl(name, namespace string) string { - return "http://" + name + "." + namespace + ".svc.cluster.local:9000" -} - func getSchemaName(schemaJson string) (string, error) { var err error @@ -214,6 +183,35 @@ func getSchemaName(schemaJson string) (string, error) { return utils.TrimQuote(string(schema["schemaName"])), nil } +func makeControllerCreateSchemaPath(svcName string) string { return svcName + "/schemas" } + +func makeControllerUpdateDeleteSchemaPath(svcName, schemaName string) string { + return svcName + "/schemas/" + schemaName +} + +func (r *PinotSchemaReconciler) getControllerSvcUrl(namespace, pinotClusterName string) (string, error) { + listOpts := []client.ListOption{ + client.InNamespace(namespace), + client.MatchingLabels(map[string]string{ + "custom_resource": pinotClusterName, + "nodeType": "controller", + }), + } + svcList := &v1.ServiceList{} + if err := r.Client.List(context.Background(), svcList, listOpts...); err != nil { + return "", err + } + var svcName string + + for range svcList.Items { + svcName = svcList.Items[0].Name + } + + newName := "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort + + return newName, nil +} + func getRespCode(resp []byte) string { var err error diff --git a/internal/table_controller/reconciler.go b/internal/table_controller/reconciler.go index 15124ef..83a2914 100644 --- a/internal/table_controller/reconciler.go +++ b/internal/table_controller/reconciler.go @@ -18,10 +18,223 @@ package tablecontroller import ( "context" + "encoding/json" + "fmt" + "net/http" + "github.com/datainfrahq/operator-runtime/builder" "github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1" + internalHTTP "github.com/datainfrahq/pinot-control-plane-k8s/internal/http" + "github.com/datainfrahq/pinot-control-plane-k8s/internal/utils" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) -func (r *PinotTableReconciler) do(ctx context.Context, schema *v1beta1.PinotTable) error { +const ( + PinotTableControllerCreateSuccess = "PinotSchemaControllerCreateSuccess" + PinotTableControllerCreateFail = "PinotTableControllerCreateFail" + PinotTableControllerUpdateSuccess = "PinotTableControllerUpdateSuccess" + PinotTableControllerUpdateFail = "PinotTableControllerUpdateFail" + PinotTableControllerDeleteSuccess = "PinotTableControllerDeleteSuccess" + PinotTableControllerDeleteFail = "PinotTableControllerDeleteFail" + PinotTableControllerFinalizer = "pinottable.datainfra.io/finalizer" + PinotControllerPort = "9000" +) + +func (r *PinotTableReconciler) do(ctx context.Context, table *v1beta1.PinotTable) error { + + svcName, err := r.getControllerSvcUrl(table.Namespace, table.Spec.PinotCluster) + if err != nil { + return err + } + + getOwnerRef := makeOwnerRef( + table.APIVersion, + table.Kind, + table.Name, + table.UID, + ) + cm := r.makeTableConfigMap(table, getOwnerRef, table.Spec.PinotTablesJson) + + build := builder.NewBuilder( + builder.ToNewBuilderConfigMap([]builder.BuilderConfigMap{*cm}), + builder.ToNewBuilderRecorder(builder.BuilderRecorder{Recorder: r.Recorder, ControllerName: "PinotTableController"}), + builder.ToNewBuilderContext(builder.BuilderContext{Context: ctx}), + builder.ToNewBuilderStore( + *builder.NewStore(r.Client, map[string]string{"table": table.Name}, table.Namespace, table), + ), + ) + + resp, err := build.ReconcileConfigMap() + if err != nil { + return err + } + + switch resp { + case controllerutil.OperationResultCreated: + + http := internalHTTP.NewHTTPClient(http.MethodPost, makeControllerCreateTablePath(svcName), http.Client{}, []byte(table.Spec.PinotTablesJson)) + resp, err := http.Do() + if err != nil { + build.Recorder.GenericEvent(table, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotTableControllerCreateFail) + return err + } + + if getRespCode(resp) != "200" && getRespCode(resp) != "" { + build.Recorder.GenericEvent(table, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotTableControllerCreateFail) + } else { + build.Recorder.GenericEvent(table, v1.EventTypeNormal, fmt.Sprintf("Resp [%s]", string(resp)), PinotTableControllerCreateSuccess) + } + case controllerutil.OperationResultUpdated: + tableName, err := getTableName(table.Spec.PinotTablesJson) + if err != nil { + return err + } + + http := internalHTTP.NewHTTPClient(http.MethodPut, makeControllerUpdateDeleteTablePath(svcName, tableName), http.Client{}, []byte(table.Spec.PinotTablesJson)) + resp, err := http.Do() + if err != nil { + build.Recorder.GenericEvent(table, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotTableControllerUpdateFail) + return err + } + + if getRespCode(resp) != "200" && getRespCode(resp) != "" { + build.Recorder.GenericEvent(table, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotTableControllerUpdateFail) + } else { + build.Recorder.GenericEvent(table, v1.EventTypeNormal, fmt.Sprintf("Resp [%s]", string(resp)), PinotTableControllerUpdateSuccess) + } + + default: + if table.ObjectMeta.DeletionTimestamp.IsZero() { + // The object is not being deleted, so if it does not have our finalizer, + // then lets add the finalizer and update the object. This is equivalent + // registering our finalizer. + if !controllerutil.ContainsFinalizer(table, PinotTableControllerFinalizer) { + controllerutil.AddFinalizer(table, PinotTableControllerFinalizer) + if err := r.Update(ctx, table); err != nil { + return err + } + } + } else { + // The object is being deleted + if controllerutil.ContainsFinalizer(table, PinotTableControllerFinalizer) { + // our finalizer is present, so lets handle any external dependency + tableName, err := getTableName(table.Spec.PinotTablesJson) + if err != nil { + return err + } + http := internalHTTP.NewHTTPClient(http.MethodDelete, makeControllerUpdateDeleteTablePath(svcName, tableName), http.Client{}, []byte{}) + resp, err := http.Do() + if err != nil { + build.Recorder.GenericEvent(table, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotTableControllerDeleteFail) + return err + } + if getRespCode(resp) != "200" && getRespCode(resp) != "" { + build.Recorder.GenericEvent(table, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp)), PinotTableControllerDeleteFail) + } else { + build.Recorder.GenericEvent(table, v1.EventTypeNormal, fmt.Sprintf("Resp [%s]", string(resp)), PinotTableControllerDeleteSuccess) + } + + // remove our finalizer from the list and update it. + controllerutil.RemoveFinalizer(table, PinotTableControllerFinalizer) + if err := r.Update(ctx, table); err != nil { + return err + } + } + return nil + } + } + return nil } + +func (r *PinotTableReconciler) makeTableConfigMap( + table *v1beta1.PinotTable, + ownerRef *metav1.OwnerReference, + data interface{}, +) *builder.BuilderConfigMap { + + configMap := &builder.BuilderConfigMap{ + CommonBuilder: builder.CommonBuilder{ + ObjectMeta: metav1.ObjectMeta{ + Name: table.GetName() + "-" + "table", + Namespace: table.GetNamespace(), + }, + Client: r.Client, + CrObject: table, + OwnerRef: *ownerRef, + }, + Data: map[string]string{ + "tables.json": data.(string), + }, + } + + return configMap +} + +// create owner ref ie pinot table controller +func makeOwnerRef(apiVersion, kind, name string, uid types.UID) *metav1.OwnerReference { + controller := true + + return &metav1.OwnerReference{ + APIVersion: apiVersion, + Kind: kind, + Name: name, + UID: uid, + Controller: &controller, + } +} + +func getTableName(tablesJson string) (string, error) { + var err error + + schema := make(map[string]json.RawMessage) + if err = json.Unmarshal([]byte(tablesJson), &schema); err != nil { + return "", err + } + + return utils.TrimQuote(string(schema["tableName"])), nil +} + +func getRespCode(resp []byte) string { + var err error + + respMap := make(map[string]json.RawMessage) + if err = json.Unmarshal(resp, &respMap); err != nil { + return "" + } + + return utils.TrimQuote(string(respMap["code"])) +} + +func makeControllerCreateTablePath(svcName string) string { return svcName + "/tables" } + +func makeControllerUpdateDeleteTablePath(svcName, tableName string) string { + return svcName + "/tables/" + tableName +} + +func (r *PinotTableReconciler) getControllerSvcUrl(namespace, pinotClusterName string) (string, error) { + listOpts := []client.ListOption{ + client.InNamespace(namespace), + client.MatchingLabels(map[string]string{ + "custom_resource": pinotClusterName, + "nodeType": "controller", + }), + } + svcList := &v1.ServiceList{} + if err := r.Client.List(context.Background(), svcList, listOpts...); err != nil { + return "", err + } + var svcName string + + for range svcList.Items { + svcName = svcList.Items[0].Name + } + + newName := "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort + + return newName, nil +} From a30c7c5cea22fdb3105f39d4372ffa9e460bf31e Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Fri, 21 Apr 2023 15:17:03 +0530 Subject: [PATCH 3/4] update table name --- internal/table_controller/reconciler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/table_controller/reconciler.go b/internal/table_controller/reconciler.go index 83a2914..b842b90 100644 --- a/internal/table_controller/reconciler.go +++ b/internal/table_controller/reconciler.go @@ -34,7 +34,7 @@ import ( ) const ( - PinotTableControllerCreateSuccess = "PinotSchemaControllerCreateSuccess" + PinotTableControllerCreateSuccess = "PinotTableControllerCreateSuccess" PinotTableControllerCreateFail = "PinotTableControllerCreateFail" PinotTableControllerUpdateSuccess = "PinotTableControllerUpdateSuccess" PinotTableControllerUpdateFail = "PinotTableControllerUpdateFail" From e8a072e5cdcf59cde3373911d9616015e81f9c9e Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Fri, 21 Apr 2023 16:48:55 +0530 Subject: [PATCH 4/4] v0.0.4 gandasa --- Makefile | 2 +- README.md | 54 ++ config/rbac/pinottable_editor_role.yaml | 4 +- config/rbac/pinottable_viewer_role.yaml | 4 +- examples/pinot/pinot-realtime-kafka.yaml | 793 +----------------- helm/pinot-control-plane/Chart.yaml | 4 +- .../crds/datainfra.io_pinotschemas.yaml | 7 +- .../crds/datainfra.io_pinottables.yaml | 69 ++ .../templates/rbac_manager.yaml | 18 + 9 files changed, 153 insertions(+), 802 deletions(-) create mode 100644 helm/pinot-control-plane/templates/crds/datainfra.io_pinottables.yaml diff --git a/Makefile b/Makefile index 45c6867..98849ac 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # Image URL to use all building/pushing image targets -IMG ?= datainfrahq/pinot-control-plane:v0.0.3 +IMG ?= datainfrahq/pinot-control-plane:v0.0.4 # ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary. ENVTEST_K8S_VERSION = 1.26.0 diff --git a/README.md b/README.md index 878d27d..f95e155 100644 --- a/README.md +++ b/README.md @@ -22,12 +22,66 @@ Control Plane for deploying and managing heterogenous apache pinot kubernetes cl ### Getting Started +- Export your StorageClassName ``` export STORAGE_CLASS_NAME=standard +``` + +- Install Pinot Control Plane +``` make helm-install-pinot-control-plane +``` + +- Install Zookeeper Opoerator and CR +``` make helm-install-zk-operator +``` + +- Install Pinot Cluster +``` envsubst < examples/pinot-simple.yaml | kubectl apply -f - -n pinot ``` + +- Deploy Kafka Cluster and Create Topics +``` +# Add Kafka +helm repo add kafka https://charts.bitnami.com/bitnami +# Deploy kafka +helm install -n pinot kafka kafka/kafka --set replicas=1,zookeeper.image.tag=latest +# Create topics +kubectl -n pinot exec kafka-0 -- kafka-topics.sh --bootstrap-server kafka-0:9092 --topic flights-realtime --create --partitions 1 --replication-factor 1 +kubectl -n pinot exec kafka-0 -- kafka-topics.sh --bootstrap-server kafka-0:9092 --topic flights-realtime-avro --create --partitions 1 --replication-factor 1 +``` + +- Create Schema +``` +kubectl apply -f examples/pinotschema-simple.yaml -n pinot +``` + +- Create Table +``` +kubectl apply -f examples/pinottable-simple.yaml -n pinot +``` + +- Check All Custom Resources created by the control plane +``` +kubectl get pinot -A +kubectl get pinotSchema -A +kubectl get pinottable -A +``` + +- Load Data Into Kafka +``` +kubectl apply -f examples/pinot/pinot-realtime-kafka.yaml +``` + +- Port-forward and query on console +``` +``` +kubectl port-forward pinot-controller-controller-0 -n pinot 9000 +``` +``` + ### Getting Started With DeepStorage Minio - An e2e getting started from kafka > pinot > minio s3. diff --git a/config/rbac/pinottable_editor_role.yaml b/config/rbac/pinottable_editor_role.yaml index a8b34d9..5361570 100644 --- a/config/rbac/pinottable_editor_role.yaml +++ b/config/rbac/pinottable_editor_role.yaml @@ -12,7 +12,7 @@ metadata: name: pinottable-editor-role rules: - apiGroups: - - datainfra.io.datainfra.io + - datainfra.io resources: - pinottables verbs: @@ -24,7 +24,7 @@ rules: - update - watch - apiGroups: - - datainfra.io.datainfra.io + - datainfra.io resources: - pinottables/status verbs: diff --git a/config/rbac/pinottable_viewer_role.yaml b/config/rbac/pinottable_viewer_role.yaml index 05e0b55..0025efe 100644 --- a/config/rbac/pinottable_viewer_role.yaml +++ b/config/rbac/pinottable_viewer_role.yaml @@ -12,7 +12,7 @@ metadata: name: pinottable-viewer-role rules: - apiGroups: - - datainfra.io.datainfra.io + - datainfra.io resources: - pinottables verbs: @@ -20,7 +20,7 @@ rules: - list - watch - apiGroups: - - datainfra.io.datainfra.io + - datainfra.io resources: - pinottables/status verbs: diff --git a/examples/pinot/pinot-realtime-kafka.yaml b/examples/pinot/pinot-realtime-kafka.yaml index 0200029..4229a23 100644 --- a/examples/pinot/pinot-realtime-kafka.yaml +++ b/examples/pinot/pinot-realtime-kafka.yaml @@ -1,798 +1,7 @@ -apiVersion: v1 -kind: ConfigMap -metadata: - name: examples - namespace: pinot -data: - airlineStats_realtime_table_config.json: |- - { - "tableName": "airlineStats", - "tableType": "REALTIME", - "segmentsConfig": { - "timeColumnName": "DaysSinceEpoch", - "timeType": "DAYS", - "retentionTimeUnit": "DAYS", - "retentionTimeValue": "3650", - "segmentPushType": "APPEND", - "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", - "schemaName": "airlineStats", - "replication": "1", - "replicasPerPartition": "1" - }, - "tenants": {}, - "tableIndexConfig": { - "loadMode": "MMAP", - "streamConfigs": { - "streamType": "kafka", - "stream.kafka.consumer.type": "simple", - "stream.kafka.topic.name": "flights-realtime", - "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", - "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", - "stream.kafka.hlc.zk.connect.string": "kafka-zookeeper:2181", - "stream.kafka.zk.broker.url": "kafka-zookeeper:2181", - "stream.kafka.broker.list": "kafka-0.kafka-headless.pinot.svc.cluster.local:9092", - "realtime.segment.flush.threshold.time": "3600000", - "realtime.segment.flush.threshold.size": "100", - "stream.kafka.consumer.prop.auto.offset.reset": "smallest" - } - }, - "metadata": { - "customConfigs": {} - } - } - - airlineStatsAvro_realtime_table_config.json: |- - { - "tableName": "airlineStatsAvro", - "tableType": "REALTIME", - "segmentsConfig": { - "timeColumnName": "DaysSinceEpoch", - "timeType": "DAYS", - "retentionTimeUnit": "DAYS", - "retentionTimeValue": "3650", - "segmentPushType": "APPEND", - "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", - "schemaName": "airlineStatsAvro", - "replication": "1", - "replicasPerPartition": "1" - }, - "tenants": {}, - "tableIndexConfig": { - "loadMode": "MMAP", - "streamConfigs": { - "streamType": "kafka", - "stream.kafka.consumer.type": "simple", - "stream.kafka.topic.name": "flights-realtime-avro", - "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.stream.SimpleAvroMessageDecoder", - "stream.kafka.decoder.prop.schema": "{\"type\":\"record\",\"name\":\"Flight\",\"namespace\":\"pinot\",\"fields\":[{\"name\":\"DaysSinceEpoch\",\"type\":[\"int\"]},{\"name\":\"Year\",\"type\":[\"int\"]},{\"name\":\"Quarter\",\"type\":[\"int\"]},{\"name\":\"Month\",\"type\":[\"int\"]},{\"name\":\"DayofMonth\",\"type\":[\"int\"]},{\"name\":\"DayOfWeek\",\"type\":[\"int\"]},{\"name\":\"FlightDate\",\"type\":[\"string\"]},{\"name\":\"UniqueCarrier\",\"type\":[\"string\"]},{\"name\":\"AirlineID\",\"type\":[\"int\"]},{\"name\":\"Carrier\",\"type\":[\"string\"]},{\"name\":\"TailNum\",\"type\":[\"string\",\"null\"]},{\"name\":\"FlightNum\",\"type\":[\"int\"]},{\"name\":\"OriginAirportID\",\"type\":[\"int\"]},{\"name\":\"OriginAirportSeqID\",\"type\":[\"int\"]},{\"name\":\"OriginCityMarketID\",\"type\":[\"int\"]},{\"name\":\"Origin\",\"type\":[\"string\"]},{\"name\":\"OriginCityName\",\"type\":[\"string\"]},{\"name\":\"OriginState\",\"type\":[\"string\"]},{\"name\":\"OriginStateFips\",\"type\":[\"int\"]},{\"name\":\"OriginStateName\",\"type\":[\"string\"]},{\"name\":\"OriginWac\",\"type\":[\"int\"]},{\"name\":\"DestAirportID\",\"type\":[\"int\"]},{\"name\":\"DestAirportSeqID\",\"type\":[\"int\"]},{\"name\":\"DestCityMarketID\",\"type\":[\"int\"]},{\"name\":\"Dest\",\"type\":[\"string\"]},{\"name\":\"DestCityName\",\"type\":[\"string\"]},{\"name\":\"DestState\",\"type\":[\"string\"]},{\"name\":\"DestStateFips\",\"type\":[\"int\"]},{\"name\":\"DestStateName\",\"type\":[\"string\"]},{\"name\":\"DestWac\",\"type\":[\"int\"]},{\"name\":\"CRSDepTime\",\"type\":[\"int\"]},{\"name\":\"DepTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"DepDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"DepDelayMinutes\",\"type\":[\"int\",\"null\"]},{\"name\":\"DepDel15\",\"type\":[\"int\",\"null\"]},{\"name\":\"DepartureDelayGroups\",\"type\":[\"int\",\"null\"]},{\"name\":\"DepTimeBlk\",\"type\":[\"string\"]},{\"name\":\"TaxiOut\",\"type\":[\"int\",\"null\"]},{\"name\":\"WheelsOff\",\"type\":[\"int\",\"null\"]},{\"name\":\"WheelsOn\",\"type\":[\"int\",\"null\"]},{\"name\":\"TaxiIn\",\"type\":[\"int\",\"null\"]},{\"name\":\"CRSArrTime\",\"type\":[\"int\"]},{\"name\":\"ArrTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"ArrDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"ArrDelayMinutes\",\"type\":[\"int\",\"null\"]},{\"name\":\"ArrDel15\",\"type\":[\"int\",\"null\"]},{\"name\":\"ArrivalDelayGroups\",\"type\":[\"int\",\"null\"]},{\"name\":\"ArrTimeBlk\",\"type\":[\"string\"]},{\"name\":\"Cancelled\",\"type\":[\"int\"]},{\"name\":\"CancellationCode\",\"type\":[\"string\",\"null\"]},{\"name\":\"Diverted\",\"type\":[\"int\"]},{\"name\":\"CRSElapsedTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"ActualElapsedTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"AirTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"Flights\",\"type\":[\"int\"]},{\"name\":\"Distance\",\"type\":[\"int\"]},{\"name\":\"DistanceGroup\",\"type\":[\"int\"]},{\"name\":\"CarrierDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"WeatherDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"NASDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"SecurityDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"LateAircraftDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"FirstDepTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"TotalAddGTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"LongestAddGTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"DivAirportLandings\",\"type\":[\"int\"]},{\"name\":\"DivReachedDest\",\"type\":[\"int\",\"null\"]},{\"name\":\"DivActualElapsedTime\",\"type\":[\"int\",\"null\"]},{\"name\":\"DivArrDelay\",\"type\":[\"int\",\"null\"]},{\"name\":\"DivDistance\",\"type\":[\"int\",\"null\"]},{\"name\":\"DivAirports\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"DivAirportIDs\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"DivAirportSeqIDs\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"DivWheelsOns\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"DivTotalGTimes\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"DivLongestGTimes\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"DivWheelsOffs\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"DivTailNums\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"RandomAirports\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}", - "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", - "stream.kafka.hlc.zk.connect.string": "kafka-zookeeper:2181", - "stream.kafka.zk.broker.url": "kafka-zookeeper:2181", - "stream.kafka.broker.list": "kafka:9092", - "realtime.segment.flush.threshold.time": "3600000", - "realtime.segment.flush.threshold.size": "100", - "stream.kafka.consumer.prop.auto.offset.reset": "smallest" - } - }, - "metadata": { - "customConfigs": {} - } - } - - airlineStats_schema.json: |- - { - "metricFieldSpecs": [ - ], - "dimensionFieldSpecs": [ - { - "dataType": "INT", - "name": "ActualElapsedTime" - }, - { - "dataType": "INT", - "name": "AirTime" - }, - { - "dataType": "INT", - "name": "AirlineID" - }, - { - "dataType": "INT", - "name": "ArrDel15" - }, - { - "dataType": "INT", - "name": "ArrDelay" - }, - { - "dataType": "INT", - "name": "ArrDelayMinutes" - }, - { - "dataType": "INT", - "name": "ArrTime" - }, - { - "dataType": "STRING", - "name": "ArrTimeBlk" - }, - { - "dataType": "INT", - "name": "ArrivalDelayGroups" - }, - { - "dataType": "INT", - "name": "CRSArrTime" - }, - { - "dataType": "INT", - "name": "CRSDepTime" - }, - { - "dataType": "INT", - "name": "CRSElapsedTime" - }, - { - "dataType": "STRING", - "name": "CancellationCode" - }, - { - "dataType": "INT", - "name": "Cancelled" - }, - { - "dataType": "STRING", - "name": "Carrier" - }, - { - "dataType": "INT", - "name": "CarrierDelay" - }, - { - "dataType": "INT", - "name": "DayOfWeek" - }, - { - "dataType": "INT", - "name": "DayofMonth" - }, - { - "dataType": "INT", - "name": "DepDel15" - }, - { - "dataType": "INT", - "name": "DepDelay" - }, - { - "dataType": "INT", - "name": "DepDelayMinutes" - }, - { - "dataType": "INT", - "name": "DepTime" - }, - { - "dataType": "STRING", - "name": "DepTimeBlk" - }, - { - "dataType": "INT", - "name": "DepartureDelayGroups" - }, - { - "dataType": "STRING", - "name": "Dest" - }, - { - "dataType": "INT", - "name": "DestAirportID" - }, - { - "dataType": "INT", - "name": "DestAirportSeqID" - }, - { - "dataType": "INT", - "name": "DestCityMarketID" - }, - { - "dataType": "STRING", - "name": "DestCityName" - }, - { - "dataType": "STRING", - "name": "DestState" - }, - { - "dataType": "INT", - "name": "DestStateFips" - }, - { - "dataType": "STRING", - "name": "DestStateName" - }, - { - "dataType": "INT", - "name": "DestWac" - }, - { - "dataType": "INT", - "name": "Distance" - }, - { - "dataType": "INT", - "name": "DistanceGroup" - }, - { - "dataType": "INT", - "name": "DivActualElapsedTime" - }, - { - "dataType": "INT", - "name": "DivAirportIDs", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "DivAirportLandings" - }, - { - "dataType": "INT", - "name": "DivAirportSeqIDs", - "singleValueField": false - }, - { - "dataType": "STRING", - "name": "DivAirports", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "DivArrDelay" - }, - { - "dataType": "INT", - "name": "DivDistance" - }, - { - "dataType": "INT", - "name": "DivLongestGTimes", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "DivReachedDest" - }, - { - "dataType": "STRING", - "name": "DivTailNums", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "DivTotalGTimes", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "DivWheelsOffs", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "DivWheelsOns", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "Diverted" - }, - { - "dataType": "INT", - "name": "FirstDepTime" - }, - { - "dataType": "STRING", - "name": "FlightDate" - }, - { - "dataType": "INT", - "name": "FlightNum" - }, - { - "dataType": "INT", - "name": "Flights" - }, - { - "dataType": "INT", - "name": "LateAircraftDelay" - }, - { - "dataType": "INT", - "name": "LongestAddGTime" - }, - { - "dataType": "INT", - "name": "Month" - }, - { - "dataType": "INT", - "name": "NASDelay" - }, - { - "dataType": "STRING", - "name": "Origin" - }, - { - "dataType": "INT", - "name": "OriginAirportID" - }, - { - "dataType": "INT", - "name": "OriginAirportSeqID" - }, - { - "dataType": "INT", - "name": "OriginCityMarketID" - }, - { - "dataType": "STRING", - "name": "OriginCityName" - }, - { - "dataType": "STRING", - "name": "OriginState" - }, - { - "dataType": "INT", - "name": "OriginStateFips" - }, - { - "dataType": "STRING", - "name": "OriginStateName" - }, - { - "dataType": "INT", - "name": "OriginWac" - }, - { - "dataType": "INT", - "name": "Quarter" - }, - { - "dataType": "STRING", - "name": "RandomAirports", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "SecurityDelay" - }, - { - "dataType": "STRING", - "name": "TailNum" - }, - { - "dataType": "INT", - "name": "TaxiIn" - }, - { - "dataType": "INT", - "name": "TaxiOut" - }, - { - "dataType": "INT", - "name": "Year" - }, - { - "dataType": "INT", - "name": "WheelsOn" - }, - { - "dataType": "INT", - "name": "WheelsOff" - }, - { - "dataType": "INT", - "name": "WeatherDelay" - }, - { - "dataType": "STRING", - "name": "UniqueCarrier" - }, - { - "dataType": "INT", - "name": "TotalAddGTime" - } - ], - "dateTimeFieldSpecs": [ - { - "name": "DaysSinceEpoch", - "dataType": "INT", - "format": "1:DAYS:EPOCH", - "granularity": "1:DAYS" - } - ], - "schemaName": "airlineStats" - } - - airlineStatsAvro_schema.json: |- - { - "metricFieldSpecs": [ - ], - "dimensionFieldSpecs": [ - { - "dataType": "INT", - "name": "ActualElapsedTime" - }, - { - "dataType": "INT", - "name": "AirTime" - }, - { - "dataType": "INT", - "name": "AirlineID" - }, - { - "dataType": "INT", - "name": "ArrDel15" - }, - { - "dataType": "INT", - "name": "ArrDelay" - }, - { - "dataType": "INT", - "name": "ArrDelayMinutes" - }, - { - "dataType": "INT", - "name": "ArrTime" - }, - { - "dataType": "STRING", - "name": "ArrTimeBlk" - }, - { - "dataType": "INT", - "name": "ArrivalDelayGroups" - }, - { - "dataType": "INT", - "name": "CRSArrTime" - }, - { - "dataType": "INT", - "name": "CRSDepTime" - }, - { - "dataType": "INT", - "name": "CRSElapsedTime" - }, - { - "dataType": "STRING", - "name": "CancellationCode" - }, - { - "dataType": "INT", - "name": "Cancelled" - }, - { - "dataType": "STRING", - "name": "Carrier" - }, - { - "dataType": "INT", - "name": "CarrierDelay" - }, - { - "dataType": "INT", - "name": "DayOfWeek" - }, - { - "dataType": "INT", - "name": "DayofMonth" - }, - { - "dataType": "INT", - "name": "DepDel15" - }, - { - "dataType": "INT", - "name": "DepDelay" - }, - { - "dataType": "INT", - "name": "DepDelayMinutes" - }, - { - "dataType": "INT", - "name": "DepTime" - }, - { - "dataType": "STRING", - "name": "DepTimeBlk" - }, - { - "dataType": "INT", - "name": "DepartureDelayGroups" - }, - { - "dataType": "STRING", - "name": "Dest" - }, - { - "dataType": "INT", - "name": "DestAirportID" - }, - { - "dataType": "INT", - "name": "DestAirportSeqID" - }, - { - "dataType": "INT", - "name": "DestCityMarketID" - }, - { - "dataType": "STRING", - "name": "DestCityName" - }, - { - "dataType": "STRING", - "name": "DestState" - }, - { - "dataType": "INT", - "name": "DestStateFips" - }, - { - "dataType": "STRING", - "name": "DestStateName" - }, - { - "dataType": "INT", - "name": "DestWac" - }, - { - "dataType": "INT", - "name": "Distance" - }, - { - "dataType": "INT", - "name": "DistanceGroup" - }, - { - "dataType": "INT", - "name": "DivActualElapsedTime" - }, - { - "dataType": "INT", - "name": "DivAirportIDs", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "DivAirportLandings" - }, - { - "dataType": "INT", - "name": "DivAirportSeqIDs", - "singleValueField": false - }, - { - "dataType": "STRING", - "name": "DivAirports", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "DivArrDelay" - }, - { - "dataType": "INT", - "name": "DivDistance" - }, - { - "dataType": "INT", - "name": "DivLongestGTimes", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "DivReachedDest" - }, - { - "dataType": "STRING", - "name": "DivTailNums", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "DivTotalGTimes", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "DivWheelsOffs", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "DivWheelsOns", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "Diverted" - }, - { - "dataType": "INT", - "name": "FirstDepTime" - }, - { - "dataType": "STRING", - "name": "FlightDate" - }, - { - "dataType": "INT", - "name": "FlightNum" - }, - { - "dataType": "INT", - "name": "Flights" - }, - { - "dataType": "INT", - "name": "LateAircraftDelay" - }, - { - "dataType": "INT", - "name": "LongestAddGTime" - }, - { - "dataType": "INT", - "name": "Month" - }, - { - "dataType": "INT", - "name": "NASDelay" - }, - { - "dataType": "STRING", - "name": "Origin" - }, - { - "dataType": "INT", - "name": "OriginAirportID" - }, - { - "dataType": "INT", - "name": "OriginAirportSeqID" - }, - { - "dataType": "INT", - "name": "OriginCityMarketID" - }, - { - "dataType": "STRING", - "name": "OriginCityName" - }, - { - "dataType": "STRING", - "name": "OriginState" - }, - { - "dataType": "INT", - "name": "OriginStateFips" - }, - { - "dataType": "STRING", - "name": "OriginStateName" - }, - { - "dataType": "INT", - "name": "OriginWac" - }, - { - "dataType": "INT", - "name": "Quarter" - }, - { - "dataType": "STRING", - "name": "RandomAirports", - "singleValueField": false - }, - { - "dataType": "INT", - "name": "SecurityDelay" - }, - { - "dataType": "STRING", - "name": "TailNum" - }, - { - "dataType": "INT", - "name": "TaxiIn" - }, - { - "dataType": "INT", - "name": "TaxiOut" - }, - { - "dataType": "INT", - "name": "Year" - }, - { - "dataType": "INT", - "name": "WheelsOn" - }, - { - "dataType": "INT", - "name": "WheelsOff" - }, - { - "dataType": "INT", - "name": "WeatherDelay" - }, - { - "dataType": "STRING", - "name": "UniqueCarrier" - }, - { - "dataType": "INT", - "name": "TotalAddGTime" - } - ], - "dateTimeFieldSpecs": [ - { - "name": "DaysSinceEpoch", - "dataType": "INT", - "format": "1:DAYS:EPOCH", - "granularity": "1:DAYS" - } - ], - "schemaName": "airlineStatsAvro" - } ---- -apiVersion: batch/v1 -kind: Job -metadata: - name: pinot-realtime-quickstart-pinot-table-creation - namespace: pinot -spec: - template: - spec: - containers: - - name: pinot-add-example-realtime-table-json - image: apachepinot/pinot:latest - args: [ "AddTable", "-schemaFile", "/var/pinot/examples/airlineStats_schema.json", "-tableConfigFile", "/var/pinot/examples/airlineStats_realtime_table_config.json", "-controllerHost", "pinot-controller-controller-svc", "-controllerPort", "9000", "-exec" ] - env: - - name: JAVA_OPTS - value: "-Xms4G -Xmx4G -Dpinot.admin.system.exit=true" - volumeMounts: - - name: examples - mountPath: /var/pinot/examples - - name: pinot-add-example-realtime-table-avro - image: apachepinot/pinot:latest - args: [ "AddTable", "-schemaFile", "/var/pinot/examples/airlineStatsAvro_schema.json", "-tableConfigFile", "/var/pinot/examples/airlineStatsAvro_realtime_table_config.json", "-controllerHost", "pinot-controller-controller-svc", "-controllerPort", "9000", "-exec" ] - env: - - name: JAVA_OPTS - value: "-Xms4G -Xmx4G -Dpinot.admin.system.exit=true" - volumeMounts: - - name: examples - mountPath: /var/pinot/examples - restartPolicy: OnFailure - volumes: - - name: examples - configMap: - name: examples - backoffLimit: 100 ---- apiVersion: batch/v1 kind: Job metadata: - name: pinot-realtime-quickstart-load-data-into-kafka + name: pinot-realtime-load-data-into-kafka namespace: pinot spec: template: diff --git a/helm/pinot-control-plane/Chart.yaml b/helm/pinot-control-plane/Chart.yaml index aa8f28c..e4c9e3e 100644 --- a/helm/pinot-control-plane/Chart.yaml +++ b/helm/pinot-control-plane/Chart.yaml @@ -2,5 +2,5 @@ apiVersion: v2 name: pinot-control-plane description: A Helm chart for Kubernetes type: application -version: 0.0.3 -appVersion: "v0.0.3" +version: 0.0.4 +appVersion: "v0.0.4" diff --git a/helm/pinot-control-plane/templates/crds/datainfra.io_pinotschemas.yaml b/helm/pinot-control-plane/templates/crds/datainfra.io_pinotschemas.yaml index 64af925..1a32a5a 100644 --- a/helm/pinot-control-plane/templates/crds/datainfra.io_pinotschemas.yaml +++ b/helm/pinot-control-plane/templates/crds/datainfra.io_pinotschemas.yaml @@ -19,7 +19,7 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date - - jsonPath: .spec.clusterName + - jsonPath: .spec.pinotCluster name: Pinot_Cluster type: string name: v1beta1 @@ -42,12 +42,13 @@ spec: spec: description: PinotSchemaSpec defines the desired state of PinotSchema properties: - clusterName: + pinotCluster: type: string schema.json: type: string required: - - clusterName + - pinotCluster + - schema.json type: object status: description: PinotSchemaStatus defines the observed state of PinotSchema diff --git a/helm/pinot-control-plane/templates/crds/datainfra.io_pinottables.yaml b/helm/pinot-control-plane/templates/crds/datainfra.io_pinottables.yaml new file mode 100644 index 0000000..c5ce24f --- /dev/null +++ b/helm/pinot-control-plane/templates/crds/datainfra.io_pinottables.yaml @@ -0,0 +1,69 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.11.1 + creationTimestamp: null + name: pinottables.datainfra.io +spec: + group: datainfra.io + names: + kind: PinotTable + listKind: PinotTableList + plural: pinottables + singular: pinottable + scope: Namespaced + versions: + - additionalPrinterColumns: + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + - jsonPath: .spec.pinotCluster + name: Pinot_Cluster + type: string + - jsonPath: .spec.pinotSchema + name: Pinot_Schema + type: string + name: v1beta1 + schema: + openAPIV3Schema: + description: PinotTable is the Schema for the pinottables API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: PinotTableSpec defines the desired state of PinotTable + properties: + pinotCluster: + type: string + pinotSchema: + type: string + pinotTableType: + type: string + tables.json: + type: string + required: + - pinotCluster + - pinotSchema + - pinotTableType + - tables.json + type: object + status: + description: PinotTableStatus defines the observed state of PinotTable + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/helm/pinot-control-plane/templates/rbac_manager.yaml b/helm/pinot-control-plane/templates/rbac_manager.yaml index 3373ca2..dec2acc 100644 --- a/helm/pinot-control-plane/templates/rbac_manager.yaml +++ b/helm/pinot-control-plane/templates/rbac_manager.yaml @@ -246,6 +246,24 @@ rules: - pinotschemas/status verbs: - get +- apiGroups: + - datainfra.io + resources: + - pinottables + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - datainfra.io + resources: + - pinottables/status + verbs: + - get {{- end }} {{- $operatorName := (include "pinot-operator.fullname" .) -}}