Skip to content

Commit

Permalink
Update GODO to include new Openseach index crud changes (#710)
Browse files Browse the repository at this point in the history
* Add opensearch index crud to godo

* Add testcases for opensearch index crud

---------

Co-authored-by: Rahul Bhardwaj <rahulbhardwaj@digitalocean.com>
  • Loading branch information
bhardwajRahul and Rahul Bhardwaj authored Aug 21, 2024
1 parent 0c07670 commit deb17ad
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 240 deletions.
96 changes: 34 additions & 62 deletions databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ const (
databaseTopicsPath = databaseBasePath + "/%s/topics"
databaseMetricsCredentialsPath = databaseBasePath + "/metrics/credentials"
databaseEvents = databaseBasePath + "/%s/events"
databaseLogsinkPath = databaseBasePath + "/%s/logsink/%s"
databaseLogsinksPath = databaseBasePath + "/%s/logsink"
databaseIndexesPath = databaseBasePath + "/%s/indexes"
databaseIndexPath = databaseBasePath + "/%s/indexes/%s"
)

// SQL Mode constants allow for MySQL-specific SQL flavor configuration.
Expand Down Expand Up @@ -161,11 +161,8 @@ type DatabasesService interface {
GetMetricsCredentials(context.Context) (*DatabaseMetricsCredentials, *Response, error)
UpdateMetricsCredentials(context.Context, *DatabaseUpdateMetricsCredentialsRequest) (*Response, error)
ListDatabaseEvents(context.Context, string, *ListOptions) ([]DatabaseEvent, *Response, error)
CreateLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateLogsinkRequest) (*DatabaseLogsink, *Response, error)
GetLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseLogsink, *Response, error)
ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseLogsink, *Response, error)
UpdateLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateLogsinkRequest) (*Response, error)
DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error)
ListIndexes(context.Context, string, *ListOptions) ([]DatabaseIndex, *Response, error)
DeleteIndex(context.Context, string, string) (*Response, error)
}

// DatabasesServiceOp handles communication with the Databases related methods
Expand Down Expand Up @@ -781,6 +778,28 @@ type ListDatabaseEventsRoot struct {
Events []DatabaseEvent `json:"events"`
}

type DatabaseIndex struct {
IndexName string `json:"index_name"`
NumberofShards uint64 `json:"number_of_shards"`
NumberofReplicas uint64 `json:"number_of_replicas"`
Size int64 `json:"size,omitempty"`
Health string `json:"health,omitempty"`
Status string `json:"status,omitempty"`
Docs int64 `json:"docs,omitempty"`
CreateTime string `json:"create_time"`
Replication *IndexReplication `json:"replication,omitempty"`
}

type IndexReplication struct {
LeaderIndex string `json:"leader_index,omitempty"`
LeaderProject string `json:"leader_project,omitempty"`
LeaderService string `json:"leader_service,omitempty"`
}

type databaseIndexesRoot struct {
Indexes []DatabaseIndex `json:"indexes"`
}

// URN returns a URN identifier for the database
func (d Database) URN() string {
return ToURN("dbaas", d.ID)
Expand Down Expand Up @@ -1609,41 +1628,9 @@ func (svc *DatabasesServiceOp) ListDatabaseEvents(ctx context.Context, databaseI
return root.Events, resp, nil
}

// CreateLogsink creates a new logsink for a database
func (svc *DatabasesServiceOp) CreateLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateLogsinkRequest) (*DatabaseLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinksPath, databaseID)
req, err := svc.client.NewRequest(ctx, http.MethodPost, path, createLogsink)
if err != nil {
return nil, nil, err
}

root := new(DatabaseLogsink)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root, resp, nil
}

// GetLogsink gets a logsink for a database
func (svc *DatabasesServiceOp) GetLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}

root := new(DatabaseLogsink)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root, resp, nil
}

