Skip to content

Commit

Permalink
[databases]: update structs for logsinks by sink type
Browse files Browse the repository at this point in the history
  • Loading branch information
loosla committed Oct 7, 2024
1 parent dfe74ef commit 89b600b
Show file tree
Hide file tree
Showing 2 changed files with 412 additions and 243 deletions.
275 changes: 222 additions & 53 deletions databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,16 @@ type DatabasesService interface {
ListDatabaseEvents(context.Context, string, *ListOptions) ([]DatabaseEvent, *Response, error)
ListIndexes(context.Context, string, *ListOptions) ([]DatabaseIndex, *Response, error)
DeleteIndex(context.Context, string, string) (*Response, error)
CreateLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateLogsinkRequest) (*DatabaseLogsink, *Response, error)
GetLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseLogsink, *Response, error)
ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseLogsink, *Response, error)
UpdateLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateLogsinkRequest) (*Response, error)
GetRsyslogLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseRsyslogLogsink, *Response, error)
CreateRsyslogLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateRsyslogLogsinkRequest) (*DatabaseRsyslogLogsink, *Response, error)
UpdateRsyslogLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateRsyslogLogsinkRequest) (*Response, error)
GetElasticsearchLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseElasticsearchLogsink, *Response, error)
CreateElasticsearchLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateElasticsearchLogsinkRequest) (*DatabaseElasticsearchLogsink, *Response, error)
UpdateElasticsearchLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateElasticsearchLogsinkRequest) (*Response, error)
GetOpensearchLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseOpensearchLogsink, *Response, error)
CreateOpensearchLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateOpensearchLogsinkRequest) (*DatabaseOpensearchLogsink, *Response, error)
UpdateOpensearchLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateOpensearchLogsinkRequest) (*Response, error)
ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]interface{}, *Response, error)
DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error)
}

Expand Down Expand Up @@ -350,14 +356,6 @@ type DatabaseTopic struct {
Config *TopicConfig `json:"config,omitempty"`
}

// DatabaseLogsink represents a logsink
type DatabaseLogsink struct {
ID string `json:"sink_id"`
Name string `json:"sink_name,omitempty"`
Type string `json:"sink_type,omitempty"`
Config *DatabaseLogsinkConfig `json:"config,omitempty"`
}

