From 8736c6129dc6d2b05e434736c5097e1b470ba122 Mon Sep 17 00:00:00 2001 From: danaelhe Date: Thu, 22 Aug 2024 11:37:38 -0400 Subject: [PATCH] Databases: Bring back Logsink Support --- databases.go | 89 +++++++++++++++++++- databases_test.go | 210 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 298 insertions(+), 1 deletion(-) diff --git a/databases.go b/databases.go index 2d3c187..8c40655 100644 --- a/databases.go +++ b/databases.go @@ -38,6 +38,8 @@ const ( databaseEvents = databaseBasePath + "/%s/events" databaseIndexesPath = databaseBasePath + "/%s/indexes" databaseIndexPath = databaseBasePath + "/%s/indexes/%s" + databaseLogsinkPath = databaseBasePath + "/%s/logsink/%s" + databaseLogsinksPath = databaseBasePath + "/%s/logsink" ) // SQL Mode constants allow for MySQL-specific SQL flavor configuration. @@ -163,6 +165,11 @@ type DatabasesService interface { ListDatabaseEvents(context.Context, string, *ListOptions) ([]DatabaseEvent, *Response, error) ListIndexes(context.Context, string, *ListOptions) ([]DatabaseIndex, *Response, error) DeleteIndex(context.Context, string, string) (*Response, error) + CreateLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateLogsinkRequest) (*DatabaseLogsink, *Response, error) + GetLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseLogsink, *Response, error) + ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseLogsink, *Response, error) + UpdateLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateLogsinkRequest) (*Response, error) + DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error) } // DatabasesServiceOp handles communication with the Databases related methods @@ -491,7 +498,7 @@ type DatabaseCreateLogsinkRequest struct { Config *DatabaseLogsinkConfig `json:"config"` } -// DatabaseUpdateLogsinkRequest ... +// DatabaseUpdateLogsinkRequest is used to update logsink for a database cluster type DatabaseUpdateLogsinkRequest struct { Config *DatabaseLogsinkConfig `json:"config"` } @@ -1660,3 +1667,83 @@ func (svc *DatabasesServiceOp) DeleteIndex(ctx context.Context, databaseID, name } return resp, nil } + +// CreateLogsink creates a new logsink for a database +func (svc *DatabasesServiceOp) CreateLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateLogsinkRequest) (*DatabaseLogsink, *Response, error) { + path := fmt.Sprintf(databaseLogsinksPath, databaseID) + req, err := svc.client.NewRequest(ctx, http.MethodPost, path, createLogsink) + if err != nil { + return nil, nil, err + } + + root := new(DatabaseLogsink) + resp, err := svc.client.Do(ctx, req, root) + if err != nil { + return nil, resp, err + } + return root, resp, nil +} + +// GetLogsink gets a logsink for a database +func (svc *DatabasesServiceOp) GetLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseLogsink, *Response, error) { + path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID) + req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil) + if err != nil { + return nil, nil, err + } + + root := new(DatabaseLogsink) + resp, err := svc.client.Do(ctx, req, root) + if err != nil { + return nil, resp, err + } + return root, resp, nil +} + +// ListTopics returns all topics for a given kafka cluster +func (svc *DatabasesServiceOp) ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseLogsink, *Response, error) { + path := fmt.Sprintf(databaseLogsinksPath, databaseID) + path, err := addOptions(path, opts) + if err != nil { + return nil, nil, err + } + req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil) + if err != nil { + return nil, nil, err + } + root := new(databaseLogsinksRoot) + resp, err := svc.client.Do(ctx, req, root) + if err != nil { + return nil, resp, err + } + return root.Sinks, resp, nil +} + +// UpdateLogsink updates a logsink for a database cluster +func (svc *DatabasesServiceOp) UpdateLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateLogsinkRequest) (*Response, error) { + path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID) + req, err := svc.client.NewRequest(ctx, http.MethodPut, path, updateLogsink) + if err != nil { + return nil, err + } + + resp, err := svc.client.Do(ctx, req, nil) + if err != nil { + return resp, err + } + return resp, nil +} + +// DeleteLogsink deletes a logsink for a database cluster +func (svc *DatabasesServiceOp) DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error) { + path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID) + req, err := svc.client.NewRequest(ctx, http.MethodDelete, path, nil) + if err != nil { + return nil, err + } + resp, err := svc.client.Do(ctx, req, nil) + if err != nil { + return resp, err + } + return resp, nil +} diff --git a/databases_test.go b/databases_test.go index 737f63b..aae02f3 100644 --- a/databases_test.go +++ b/databases_test.go @@ -3307,3 +3307,213 @@ func TestDatabases_DeleteIndexes(t *testing.T) { _, err := client.Databases.DeleteIndex(ctx, dbID, indexName) require.NoError(t, err) } + +func TestDatabases_CreateLogsink(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + ) + + want := &DatabaseLogsink{ + ID: "deadbeef-dead-4aa5-beef-deadbeef347d", + Name: "logs-sink", + Type: "opensearch", + Config: &DatabaseLogsinkConfig{ + URL: "https://user:passwd@192.168.0.1:25060", + IndexPrefix: "opensearch-logs", + }, + } + + body := `{ + "sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d", + "sink_name": "logs-sink", + "sink_type": "opensearch", + "config": { + "url": "https://user:passwd@192.168.0.1:25060", + "index_prefix": "opensearch-logs" + } + }` + + path := fmt.Sprintf("/v2/databases/%s/logsink", dbID) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodPost) + fmt.Fprint(w, body) + }) + + log, _, err := client.Databases.CreateLogsink(ctx, dbID, &DatabaseCreateLogsinkRequest{ + Name: "logs-sink", + Type: "opensearch", + Config: &DatabaseLogsinkConfig{ + URL: "https://user:passwd@192.168.0.1:25060", + IndexPrefix: "opensearch-logs", + }, + }) + + require.NoError(t, err) + + require.Equal(t, want, log) +} + +func TestDatabases_GetLogsink(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6" + ) + + want := &DatabaseLogsink{ + ID: "deadbeef-dead-4aa5-beef-deadbeef347d", + Name: "logs-sink", + Type: "opensearch", + Config: &DatabaseLogsinkConfig{ + URL: "https://user:passwd@192.168.0.1:25060", + IndexPrefix: "opensearch-logs", + }, + } + + body := `{ + "sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d", + "sink_name": "logs-sink", + "sink_type": "opensearch", + "config": { + "url": "https://user:passwd@192.168.0.1:25060", + "index_prefix": "opensearch-logs" + } + }` + + path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodGet) + fmt.Fprint(w, body) + }) + + got, _, err := client.Databases.GetLogsink(ctx, dbID, logsinkID) + require.NoError(t, err) + require.Equal(t, want, got) +} + +func TestDatabases_UpdateLogsink(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6" + ) + + body := `{ + "sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d", + "sink_name": "logs-sink", + "sink_type": "opensearch", + "config": { + "url": "https://user:passwd@192.168.0.1:25060", + "index_prefix": "opensearch-logs" + } + }` + + path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodPut) + fmt.Fprint(w, body) + }) + + _, err := client.Databases.UpdateLogsink(ctx, dbID, logsinkID, &DatabaseUpdateLogsinkRequest{ + Config: &DatabaseLogsinkConfig{ + Server: "192.168.0.1", + Port: 514, + TLS: false, + Format: "rfc3164", + }, + }) + + require.NoError(t, err) +} + +func TestDatabases_ListLogsinks(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + ) + + want := []DatabaseLogsink{ + { + ID: "deadbeef-dead-4aa5-beef-deadbeef347d", + Name: "logs-sink", + Type: "opensearch", + Config: &DatabaseLogsinkConfig{ + URL: "https://user:passwd@192.168.0.1:25060", + IndexPrefix: "opensearch-logs", + }, + }, + { + ID: "d6e95157-5f58-48d0-9023-8cfb409d102a", + Name: "logs-sink-2", + Type: "opensearch", + Config: &DatabaseLogsinkConfig{ + URL: "https://user:passwd@192.168.0.1:25060", + IndexPrefix: "opensearch-logs", + }, + }} + + body := `{ + "sinks": [ + { + "sink_id": "deadbeef-dead-4aa5-beef-deadbeef347d", + "sink_name": "logs-sink", + "sink_type": "opensearch", + "config": { + "url": "https://user:passwd@192.168.0.1:25060", + "index_prefix": "opensearch-logs" + } + }, + { + "sink_id": "d6e95157-5f58-48d0-9023-8cfb409d102a", + "sink_name": "logs-sink-2", + "sink_type": "opensearch", + "config": { + "url": "https://user:passwd@192.168.0.1:25060", + "index_prefix": "opensearch-logs" + } + } + ] + }` + + path := fmt.Sprintf("/v2/databases/%s/logsink", dbID) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodGet) + fmt.Fprint(w, body) + }) + + got, _, err := client.Databases.ListLogsinks(ctx, dbID, &ListOptions{}) + require.NoError(t, err) + require.Equal(t, want, got) +} + +func TestDatabases_DeleteLogsink(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6" + ) + + path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodDelete) + }) + + _, err := client.Databases.DeleteLogsink(ctx, dbID, logsinkID) + require.NoError(t, err) +}