Skip to content

Commit

Permalink
Databases: Bring back Logsink Support
Browse files Browse the repository at this point in the history
  • Loading branch information
danaelhe committed Aug 22, 2024
1 parent deb17ad commit 8736c61
Show file tree
Hide file tree
Showing 2 changed files with 298 additions and 1 deletion.
89 changes: 88 additions & 1 deletion databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const (
databaseEvents = databaseBasePath + "/%s/events"
databaseIndexesPath = databaseBasePath + "/%s/indexes"
databaseIndexPath = databaseBasePath + "/%s/indexes/%s"
databaseLogsinkPath = databaseBasePath + "/%s/logsink/%s"
databaseLogsinksPath = databaseBasePath + "/%s/logsink"
)

// SQL Mode constants allow for MySQL-specific SQL flavor configuration.
Expand Down Expand Up @@ -163,6 +165,11 @@ type DatabasesService interface {
ListDatabaseEvents(context.Context, string, *ListOptions) ([]DatabaseEvent, *Response, error)
ListIndexes(context.Context, string, *ListOptions) ([]DatabaseIndex, *Response, error)
DeleteIndex(context.Context, string, string) (*Response, error)
CreateLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateLogsinkRequest) (*DatabaseLogsink, *Response, error)
GetLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseLogsink, *Response, error)
ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseLogsink, *Response, error)
UpdateLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateLogsinkRequest) (*Response, error)
DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error)
}

// DatabasesServiceOp handles communication with the Databases related methods
Expand Down Expand Up @@ -491,7 +498,7 @@ type DatabaseCreateLogsinkRequest struct {
Config *DatabaseLogsinkConfig `json:"config"`
}

// DatabaseUpdateLogsinkRequest ...
// DatabaseUpdateLogsinkRequest is used to update logsink for a database cluster
type DatabaseUpdateLogsinkRequest struct {
Config *DatabaseLogsinkConfig `json:"config"`
}
Expand Down Expand Up @@ -1660,3 +1667,83 @@ func (svc *DatabasesServiceOp) DeleteIndex(ctx context.Context, databaseID, name
}
return resp, nil
}

// CreateLogsink creates a new logsink for a database
func (svc *DatabasesServiceOp) CreateLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateLogsinkRequest) (*DatabaseLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinksPath, databaseID)
req, err := svc.client.NewRequest(ctx, http.MethodPost, path, createLogsink)
if err != nil {
return nil, nil, err
}

root := new(DatabaseLogsink)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root, resp, nil
}

// GetLogsink gets a logsink for a database
func (svc *DatabasesServiceOp) GetLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}

root := new(DatabaseLogsink)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root, resp, nil
}

// ListTopics returns all topics for a given kafka cluster
func (svc *DatabasesServiceOp) ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinksPath, databaseID)
path, err := addOptions(path, opts)
if err != nil {
return nil, nil, err
}
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}
root := new(databaseLogsinksRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root.Sinks, resp, nil
}

// UpdateLogsink updates a logsink for a database cluster
func (svc *DatabasesServiceOp) UpdateLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateLogsinkRequest) (*Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodPut, path, updateLogsink)
if err != nil {
return nil, err
}

resp, err := svc.client.Do(ctx, req, nil)
if err != nil {
return resp, err
}
return resp, nil
}

// DeleteLogsink deletes a logsink for a database cluster
func (svc *DatabasesServiceOp) DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodDelete, path, nil)
if err != nil {
return nil, err
}
resp, err := svc.client.Do(ctx, req, nil)
if err != nil {
return resp, err
}
return resp, nil
}
210 changes: 210 additions & 0 deletions databases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3307,3 +3307,213 @@ func TestDatabases_DeleteIndexes(t *testing.T) {
_, err := client.Databases.DeleteIndex(ctx, dbID, indexName)
require.NoError(t, err)
}

func TestDatabases_CreateLogsink(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
)

want := &DatabaseLogsink{
ID: "deadbeef-dead-4aa5-beef-deadbeef347d",
Name: "logs-sink",
Type: "opensearch",
Config: &DatabaseLogsinkConfig{
URL: "https://user:passwd@192.168.0.1:25060",
IndexPrefix: "opensearch-logs",
},
}

