Skip to content

Commit

Permalink
PTEUDO-1562 rewrite migration logic of controller (#324)
Browse files Browse the repository at this point in the history
* PTEUDO-1562 rewrite migration logic of controller

    The state machine was not being used effectively to go from the
    new cloud db step to the migration step. The controller now
    reconciles fully after a database is created and requeues. On
    next requeue, controller performs a migration to the new database.

    I attempted to identify issues storing, retrieiving and using db
    credentials. In all places possible, a uri dsn is now used. There
    is still significant cleanup in this regard. The usage of storing
    state in requestinfo needs to be eliminated. RequestInfo makes the
    code perform in a spaghetti fashion where you follow logic only to
    find it calling other states non-recurisvely or putting the dbc in
    a broken state when random errors occur.

    - fix grant/revoke superuser
    - support gcp alloydb roles
    - pass cloud into dbclient pkg calls
    - ignore sql revoke errors on non-cloud dbs
    - separate kubectl client for finding cloud based master credentials
    - push sql updatepassword when credentials are denied
    - always set status when existing reconcileNewDB
    - persist source user creds to secret at end of newdb for migration
    - bind psql containers to host IP so they can talk to each other
    - refactor ways credentials are divined from cluster
    - move mock sql roles into testdb runner
    - systemfunctions cleanup logic and logs
    - ignore errors during dbproxy shutdown
    - fix invalid claim tests
    - verify crossplane CR is named correctly in test
    - test webhook

* fix source dsn parsing

* webhooks were looking at claim class in addition to pod class

* github actions is quite slow, check for present of obj before moving on
  • Loading branch information
drewwells authored Oct 5, 2024
1 parent 62c14f7 commit ccd29f3
Show file tree
Hide file tree
Showing 39 changed files with 1,929 additions and 879 deletions.
19 changes: 19 additions & 0 deletions api/v1/databaseclaim_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1

import (
"errors"
"fmt"
"net/url"
"strings"
Expand Down Expand Up @@ -341,6 +342,24 @@ func init() {
SchemeBuilder.Register(&DatabaseClaim{}, &DatabaseClaimList{})
}

// Validate checks for basic errors in the DSN string
func (c *DatabaseClaimConnectionInfo) Validate() error {
if c.Host == "" {
return errors.New("dsn has an empty host")
}
if c.Username == "" {
return errors.New("dsn has an empty user")
}
if c.Password == "" {
return errors.New("dsn has an empty password")
}
if c.DatabaseName == "" {
return errors.New("dsn has an empty database name")
}

return nil
}

func (c *DatabaseClaimConnectionInfo) Uri() string {
if c == nil {
return ""
Expand Down
9 changes: 4 additions & 5 deletions cmd/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
env: "testenv"
athena-shared:
masterUsername: root
authSource: secret
Expand Down Expand Up @@ -41,9 +42,7 @@ passwordConfig:
passwordRotationPeriod: 60

systemFunctions:
ib_realm: "{{ .Values.ib.realm }}"
ib_env: "{{ .Values.env }}"
ib_lifecycle: "{{ .Values.lifecycle }}"


ib_realm: "ib_realm"
ib_env: "ib_env"
ib_lifecycle: "ib_lifecycle"

30 changes: 19 additions & 11 deletions dbproxy/cmd/dbproxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strconv"
"syscall"

"github.com/go-logr/logr"
"github.com/infobloxopen/db-controller/dbproxy"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
Expand All @@ -35,6 +36,8 @@ func init() {
flag.StringVar(&addr, "addr", "0.0.0.0:5432", "address to listen to clients on")
}

var logger logr.Logger

func main() {

opts := zap.Options{
Expand All @@ -44,7 +47,8 @@ func main() {

flag.Parse()

dbproxy.SetLogger(zap.New(zap.UseFlagOptions(&opts)))
logger = zap.New(zap.UseFlagOptions(&opts))
dbproxy.SetLogger(logger)
mgr, err := dbproxy.New(context.TODO(), dbproxy.Config{
DBCredentialPath: dbCredentialPath,
PGCredentialPath: pbCredentialPath,
Expand All @@ -62,7 +66,9 @@ func main() {
catch(cancel)

// Blocking call
log.Fatal(mgr.Start(ctx))
if err := mgr.Start(ctx); err != nil {
logger.Error(err, "failed to start dbproxy")
}
}

func catch(cancel func()) {
Expand All @@ -76,33 +82,35 @@ func catch(cancel func()) {
go func() {
sig := <-sigs
cancel()
fmt.Println("Received signal:", sig)
logger.Info("Received signal", "signal", sig)
// Path set in ini file
bs, err := ioutil.ReadFile(filepath.Join("pgbouncer.pid"))
if err != nil {
log.Fatal(err)
logger.Error(err, "failed to read pgbouncer pid file")
os.Exit(1)
}
pid, err := strconv.Atoi(string(bytes.TrimSpace(bs)))
if err != nil {
log.Fatal(err)
logger.Error(err, "failed to convert pid to int")
os.Exit(1)
}
fmt.Println("terminating pgbouncer pid:", pid)
logger.Info("terminating pgbouncer pid", "pid", pid)
// Terminate pgbouncer
cmd := exec.Command("sh", "-c", fmt.Sprintf("kill -s 9 %d", pid))
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
log.Fatal(err)
logger.Error(err, "failed to kill pgbouncer")
}
fmt.Println(stdoutStderr)
logger.Info("pgbouncer stop executed", "output", stdoutStderr)

// Capture log pgbouncer.log and write to stdout
cmd = exec.Command("sh", "-c", fmt.Sprintf("cat %s", "pgbouncer.log"))
stdoutStderr, err = cmd.CombinedOutput()
if err != nil {
log.Fatalf("failed to cat log: %s", err)
logger.Error(err, "failed to cat log", "output", string(stdoutStderr))
} else {
logger.Info("pgbouncer.log", "output", string(stdoutStderr))
}
log.Println("pgbouncer.log")
fmt.Println(string(stdoutStderr))

os.Exit(0)
}()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ metadata:
annotations:
# ensure this runs on the new db-controller, not the existing one
"helm.sh/hook": "test"
"helm.sh/hook-weight": "-5"
spec:
class: {{ .Values.dbController.class }}
databaseName: mydb
Expand Down Expand Up @@ -159,10 +160,10 @@ spec:
echo "Connection successful!"
exit 0
fi
echo "Failed to connect. Retrying in 5 seconds..."
sleep 5
echo "Failed to connect. Retrying in 1 second..."
sleep 1
done
echo "Failed to connect after 20 attempts. Exiting."
echo "Failed to connect after 50 attempts. Exiting."
exit 1
volumeMounts:
- name: dsn-volume
Expand All @@ -187,8 +188,7 @@ spec:
kill -s INT $(pidof /usr/bin/dbproxy)
exit 0
shareProcessNamespace: true
# Sidecar has a liveness probe, allow it to restart
restartPolicy: Never
restartPolicy: OnFailure
volumes:
- name: dsn-volume
secret:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ metadata:
persistance.atlas.infoblox.com/dsnexec-config: {{ .Release.Name }}-dsnexec-config
annotations:
helm.sh/hook: test
helm.sh/hook-delete-policy: "before-hook-creation,hook-succeeded"
# helm.sh/hook-delete-policy: "before-hook-creation,hook-succeeded"
spec:
serviceAccountName: {{ .Release.Name }}-dbproxy-test
initContainers:
Expand Down Expand Up @@ -66,14 +66,22 @@ spec:
# Loop for 1 minute (60 seconds)
while [ $(($(date +%s) - start_time)) -lt 60 ]; do
echo >2 "list processes"
ls /proc | grep '^[0-9]'
# Fallback to grep when pidof fails
if check_table_exists; then
echo "Table $TABLE_NAME exists!"
kill -s INT $(pidof dsnexec)
PID=$(pidof dsnexec)
if [ -n "$PID" ]; then
echo "Killing dsnexec..."
kill -s INT $PID
else
echo "dsnexec not running..."
fi
exit 0
else
echo "Table $TABLE_NAME does not exist. Checking again in 5 seconds..."
sleep 5
echo "Table $TABLE_NAME does not exist. Checking again in 2 seconds..."
sleep 1
fi
done
Expand All @@ -83,8 +91,7 @@ spec:
mountPath: /etc/secrets
readOnly: true
shareProcessNamespace: true
# Sidecar has a liveness probe, allow it to restart
restartPolicy: Never
restartPolicy: OnFailure
volumes:
- name: dsn-volume
secret:
Expand Down
12 changes: 2 additions & 10 deletions internal/controller/databaseclaim_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

persistancev1 "github.com/infobloxopen/db-controller/api/v1"
Expand Down Expand Up @@ -63,18 +62,11 @@ func (r *DatabaseClaimReconciler) Reconciler() *databaseclaim.DatabaseClaimRecon
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.18.2/pkg/reconcile
func (r *DatabaseClaimReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)

res, err := r.reconciler.Reconcile(ctx, req)
logger.V(debugLevel).Info("reconcile_done", "err", err, "res", res)
return res, err
return r.reconciler.Reconcile(ctx, req)
}

func (r *DatabaseClaimReconciler) Setup() {
r.reconciler = &databaseclaim.DatabaseClaimReconciler{
Client: r.Client,
Config: r.Config,
}
r.reconciler = databaseclaim.New(r.Client, r.Config)
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var hasOperationalTag = databaseclaim.HasOperationalTag

var _ = Describe("Tagging", Ordered, func() {

var logger = NewGinkgoLogger()
// define and create objects in the test cluster

name := fmt.Sprintf("test-%s-%s", namespace, rand.String(5))
Expand Down
109 changes: 108 additions & 1 deletion internal/controller/databaseclaim_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,27 @@ package controller

import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/url"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

crossplaneaws "github.com/crossplane-contrib/provider-aws/apis/rds/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/webhook"

persistancev1 "github.com/infobloxopen/db-controller/api/v1"
mutating "github.com/infobloxopen/db-controller/internal/webhook"
"github.com/infobloxopen/db-controller/pkg/hostparams"
)

var _ = Describe("DatabaseClaim Controller", func() {
Expand Down Expand Up @@ -58,7 +66,7 @@ var _ = Describe("DatabaseClaim Controller", func() {
By("ensuring the resource does not exist")
Expect(k8sClient.Get(ctx, typeNamespacedName, claim)).To(HaveOccurred())

By("creating the custom resource for the Kind DatabaseClaim")
By(fmt.Sprintf("Creating dbc: %s", resourceName))
parsedDSN, err := url.Parse(testDSN)
Expect(err).NotTo(HaveOccurred())
password, ok := parsedDSN.User.Password()
Expand Down Expand Up @@ -132,6 +140,10 @@ var _ = Describe("DatabaseClaim Controller", func() {
})

It("Should succeed to reconcile DB Claim missing dbVersion", func() {
By("Verify environment")
viper := controllerReconciler.Config.Viper
Expect(viper.Get("env")).To(Equal(env))

By("Reconciling the created resource")

_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName})
Expand All @@ -154,6 +166,101 @@ var _ = Describe("DatabaseClaim Controller", func() {
Expect(err).NotTo(HaveOccurred())
Expect(resource.Status.Error).To(Equal(""))

var instance crossplaneaws.DBInstance
viper := controllerReconciler.Config.Viper
hostParams, err := hostparams.New(viper, resource)
Expect(err).ToNot(HaveOccurred())

instanceName := fmt.Sprintf("%s-%s-%s", env, resourceName, hostParams.Hash())

By(fmt.Sprintf("Check dbinstance is created: %s", instanceName))
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{Name: instanceName}, &instance)
}).Should(Succeed())
})

It("Should succeed with no error status to reconcile CR with DBVersion", func() {
By("Updating CR with a DB Version")

resource := &persistancev1.DatabaseClaim{}
Expect(k8sClient.Get(ctx, typeNamespacedName, resource)).NotTo(HaveOccurred())
resource.Spec.DBVersion = "13.3"
Expect(k8sClient.Update(ctx, resource)).NotTo(HaveOccurred())

Expect(k8sClient.Get(ctx, typeNamespacedName, resource)).NotTo(HaveOccurred())
Expect(resource.Spec.DBVersion).To(Equal("13.3"))

_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
Expect(resource.Status.Error).To(Equal(""))

})

It("dbproxy mutated pod", func() {

class := "testenv"
// FIXME: can this be done without standing up
// a full manager?
mgr, err := manager.New(cfg, manager.Options{
Scheme: k8sClient.Scheme(),
WebhookServer: webhook.NewServer(webhook.Options{
Port: testEnv.WebhookInstallOptions.LocalServingPort,
Host: testEnv.WebhookInstallOptions.LocalServingHost,
CertDir: testEnv.WebhookInstallOptions.LocalServingCertDir,
TLSOpts: []func(*tls.Config){func(config *tls.Config) {}},
}),
})
Expect(err).NotTo(HaveOccurred())

Expect(mutating.SetupWebhookWithManager(mgr, mutating.SetupConfig{
Namespace: namespace,
Class: class,
DBProxyImg: "test-db-proxy:latest",
DSNExecImg: "test-dsn-exec:latest",
})).To(Succeed())

mgrCtx, cancel := context.WithCancel(context.Background())
go func() {
Expect(mgr.Start(mgrCtx)).To(Succeed())
}()
DeferCleanup(func() {
cancel()
<-mgrCtx.Done()
Expect(errors.Is(mgrCtx.Err(), context.Canceled)).To(BeTrue())
})

pod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
Labels: map[string]string{
mutating.LabelCheckProxy: "enabled",
mutating.LabelClaim: resourceName,
mutating.LabelClass: class,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-container",
Image: "busybox",
Command: []string{
"sleep",
"3600",
},
},
},
},
}
Expect(k8sClient.Create(ctx, &pod)).To(Succeed())
Expect(pod.Spec.Volumes).To(HaveLen(1))
Expect(pod.Spec.Volumes[0].VolumeSource.Secret.SecretName).To(Equal(secretName))
Expect(pod.Annotations[mutating.AnnotationInjectedProxy]).To(Equal("true"))
Expect(pod.Spec.Containers).To(HaveLen(2))
Expect(pod.Spec.Containers[1].Env).To(HaveLen(1))
envvar := pod.Spec.Containers[1].Env[0]
Expect(envvar.Name).To(Equal("DBPROXY_CREDENTIAL"))
Expect(envvar.Value).To(Equal("/dbproxy/uri_dsn.txt"))
})
})
})
Loading

0 comments on commit ccd29f3

Please sign in to comment.