From 5ea102ace49887d7edfca413d21fa8a854083334 Mon Sep 17 00:00:00 2001 From: Qiaozp Date: Wed, 6 Sep 2023 17:38:35 +0800 Subject: [PATCH 1/3] Chore: unify sql data driver implementation Signed-off-by: Qiaozp --- .github/workflows/server-test.yml | 10 - .../datastore/datastore_test.go | 23 -- .../infrastructure/datastore/mysql/mysql.go | 253 +-------------- .../datastore/postgres/postgres.go | 253 +-------------- .../infrastructure/datastore/sql/driver.go | 290 ++++++++++++++++++ 5 files changed, 322 insertions(+), 507 deletions(-) create mode 100644 pkg/server/infrastructure/datastore/sql/driver.go diff --git a/.github/workflows/server-test.yml b/.github/workflows/server-test.yml index 45765f28..8c489060 100644 --- a/.github/workflows/server-test.yml +++ b/.github/workflows/server-test.yml @@ -72,16 +72,6 @@ jobs: mysql database: 'kubevela' mysql root password: 'kubevelaSQL123' - - name: Set up OpenGauss - uses: lgj101/opengauss-action@v1.6 - with: - GS_VERSION: '5.0.0' - GS_DB: 'kubevela' - GS_USERNAME: 'kubevela' - GS_PASSWORD: 'Kubevela-123' - HOST_PORT: 15432 - CONTAINER_PORT: 5432 - - name: Set up Postgres uses: Harmon758/postgresql-action@v1 with: diff --git a/pkg/server/infrastructure/datastore/datastore_test.go b/pkg/server/infrastructure/datastore/datastore_test.go index a2e8d186..8ff27d30 100644 --- a/pkg/server/infrastructure/datastore/datastore_test.go +++ b/pkg/server/infrastructure/datastore/datastore_test.go @@ -90,28 +90,6 @@ func initPostgresTestDs() (datastore.DataStore, error) { return postgresDriver, nil } -// initOpenGaussTestDs Postgres Driver is also compatible with OpenGaussian databases -func initOpenGaussTestDs() (datastore.DataStore, error) { - db, err := gorm.Open(postgresorm.Open("postgres://gaussdb:Kubevela-123@127.0.0.1:15432/kubevela?sslmode=disable&client_encoding=UTF-8&connect_timeout=1"), &gorm.Config{}) - if err != nil { - return nil, err - } - for _, v := range model.GetRegisterModels() { - err := db.Migrator().DropTable(&v) - if err != nil { - return nil, err - } - } - openGaussDriver, err := postgres.New(context.TODO(), datastore.Config{ - URL: "postgres://gaussdb:Kubevela-123@127.0.0.1:15432/kubevela?sslmode=disable&client_encoding=UTF-8&connect_timeout=1", - Database: "kubevela", - }) - if err != nil { - return nil, err - } - return openGaussDriver, nil -} - func initKubeapiTestDs() (datastore.DataStore, error) { var testScheme = runtime.NewScheme() testEnv := &envtest.Environment{ @@ -196,7 +174,6 @@ var _ = Describe("Test datastore methods", func() { DriverTest(initMysqlTestDs) DriverTest(initMongodbTestDs) DriverTest(initKubeapiTestDs) - DriverTest(initOpenGaussTestDs) DriverTest(initPostgresTestDs) }) diff --git a/pkg/server/infrastructure/datastore/mysql/mysql.go b/pkg/server/infrastructure/datastore/mysql/mysql.go index 5bb0d008..e28f911e 100644 --- a/pkg/server/infrastructure/datastore/mysql/mysql.go +++ b/pkg/server/infrastructure/datastore/mysql/mysql.go @@ -18,25 +18,17 @@ package mysql import ( "context" - "errors" - "fmt" - "strings" - "time" - - mysqlgorm "gorm.io/driver/mysql" - "gorm.io/gorm" - "gorm.io/gorm/clause" - "gorm.io/gorm/logger" - "k8s.io/klog/v2" - "github.com/kubevela/velaux/pkg/server/domain/model" "github.com/kubevela/velaux/pkg/server/infrastructure/datastore" + "github.com/kubevela/velaux/pkg/server/infrastructure/datastore/sql" "github.com/kubevela/velaux/pkg/server/infrastructure/datastore/sqlnamer" + mysqlgorm "gorm.io/driver/mysql" + "gorm.io/gorm" + "gorm.io/gorm/logger" ) type mysql struct { - client gorm.DB - database string + delegate sql.Driver } // New new mysql datastore instance @@ -56,262 +48,49 @@ func New(ctx context.Context, cfg datastore.Config) (datastore.DataStore, error) } m := &mysql{ - client: *db.WithContext(ctx), - database: cfg.Database, + delegate: sql.Driver{ + Client: *db.WithContext(ctx), + }, } return m, nil } // Add add data model func (m *mysql) Add(ctx context.Context, entity datastore.Entity) error { - if entity.PrimaryKey() == "" { - return datastore.ErrPrimaryEmpty - } - if entity.TableName() == "" { - return datastore.ErrTableNameEmpty - } - entity.SetCreateTime(time.Now()) - entity.SetUpdateTime(time.Now()) - - if dbAdd := m.client.WithContext(ctx).Create(entity); dbAdd.Error != nil { - if match := errors.Is(dbAdd.Error, gorm.ErrDuplicatedKey); match { - return datastore.ErrRecordExist - } - return datastore.NewDBError(dbAdd.Error) - } - return nil + return m.delegate.Add(ctx, entity) } // BatchAdd batch add entity, this operation has some atomicity. func (m *mysql) BatchAdd(ctx context.Context, entities []datastore.Entity) error { - notRollback := make(map[string]bool) - for i, saveEntity := range entities { - if err := m.Add(ctx, saveEntity); err != nil { - if errors.Is(err, datastore.ErrRecordExist) { - notRollback[saveEntity.PrimaryKey()] = true - } - for _, deleteEntity := range entities[:i] { - if _, exit := notRollback[deleteEntity.PrimaryKey()]; !exit { - if err := m.Delete(ctx, deleteEntity); err != nil { - if !errors.Is(err, datastore.ErrRecordNotExist) { - klog.Errorf("rollback delete entity failure %w", err) - } - } - } - } - return datastore.NewDBError(fmt.Errorf("save entities occur error, %w", err)) - } - } - return nil + return m.delegate.BatchAdd(ctx, entities) } // Get get data model func (m *mysql) Get(ctx context.Context, entity datastore.Entity) error { - if entity.PrimaryKey() == "" { - return datastore.ErrPrimaryEmpty - } - if entity.TableName() == "" { - return datastore.ErrTableNameEmpty - } - - if dbGet := m.client.WithContext(ctx).First(entity); dbGet.Error != nil { - if errors.Is(dbGet.Error, gorm.ErrRecordNotFound) { - return datastore.ErrRecordNotExist - } - return datastore.NewDBError(dbGet.Error) - } - return nil + return m.delegate.Get(ctx, entity) } // Put update data model func (m *mysql) Put(ctx context.Context, entity datastore.Entity) error { - if entity.PrimaryKey() == "" { - return datastore.ErrPrimaryEmpty - } - if entity.TableName() == "" { - return datastore.ErrTableNameEmpty - } - entity.SetUpdateTime(time.Now()) - if dbPut := m.client.WithContext(ctx).Model(entity).Updates(entity); dbPut.Error != nil { - if errors.Is(dbPut.Error, gorm.ErrRecordNotFound) { - return datastore.ErrRecordNotExist - } - return datastore.NewDBError(dbPut.Error) - } - return nil + return m.delegate.Put(ctx, entity) } // IsExist determine whether data exists. func (m *mysql) IsExist(ctx context.Context, entity datastore.Entity) (bool, error) { - if entity.PrimaryKey() == "" { - return false, datastore.ErrPrimaryEmpty - } - if entity.TableName() == "" { - return false, datastore.ErrTableNameEmpty - } - - if dbExist := m.client.WithContext(ctx).First(entity); dbExist.Error != nil { - if errors.Is(dbExist.Error, gorm.ErrRecordNotFound) { - return false, nil - } - return false, datastore.NewDBError(dbExist.Error) - } - - return true, nil + return m.delegate.IsExist(ctx, entity) } // Delete delete data func (m *mysql) Delete(ctx context.Context, entity datastore.Entity) error { - if entity.PrimaryKey() == "" { - return datastore.ErrPrimaryEmpty - } - if entity.TableName() == "" { - return datastore.ErrTableNameEmpty - } - // check entity is exist - if err := m.Get(ctx, entity); err != nil { - return err - } - - if dbDelete := m.client.WithContext(ctx).Model(entity).Delete(entity); dbDelete.Error != nil { - klog.Errorf("delete document failure %w", dbDelete.Error) - return datastore.NewDBError(dbDelete.Error) - } - - return nil -} - -// _toColumnName converts keys of the models to lowercase as the column name are in lowercase in the database -func _toColumnName(columnName string) string { - return strings.ToLower(columnName) -} - -func _applyFilterOptions(clauses []clause.Expression, filterOptions datastore.FilterOptions) []clause.Expression { - for _, queryOp := range filterOptions.Queries { - clauses = append(clauses, clause.Like{ - Column: _toColumnName(queryOp.Key), - Value: fmt.Sprintf("%%%s%%", queryOp.Query), - }) - } - for _, queryOp := range filterOptions.In { - values := make([]interface{}, len(queryOp.Values)) - for i, v := range queryOp.Values { - values[i] = v - } - clauses = append(clauses, clause.IN{ - Column: _toColumnName(queryOp.Key), - Values: values, - }) - } - for _, queryOp := range filterOptions.IsNotExist { - clauses = append(clauses, clause.Eq{ - Column: _toColumnName(queryOp.Key), - Value: "", - }) - } - return clauses + return m.delegate.Delete(ctx, entity) } // List list entity function func (m *mysql) List(ctx context.Context, entity datastore.Entity, op *datastore.ListOptions) ([]datastore.Entity, error) { - if entity.TableName() == "" { - return nil, datastore.ErrTableNameEmpty - } - var ( - clauses []clause.Expression - exprs []clause.Expression - limit int - offset int - ) - if op != nil && op.PageSize > 0 && op.Page > 0 { - limit = op.PageSize - offset = op.PageSize * (op.Page - 1) - clauses = append(clauses, clause.Limit{ - Limit: &limit, - Offset: offset, - }) - } - for k, v := range entity.Index() { - exprs = append(exprs, clause.Eq{ - Column: strings.ToLower(k), - Value: v, - }) - } - if op != nil { - exprs = _applyFilterOptions(exprs, op.FilterOptions) - } - if len(exprs) > 0 { - clauses = append(clauses, clause.Where{ - Exprs: exprs, - }) - } - if op != nil && op.SortBy != nil { - var sortOption []clause.OrderByColumn - for _, v := range op.SortBy { - sortOption = append(sortOption, clause.OrderByColumn{ - Column: clause.Column{ - Name: strings.ToLower(v.Key), - }, - Desc: v.Order == datastore.SortOrderDescending, - }) - } - clauses = append(clauses, clause.OrderBy{ - Columns: sortOption, - }) - } - var list []datastore.Entity - rows, err := m.client.WithContext(ctx).Model(entity).Clauses(clauses...).Rows() - if err != nil { - return nil, datastore.NewDBError(err) - } - defer func() { - if err := rows.Close(); err != nil { - klog.Warningf("close rows failure %s", err.Error()) - } - }() - for rows.Next() { - item, err := datastore.NewEntity(entity) - if err != nil { - return nil, datastore.NewDBError(err) - } - err = m.client.WithContext(ctx).ScanRows(rows, &item) - if err != nil { - return nil, datastore.NewDBError(fmt.Errorf("row scan failure %w", err)) - } - list = append(list, item) - } - if err := rows.Err(); err != nil { - return nil, datastore.NewDBError(err) - } - return list, nil + return m.delegate.List(ctx, entity, op) } // Count counts entities func (m *mysql) Count(ctx context.Context, entity datastore.Entity, filterOptions *datastore.FilterOptions) (int64, error) { - if entity.TableName() == "" { - return 0, datastore.ErrTableNameEmpty - } - var ( - count int64 - exprs []clause.Expression - clauses []clause.Expression - ) - for k, v := range entity.Index() { - exprs = append(exprs, clause.Eq{ - Column: strings.ToLower(k), - Value: v, - }) - } - if filterOptions != nil { - exprs = _applyFilterOptions(exprs, *filterOptions) - } - if len(exprs) > 0 { - clauses = append(clauses, clause.Where{ - Exprs: exprs, - }) - } - if dbCount := m.client.WithContext(ctx).Model(entity).Clauses(clauses...).Count(&count); dbCount.Error != nil { - return 0, datastore.NewDBError(dbCount.Error) - } - return count, nil + return m.delegate.Count(ctx, entity, filterOptions) } diff --git a/pkg/server/infrastructure/datastore/postgres/postgres.go b/pkg/server/infrastructure/datastore/postgres/postgres.go index 08e0fdd7..58ccd016 100644 --- a/pkg/server/infrastructure/datastore/postgres/postgres.go +++ b/pkg/server/infrastructure/datastore/postgres/postgres.go @@ -18,25 +18,17 @@ package postgres import ( "context" - "errors" - "fmt" - "strings" - "time" - - postgresorm "gorm.io/driver/postgres" - "gorm.io/gorm" - "gorm.io/gorm/clause" - "gorm.io/gorm/logger" - "k8s.io/klog/v2" - "github.com/kubevela/velaux/pkg/server/domain/model" "github.com/kubevela/velaux/pkg/server/infrastructure/datastore" + "github.com/kubevela/velaux/pkg/server/infrastructure/datastore/sql" "github.com/kubevela/velaux/pkg/server/infrastructure/datastore/sqlnamer" + postgresorm "gorm.io/driver/postgres" + "gorm.io/gorm" + "gorm.io/gorm/logger" ) type postgres struct { - client gorm.DB - database string + delegate sql.Driver } // New postgres datastore instance @@ -59,262 +51,49 @@ func New(ctx context.Context, cfg datastore.Config) (datastore.DataStore, error) } m := &postgres{ - client: *db.WithContext(ctx), - database: cfg.Database, + delegate: sql.Driver{ + Client: *db.WithContext(ctx), + }, } return m, nil } // Add add data model func (m *postgres) Add(ctx context.Context, entity datastore.Entity) error { - if entity.PrimaryKey() == "" { - return datastore.ErrPrimaryEmpty - } - if entity.TableName() == "" { - return datastore.ErrTableNameEmpty - } - entity.SetCreateTime(time.Now()) - entity.SetUpdateTime(time.Now()) - - if dbAdd := m.client.WithContext(ctx).Create(entity); dbAdd.Error != nil { - if match := errors.Is(dbAdd.Error, gorm.ErrDuplicatedKey); match { - return datastore.ErrRecordExist - } - return datastore.NewDBError(dbAdd.Error) - } - return nil + return m.delegate.Add(ctx, entity) } // BatchAdd batch add entity, this operation has some atomicity. func (m *postgres) BatchAdd(ctx context.Context, entities []datastore.Entity) error { - notRollback := make(map[string]bool) - for i, saveEntity := range entities { - if err := m.Add(ctx, saveEntity); err != nil { - if errors.Is(err, datastore.ErrRecordExist) { - notRollback[saveEntity.PrimaryKey()] = true - } - for _, deleteEntity := range entities[:i] { - if _, exit := notRollback[deleteEntity.PrimaryKey()]; !exit { - if err := m.Delete(ctx, deleteEntity); err != nil { - if !errors.Is(err, datastore.ErrRecordNotExist) { - klog.Errorf("rollback delete entity failure %w", err) - } - } - } - } - return datastore.NewDBError(fmt.Errorf("save entities occur error, %w", err)) - } - } - return nil + return m.delegate.BatchAdd(ctx, entities) } // Get get data model func (m *postgres) Get(ctx context.Context, entity datastore.Entity) error { - if entity.PrimaryKey() == "" { - return datastore.ErrPrimaryEmpty - } - if entity.TableName() == "" { - return datastore.ErrTableNameEmpty - } - - if dbGet := m.client.WithContext(ctx).First(entity); dbGet.Error != nil { - if errors.Is(dbGet.Error, gorm.ErrRecordNotFound) { - return datastore.ErrRecordNotExist - } - return datastore.NewDBError(dbGet.Error) - } - return nil + return m.delegate.Get(ctx, entity) } // Put update data model func (m *postgres) Put(ctx context.Context, entity datastore.Entity) error { - if entity.PrimaryKey() == "" { - return datastore.ErrPrimaryEmpty - } - if entity.TableName() == "" { - return datastore.ErrTableNameEmpty - } - entity.SetUpdateTime(time.Now()) - if dbPut := m.client.WithContext(ctx).Model(entity).Updates(entity); dbPut.Error != nil { - if errors.Is(dbPut.Error, gorm.ErrRecordNotFound) { - return datastore.ErrRecordNotExist - } - return datastore.NewDBError(dbPut.Error) - } - return nil + return m.delegate.Put(ctx, entity) } // IsExist determine whether data exists. func (m *postgres) IsExist(ctx context.Context, entity datastore.Entity) (bool, error) { - if entity.PrimaryKey() == "" { - return false, datastore.ErrPrimaryEmpty - } - if entity.TableName() == "" { - return false, datastore.ErrTableNameEmpty - } - - if dbExist := m.client.WithContext(ctx).First(entity); dbExist.Error != nil { - if errors.Is(dbExist.Error, gorm.ErrRecordNotFound) { - return false, nil - } - return false, datastore.NewDBError(dbExist.Error) - } - - return true, nil + return m.delegate.IsExist(ctx, entity) } // Delete delete data func (m *postgres) Delete(ctx context.Context, entity datastore.Entity) error { - if entity.PrimaryKey() == "" { - return datastore.ErrPrimaryEmpty - } - if entity.TableName() == "" { - return datastore.ErrTableNameEmpty - } - // check entity is existed - if err := m.Get(ctx, entity); err != nil { - return err - } - - if dbDelete := m.client.WithContext(ctx).Model(entity).Delete(entity); dbDelete.Error != nil { - klog.Errorf("delete document failure %w", dbDelete.Error) - return datastore.NewDBError(dbDelete.Error) - } - - return nil -} - -// _toColumnName converts keys of the models to lowercase as the column name are in lowercase in the database -func _toColumnName(columnName string) string { - return strings.ToLower(columnName) -} - -func _applyFilterOptions(clauses []clause.Expression, filterOptions datastore.FilterOptions) []clause.Expression { - for _, queryOp := range filterOptions.Queries { - clauses = append(clauses, clause.Like{ - Column: _toColumnName(queryOp.Key), - Value: fmt.Sprintf("%%%s%%", queryOp.Query), - }) - } - for _, queryOp := range filterOptions.In { - values := make([]interface{}, len(queryOp.Values)) - for i, v := range queryOp.Values { - values[i] = v - } - clauses = append(clauses, clause.IN{ - Column: _toColumnName(queryOp.Key), - Values: values, - }) - } - for _, queryOp := range filterOptions.IsNotExist { - clauses = append(clauses, clause.Eq{ - Column: _toColumnName(queryOp.Key), - Value: "", - }) - } - return clauses + return m.delegate.Delete(ctx, entity) } // List list entity function func (m *postgres) List(ctx context.Context, entity datastore.Entity, op *datastore.ListOptions) ([]datastore.Entity, error) { - if entity.TableName() == "" { - return nil, datastore.ErrTableNameEmpty - } - var ( - clauses []clause.Expression - exprs []clause.Expression - limit int - offset int - ) - if op != nil && op.PageSize > 0 && op.Page > 0 { - limit = op.PageSize - offset = op.PageSize * (op.Page - 1) - clauses = append(clauses, clause.Limit{ - Limit: &limit, - Offset: offset, - }) - } - for k, v := range entity.Index() { - exprs = append(exprs, clause.Eq{ - Column: strings.ToLower(k), - Value: v, - }) - } - if op != nil { - exprs = _applyFilterOptions(exprs, op.FilterOptions) - } - if len(exprs) > 0 { - clauses = append(clauses, clause.Where{ - Exprs: exprs, - }) - } - if op != nil && op.SortBy != nil { - var sortOption []clause.OrderByColumn - for _, v := range op.SortBy { - sortOption = append(sortOption, clause.OrderByColumn{ - Column: clause.Column{ - Name: strings.ToLower(v.Key), - }, - Desc: v.Order == datastore.SortOrderDescending, - }) - } - clauses = append(clauses, clause.OrderBy{ - Columns: sortOption, - }) - } - var list []datastore.Entity - rows, err := m.client.WithContext(ctx).Model(entity).Clauses(clauses...).Rows() - if err != nil { - return nil, datastore.NewDBError(err) - } - defer func() { - if err := rows.Close(); err != nil { - klog.Warningf("close rows failure %s", err.Error()) - } - }() - for rows.Next() { - item, err := datastore.NewEntity(entity) - if err != nil { - return nil, datastore.NewDBError(err) - } - err = m.client.WithContext(ctx).ScanRows(rows, &item) - if err != nil { - return nil, datastore.NewDBError(fmt.Errorf("row scan failure %w", err)) - } - list = append(list, item) - } - if err := rows.Err(); err != nil { - return nil, datastore.NewDBError(err) - } - return list, nil + return m.delegate.List(ctx, entity, op) } // Count counts entities func (m *postgres) Count(ctx context.Context, entity datastore.Entity, filterOptions *datastore.FilterOptions) (int64, error) { - if entity.TableName() == "" { - return 0, datastore.ErrTableNameEmpty - } - var ( - count int64 - exprs []clause.Expression - clauses []clause.Expression - ) - for k, v := range entity.Index() { - exprs = append(exprs, clause.Eq{ - Column: strings.ToLower(k), - Value: v, - }) - } - if filterOptions != nil { - exprs = _applyFilterOptions(exprs, *filterOptions) - } - if len(exprs) > 0 { - clauses = append(clauses, clause.Where{ - Exprs: exprs, - }) - } - if dbCount := m.client.WithContext(ctx).Model(entity).Clauses(clauses...).Count(&count); dbCount.Error != nil { - return 0, datastore.NewDBError(dbCount.Error) - } - return count, nil + return m.delegate.Count(ctx, entity, filterOptions) } diff --git a/pkg/server/infrastructure/datastore/sql/driver.go b/pkg/server/infrastructure/datastore/sql/driver.go new file mode 100644 index 00000000..f472c644 --- /dev/null +++ b/pkg/server/infrastructure/datastore/sql/driver.go @@ -0,0 +1,290 @@ +/* +Copyright 2021 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sql + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "gorm.io/gorm" + "gorm.io/gorm/clause" + "k8s.io/klog/v2" + + "github.com/kubevela/velaux/pkg/server/infrastructure/datastore" +) + +// Driver is a unified implementation of SQL driver of datastore +type Driver struct { + Client gorm.DB +} + +// Add data model +func (m *Driver) Add(ctx context.Context, entity datastore.Entity) error { + if entity.PrimaryKey() == "" { + return datastore.ErrPrimaryEmpty + } + if entity.TableName() == "" { + return datastore.ErrTableNameEmpty + } + entity.SetCreateTime(time.Now()) + entity.SetUpdateTime(time.Now()) + + if dbAdd := m.Client.WithContext(ctx).Create(entity); dbAdd.Error != nil { + if match := errors.Is(dbAdd.Error, gorm.ErrDuplicatedKey); match { + return datastore.ErrRecordExist + } + return datastore.NewDBError(dbAdd.Error) + } + return nil +} + +// BatchAdd batch add entity, this operation has some atomicity. +func (m *Driver) BatchAdd(ctx context.Context, entities []datastore.Entity) error { + notRollback := make(map[string]bool) + for i, saveEntity := range entities { + if err := m.Add(ctx, saveEntity); err != nil { + if errors.Is(err, datastore.ErrRecordExist) { + notRollback[saveEntity.PrimaryKey()] = true + } + for _, deleteEntity := range entities[:i] { + if _, exit := notRollback[deleteEntity.PrimaryKey()]; !exit { + if err := m.Delete(ctx, deleteEntity); err != nil { + if !errors.Is(err, datastore.ErrRecordNotExist) { + klog.Errorf("rollback delete entity failure %w", err) + } + } + } + } + return datastore.NewDBError(fmt.Errorf("save entities occur error, %w", err)) + } + } + return nil +} + +// Get get data model +func (m *Driver) Get(ctx context.Context, entity datastore.Entity) error { + if entity.PrimaryKey() == "" { + return datastore.ErrPrimaryEmpty + } + if entity.TableName() == "" { + return datastore.ErrTableNameEmpty + } + + if dbGet := m.Client.WithContext(ctx).First(entity); dbGet.Error != nil { + if errors.Is(dbGet.Error, gorm.ErrRecordNotFound) { + return datastore.ErrRecordNotExist + } + return datastore.NewDBError(dbGet.Error) + } + return nil +} + +// Put update data model +func (m *Driver) Put(ctx context.Context, entity datastore.Entity) error { + if entity.PrimaryKey() == "" { + return datastore.ErrPrimaryEmpty + } + if entity.TableName() == "" { + return datastore.ErrTableNameEmpty + } + entity.SetUpdateTime(time.Now()) + if dbPut := m.Client.WithContext(ctx).Model(entity).Updates(entity); dbPut.Error != nil { + if errors.Is(dbPut.Error, gorm.ErrRecordNotFound) { + return datastore.ErrRecordNotExist + } + return datastore.NewDBError(dbPut.Error) + } + return nil +} + +// IsExist determine whether data exists. +func (m *Driver) IsExist(ctx context.Context, entity datastore.Entity) (bool, error) { + if entity.PrimaryKey() == "" { + return false, datastore.ErrPrimaryEmpty + } + if entity.TableName() == "" { + return false, datastore.ErrTableNameEmpty + } + + if dbExist := m.Client.WithContext(ctx).First(entity); dbExist.Error != nil { + if errors.Is(dbExist.Error, gorm.ErrRecordNotFound) { + return false, nil + } + return false, datastore.NewDBError(dbExist.Error) + } + + return true, nil +} + +// Delete delete data +func (m *Driver) Delete(ctx context.Context, entity datastore.Entity) error { + if entity.PrimaryKey() == "" { + return datastore.ErrPrimaryEmpty + } + if entity.TableName() == "" { + return datastore.ErrTableNameEmpty + } + // check entity is existed + if err := m.Get(ctx, entity); err != nil { + return err + } + + if dbDelete := m.Client.WithContext(ctx).Model(entity).Delete(entity); dbDelete.Error != nil { + klog.Errorf("delete document failure %w", dbDelete.Error) + return datastore.NewDBError(dbDelete.Error) + } + + return nil +} + +// _toColumnName converts keys of the models to lowercase as the column name are in lowercase in the database +func _toColumnName(columnName string) string { + return strings.ToLower(columnName) +} + +func _applyFilterOptions(clauses []clause.Expression, filterOptions datastore.FilterOptions) []clause.Expression { + for _, queryOp := range filterOptions.Queries { + clauses = append(clauses, clause.Like{ + Column: _toColumnName(queryOp.Key), + Value: fmt.Sprintf("%%%s%%", queryOp.Query), + }) + } + for _, queryOp := range filterOptions.In { + values := make([]interface{}, len(queryOp.Values)) + for i, v := range queryOp.Values { + values[i] = v + } + clauses = append(clauses, clause.IN{ + Column: _toColumnName(queryOp.Key), + Values: values, + }) + } + for _, queryOp := range filterOptions.IsNotExist { + clauses = append(clauses, clause.Eq{ + Column: _toColumnName(queryOp.Key), + Value: "", + }) + } + return clauses +} + +// List list entity function +func (m *Driver) List(ctx context.Context, entity datastore.Entity, op *datastore.ListOptions) ([]datastore.Entity, error) { + if entity.TableName() == "" { + return nil, datastore.ErrTableNameEmpty + } + var ( + clauses []clause.Expression + exprs []clause.Expression + limit int + offset int + ) + if op != nil && op.PageSize > 0 && op.Page > 0 { + limit = op.PageSize + offset = op.PageSize * (op.Page - 1) + clauses = append(clauses, clause.Limit{ + Limit: &limit, + Offset: offset, + }) + } + for k, v := range entity.Index() { + exprs = append(exprs, clause.Eq{ + Column: strings.ToLower(k), + Value: v, + }) + } + if op != nil { + exprs = _applyFilterOptions(exprs, op.FilterOptions) + } + if len(exprs) > 0 { + clauses = append(clauses, clause.Where{ + Exprs: exprs, + }) + } + if op != nil && op.SortBy != nil { + var sortOption []clause.OrderByColumn + for _, v := range op.SortBy { + sortOption = append(sortOption, clause.OrderByColumn{ + Column: clause.Column{ + Name: strings.ToLower(v.Key), + }, + Desc: v.Order == datastore.SortOrderDescending, + }) + } + clauses = append(clauses, clause.OrderBy{ + Columns: sortOption, + }) + } + var list []datastore.Entity + rows, err := m.Client.WithContext(ctx).Model(entity).Clauses(clauses...).Rows() + if err != nil { + return nil, datastore.NewDBError(err) + } + defer func() { + if err := rows.Close(); err != nil { + klog.Warningf("close rows failure %s", err.Error()) + } + }() + for rows.Next() { + item, err := datastore.NewEntity(entity) + if err != nil { + return nil, datastore.NewDBError(err) + } + err = m.Client.WithContext(ctx).ScanRows(rows, &item) + if err != nil { + return nil, datastore.NewDBError(fmt.Errorf("row scan failure %w", err)) + } + list = append(list, item) + } + if err := rows.Err(); err != nil { + return nil, datastore.NewDBError(err) + } + return list, nil +} + +// Count counts entities +func (m *Driver) Count(ctx context.Context, entity datastore.Entity, filterOptions *datastore.FilterOptions) (int64, error) { + if entity.TableName() == "" { + return 0, datastore.ErrTableNameEmpty + } + var ( + count int64 + exprs []clause.Expression + clauses []clause.Expression + ) + for k, v := range entity.Index() { + exprs = append(exprs, clause.Eq{ + Column: strings.ToLower(k), + Value: v, + }) + } + if filterOptions != nil { + exprs = _applyFilterOptions(exprs, *filterOptions) + } + if len(exprs) > 0 { + clauses = append(clauses, clause.Where{ + Exprs: exprs, + }) + } + if dbCount := m.Client.WithContext(ctx).Model(entity).Clauses(clauses...).Count(&count); dbCount.Error != nil { + return 0, datastore.NewDBError(dbCount.Error) + } + return count, nil +} From 2069fbd6294057d27789de2c710c96493e6c06b7 Mon Sep 17 00:00:00 2001 From: Qiaozp Date: Thu, 7 Sep 2023 10:03:49 +0800 Subject: [PATCH 2/3] lint Signed-off-by: Qiaozp --- pkg/server/infrastructure/datastore/mysql/mysql.go | 8 +++++--- pkg/server/infrastructure/datastore/postgres/postgres.go | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/server/infrastructure/datastore/mysql/mysql.go b/pkg/server/infrastructure/datastore/mysql/mysql.go index e28f911e..dfc0a13b 100644 --- a/pkg/server/infrastructure/datastore/mysql/mysql.go +++ b/pkg/server/infrastructure/datastore/mysql/mysql.go @@ -18,13 +18,15 @@ package mysql import ( "context" + + mysqlgorm "gorm.io/driver/mysql" + "gorm.io/gorm" + "gorm.io/gorm/logger" + "github.com/kubevela/velaux/pkg/server/domain/model" "github.com/kubevela/velaux/pkg/server/infrastructure/datastore" "github.com/kubevela/velaux/pkg/server/infrastructure/datastore/sql" "github.com/kubevela/velaux/pkg/server/infrastructure/datastore/sqlnamer" - mysqlgorm "gorm.io/driver/mysql" - "gorm.io/gorm" - "gorm.io/gorm/logger" ) type mysql struct { diff --git a/pkg/server/infrastructure/datastore/postgres/postgres.go b/pkg/server/infrastructure/datastore/postgres/postgres.go index 58ccd016..b71ed114 100644 --- a/pkg/server/infrastructure/datastore/postgres/postgres.go +++ b/pkg/server/infrastructure/datastore/postgres/postgres.go @@ -18,13 +18,15 @@ package postgres import ( "context" + + postgresorm "gorm.io/driver/postgres" + "gorm.io/gorm" + "gorm.io/gorm/logger" + "github.com/kubevela/velaux/pkg/server/domain/model" "github.com/kubevela/velaux/pkg/server/infrastructure/datastore" "github.com/kubevela/velaux/pkg/server/infrastructure/datastore/sql" "github.com/kubevela/velaux/pkg/server/infrastructure/datastore/sqlnamer" - postgresorm "gorm.io/driver/postgres" - "gorm.io/gorm" - "gorm.io/gorm/logger" ) type postgres struct { From 5b385a5b9b2d8105857a3db9ae771dcd5ca36b71 Mon Sep 17 00:00:00 2001 From: Qiaozp Date: Thu, 7 Sep 2023 10:27:28 +0800 Subject: [PATCH 3/3] inline datastore interface Signed-off-by: Qiaozp --- .../infrastructure/datastore/mysql/mysql.go | 44 +---------------- .../datastore/postgres/postgres.go | 48 ++----------------- 2 files changed, 6 insertions(+), 86 deletions(-) diff --git a/pkg/server/infrastructure/datastore/mysql/mysql.go b/pkg/server/infrastructure/datastore/mysql/mysql.go index dfc0a13b..9ccd3419 100644 --- a/pkg/server/infrastructure/datastore/mysql/mysql.go +++ b/pkg/server/infrastructure/datastore/mysql/mysql.go @@ -30,7 +30,7 @@ import ( ) type mysql struct { - delegate sql.Driver + sql.Driver } // New new mysql datastore instance @@ -50,49 +50,9 @@ func New(ctx context.Context, cfg datastore.Config) (datastore.DataStore, error) } m := &mysql{ - delegate: sql.Driver{ + Driver: sql.Driver{ Client: *db.WithContext(ctx), }, } return m, nil } - -// Add add data model -func (m *mysql) Add(ctx context.Context, entity datastore.Entity) error { - return m.delegate.Add(ctx, entity) -} - -// BatchAdd batch add entity, this operation has some atomicity. -func (m *mysql) BatchAdd(ctx context.Context, entities []datastore.Entity) error { - return m.delegate.BatchAdd(ctx, entities) -} - -// Get get data model -func (m *mysql) Get(ctx context.Context, entity datastore.Entity) error { - return m.delegate.Get(ctx, entity) -} - -// Put update data model -func (m *mysql) Put(ctx context.Context, entity datastore.Entity) error { - return m.delegate.Put(ctx, entity) -} - -// IsExist determine whether data exists. -func (m *mysql) IsExist(ctx context.Context, entity datastore.Entity) (bool, error) { - return m.delegate.IsExist(ctx, entity) -} - -// Delete delete data -func (m *mysql) Delete(ctx context.Context, entity datastore.Entity) error { - return m.delegate.Delete(ctx, entity) -} - -// List list entity function -func (m *mysql) List(ctx context.Context, entity datastore.Entity, op *datastore.ListOptions) ([]datastore.Entity, error) { - return m.delegate.List(ctx, entity, op) -} - -// Count counts entities -func (m *mysql) Count(ctx context.Context, entity datastore.Entity, filterOptions *datastore.FilterOptions) (int64, error) { - return m.delegate.Count(ctx, entity, filterOptions) -} diff --git a/pkg/server/infrastructure/datastore/postgres/postgres.go b/pkg/server/infrastructure/datastore/postgres/postgres.go index b71ed114..da65643f 100644 --- a/pkg/server/infrastructure/datastore/postgres/postgres.go +++ b/pkg/server/infrastructure/datastore/postgres/postgres.go @@ -30,7 +30,7 @@ import ( ) type postgres struct { - delegate sql.Driver + sql.Driver } // New postgres datastore instance @@ -52,50 +52,10 @@ func New(ctx context.Context, cfg datastore.Config) (datastore.DataStore, error) } } - m := &postgres{ - delegate: sql.Driver{ + p := &postgres{ + Driver: sql.Driver{ Client: *db.WithContext(ctx), }, } - return m, nil -} - -// Add add data model -func (m *postgres) Add(ctx context.Context, entity datastore.Entity) error { - return m.delegate.Add(ctx, entity) -} - -// BatchAdd batch add entity, this operation has some atomicity. -func (m *postgres) BatchAdd(ctx context.Context, entities []datastore.Entity) error { - return m.delegate.BatchAdd(ctx, entities) -} - -// Get get data model -func (m *postgres) Get(ctx context.Context, entity datastore.Entity) error { - return m.delegate.Get(ctx, entity) -} - -// Put update data model -func (m *postgres) Put(ctx context.Context, entity datastore.Entity) error { - return m.delegate.Put(ctx, entity) -} - -// IsExist determine whether data exists. -func (m *postgres) IsExist(ctx context.Context, entity datastore.Entity) (bool, error) { - return m.delegate.IsExist(ctx, entity) -} - -// Delete delete data -func (m *postgres) Delete(ctx context.Context, entity datastore.Entity) error { - return m.delegate.Delete(ctx, entity) -} - -// List list entity function -func (m *postgres) List(ctx context.Context, entity datastore.Entity, op *datastore.ListOptions) ([]datastore.Entity, error) { - return m.delegate.List(ctx, entity, op) -} - -// Count counts entities -func (m *postgres) Count(ctx context.Context, entity datastore.Entity, filterOptions *datastore.FilterOptions) (int64, error) { - return m.delegate.Count(ctx, entity, filterOptions) + return p, nil }