Skip to content

Commit

Permalink
Remove GetClusterN1QLStore which creates un-closable connection in …
Browse files Browse the repository at this point in the history
…some cases, and replace with explicit connection in `InitializeDatabase`
  • Loading branch information
bbrks committed Oct 10, 2024
1 parent 1dc68b9 commit c1189ff
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 109 deletions.
93 changes: 28 additions & 65 deletions base/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ type BootstrapConnection interface {
// Returns exists=false if key is not found, returns error for any other error.
GetDocument(ctx context.Context, bucket, docID string, rv interface{}) (exists bool, err error)

// Returns the bootstrap connection's cluster connection as N1QLStore for the specified bucket/scope/collection.
// Does NOT establish a bucket connection, the bucketName/scopeName/collectionName is for query scoping only
GetClusterN1QLStore(bucketName, scopeName, collectionName string) (*ClusterOnlyN1QLStore, error)
// Close releases any long-lived connections
Close()
}
Expand Down Expand Up @@ -149,54 +146,23 @@ var _ BootstrapConnection = &CouchbaseCluster{}
func NewCouchbaseCluster(ctx context.Context, server, username, password,
x509CertPath, x509KeyPath, caCertPath string,
forcePerBucketAuth bool, perBucketCreds PerBucketCredentialsConfig,
tlsSkipVerify *bool, useXattrConfig *bool, bucketMode BucketConnectionMode) (*CouchbaseCluster, error) {
opts := newCouchbaseClusterOpts{
server: server,
username: username,
password: password,
x509CertPath: x509CertPath,
x509KeyPath: x509KeyPath,
caCertPath: caCertPath,
forcePerBucketAuth: forcePerBucketAuth,
perBucketCreds: perBucketCreds,
tlsSkipVerify: tlsSkipVerify,
useXattrConfig: useXattrConfig,
bucketMode: bucketMode,
}
return NewCouchbaseClusterWithOpts(ctx, opts)
}

type newCouchbaseClusterOpts struct {
server string
username string
password string
x509CertPath string
x509KeyPath string
caCertPath string
forcePerBucketAuth bool
perBucketCreds PerBucketCredentialsConfig
tlsSkipVerify *bool
useXattrConfig *bool
bucketMode BucketConnectionMode
}

