Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3026: Use DatabaseInitManager in all GSI cases #7121

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
50 changes: 40 additions & 10 deletions base/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,53 @@ func NewCouchbaseCluster(ctx context.Context, server, username, password,
x509CertPath, x509KeyPath, caCertPath string,
forcePerBucketAuth bool, perBucketCreds PerBucketCredentialsConfig,
tlsSkipVerify *bool, useXattrConfig *bool, bucketMode BucketConnectionMode) (*CouchbaseCluster, error) {
opts := newCouchbaseClusterOpts{
server: server,
username: username,
password: password,
x509CertPath: x509CertPath,
x509KeyPath: x509KeyPath,
caCertPath: caCertPath,
forcePerBucketAuth: forcePerBucketAuth,
perBucketCreds: perBucketCreds,
tlsSkipVerify: tlsSkipVerify,
useXattrConfig: useXattrConfig,
bucketMode: bucketMode,
}
return NewCouchbaseClusterWithOpts(ctx, opts)
}

type newCouchbaseClusterOpts struct {
server string
username string
password string
x509CertPath string
x509KeyPath string
caCertPath string
forcePerBucketAuth bool
perBucketCreds PerBucketCredentialsConfig
tlsSkipVerify *bool
useXattrConfig *bool
bbrks marked this conversation as resolved.
Show resolved Hide resolved
bucketMode BucketConnectionMode
}

