Skip to content

Commit

Permalink
Chore: unify sql data driver implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Qiaozp <qiaozhongpei.qzp@alibaba-inc.com>
  • Loading branch information
chivalryq committed Sep 6, 2023
1 parent 7937d09 commit 5ea102a
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 507 deletions.
10 changes: 0 additions & 10 deletions .github/workflows/server-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,6 @@ jobs:
mysql database: 'kubevela'
mysql root password: 'kubevelaSQL123'

- name: Set up OpenGauss
uses: lgj101/opengauss-action@v1.6
with:
GS_VERSION: '5.0.0'
GS_DB: 'kubevela'
GS_USERNAME: 'kubevela'
GS_PASSWORD: 'Kubevela-123'
HOST_PORT: 15432
CONTAINER_PORT: 5432

- name: Set up Postgres
uses: Harmon758/postgresql-action@v1
with:
Expand Down
23 changes: 0 additions & 23 deletions pkg/server/infrastructure/datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,28 +90,6 @@ func initPostgresTestDs() (datastore.DataStore, error) {
return postgresDriver, nil
}

// initOpenGaussTestDs Postgres Driver is also compatible with OpenGaussian databases
func initOpenGaussTestDs() (datastore.DataStore, error) {
db, err := gorm.Open(postgresorm.Open("postgres://gaussdb:Kubevela-123@127.0.0.1:15432/kubevela?sslmode=disable&client_encoding=UTF-8&connect_timeout=1"), &gorm.Config{})
if err != nil {
return nil, err
}
for _, v := range model.GetRegisterModels() {
err := db.Migrator().DropTable(&v)
if err != nil {
return nil, err
}
}
openGaussDriver, err := postgres.New(context.TODO(), datastore.Config{
URL: "postgres://gaussdb:Kubevela-123@127.0.0.1:15432/kubevela?sslmode=disable&client_encoding=UTF-8&connect_timeout=1",
Database: "kubevela",
})
if err != nil {
return nil, err
}
return openGaussDriver, nil
}