// ListTopics returns all topics for a given kafka cluster
func (svc *DatabasesServiceOp) ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinksPath, databaseID)
// ListIndexes returns all indexes for a given opensearch cluster
func (svc *DatabasesServiceOp) ListIndexes(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseIndex, *Response, error) {
path := fmt.Sprintf(databaseIndexesPath, databaseID)
path, err := addOptions(path, opts)
if err != nil {
return nil, nil, err
Expand All @@ -1652,32 +1639,17 @@ func (svc *DatabasesServiceOp) ListLogsinks(ctx context.Context, databaseID stri
if err != nil {
return nil, nil, err
}
root := new(databaseLogsinksRoot)
root := new(databaseIndexesRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root.Sinks, resp, nil
}

// UpdateLogsink updates a logsink for a database cluster
func (svc *DatabasesServiceOp) UpdateLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateLogsinkRequest) (*Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodPut, path, updateLogsink)
if err != nil {
return nil, err
}

resp, err := svc.client.Do(ctx, req, nil)
if err != nil {
return resp, err
}
return resp, nil
return root.Indexes, resp, nil
}

// DeleteLogsink deletes a logsink for a database cluster
func (svc *DatabasesServiceOp) DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
// DeleteIndex will delete an existing opensearch index
func (svc *DatabasesServiceOp) DeleteIndex(ctx context.Context, databaseID, name string) (*Response, error) {
path := fmt.Sprintf(databaseIndexPath, databaseID, name)
req, err := svc.client.NewRequest(ctx, http.MethodDelete, path, nil)
if err != nil {
return nil, err
Expand Down
228 changes: 50 additions & 178 deletions databases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3226,212 +3226,84 @@ func TestDatabases_ListDatabaseEvents(t *testing.T) {
require.Equal(t, want, got)
}

func TestDatabases_CreateLogsink(t *testing.T) {
func TestDatabases_ListIndexes(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
)

want := &DatabaseLogsink{
ID: "deadbeef-dead-4aa5-beef-deadbeef347d",
Name: "logs-sink",
Type: "opensearch",
Config: &DatabaseLogsinkConfig{
URL: "https://user:passwd@192.168.0.1:25060",
IndexPrefix: "opensearch-logs",
},
}

body := `{
"sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d",
"sink_name": "logs-sink",
"sink_type": "opensearch",
"config": {
"url": "https://user:passwd@192.168.0.1:25060",
"index_prefix": "opensearch-logs"
}
}`

path := fmt.Sprintf("/v2/databases/%s/logsink", dbID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodPost)
fmt.Fprint(w, body)
})

log, _, err := client.Databases.CreateLogsink(ctx, dbID, &DatabaseCreateLogsinkRequest{
Name: "logs-sink",
Type: "opensearch",
Config: &DatabaseLogsinkConfig{
URL: "https://user:passwd@192.168.0.1:25060",
IndexPrefix: "opensearch-logs",
},
})

require.NoError(t, err)

require.Equal(t, want, log)
}

func TestDatabases_GetLogsink(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6"
)

want := &DatabaseLogsink{
ID: "deadbeef-dead-4aa5-beef-deadbeef347d",
Name: "logs-sink",
Type: "opensearch",
Config: &DatabaseLogsinkConfig{
URL: "https://user:passwd@192.168.0.1:25060",
IndexPrefix: "opensearch-logs",
},
}

body := `{
"sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d",
"sink_name": "logs-sink",
"sink_type": "opensearch",
"config": {
"url": "https://user:passwd@192.168.0.1:25060",
"index_prefix": "opensearch-logs"
}
}`

path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodGet)
fmt.Fprint(w, body)
})

got, _, err := client.Databases.GetLogsink(ctx, dbID, logsinkID)
require.NoError(t, err)
require.Equal(t, want, got)
}

func TestDatabases_UpdateLogsink(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6"
)

body := `{
"sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d",
"sink_name": "logs-sink",
"sink_type": "opensearch",
"config": {
"url": "https://user:passwd@192.168.0.1:25060",
"index_prefix": "opensearch-logs"
}
}`