func NewCouchbaseClusterWithOpts(ctx context.Context, opts newCouchbaseClusterOpts) (*CouchbaseCluster, error) {
securityConfig, err := GoCBv2SecurityConfig(ctx, opts.tlsSkipVerify, opts.caCertPath)
tlsSkipVerify *bool, useXattrConfig bool, bucketMode BucketConnectionMode) (*CouchbaseCluster, error) {
securityConfig, err := GoCBv2SecurityConfig(ctx, tlsSkipVerify, caCertPath)
if err != nil {
return nil, err
}

clusterAuthConfig, err := GoCBv2Authenticator(
opts.username, opts.password,
opts.x509CertPath, opts.x509KeyPath,
username, password,
x509CertPath, x509KeyPath,
)
if err != nil {
return nil, err
}

// Populate individual bucket credentials
perBucketAuth := make(map[string]*gocb.Authenticator, len(opts.perBucketCreds))
for bucket, credentials := range opts.perBucketCreds {
perBucketAuth := make(map[string]*gocb.Authenticator, len(perBucketCreds))
for bucket, credentials := range perBucketCreds {
authenticator, err := GoCBv2Authenticator(
credentials.Username, credentials.Password,
credentials.X509CertPath, credentials.X509KeyPath,
Expand All @@ -215,19 +181,19 @@ func NewCouchbaseClusterWithOpts(ctx context.Context, opts newCouchbaseClusterOp
}

cbCluster := &CouchbaseCluster{
server: opts.server,
forcePerBucketAuth: opts.forcePerBucketAuth,
server: server,
forcePerBucketAuth: forcePerBucketAuth,
perBucketAuth: perBucketAuth,
clusterOptions: clusterOptions,
bucketConnectionMode: opts.bucketMode,
bucketConnectionMode: bucketMode,
}

if opts.bucketMode == CachedClusterConnections {
if bucketMode == CachedClusterConnections {
cbCluster.cachedBucketConnections = cachedBucketConnections{buckets: make(map[string]*cachedBucket)}
}

cbCluster.configPersistence = &DocumentBootstrapPersistence{}
if BoolDefault(opts.useXattrConfig, false) {
if useXattrConfig {
cbCluster.configPersistence = &XattrBootstrapPersistence{}
}

Expand Down Expand Up @@ -514,15 +480,6 @@ func (cc *CouchbaseCluster) Close() {
}
}

func (cc *CouchbaseCluster) GetClusterN1QLStore(bucketName, scopeName, collectionName string) (*ClusterOnlyN1QLStore, error) {
gocbCluster, err := cc.getClusterConnection()
if err != nil {
return nil, err
}

return NewClusterOnlyN1QLStore(gocbCluster, bucketName, scopeName, collectionName)
}

func (cc *CouchbaseCluster) getBucket(ctx context.Context, bucketName string) (b *gocb.Bucket, teardownFn func(), err error) {

if cc.bucketConnectionMode != CachedClusterConnections {
Expand Down Expand Up @@ -553,17 +510,30 @@ func (cc *CouchbaseCluster) getBucket(ctx context.Context, bucketName string) (b
return newBucket, teardownFn, nil
}

// connectToBucket establishes a new connection to a bucket, and returns the bucket after waiting for it to be ready.
func (cc *CouchbaseCluster) connectToBucket(ctx context.Context, bucketName string) (b *gocb.Bucket, teardownFn func(), err error) {
var connection *gocb.Cluster
func (cc *CouchbaseCluster) GetClusterConnectionForBucket(ctx context.Context, bucketName string) (connection *gocb.Cluster, teardownFn func(), err error) {
if bucketAuth, set := cc.perBucketAuth[bucketName]; set {
connection, err = cc.connect(bucketAuth)
} else if cc.forcePerBucketAuth {
return nil, nil, fmt.Errorf("unable to get bucket %q since credentials are not defined in bucket_credentials", MD(bucketName).Redact())
} else {
connection, err = cc.connect(nil)
}
if err != nil {
return nil, nil, err
}

teardownFn = func() {
err := connection.Close(&gocb.ClusterCloseOptions{})
if err != nil {
WarnfCtx(ctx, "Failed to close cluster connection: %v", err)
}
}
return connection, teardownFn, nil
}

// connectToBucket establishes a new connection to a bucket, and returns the bucket after waiting for it to be ready.
func (cc *CouchbaseCluster) connectToBucket(ctx context.Context, bucketName string) (b *gocb.Bucket, teardownFn func(), err error) {
connection, teardownFn, err := cc.GetClusterConnectionForBucket(ctx, bucketName)
if err != nil {
return nil, nil, err
}
Expand All @@ -575,7 +545,7 @@ func (cc *CouchbaseCluster) connectToBucket(ctx context.Context, bucketName stri
ServiceTypes: []gocb.ServiceType{gocb.ServiceTypeKeyValue},
})
if err != nil {
_ = connection.Close(&gocb.ClusterCloseOptions{})
teardownFn()

if errors.Is(err, gocb.ErrAuthenticationFailure) {
return nil, nil, ErrAuthError
Expand All @@ -584,13 +554,6 @@ func (cc *CouchbaseCluster) connectToBucket(ctx context.Context, bucketName stri
return nil, nil, err
}

teardownFn = func() {
err := connection.Close(&gocb.ClusterCloseOptions{})
if err != nil {
WarnfCtx(ctx, "Failed to close cluster connection: %v", err)
}
}

return b, teardownFn, nil
}

Expand Down
2 changes: 1 addition & 1 deletion base/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestBootstrapRefCounting(t *testing.T) {
tlsSkipVerify := BoolPtr(TestTLSSkipVerify())
var perBucketCredentialsConfig map[string]*CredentialsConfig
ctx := TestCtx(t)
cluster, err := NewCouchbaseCluster(ctx, UnitTestUrl(), TestClusterUsername(), TestClusterPassword(), x509CertPath, x509KeyPath, caCertPath, forcePerBucketAuth, perBucketCredentialsConfig, tlsSkipVerify, BoolPtr(TestUseXattrs()), CachedClusterConnections)
cluster, err := NewCouchbaseCluster(ctx, UnitTestUrl(), TestClusterUsername(), TestClusterPassword(), x509CertPath, x509KeyPath, caCertPath, forcePerBucketAuth, perBucketCredentialsConfig, tlsSkipVerify, TestUseXattrs(), CachedClusterConnections)
require.NoError(t, err)
defer cluster.Close()
require.NotNil(t, cluster)
Expand Down
4 changes: 0 additions & 4 deletions base/cluster_n1ql.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ func NewClusterOnlyN1QLStore(cluster *gocb.Cluster, bucketName, scopeName, colle

}

func (cl *ClusterOnlyN1QLStore) Close() error {
return cl.cluster.Close(nil)
}

func (cl *ClusterOnlyN1QLStore) GetName() string {
return cl.bucketName
}
Expand Down
4 changes: 0 additions & 4 deletions base/rosmar_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,3 @@ func (c *RosmarCluster) Close() {
}

func (c *RosmarCluster) SetConnectionStringServerless() error { return nil }

func (c *RosmarCluster) GetClusterN1QLStore(bucketName, scopeName, collectionName string) (*ClusterOnlyN1QLStore, error) {
return nil, errors.New("rosmar doesn't support a N1QL store")
}
22 changes: 16 additions & 6 deletions rest/database_init_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package rest
import (
"context"
"errors"
"fmt"
"sync"

"github.com/couchbase/sync_gateway/base"
Expand Down Expand Up @@ -49,7 +50,7 @@ type CollectionInitData map[base.ScopeAndCollectionName]db.CollectionIndexesType

// Initializes the database. Will establish a new cluster connection using the provided server config. Establishes a new
// cluster-only N1QLStore based on the startup config to perform initialization.
func (m *DatabaseInitManager) InitializeDatabase(ctx context.Context, isServerless bool, dbConfig *DatabaseConfig) (doneChan chan error, err error) {
func (m *DatabaseInitManager) InitializeDatabase(ctx context.Context, startupConfig *StartupConfig, dbConfig *DatabaseConfig) (doneChan chan error, err error) {
m.workersLock.Lock()
defer m.workersLock.Unlock()
if m.workers == nil {
Expand All @@ -73,9 +74,8 @@ func (m *DatabaseInitManager) InitializeDatabase(ctx context.Context, isServerle
delete(m.workers, dbConfig.Name)
}

opts := bootstrapConnectionOptsFromDbConfig(dbConfig.DbConfig)
opts := bootstrapConnectionOptsConfigs(startupConfig, dbConfig.DbConfig)
opts.bucketConnectionMode = base.PerUseClusterConnections
opts.isServerless = isServerless

base.InfofCtx(ctx, base.KeyAll, "Starting new initialization for database %s ...",
base.MD(dbConfig.Name))
Expand All @@ -90,13 +90,23 @@ func (m *DatabaseInitManager) InitializeDatabase(ctx context.Context, isServerle
bucketName = *dbConfig.Bucket
}

cc, ok := couchbaseCluster.(*base.CouchbaseCluster)
if !ok {
return nil, fmt.Errorf("DatabaseInitManager requires gocb.Cluster connection - had %T", couchbaseCluster)
}

connection, closeClusterConnection, err := cc.GetClusterConnectionForBucket(ctx, bucketName)
if err != nil {
return nil, err
}

// Initialize ClusterN1QLStore for the bucket. Scope and collection name are set per-operation
n1qlStore, err := couchbaseCluster.GetClusterN1QLStore(bucketName, "", "")
n1qlStore, err := base.NewClusterOnlyN1QLStore(connection, bucketName, "", "")
if err != nil {
return nil, err
}

indexOptions := m.BuildIndexOptions(isServerless, dbConfig)
indexOptions := m.BuildIndexOptions(startupConfig.IsServerless(), dbConfig)

// Create new worker and add this caller as a watcher
worker := NewDatabaseInitWorker(ctx, dbConfig.Name, n1qlStore, collectionSet, indexOptions, m.collectionCompleteCallback)
Expand All @@ -105,7 +115,7 @@ func (m *DatabaseInitManager) InitializeDatabase(ctx context.Context, isServerle

// Start a goroutine to perform the initialization
go func() {
defer func() { _ = n1qlStore.Close() }()
defer closeClusterConnection()
defer couchbaseCluster.Close()
// worker.Run blocks until completion, and returns any error on doneChan.
worker.Run()
Expand Down
24 changes: 12 additions & 12 deletions rest/database_init_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestDatabaseInitManager(t *testing.T) {
dropAllNonPrimaryIndexes(t, tb.GetSingleDataStore())

// Async index creation
doneChan, err := initMgr.InitializeDatabase(ctx, sc.Config.IsServerless(), dbConfig.ToDatabaseConfig())
doneChan, err := initMgr.InitializeDatabase(ctx, sc.Config, dbConfig.ToDatabaseConfig())
require.NoError(t, err)

select {
Expand Down Expand Up @@ -108,14 +108,14 @@ func TestDatabaseInitConfigChangeSameCollections(t *testing.T) {
require.NoError(t, dbConfig.setup(ctx, dbName, sc.Config.Bootstrap, nil, nil, false))

// Start first async index creation, blocks after first collection
doneChan, err := initMgr.InitializeDatabase(ctx, sc.Config.IsServerless(), dbConfig.ToDatabaseConfig())
doneChan, err := initMgr.InitializeDatabase(ctx, sc.Config, dbConfig.ToDatabaseConfig())
require.NoError(t, err)

// Wait for first collection to be initialized
WaitForChannel(t, singleCollectionInitChannel, "first collection init")

// Make a duplicate call to initialize database, should reuse the existing agent
duplicateDoneChan, err := initMgr.InitializeDatabase(ctx, sc.Config.IsServerless(), dbConfig.ToDatabaseConfig())
duplicateDoneChan, err := initMgr.InitializeDatabase(ctx, sc.Config, dbConfig.ToDatabaseConfig())
require.NoError(t, err)

// Unblock collection callback to process all remaining collections
Expand All @@ -132,7 +132,7 @@ func TestDatabaseInitConfigChangeSameCollections(t *testing.T) {
waitForWorkerDone(t, initMgr, "dbName")

// Rerun init, should start a new worker for the database and re-verify init for each collection
rerunDoneChan, err := initMgr.InitializeDatabase(ctx, sc.Config.IsServerless(), dbConfig.ToDatabaseConfig())
rerunDoneChan, err := initMgr.InitializeDatabase(ctx, sc.Config, dbConfig.ToDatabaseConfig())
require.NoError(t, err)
WaitForChannel(t, rerunDoneChan, "repeated init done chan")
totalCount = atomic.LoadInt64(&collectionCount)
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestDatabaseInitConfigChangeDifferentCollections(t *testing.T) {
require.NoError(t, dbConfig.setup(ctx, dbName, sc.Config.Bootstrap, nil, nil, false))

// Start first async index creation, should block after first collection
doneChan, err := initMgr.InitializeDatabase(ctx, sc.Config.IsServerless(), dbConfig.ToDatabaseConfig())
doneChan, err := initMgr.InitializeDatabase(ctx, sc.Config, dbConfig.ToDatabaseConfig())
require.NoError(t, err)

// Wait for first collection to be initialized
Expand All @@ -203,7 +203,7 @@ func TestDatabaseInitConfigChangeDifferentCollections(t *testing.T) {
// Make a call to initialize database for the same db name, different collections
modifiedDbConfig := makeDbConfig(tb.GetName(), dbName, collection1and3ScopesConfig)
require.NoError(t, modifiedDbConfig.setup(ctx, dbName, sc.Config.Bootstrap, nil, nil, false))
modifiedDoneChan, err := initMgr.InitializeDatabase(ctx, sc.Config.IsServerless(), modifiedDbConfig.ToDatabaseConfig())
modifiedDoneChan, err := initMgr.InitializeDatabase(ctx, sc.Config, modifiedDbConfig.ToDatabaseConfig())
require.NoError(t, err)

// Unblock the first InitializeDatabase, should cancel
Expand Down Expand Up @@ -281,14 +281,14 @@ func TestDatabaseInitConcurrentDatabasesSameBucket(t *testing.T) {
require.NoError(t, db2Config.setup(ctx, db2Name, sc.Config.Bootstrap, nil, nil, false))

// Start first async index creation, should block after first collection
doneChan1, err := initMgr.InitializeDatabase(ctx, sc.Config.IsServerless(), db1Config.ToDatabaseConfig())
doneChan1, err := initMgr.InitializeDatabase(ctx, sc.Config, db1Config.ToDatabaseConfig())
require.NoError(t, err)

// Wait for first collection to be initialized
WaitForChannel(t, firstCollectionInitChannel, "first collection init")

// Start second async index creation for db2 while first is still running
doneChan2, err := initMgr.InitializeDatabase(ctx, sc.Config.IsServerless(), db2Config.ToDatabaseConfig())
doneChan2, err := initMgr.InitializeDatabase(ctx, sc.Config, db2Config.ToDatabaseConfig())
require.NoError(t, err)

// Unblock the first InitializeDatabase, should cancel
Expand Down Expand Up @@ -371,14 +371,14 @@ func TestDatabaseInitConcurrentDatabasesDifferentBuckets(t *testing.T) {
require.NoError(t, db2Config.setup(ctx, db2Name, sc.Config.Bootstrap, nil, nil, false))

// Start first async index creation, should block after first collection
doneChan1, err := initMgr.InitializeDatabase(ctx, sc.Config.IsServerless(), db1Config.ToDatabaseConfig())
doneChan1, err := initMgr.InitializeDatabase(ctx, sc.Config, db1Config.ToDatabaseConfig())
require.NoError(t, err)

// Wait for first collection to be initialized
WaitForChannel(t, firstCollectionInitChannel, "first collection init")

// Start second async index creation for db2 while first is still running
doneChan2, err := initMgr.InitializeDatabase(ctx, sc.Config.IsServerless(), db2Config.ToDatabaseConfig())
doneChan2, err := initMgr.InitializeDatabase(ctx, sc.Config, db2Config.ToDatabaseConfig())
require.NoError(t, err)

// Unblock the first InitializeDatabase, should cancel
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestDatabaseInitTeardownTiming(t *testing.T) {
if currentCount == 0 {
defer wg.Done()
log.Printf("invoking InitializeDatabase again during teardown")
doneChan2, err := initMgr.InitializeDatabase(ctx, sc.Config.IsServerless(), dbConfig.ToDatabaseConfig())
doneChan2, err := initMgr.InitializeDatabase(ctx, sc.Config, dbConfig.ToDatabaseConfig())
require.NoError(t, err)
WaitForChannel(t, doneChan2, "done chan 2")
}
Expand All @@ -455,7 +455,7 @@ func TestDatabaseInitTeardownTiming(t *testing.T) {
}

// Start first async index creation, should block after first collection
doneChan1, err := initMgr.InitializeDatabase(ctx, sc.Config.IsServerless(), dbConfig.ToDatabaseConfig())
doneChan1, err := initMgr.InitializeDatabase(ctx, sc.Config, dbConfig.ToDatabaseConfig())
require.NoError(t, err)

WaitForChannel(t, doneChan1, "done chan 1")
Expand Down
Loading

0 comments on commit c1189ff

Please sign in to comment.