func initKubeapiTestDs() (datastore.DataStore, error) {
var testScheme = runtime.NewScheme()
testEnv := &envtest.Environment{
Expand Down Expand Up @@ -196,7 +174,6 @@ var _ = Describe("Test datastore methods", func() {
DriverTest(initMysqlTestDs)
DriverTest(initMongodbTestDs)
DriverTest(initKubeapiTestDs)
DriverTest(initOpenGaussTestDs)
DriverTest(initPostgresTestDs)
})

Expand Down
253 changes: 16 additions & 237 deletions pkg/server/infrastructure/datastore/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,17 @@ package mysql

import (
"context"
"errors"
"fmt"
"strings"
"time"

mysqlgorm "gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/logger"
"k8s.io/klog/v2"

"github.com/kubevela/velaux/pkg/server/domain/model"
"github.com/kubevela/velaux/pkg/server/infrastructure/datastore"
"github.com/kubevela/velaux/pkg/server/infrastructure/datastore/sql"
"github.com/kubevela/velaux/pkg/server/infrastructure/datastore/sqlnamer"
mysqlgorm "gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)

type mysql struct {
client gorm.DB
database string
delegate sql.Driver
}

// New new mysql datastore instance
Expand All @@ -56,262 +48,49 @@ func New(ctx context.Context, cfg datastore.Config) (datastore.DataStore, error)
}

m := &mysql{
client: *db.WithContext(ctx),
database: cfg.Database,
delegate: sql.Driver{
Client: *db.WithContext(ctx),
},
}
return m, nil
}

// Add add data model
func (m *mysql) Add(ctx context.Context, entity datastore.Entity) error {
if entity.PrimaryKey() == "" {
return datastore.ErrPrimaryEmpty
}
if entity.TableName() == "" {
return datastore.ErrTableNameEmpty
}
entity.SetCreateTime(time.Now())
entity.SetUpdateTime(time.Now())

if dbAdd := m.client.WithContext(ctx).Create(entity); dbAdd.Error != nil {
if match := errors.Is(dbAdd.Error, gorm.ErrDuplicatedKey); match {
return datastore.ErrRecordExist
}
return datastore.NewDBError(dbAdd.Error)
}
return nil
return m.delegate.Add(ctx, entity)

Check warning on line 60 in pkg/server/infrastructure/datastore/mysql/mysql.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/infrastructure/datastore/mysql/mysql.go#L60

Added line #L60 was not covered by tests
}

// BatchAdd batch add entity, this operation has some atomicity.
func (m *mysql) BatchAdd(ctx context.Context, entities []datastore.Entity) error {
notRollback := make(map[string]bool)
for i, saveEntity := range entities {
if err := m.Add(ctx, saveEntity); err != nil {
if errors.Is(err, datastore.ErrRecordExist) {
notRollback[saveEntity.PrimaryKey()] = true
}
for _, deleteEntity := range entities[:i] {
if _, exit := notRollback[deleteEntity.PrimaryKey()]; !exit {
if err := m.Delete(ctx, deleteEntity); err != nil {
if !errors.Is(err, datastore.ErrRecordNotExist) {
klog.Errorf("rollback delete entity failure %w", err)
}
}
}
}
return datastore.NewDBError(fmt.Errorf("save entities occur error, %w", err))
}
}
return nil
return m.delegate.BatchAdd(ctx, entities)

Check warning on line 65 in pkg/server/infrastructure/datastore/mysql/mysql.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/infrastructure/datastore/mysql/mysql.go#L65

Added line #L65 was not covered by tests
}

// Get get data model
func (m *mysql) Get(ctx context.Context, entity datastore.Entity) error {
if entity.PrimaryKey() == "" {
return datastore.ErrPrimaryEmpty
}
if entity.TableName() == "" {
return datastore.ErrTableNameEmpty
}

if dbGet := m.client.WithContext(ctx).First(entity); dbGet.Error != nil {
if errors.Is(dbGet.Error, gorm.ErrRecordNotFound) {
return datastore.ErrRecordNotExist
}
return datastore.NewDBError(dbGet.Error)
}
return nil
return m.delegate.Get(ctx, entity)

Check warning on line 70 in pkg/server/infrastructure/datastore/mysql/mysql.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/infrastructure/datastore/mysql/mysql.go#L70

Added line #L70 was not covered by tests
}

// Put update data model
func (m *mysql) Put(ctx context.Context, entity datastore.Entity) error {
if entity.PrimaryKey() == "" {
return datastore.ErrPrimaryEmpty
}
if entity.TableName() == "" {
return datastore.ErrTableNameEmpty
}
entity.SetUpdateTime(time.Now())
if dbPut := m.client.WithContext(ctx).Model(entity).Updates(entity); dbPut.Error != nil {
if errors.Is(dbPut.Error, gorm.ErrRecordNotFound) {
return datastore.ErrRecordNotExist
}
return datastore.NewDBError(dbPut.Error)
}
return nil
return m.delegate.Put(ctx, entity)

Check warning on line 75 in pkg/server/infrastructure/datastore/mysql/mysql.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/infrastructure/datastore/mysql/mysql.go#L75

Added line #L75 was not covered by tests
}

// IsExist determine whether data exists.
func (m *mysql) IsExist(ctx context.Context, entity datastore.Entity) (bool, error) {
if entity.PrimaryKey() == "" {
return false, datastore.ErrPrimaryEmpty
}
if entity.TableName() == "" {
return false, datastore.ErrTableNameEmpty
}

if dbExist := m.client.WithContext(ctx).First(entity); dbExist.Error != nil {
if errors.Is(dbExist.Error, gorm.ErrRecordNotFound) {
return false, nil
}
return false, datastore.NewDBError(dbExist.Error)
}

return true, nil
return m.delegate.IsExist(ctx, entity)

Check warning on line 80 in pkg/server/infrastructure/datastore/mysql/mysql.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/infrastructure/datastore/mysql/mysql.go#L80

Added line #L80 was not covered by tests
}

// Delete delete data
func (m *mysql) Delete(ctx context.Context, entity datastore.Entity) error {
if entity.PrimaryKey() == "" {
return datastore.ErrPrimaryEmpty
}
if entity.TableName() == "" {
return datastore.ErrTableNameEmpty
}
// check entity is exist
if err := m.Get(ctx, entity); err != nil {
return err
}

if dbDelete := m.client.WithContext(ctx).Model(entity).Delete(entity); dbDelete.Error != nil {
klog.Errorf("delete document failure %w", dbDelete.Error)
return datastore.NewDBError(dbDelete.Error)
}

return nil
}

// _toColumnName converts keys of the models to lowercase as the column name are in lowercase in the database
func _toColumnName(columnName string) string {
return strings.ToLower(columnName)
}

func _applyFilterOptions(clauses []clause.Expression, filterOptions datastore.FilterOptions) []clause.Expression {
for _, queryOp := range filterOptions.Queries {
clauses = append(clauses, clause.Like{
Column: _toColumnName(queryOp.Key),
Value: fmt.Sprintf("%%%s%%", queryOp.Query),
})
}
for _, queryOp := range filterOptions.In {
values := make([]interface{}, len(queryOp.Values))
for i, v := range queryOp.Values {
values[i] = v
}
clauses = append(clauses, clause.IN{
Column: _toColumnName(queryOp.Key),
Values: values,
})
}
for _, queryOp := range filterOptions.IsNotExist {
clauses = append(clauses, clause.Eq{
Column: _toColumnName(queryOp.Key),
Value: "",
})
}
return clauses
return m.delegate.Delete(ctx, entity)

Check warning on line 85 in pkg/server/infrastructure/datastore/mysql/mysql.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/infrastructure/datastore/mysql/mysql.go#L85

Added line #L85 was not covered by tests
}

// List list entity function
func (m *mysql) List(ctx context.Context, entity datastore.Entity, op *datastore.ListOptions) ([]datastore.Entity, error) {
if entity.TableName() == "" {
return nil, datastore.ErrTableNameEmpty
}
var (
clauses []clause.Expression
exprs []clause.Expression
limit int
offset int
)
if op != nil && op.PageSize > 0 && op.Page > 0 {
limit = op.PageSize
offset = op.PageSize * (op.Page - 1)
clauses = append(clauses, clause.Limit{
Limit: &limit,
Offset: offset,
})
}
for k, v := range entity.Index() {
exprs = append(exprs, clause.Eq{
Column: strings.ToLower(k),
Value: v,
})
}
if op != nil {
exprs = _applyFilterOptions(exprs, op.FilterOptions)
}
if len(exprs) > 0 {
clauses = append(clauses, clause.Where{
Exprs: exprs,
})
}
if op != nil && op.SortBy != nil {
var sortOption []clause.OrderByColumn
for _, v := range op.SortBy {
sortOption = append(sortOption, clause.OrderByColumn{
Column: clause.Column{
Name: strings.ToLower(v.Key),
},
Desc: v.Order == datastore.SortOrderDescending,
})
}
clauses = append(clauses, clause.OrderBy{
Columns: sortOption,
})
}
var list []datastore.Entity
rows, err := m.client.WithContext(ctx).Model(entity).Clauses(clauses...).Rows()
if err != nil {
return nil, datastore.NewDBError(err)
}
defer func() {
if err := rows.Close(); err != nil {
klog.Warningf("close rows failure %s", err.Error())
}
}()
for rows.Next() {
item, err := datastore.NewEntity(entity)
if err != nil {
return nil, datastore.NewDBError(err)
}
err = m.client.WithContext(ctx).ScanRows(rows, &item)
if err != nil {
return nil, datastore.NewDBError(fmt.Errorf("row scan failure %w", err))
}
list = append(list, item)
}
if err := rows.Err(); err != nil {
return nil, datastore.NewDBError(err)
}
return list, nil
return m.delegate.List(ctx, entity, op)

Check warning on line 90 in pkg/server/infrastructure/datastore/mysql/mysql.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/infrastructure/datastore/mysql/mysql.go#L90

Added line #L90 was not covered by tests
}

// Count counts entities
func (m *mysql) Count(ctx context.Context, entity datastore.Entity, filterOptions *datastore.FilterOptions) (int64, error) {
if entity.TableName() == "" {
return 0, datastore.ErrTableNameEmpty
}
var (
count int64
exprs []clause.Expression
clauses []clause.Expression
)
for k, v := range entity.Index() {
exprs = append(exprs, clause.Eq{
Column: strings.ToLower(k),
Value: v,
})
}
if filterOptions != nil {
exprs = _applyFilterOptions(exprs, *filterOptions)
}
if len(exprs) > 0 {
clauses = append(clauses, clause.Where{
Exprs: exprs,
})
}
if dbCount := m.client.WithContext(ctx).Model(entity).Clauses(clauses...).Count(&count); dbCount.Error != nil {
return 0, datastore.NewDBError(dbCount.Error)
}
return count, nil
return m.delegate.Count(ctx, entity, filterOptions)

Check warning on line 95 in pkg/server/infrastructure/datastore/mysql/mysql.go

View check run for this annotation

Codecov / codecov/patch

pkg/server/infrastructure/datastore/mysql/mysql.go#L95

Added line #L95 was not covered by tests
}
Loading

0 comments on commit 5ea102a

Please sign in to comment.