Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: unify sql data driver implementation #873

Merged
merged 3 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions .github/workflows/server-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,6 @@ jobs:
mysql database: 'kubevela'
mysql root password: 'kubevelaSQL123'

- name: Set up OpenGauss
uses: lgj101/opengauss-action@v1.6
with:
GS_VERSION: '5.0.0'
GS_DB: 'kubevela'
GS_USERNAME: 'kubevela'
GS_PASSWORD: 'Kubevela-123'
HOST_PORT: 15432
CONTAINER_PORT: 5432

- name: Set up Postgres
uses: Harmon758/postgresql-action@v1
with:
Expand Down
23 changes: 0 additions & 23 deletions pkg/server/infrastructure/datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,28 +90,6 @@ func initPostgresTestDs() (datastore.DataStore, error) {
return postgresDriver, nil
}

// initOpenGaussTestDs Postgres Driver is also compatible with OpenGaussian databases
func initOpenGaussTestDs() (datastore.DataStore, error) {
db, err := gorm.Open(postgresorm.Open("postgres://gaussdb:Kubevela-123@127.0.0.1:15432/kubevela?sslmode=disable&client_encoding=UTF-8&connect_timeout=1"), &gorm.Config{})
if err != nil {
return nil, err
}
for _, v := range model.GetRegisterModels() {
err := db.Migrator().DropTable(&v)
if err != nil {
return nil, err
}
}
openGaussDriver, err := postgres.New(context.TODO(), datastore.Config{
URL: "postgres://gaussdb:Kubevela-123@127.0.0.1:15432/kubevela?sslmode=disable&client_encoding=UTF-8&connect_timeout=1",
Database: "kubevela",
})
if err != nil {
return nil, err
}
return openGaussDriver, nil
}

func initKubeapiTestDs() (datastore.DataStore, error) {
var testScheme = runtime.NewScheme()
testEnv := &envtest.Environment{
Expand Down Expand Up @@ -196,7 +174,6 @@ var _ = Describe("Test datastore methods", func() {
DriverTest(initMysqlTestDs)
DriverTest(initMongodbTestDs)
DriverTest(initKubeapiTestDs)
DriverTest(initOpenGaussTestDs)
DriverTest(initPostgresTestDs)
})

Expand Down
269 changes: 5 additions & 264 deletions pkg/server/infrastructure/datastore/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,19 @@ package mysql

import (
"context"
"errors"
"fmt"
"strings"
"time"

mysqlgorm "gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/logger"
"k8s.io/klog/v2"

"github.com/kubevela/velaux/pkg/server/domain/model"
"github.com/kubevela/velaux/pkg/server/infrastructure/datastore"
"github.com/kubevela/velaux/pkg/server/infrastructure/datastore/sql"
"github.com/kubevela/velaux/pkg/server/infrastructure/datastore/sqlnamer"
)

type mysql struct {
client gorm.DB
database string
sql.Driver
}

// New new mysql datastore instance
Expand All @@ -56,262 +50,9 @@ func New(ctx context.Context, cfg datastore.Config) (datastore.DataStore, error)
}

m := &mysql{
client: *db.WithContext(ctx),
database: cfg.Database,
Driver: sql.Driver{
Client: *db.WithContext(ctx),
},
}
return m, nil
}

// Add add data model
func (m *mysql) Add(ctx context.Context, entity datastore.Entity) error {
if entity.PrimaryKey() == "" {
return datastore.ErrPrimaryEmpty
}
if entity.TableName() == "" {
return datastore.ErrTableNameEmpty
}
entity.SetCreateTime(time.Now())
entity.SetUpdateTime(time.Now())

if dbAdd := m.client.WithContext(ctx).Create(entity); dbAdd.Error != nil {
if match := errors.Is(dbAdd.Error, gorm.ErrDuplicatedKey); match {
return datastore.ErrRecordExist
}
return datastore.NewDBError(dbAdd.Error)
}
return nil
}

// BatchAdd batch add entity, this operation has some atomicity.
func (m *mysql) BatchAdd(ctx context.Context, entities []datastore.Entity) error {
notRollback := make(map[string]bool)
for i, saveEntity := range entities {
if err := m.Add(ctx, saveEntity); err != nil {
if errors.Is(err, datastore.ErrRecordExist) {
notRollback[saveEntity.PrimaryKey()] = true
}
for _, deleteEntity := range entities[:i] {
if _, exit := notRollback[deleteEntity.PrimaryKey()]; !exit {
if err := m.Delete(ctx, deleteEntity); err != nil {
if !errors.Is(err, datastore.ErrRecordNotExist) {
klog.Errorf("rollback delete entity failure %w", err)
}
}
}
}
return datastore.NewDBError(fmt.Errorf("save entities occur error, %w", err))
}
}
return nil
}

// Get get data model
func (m *mysql) Get(ctx context.Context, entity datastore.Entity) error {
if entity.PrimaryKey() == "" {
return datastore.ErrPrimaryEmpty
}
if entity.TableName() == "" {
return datastore.ErrTableNameEmpty
}

if dbGet := m.client.WithContext(ctx).First(entity); dbGet.Error != nil {
if errors.Is(dbGet.Error, gorm.ErrRecordNotFound) {
return datastore.ErrRecordNotExist
}
return datastore.NewDBError(dbGet.Error)
}
return nil
}