// TopicPartition represents the state of a Kafka topic partition
type TopicPartition struct {
EarliestOffset uint64 `json:"earliest_offset,omitempty"`
Expand Down Expand Up @@ -507,33 +505,95 @@ type DatabaseFirewallRule struct {
CreatedAt time.Time `json:"created_at"`
}

// DatabaseCreateLogsinkRequest is used to create logsink for a database cluster
type DatabaseCreateLogsinkRequest struct {
Name string `json:"sink_name"`
Type string `json:"sink_type"`
Config *DatabaseLogsinkConfig `json:"config"`
// DatabaseRsyslogLogsink represents a rsyslog logsink.
type DatabaseRsyslogLogsink struct {
ID string `json:"sink_id"`
Name string `json:"sink_name,required"`
Type string `json:"sink_type,required"`
Config *RsyslogLogsinkConfig `json:"config,required"`
}

// DatabaseCreateRsyslogLogsinkRequest is used to create rsyslog logsink for a database cluster.
type DatabaseCreateRsyslogLogsinkRequest struct {
Name string `json:"sink_name"`
Type string `json:"sink_type"`
Config *RsyslogLogsinkConfig `json:"config"`
}

// DatabaseUpdateRsyslogLogsinkRequest is used to update rsyslog logsink for a database cluster.
type DatabaseUpdateRsyslogLogsinkRequest struct {
Config *RsyslogLogsinkConfig `json:"config"`
}

// RsyslogLogsinkConfig represents rsyslog logsink configuration.
type RsyslogLogsinkConfig struct {
Server string `json:"server,required"`
Port int `json:"port,required"`
TLS bool `json:"tls,required"`
Format string `json:"format,required"`
Logline string `json:"logline,omitempty"`
SD string `json:"sd,omitempty"`
CA string `json:"ca,omitempty"`
Key string `json:"key,omitempty"`
Cert string `json:"cert,omitempty"`
}

// DatabaseUpdateLogsinkRequest is used to update logsink for a database cluster
type DatabaseUpdateLogsinkRequest struct {
Config *DatabaseLogsinkConfig `json:"config"`
// DatabaseElasticsearchLogsink represents an elasticsearch logsink.
type DatabaseElasticsearchLogsink struct {
ID string `json:"sink_id"`
Name string `json:"sink_name,required"`
Type string `json:"sink_type,required"`
Config *ElasticsearchLogsinkConfig `json:"config,required"`
}

// DatabaseLogsinkConfig represents one of the configurable options (rsyslog_logsink, elasticsearch_logsink, or opensearch_logsink) for a logsink.
type DatabaseLogsinkConfig struct {
URL string `json:"url,omitempty"`
IndexPrefix string `json:"index_prefix,omitempty"`
// DatabaseCreateElasticsearchLogsinkRequest is used to create elasticsearch logsink for a database cluster.
type DatabaseCreateElasticsearchLogsinkRequest struct {
Name string `json:"sink_name"`
Type string `json:"sink_type"`
Config *ElasticsearchLogsinkConfig `json:"config"`
}

// DatabaseUpdateElasticsearchLogsinkRequest is used to update elasticsearch logsink for a database cluster.
type DatabaseUpdateElasticsearchLogsinkRequest struct {
Config *ElasticsearchLogsinkConfig `json:"config"`
}

// ElasticsearchLogsinkConfig represents elasticsearch logsink configuration.
type ElasticsearchLogsinkConfig struct {
URL string `json:"url,required"`
IndexPrefix string `json:"index_prefix,required"`
IndexDaysMax int `json:"index_days_max,omitempty"`
Timeout float32 `json:"timeout,omitempty"`
CA string `json:"ca,omitempty"`
}

// DatabaseOpensearchLogsink represents an opensearch logsink.
type DatabaseOpensearchLogsink struct {
ID string `json:"sink_id"`
Name string `json:"sink_name,required"`
Type string `json:"sink_type,required"`
Config *OpensearchLogsinkConfig `json:"config,required"`
}

// DatabaseCreateOpensearchLogsinkRequest is used to create opensearch logsink for a database cluster.
type DatabaseCreateOpensearchLogsinkRequest struct {
Name string `json:"sink_name"`
Type string `json:"sink_type"`
Config *OpensearchLogsinkConfig `json:"config"`
}

// DatabaseUpdateOpensearchLogsinkRequest is used to update opensearch logsink for a database cluster.
type DatabaseUpdateOpensearchLogsinkRequest struct {
Config *OpensearchLogsinkConfig `json:"config"`
}

// OpensearchLogsinkConfig represents opensearch logsink configuration.
type OpensearchLogsinkConfig struct {
URL string `json:"url,required"`
IndexPrefix string `json:"index_prefix,required"`
IndexDaysMax int `json:"index_days_max,omitempty"`
Timeout float32 `json:"timeout,omitempty"`
Server string `json:"server,omitempty"`
Port int `json:"port,omitempty"`
TLS bool `json:"tls,omitempty"`
Format string `json:"format,omitempty"`
Logline string `json:"logline,omitempty"`
SD string `json:"sd,omitempty"`
CA string `json:"ca,omitempty"`
Key string `json:"key,omitempty"`
Cert string `json:"cert,omitempty"`
}

// PostgreSQLConfig holds advanced configurations for PostgreSQL database clusters.
Expand Down Expand Up @@ -828,8 +888,20 @@ type databaseTopicsRoot struct {
Topics []DatabaseTopic `json:"topics"`
}

type databaseRsyslogLogsinkRoot struct {
Sink DatabaseRsyslogLogsink `json:"sink"`
}

type databaseElasticsearchLogsinkRoot struct {
Sink DatabaseElasticsearchLogsink `json:"sink"`
}

type databaseOpensearchLogsinkRoot struct {
Sink DatabaseOpensearchLogsink `json:"sink"`
}

type databaseLogsinksRoot struct {
Sinks []DatabaseLogsink `json:"sinks"`
Sinks []interface{} `json:"sinks"`
}

type databaseMetricsCredentialsRoot struct {
Expand Down Expand Up @@ -1878,59 +1950,122 @@ func (svc *DatabasesServiceOp) DeleteIndex(ctx context.Context, databaseID, name
return resp, nil
}

// CreateLogsink creates a new logsink for a database
func (svc *DatabasesServiceOp) CreateLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateLogsinkRequest) (*DatabaseLogsink, *Response, error) {
// ListTopics returns all topics for a given kafka cluster.
func (svc *DatabasesServiceOp) ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]interface{}, *Response, error) {
path := fmt.Sprintf(databaseLogsinksPath, databaseID)
req, err := svc.client.NewRequest(ctx, http.MethodPost, path, createLogsink)
path, err := addOptions(path, opts)
if err != nil {
return nil, nil, err
}

root := new(DatabaseLogsink)
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}
root := new(databaseLogsinksRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root, resp, nil
return root.Sinks, resp, nil
}

// DeleteLogsink deletes a logsink for a database cluster.
func (svc *DatabasesServiceOp) DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodDelete, path, nil)
if err != nil {
return nil, err
}
resp, err := svc.client.Do(ctx, req, nil)
if err != nil {
return resp, err
}
return resp, nil
}

// GetLogsink gets a logsink for a database
func (svc *DatabasesServiceOp) GetLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseLogsink, *Response, error) {
// GetRsyslogLogsink gets a logsink for a database.
func (svc *DatabasesServiceOp) GetRsyslogLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseRsyslogLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}

