From 9fe130b48e9da8e98a674ba71fe296e4dd8c0cb6 Mon Sep 17 00:00:00 2001 From: danaelhe <42972711+danaelhe@users.noreply.github.com> Date: Tue, 20 Aug 2024 11:50:02 -0400 Subject: [PATCH] Databases: Add Logsinks CRUD support (#711) --- databases.go | 128 ++++++++++++++++++++++++++++ databases_test.go | 210 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 338 insertions(+) diff --git a/databases.go b/databases.go index 4e926b7..3460948 100644 --- a/databases.go +++ b/databases.go @@ -36,6 +36,8 @@ const ( databaseTopicsPath = databaseBasePath + "/%s/topics" databaseMetricsCredentialsPath = databaseBasePath + "/metrics/credentials" databaseEvents = databaseBasePath + "/%s/events" + databaseLogsinkPath = databaseBasePath + "/%s/logsink/%s" + databaseLogsinksPath = databaseBasePath + "/%s/logsink" ) // SQL Mode constants allow for MySQL-specific SQL flavor configuration. @@ -159,6 +161,11 @@ type DatabasesService interface { GetMetricsCredentials(context.Context) (*DatabaseMetricsCredentials, *Response, error) UpdateMetricsCredentials(context.Context, *DatabaseUpdateMetricsCredentialsRequest) (*Response, error) ListDatabaseEvents(context.Context, string, *ListOptions) ([]DatabaseEvent, *Response, error) + CreateLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateLogsinkRequest) (*DatabaseLogsink, *Response, error) + GetLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseLogsink, *Response, error) + ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseLogsink, *Response, error) + UpdateLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateLogsinkRequest) (*Response, error) + DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error) } // DatabasesServiceOp handles communication with the Databases related methods @@ -323,6 +330,14 @@ type DatabaseTopic struct { Config *TopicConfig `json:"config,omitempty"` } +// DatabaseLogsink represents a logsink +type DatabaseLogsink struct { + ID string `json:"sink_id"` + Name string `json:"sink_name,omitempty"` + Type string `json:"sink_type,omitempty"` + Config *LogsinkConfig `json:"config,omitempty"` +} + // TopicPartition represents the state of a Kafka topic partition type TopicPartition struct { EarliestOffset uint64 `json:"earliest_offset,omitempty"` @@ -472,6 +487,35 @@ type DatabaseFirewallRule struct { CreatedAt time.Time `json:"created_at"` } +// DatabaseCreateLogsinkRequest is used to create logsink for a database cluster +type DatabaseCreateLogsinkRequest struct { + Name string `json:"sink_name"` + Type string `json:"sink_type"` + Config *LogsinkConfig `json:"config"` +} + +// DatabaseUpdateLogsinkRequest ... +type DatabaseUpdateLogsinkRequest struct { + Config *LogsinkConfig `json:"config"` +} + +// LogsinkConfig represents one of the configurable options (rsyslog_logsink, elasticsearch_logsink, or opensearch_logsink) for a logsink. +type LogsinkConfig struct { + URL string `json:"url,omitempty"` + IndexPrefix string `json:"index_prefix,omitempty"` + IndexDaysMax string `json:"index_days_max,omitempty"` + Timeout string `json:"timeout,omitempty"` + Server string `json:"server,omitempty"` + Port int `json:"port,omitempty"` + TLS bool `json:"tls,omitempty"` + Format string `json:"format,omitempty"` + Logline string `json:"logline,omitempty"` + SD string `json:"sd,omitempty"` + CA string `json:"ca,omitempty"` + Key string `json:"key,omitempty"` + Cert string `json:"cert,omitempty"` +} + // PostgreSQLConfig holds advanced configurations for PostgreSQL database clusters. type PostgreSQLConfig struct { AutovacuumFreezeMaxAge *int `json:"autovacuum_freeze_max_age,omitempty"` @@ -680,6 +724,10 @@ type databaseTopicsRoot struct { Topics []DatabaseTopic `json:"topics"` } +type databaseLogsinksRoot struct { + Sinks []DatabaseLogsink `json:"sinks"` +} + type databaseMetricsCredentialsRoot struct { Credentials *DatabaseMetricsCredentials `json:"credentials"` } @@ -1560,3 +1608,83 @@ func (svc *DatabasesServiceOp) ListDatabaseEvents(ctx context.Context, databaseI return root.Events, resp, nil } + +// CreateLogsink creates a new logsink for a database +func (svc *DatabasesServiceOp) CreateLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateLogsinkRequest) (*DatabaseLogsink, *Response, error) { + path := fmt.Sprintf(databaseLogsinksPath, databaseID) + req, err := svc.client.NewRequest(ctx, http.MethodPost, path, createLogsink) + if err != nil { + return nil, nil, err + } + + root := new(DatabaseLogsink) + resp, err := svc.client.Do(ctx, req, root) + if err != nil { + return nil, resp, err + } + return root, resp, nil +} + +// GetLogsink gets a logsink for a database +func (svc *DatabasesServiceOp) GetLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseLogsink, *Response, error) { + path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID) + req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil) + if err != nil { + return nil, nil, err + } + + root := new(DatabaseLogsink) + resp, err := svc.client.Do(ctx, req, root) + if err != nil { + return nil, resp, err + } + return root, resp, nil +} + +// ListTopics returns all topics for a given kafka cluster +func (svc *DatabasesServiceOp) ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseLogsink, *Response, error) { + path := fmt.Sprintf(databaseLogsinksPath, databaseID) + path, err := addOptions(path, opts) + if err != nil { + return nil, nil, err + } + req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil) + if err != nil { + return nil, nil, err + } + root := new(databaseLogsinksRoot) + resp, err := svc.client.Do(ctx, req, root) + if err != nil { + return nil, resp, err + } + return root.Sinks, resp, nil +} + +// UpdateLogsink updates a logsink for a database cluster +func (svc *DatabasesServiceOp) UpdateLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateLogsinkRequest) (*Response, error) { + path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID) + req, err := svc.client.NewRequest(ctx, http.MethodPut, path, updateLogsink) + if err != nil { + return nil, err + } + + resp, err := svc.client.Do(ctx, req, nil) + if err != nil { + return resp, err + } + return resp, nil +} + +// DeleteLogsink deletes a logsink for a database cluster +func (svc *DatabasesServiceOp) DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error) { + path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID) + req, err := svc.client.NewRequest(ctx, http.MethodDelete, path, nil) + if err != nil { + return nil, err + } + resp, err := svc.client.Do(ctx, req, nil) + if err != nil { + return resp, err + } + return resp, nil +} diff --git a/databases_test.go b/databases_test.go index 395d58b..58ef301 100644 --- a/databases_test.go +++ b/databases_test.go @@ -3225,3 +3225,213 @@ func TestDatabases_ListDatabaseEvents(t *testing.T) { require.NoError(t, err) require.Equal(t, want, got) } + +func TestDatabases_CreateLogsink(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + ) + + want := &DatabaseLogsink{ + ID: "deadbeef-dead-4aa5-beef-deadbeef347d", + Name: "logs-sink", + Type: "opensearch", + Config: &LogsinkConfig{ + URL: "https://user:passwd@192.168.0.1:25060", + IndexPrefix: "opensearch-logs", + }, + } + + body := `{ + "sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d", + "sink_name": "logs-sink", + "sink_type": "opensearch", + "config": { + "url": "https://user:passwd@192.168.0.1:25060", + "index_prefix": "opensearch-logs" + } + }` + + path := fmt.Sprintf("/v2/databases/%s/logsink", dbID) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodPost) + fmt.Fprint(w, body) + }) + + log, _, err := client.Databases.CreateLogsink(ctx, dbID, &DatabaseCreateLogsinkRequest{ + Name: "logs-sink", + Type: "opensearch", + Config: &LogsinkConfig{ + URL: "https://user:passwd@192.168.0.1:25060", + IndexPrefix: "opensearch-logs", + }, + }) + + require.NoError(t, err) + + require.Equal(t, want, log) +} + +func TestDatabases_GetLogsink(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6" + ) + + want := &DatabaseLogsink{ + ID: "deadbeef-dead-4aa5-beef-deadbeef347d", + Name: "logs-sink", + Type: "opensearch", + Config: &LogsinkConfig{ + URL: "https://user:passwd@192.168.0.1:25060", + IndexPrefix: "opensearch-logs", + }, + } + + body := `{ + "sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d", + "sink_name": "logs-sink", + "sink_type": "opensearch", + "config": { + "url": "https://user:passwd@192.168.0.1:25060", + "index_prefix": "opensearch-logs" + } + }` + + path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodGet) + fmt.Fprint(w, body) + }) + + got, _, err := client.Databases.GetLogsink(ctx, dbID, logsinkID) + require.NoError(t, err) + require.Equal(t, want, got) +} + +func TestDatabases_UpdateLogsink(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6" + ) + + body := `{ + "sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d", + "sink_name": "logs-sink", + "sink_type": "opensearch", + "config": { + "url": "https://user:passwd@192.168.0.1:25060", + "index_prefix": "opensearch-logs" + } + }` + + path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodPut) + fmt.Fprint(w, body) + }) + + _, err := client.Databases.UpdateLogsink(ctx, dbID, logsinkID, &DatabaseUpdateLogsinkRequest{ + Config: &LogsinkConfig{ + Server: "192.168.0.1", + Port: 514, + TLS: false, + Format: "rfc3164", + }, + }) + + require.NoError(t, err) +} + +func TestDatabases_ListLogsinks(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + ) + + want := []DatabaseLogsink{ + { + ID: "deadbeef-dead-4aa5-beef-deadbeef347d", + Name: "logs-sink", + Type: "opensearch", + Config: &LogsinkConfig{ + URL: "https://user:passwd@192.168.0.1:25060", + IndexPrefix: "opensearch-logs", + }, + }, + { + ID: "d6e95157-5f58-48d0-9023-8cfb409d102a", + Name: "logs-sink-2", + Type: "opensearch", + Config: &LogsinkConfig{ + URL: "https://user:passwd@192.168.0.1:25060", + IndexPrefix: "opensearch-logs", + }, + }} + + body := `{ + "sinks": [ + { + "sink_id": "deadbeef-dead-4aa5-beef-deadbeef347d", + "sink_name": "logs-sink", + "sink_type": "opensearch", + "config": { + "url": "https://user:passwd@192.168.0.1:25060", + "index_prefix": "opensearch-logs" + } + }, + { + "sink_id": "d6e95157-5f58-48d0-9023-8cfb409d102a", + "sink_name": "logs-sink-2", + "sink_type": "opensearch", + "config": { + "url": "https://user:passwd@192.168.0.1:25060", + "index_prefix": "opensearch-logs" + } + } + ] + }` + + path := fmt.Sprintf("/v2/databases/%s/logsink", dbID) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodGet) + fmt.Fprint(w, body) + }) + + got, _, err := client.Databases.ListLogsinks(ctx, dbID, &ListOptions{}) + require.NoError(t, err) + require.Equal(t, want, got) +} + +func TestDatabases_DeleteLogsink(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6" + ) + + path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodDelete) + }) + + _, err := client.Databases.DeleteLogsink(ctx, dbID, logsinkID) + require.NoError(t, err) +}