// Put update data model
func (m *mysql) Put(ctx context.Context, entity datastore.Entity) error {
if entity.PrimaryKey() == "" {
return datastore.ErrPrimaryEmpty
}
if entity.TableName() == "" {
return datastore.ErrTableNameEmpty
}
entity.SetUpdateTime(time.Now())
if dbPut := m.client.WithContext(ctx).Model(entity).Updates(entity); dbPut.Error != nil {
if errors.Is(dbPut.Error, gorm.ErrRecordNotFound) {
return datastore.ErrRecordNotExist
}
return datastore.NewDBError(dbPut.Error)
}
return nil
}

// IsExist determine whether data exists.
func (m *mysql) IsExist(ctx context.Context, entity datastore.Entity) (bool, error) {
if entity.PrimaryKey() == "" {
return false, datastore.ErrPrimaryEmpty
}
if entity.TableName() == "" {
return false, datastore.ErrTableNameEmpty
}

if dbExist := m.client.WithContext(ctx).First(entity); dbExist.Error != nil {
if errors.Is(dbExist.Error, gorm.ErrRecordNotFound) {
return false, nil
}
return false, datastore.NewDBError(dbExist.Error)
}

return true, nil
}

// Delete delete data
func (m *mysql) Delete(ctx context.Context, entity datastore.Entity) error {
if entity.PrimaryKey() == "" {
return datastore.ErrPrimaryEmpty
}
if entity.TableName() == "" {
return datastore.ErrTableNameEmpty
}
// check entity is exist
if err := m.Get(ctx, entity); err != nil {
return err
}

if dbDelete := m.client.WithContext(ctx).Model(entity).Delete(entity); dbDelete.Error != nil {
klog.Errorf("delete document failure %w", dbDelete.Error)
return datastore.NewDBError(dbDelete.Error)
}

return nil
}

// _toColumnName converts keys of the models to lowercase as the column name are in lowercase in the database
func _toColumnName(columnName string) string {
return strings.ToLower(columnName)
}

func _applyFilterOptions(clauses []clause.Expression, filterOptions datastore.FilterOptions) []clause.Expression {
for _, queryOp := range filterOptions.Queries {
clauses = append(clauses, clause.Like{
Column: _toColumnName(queryOp.Key),
Value: fmt.Sprintf("%%%s%%", queryOp.Query),
})
}
for _, queryOp := range filterOptions.In {
values := make([]interface{}, len(queryOp.Values))
for i, v := range queryOp.Values {
values[i] = v
}
clauses = append(clauses, clause.IN{
Column: _toColumnName(queryOp.Key),
Values: values,
})
}
for _, queryOp := range filterOptions.IsNotExist {
clauses = append(clauses, clause.Eq{
Column: _toColumnName(queryOp.Key),
Value: "",
})
}
return clauses
}

// List list entity function
func (m *mysql) List(ctx context.Context, entity datastore.Entity, op *datastore.ListOptions) ([]datastore.Entity, error) {
if entity.TableName() == "" {
return nil, datastore.ErrTableNameEmpty
}
var (
clauses []clause.Expression
exprs []clause.Expression
limit int
offset int
)
if op != nil && op.PageSize > 0 && op.Page > 0 {
limit = op.PageSize
offset = op.PageSize * (op.Page - 1)
clauses = append(clauses, clause.Limit{
Limit: &limit,
Offset: offset,
})
}
for k, v := range entity.Index() {
exprs = append(exprs, clause.Eq{
Column: strings.ToLower(k),
Value: v,
})
}
if op != nil {
exprs = _applyFilterOptions(exprs, op.FilterOptions)
}
if len(exprs) > 0 {
clauses = append(clauses, clause.Where{
Exprs: exprs,
})
}
if op != nil && op.SortBy != nil {
var sortOption []clause.OrderByColumn
for _, v := range op.SortBy {
sortOption = append(sortOption, clause.OrderByColumn{
Column: clause.Column{
Name: strings.ToLower(v.Key),
},
Desc: v.Order == datastore.SortOrderDescending,
})
}
clauses = append(clauses, clause.OrderBy{
Columns: sortOption,
})
}
var list []datastore.Entity
rows, err := m.client.WithContext(ctx).Model(entity).Clauses(clauses...).Rows()
if err != nil {
return nil, datastore.NewDBError(err)
}
defer func() {
if err := rows.Close(); err != nil {
klog.Warningf("close rows failure %s", err.Error())
}
}()
for rows.Next() {
item, err := datastore.NewEntity(entity)
if err != nil {
return nil, datastore.NewDBError(err)
}
err = m.client.WithContext(ctx).ScanRows(rows, &item)
if err != nil {
return nil, datastore.NewDBError(fmt.Errorf("row scan failure %w", err))
}
list = append(list, item)
}
if err := rows.Err(); err != nil {
return nil, datastore.NewDBError(err)
}
return list, nil
}

// Count counts entities
func (m *mysql) Count(ctx context.Context, entity datastore.Entity, filterOptions *datastore.FilterOptions) (int64, error) {
if entity.TableName() == "" {
return 0, datastore.ErrTableNameEmpty
}
var (
count int64
exprs []clause.Expression
clauses []clause.Expression
)
for k, v := range entity.Index() {
exprs = append(exprs, clause.Eq{
Column: strings.ToLower(k),
Value: v,
})
}
if filterOptions != nil {
exprs = _applyFilterOptions(exprs, *filterOptions)
}
if len(exprs) > 0 {
clauses = append(clauses, clause.Where{
Exprs: exprs,
})
}
if dbCount := m.client.WithContext(ctx).Model(entity).Clauses(clauses...).Count(&count); dbCount.Error != nil {
return 0, datastore.NewDBError(dbCount.Error)
}
return count, nil
}
Loading
Loading