root := new(DatabaseLogsink)
root := new(databaseRsyslogLogsinkRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root, resp, nil
return &root.Sink, resp, nil
}

// ListTopics returns all topics for a given kafka cluster
func (svc *DatabasesServiceOp) ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseLogsink, *Response, error) {
// CreateRsyslogLogsink creates a new logsink for a database.
func (svc *DatabasesServiceOp) CreateRsyslogLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateRsyslogLogsinkRequest) (*DatabaseRsyslogLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinksPath, databaseID)
path, err := addOptions(path, opts)
req, err := svc.client.NewRequest(ctx, http.MethodPost, path, createLogsink)
if err != nil {
return nil, nil, err
}

root := new(databaseRsyslogLogsinkRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}

return &root.Sink, resp, nil
}

// UpdateRsyslogLogsink updates a logsink for a database cluster.
func (svc *DatabasesServiceOp) UpdateRsyslogLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateRsyslogLogsinkRequest) (*Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodPut, path, updateLogsink)
if err != nil {
return nil, err
}

resp, err := svc.client.Do(ctx, req, nil)
if err != nil {
return resp, err
}
return resp, nil
}

// GetElasticsearchLogsink gets a logsink for a database.
func (svc *DatabasesServiceOp) GetElasticsearchLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseElasticsearchLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}
root := new(databaseLogsinksRoot)

root := new(databaseElasticsearchLogsinkRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root.Sinks, resp, nil
return &root.Sink, resp, nil
}

// CreateElasticsearchLogsink creates a new logsink for a database.
func (svc *DatabasesServiceOp) CreateElasticsearchLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateElasticsearchLogsinkRequest) (*DatabaseElasticsearchLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinksPath, databaseID)
req, err := svc.client.NewRequest(ctx, http.MethodPost, path, createLogsink)
if err != nil {
return nil, nil, err
}

root := new(databaseElasticsearchLogsinkRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}

return &root.Sink, resp, nil
}

// UpdateLogsink updates a logsink for a database cluster
func (svc *DatabasesServiceOp) UpdateLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateLogsinkRequest) (*Response, error) {
// UpdateElasticsearchLogsink updates a logsink for a database cluster.
func (svc *DatabasesServiceOp) UpdateElasticsearchLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateElasticsearchLogsinkRequest) (*Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodPut, path, updateLogsink)
if err != nil {
Expand All @@ -1944,13 +2079,47 @@ func (svc *DatabasesServiceOp) UpdateLogsink(ctx context.Context, databaseID str
return resp, nil
}

// DeleteLogsink deletes a logsink for a database cluster
func (svc *DatabasesServiceOp) DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error) {
// GetOpensearchLogsink gets a logsink for a database.
func (svc *DatabasesServiceOp) GetOpensearchLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseOpensearchLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodDelete, path, nil)
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}

root := new(databaseOpensearchLogsinkRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return &root.Sink, resp, nil
}

// CreateOpensearchLogsink creates a new logsink for a database.
func (svc *DatabasesServiceOp) CreateOpensearchLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateOpensearchLogsinkRequest) (*DatabaseOpensearchLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinksPath, databaseID)
req, err := svc.client.NewRequest(ctx, http.MethodPost, path, createLogsink)
if err != nil {
return nil, nil, err
}

root := new(databaseOpensearchLogsinkRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}

return &root.Sink, resp, nil
}

// UpdateOpensearchLogsink updates a logsink for a database cluster.
func (svc *DatabasesServiceOp) UpdateOpensearchLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateOpensearchLogsinkRequest) (*Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodPut, path, updateLogsink)
if err != nil {
return nil, err
}

resp, err := svc.client.Do(ctx, req, nil)
if err != nil {
return resp, err
Expand Down
Loading

0 comments on commit 89b600b

Please sign in to comment.