securityConfig, err := GoCBv2SecurityConfig(ctx, tlsSkipVerify, caCertPath)
func NewCouchbaseClusterWithOpts(ctx context.Context, opts newCouchbaseClusterOpts) (*CouchbaseCluster, error) {
securityConfig, err := GoCBv2SecurityConfig(ctx, opts.tlsSkipVerify, opts.caCertPath)
if err != nil {
return nil, err
}

clusterAuthConfig, err := GoCBv2Authenticator(
username, password,
x509CertPath, x509KeyPath,
opts.username, opts.password,
opts.x509CertPath, opts.x509KeyPath,
)
if err != nil {
return nil, err
}

// Populate individual bucket credentials
perBucketAuth := make(map[string]*gocb.Authenticator, len(perBucketCreds))
for bucket, credentials := range perBucketCreds {
perBucketAuth := make(map[string]*gocb.Authenticator, len(opts.perBucketCreds))
for bucket, credentials := range opts.perBucketCreds {
authenticator, err := GoCBv2Authenticator(
credentials.Username, credentials.Password,
credentials.X509CertPath, credentials.X509KeyPath,
Expand All @@ -185,19 +215,19 @@ func NewCouchbaseCluster(ctx context.Context, server, username, password,
}

cbCluster := &CouchbaseCluster{
server: server,
forcePerBucketAuth: forcePerBucketAuth,
server: opts.server,
forcePerBucketAuth: opts.forcePerBucketAuth,
perBucketAuth: perBucketAuth,
clusterOptions: clusterOptions,
bucketConnectionMode: bucketMode,
bucketConnectionMode: opts.bucketMode,
}

if bucketMode == CachedClusterConnections {
if opts.bucketMode == CachedClusterConnections {
cbCluster.cachedBucketConnections = cachedBucketConnections{buckets: make(map[string]*cachedBucket)}
}

cbCluster.configPersistence = &DocumentBootstrapPersistence{}
if useXattrConfig != nil && *useXattrConfig == true {
if BoolDefault(opts.useXattrConfig, false) {
cbCluster.configPersistence = &XattrBootstrapPersistence{}
}

Expand Down
4 changes: 4 additions & 0 deletions base/cluster_n1ql.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func NewClusterOnlyN1QLStore(cluster *gocb.Cluster, bucketName, scopeName, colle

}

func (cl *ClusterOnlyN1QLStore) Close() error {
return cl.cluster.Close(nil)
}

func (cl *ClusterOnlyN1QLStore) GetName() string {
return cl.bucketName
}
Expand Down
1 change: 1 addition & 0 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type DatabaseContext struct {
RequireResync base.ScopeAndCollectionNames // Collections requiring resync before database can go online
CORS *auth.CORSConfig // CORS configuration
EnableMou bool // Write _mou xattr when performing metadata-only update. Set based on bucket capability on connect
WasInitializedSynchronously bool // true if the database was initialized synchronously
}

type Scope struct {
Expand Down
61 changes: 33 additions & 28 deletions rest/database_init_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type CollectionInitData map[base.ScopeAndCollectionName]db.CollectionIndexesType

// Initializes the database. Will establish a new cluster connection using the provided server config. Establishes a new
// cluster-only N1QLStore based on the startup config to perform initialization.
func (m *DatabaseInitManager) InitializeDatabase(ctx context.Context, startupConfig *StartupConfig, dbConfig *DatabaseConfig) (doneChan chan error, err error) {
func (m *DatabaseInitManager) InitializeDatabase(ctx context.Context, isServerless bool, dbConfig *DatabaseConfig) (doneChan chan error, err error) {
m.workersLock.Lock()
defer m.workersLock.Unlock()
if m.workers == nil {
Expand All @@ -58,7 +58,8 @@ func (m *DatabaseInitManager) InitializeDatabase(ctx context.Context, startupCon
base.InfofCtx(ctx, base.KeyAll, "Initializing database %s ...",
base.MD(dbConfig.Name))
dbInitWorker, ok := m.workers[dbConfig.Name]
collectionSet := m.buildCollectionIndexData(dbConfig)

collectionSet := buildCollectionIndexData(dbConfig)
if ok {
// If worker exists for the database and the collection sets match, add watcher to the existing worker
if dbInitWorker.collectionsEqual(collectionSet) {
Expand All @@ -72,9 +73,14 @@ func (m *DatabaseInitManager) InitializeDatabase(ctx context.Context, startupCon
delete(m.workers, dbConfig.Name)
}

base.InfofCtx(ctx, base.KeyAll, "Starting new async initialization for database %s ...",
opts := bootstrapConnectionOptsFromDbConfig(dbConfig.DbConfig)
opts.bucketConnectionMode = base.PerUseClusterConnections
opts.isServerless = isServerless

base.InfofCtx(ctx, base.KeyAll, "Starting new initialization for database %s ...",
base.MD(dbConfig.Name))
couchbaseCluster, err := CreateBootstrapConnectionFromStartupConfig(ctx, startupConfig, base.PerUseClusterConnections)

couchbaseCluster, err := createBootstrapConnectionWithOpts(ctx, opts)
if err != nil {
return nil, err
}
Expand All @@ -84,14 +90,13 @@ func (m *DatabaseInitManager) InitializeDatabase(ctx context.Context, startupCon
bucketName = *dbConfig.Bucket
}

// Initialize ClusterN1QLStore for the bucket. Scope and collection name are set
// per-operation
// Initialize ClusterN1QLStore for the bucket. Scope and collection name are set per-operation
n1qlStore, err := couchbaseCluster.GetClusterN1QLStore(bucketName, "", "")
if err != nil {
return nil, err
}

indexOptions := m.BuildIndexOptions(startupConfig, dbConfig)
indexOptions := m.BuildIndexOptions(isServerless, dbConfig)

// Create new worker and add this caller as a watcher
worker := NewDatabaseInitWorker(ctx, dbConfig.Name, n1qlStore, collectionSet, indexOptions, m.collectionCompleteCallback)
Expand All @@ -100,6 +105,7 @@ func (m *DatabaseInitManager) InitializeDatabase(ctx context.Context, startupCon

// Start a goroutine to perform the initialization
go func() {
defer func() { _ = n1qlStore.Close() }()
bbrks marked this conversation as resolved.
Show resolved Hide resolved
defer couchbaseCluster.Close()
// worker.Run blocks until completion, and returns any error on doneChan.
worker.Run()
Expand All @@ -116,7 +122,6 @@ func (m *DatabaseInitManager) InitializeDatabase(ctx context.Context, startupCon

func (m *DatabaseInitManager) HasActiveInitialization(dbName string) bool {
if m == nil {
// When not using persistent config, DatabaseInitManager will be nil
return false
}
m.workersLock.Lock()
Expand All @@ -125,15 +130,15 @@ func (m *DatabaseInitManager) HasActiveInitialization(dbName string) bool {
return ok
}

func (m *DatabaseInitManager) BuildIndexOptions(startupConfig *StartupConfig, dbConfig *DatabaseConfig) db.InitializeIndexOptions {
func (m *DatabaseInitManager) BuildIndexOptions(isServerless bool, dbConfig *DatabaseConfig) db.InitializeIndexOptions {
numReplicas := DefaultNumIndexReplicas
if dbConfig.NumIndexReplicas != nil {
numReplicas = *dbConfig.NumIndexReplicas
}
return db.InitializeIndexOptions{
WaitForIndexesOnlineOption: base.WaitForIndexesInfinite,
NumReplicas: numReplicas,
Serverless: startupConfig.IsServerless(),
Serverless: isServerless,
UseXattrs: dbConfig.UseXattrs(),
}
}
Expand All @@ -156,27 +161,27 @@ func (m *DatabaseInitManager) Cancel(dbName string) {

// buildCollectionIndexData determines the set of indexes required for each collection in the config, including
// the metadata collection
func (m *DatabaseInitManager) buildCollectionIndexData(config *DatabaseConfig) CollectionInitData {
collectionInitData := make(CollectionInitData, 0)
if len(config.Scopes) > 0 {
hasDefaultCollection := false
for scopeName, scopeConfig := range config.Scopes {
for collectionName, _ := range scopeConfig.Collections {
metadataIndexOption := db.IndexesWithoutMetadata
if base.IsDefaultCollection(scopeName, collectionName) {
hasDefaultCollection = true
metadataIndexOption = db.IndexesAll
}
scName := base.ScopeAndCollectionName{Scope: scopeName, Collection: collectionName}
collectionInitData[scName] = metadataIndexOption
func buildCollectionIndexData(config *DatabaseConfig) CollectionInitData {
if len(config.Scopes) == 0 {
return CollectionInitData{base.DefaultScopeAndCollectionName(): db.IndexesAll}
}

defaultScopeAndCollectionMetadataIndexes := db.IndexesMetadataOnly

collectionInitData := make(CollectionInitData)
bbrks marked this conversation as resolved.
Show resolved Hide resolved
for scopeName, scopeConfig := range config.Scopes {
for collectionName := range scopeConfig.Collections {
scName := base.ScopeAndCollectionName{Scope: scopeName, Collection: collectionName}
if scName.IsDefault() {
defaultScopeAndCollectionMetadataIndexes = db.IndexesAll
continue
}
collectionInitData[scName] = db.IndexesWithoutMetadata
}
if !hasDefaultCollection {
collectionInitData[base.DefaultScopeAndCollectionName()] = db.IndexesMetadataOnly
}
} else {
collectionInitData[base.DefaultScopeAndCollectionName()] = db.IndexesAll
}

collectionInitData[base.DefaultScopeAndCollectionName()] = defaultScopeAndCollectionMetadataIndexes

return collectionInitData
}

Expand Down
Loading
Loading