body := `{
"sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d",
"sink_name": "logs-sink",
"sink_type": "opensearch",
"config": {
"url": "https://user:passwd@192.168.0.1:25060",
"index_prefix": "opensearch-logs"
}
}`

path := fmt.Sprintf("/v2/databases/%s/logsink", dbID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodPost)
fmt.Fprint(w, body)
})

log, _, err := client.Databases.CreateLogsink(ctx, dbID, &DatabaseCreateLogsinkRequest{
Name: "logs-sink",
Type: "opensearch",
Config: &DatabaseLogsinkConfig{
URL: "https://user:passwd@192.168.0.1:25060",
IndexPrefix: "opensearch-logs",
},
})

require.NoError(t, err)

require.Equal(t, want, log)
}

func TestDatabases_GetLogsink(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6"
)

want := &DatabaseLogsink{
ID: "deadbeef-dead-4aa5-beef-deadbeef347d",
Name: "logs-sink",
Type: "opensearch",
Config: &DatabaseLogsinkConfig{
URL: "https://user:passwd@192.168.0.1:25060",
IndexPrefix: "opensearch-logs",
},
}

body := `{
"sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d",
"sink_name": "logs-sink",
"sink_type": "opensearch",
"config": {
"url": "https://user:passwd@192.168.0.1:25060",
"index_prefix": "opensearch-logs"
}
}`

path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodGet)
fmt.Fprint(w, body)
})

got, _, err := client.Databases.GetLogsink(ctx, dbID, logsinkID)
require.NoError(t, err)
require.Equal(t, want, got)
}

func TestDatabases_UpdateLogsink(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6"
)

body := `{
"sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d",
"sink_name": "logs-sink",
"sink_type": "opensearch",
"config": {
"url": "https://user:passwd@192.168.0.1:25060",
"index_prefix": "opensearch-logs"
}
}`

path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodPut)
fmt.Fprint(w, body)
})

_, err := client.Databases.UpdateLogsink(ctx, dbID, logsinkID, &DatabaseUpdateLogsinkRequest{
Config: &DatabaseLogsinkConfig{
Server: "192.168.0.1",
Port: 514,
TLS: false,
Format: "rfc3164",
},
})

require.NoError(t, err)
}

func TestDatabases_ListLogsinks(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
)

want := []DatabaseLogsink{
{
ID: "deadbeef-dead-4aa5-beef-deadbeef347d",
Name: "logs-sink",
Type: "opensearch",
Config: &DatabaseLogsinkConfig{
URL: "https://user:passwd@192.168.0.1:25060",
IndexPrefix: "opensearch-logs",
},
},
{
ID: "d6e95157-5f58-48d0-9023-8cfb409d102a",
Name: "logs-sink-2",
Type: "opensearch",
Config: &DatabaseLogsinkConfig{
URL: "https://user:passwd@192.168.0.1:25060",
IndexPrefix: "opensearch-logs",
},
}}

body := `{
"sinks": [
{
"sink_id": "deadbeef-dead-4aa5-beef-deadbeef347d",
"sink_name": "logs-sink",
"sink_type": "opensearch",
"config": {
"url": "https://user:passwd@192.168.0.1:25060",
"index_prefix": "opensearch-logs"
}
},
{
"sink_id": "d6e95157-5f58-48d0-9023-8cfb409d102a",
"sink_name": "logs-sink-2",
"sink_type": "opensearch",
"config": {
"url": "https://user:passwd@192.168.0.1:25060",
"index_prefix": "opensearch-logs"
}
}
]
}`

path := fmt.Sprintf("/v2/databases/%s/logsink", dbID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodGet)
fmt.Fprint(w, body)
})

got, _, err := client.Databases.ListLogsinks(ctx, dbID, &ListOptions{})
require.NoError(t, err)
require.Equal(t, want, got)
}

func TestDatabases_DeleteLogsink(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6"
)

path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodDelete)
})

_, err := client.Databases.DeleteLogsink(ctx, dbID, logsinkID)
require.NoError(t, err)
}

0 comments on commit 8736c61

Please sign in to comment.