path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodPut)
fmt.Fprint(w, body)
})

_, err := client.Databases.UpdateLogsink(ctx, dbID, logsinkID, &DatabaseUpdateLogsinkRequest{
Config: &DatabaseLogsinkConfig{
Server: "192.168.0.1",
Port: 514,
TLS: false,
Format: "rfc3164",
},
})

require.NoError(t, err)
}

func TestDatabases_ListLogsinks(t *testing.T) {
setup()
defer teardown()
dbID := "deadbeef-dead-4aa5-beef-deadbeef347d"

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
)
path := fmt.Sprintf("/v2/databases/%s/indexes", dbID)

want := []DatabaseLogsink{
want := []DatabaseIndex{
{
ID: "deadbeef-dead-4aa5-beef-deadbeef347d",
Name: "logs-sink",
Type: "opensearch",
Config: &DatabaseLogsinkConfig{
URL: "https://user:passwd@192.168.0.1:25060",
IndexPrefix: "opensearch-logs",
},
IndexName: "sample_index",
NumberofShards: uint64(1),
NumberofReplicas: uint64(0),
CreateTime: "2020-10-29T15:57:38Z",
Health: "green",
Size: int64(5314),
Status: "open",
Docs: int64(64811),
},
{
ID: "d6e95157-5f58-48d0-9023-8cfb409d102a",
Name: "logs-sink-2",
Type: "opensearch",
Config: &DatabaseLogsinkConfig{
URL: "https://user:passwd@192.168.0.1:25060",
IndexPrefix: "opensearch-logs",
},
}}
IndexName: "sample_index_2",
NumberofShards: uint64(1),
NumberofReplicas: uint64(0),
CreateTime: "2020-10-30T15:57:38Z",
Health: "red",
Size: int64(6105247),
Status: "close",
Docs: int64(64801),
},
}

body := `{
"sinks": [
{
"sink_id": "deadbeef-dead-4aa5-beef-deadbeef347d",
"sink_name": "logs-sink",
"sink_type": "opensearch",
"config": {
"url": "https://user:passwd@192.168.0.1:25060",
"index_prefix": "opensearch-logs"
}
},
{
"sink_id": "d6e95157-5f58-48d0-9023-8cfb409d102a",
"sink_name": "logs-sink-2",
"sink_type": "opensearch",
"config": {
"url": "https://user:passwd@192.168.0.1:25060",
"index_prefix": "opensearch-logs"
}
}
]
}`

path := fmt.Sprintf("/v2/databases/%s/logsink", dbID)
"indexes": [
{
"create_time": "2020-10-29T15:57:38Z",
"docs": 64811,
"health": "green",
"index_name": "sample_index",
"number_of_replica": 0,
"number_of_shards": 1,
"size": 5314,
"status": "open"
},
{
"create_time": "2020-10-30T15:57:38Z",
"docs": 64801,
"health": "red",
"index_name": "sample_index_2",
"number_of_replica": 0,
"number_of_shards": 1,
"size": 6105247,
"status": "close"
}]
} `

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodGet)
fmt.Fprint(w, body)
})

got, _, err := client.Databases.ListLogsinks(ctx, dbID, &ListOptions{})
got, _, err := client.Databases.ListIndexes(ctx, dbID, &ListOptions{})
require.NoError(t, err)
require.Equal(t, want, got)
}

func TestDatabases_DeleteLogsink(t *testing.T) {
func TestDatabases_DeleteIndexes(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6"
)
dbID := "deadbeef-dead-4aa5-beef-deadbeef347d"
indexName := "sample_index"

path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID)
path := fmt.Sprintf("/v2/databases/%s/indexes/%s", dbID, indexName)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodDelete)
})

_, err := client.Databases.DeleteLogsink(ctx, dbID, logsinkID)
_, err := client.Databases.DeleteIndex(ctx, dbID, indexName)
require.NoError(t, err)
}

0 comments on commit deb17ad

Please sign in to comment.