Skip to content

Commit

Permalink
Improve shared resource handling mechanism on MCI dynamic creation
Browse files Browse the repository at this point in the history
* Remove distributed lock
* Check if there is a Shared Resource before creation
* Execute getVmReqFromDynamicReq() for the first request (i.e., vmRequest[0])
  - It's necessary to create Shared Resources securely.
* Then apply an in-parallel routine for the remaining vmRequest[1:]
  • Loading branch information
yunkon-kim committed Nov 27, 2024
1 parent 463964c commit b65287f
Showing 1 changed file with 98 additions and 113 deletions.
211 changes: 98 additions & 113 deletions src/core/infra/provisioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
validator "github.com/go-playground/validator/v10"
"github.com/go-resty/resty/v2"
"github.com/rs/zerolog/log"
"go.etcd.io/etcd/client/v3/concurrency"
"golang.org/x/net/context"
)

// TbMciReqStructLevelValidation is func to validate fields in TbMciReqStruct
Expand Down Expand Up @@ -958,54 +956,16 @@ func CreateMciDynamic(reqID string, nsId string, req *model.TbMciDynamicReq, dep
return emptyMci, err
}

// Create a persistent session for distributed lock
ctx := context.TODO()
session, err := kvstore.NewSession(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to create a session for dist-lock")
}
defer func() {
err := session.Close()
if err != nil {
log.Error().Err(err).Msg("Failed to close a session for dist-lock")
}
}()
log.Debug().Msgf("Created a session for dist-lock: %v", session)

//If not, generate default resources dynamically.
// Parallel processing of VM requests
var wg sync.WaitGroup
var mutex sync.Mutex
errChan := make(chan error, len(vmRequest)) // Error channel to collect errors

for _, k := range vmRequest {
wg.Add(1)

// Launch a goroutine for each VM request
go func(vmReq model.TbVmDynamicReq) {
defer wg.Done()

req, err := getVmReqFromDynamicReq(ctx, session, reqID, nsId, &vmReq)
// req, err := getVmReqFromDynamicReq(reqID, nsId, &vmReq)
if err != nil {
log.Error().Err(err).Msg("Failed to prepare resources for dynamic MCI creation")
errChan <- err
return
}

// Safely append to the shared mciReq.Vm slice
mutex.Lock()
mciReq.Vm = append(mciReq.Vm, *req)
mutex.Unlock()
}(k)
}

// Wait for all goroutines to complete
wg.Wait()
close(errChan) // Close the error channel after processing
/*
* [NOTE]
* 1. Generate default resources first
* 2. And then, parallel processing of VM requests
*/

// Check for any errors from the goroutines
for err := range errChan {
// Check if vmRequest has elements
if len(vmRequest) > 0 {
// Process the first vmRequest[0] synchronously
req, err := getVmReqFromDynamicReq(reqID, nsId, &vmRequest[0])
if err != nil {
// Rollback if any error occurs
log.Info().Msg("Rolling back created default resources")
Expand All @@ -1017,6 +977,54 @@ func CreateMciDynamic(reqID string, nsId string, req *model.TbMciDynamicReq, dep
return emptyMci, fmt.Errorf("rollback results [%s]: %w", ids, err)
}
}
mciReq.Vm = append(mciReq.Vm, *req)

// Process the rest of vmRequest[1:] in goroutines
if len(vmRequest) > 1 {
var wg sync.WaitGroup
var mutex sync.Mutex
errChan := make(chan error, len(vmRequest)-1) // Error channel to collect errors

for _, k := range vmRequest[1:] {
wg.Add(1)

// Launch a goroutine for each VM request
go func(vmReq model.TbVmDynamicReq) {
defer wg.Done()

req, err := getVmReqFromDynamicReq(reqID, nsId, &vmReq)
if err != nil {
log.Error().Err(err).Msg("Failed to prepare resources for dynamic MCI creation")
errChan <- err
return
}

// Safely append to the shared mciReq.Vm slice
mutex.Lock()
mciReq.Vm = append(mciReq.Vm, *req)
mutex.Unlock()
}(k)
}

// Wait for all goroutines to complete
wg.Wait()
close(errChan) // Close the error channel after processing

// Check for any errors from the goroutines
for err := range errChan {
if err != nil {
// Rollback if any error occurs
log.Info().Msg("Rolling back created default resources")
time.Sleep(5 * time.Second)
if rollbackResult, rollbackErr := resource.DelAllSharedResources(nsId); rollbackErr != nil {
return emptyMci, fmt.Errorf("failed in rollback operation: %w", rollbackErr)
} else {
ids := strings.Join(rollbackResult.IdList, ", ")
return emptyMci, fmt.Errorf("rollback results [%s]: %w", ids, err)
}
}
}
}
}

// Log the prepared MCI request and update the progress
Expand Down Expand Up @@ -1052,9 +1060,7 @@ func CreateMciVmDynamic(nsId string, mciId string, req *model.TbVmDynamicReq) (*
return emptyMci, err
}

ctx := context.TODO()

vmReq, err := getVmReqFromDynamicReq(ctx, nil, "", nsId, req)
vmReq, err := getVmReqFromDynamicReq("", nsId, req)
if err != nil {
log.Error().Err(err).Msg("")
return emptyMci, err
Expand Down Expand Up @@ -1111,7 +1117,7 @@ func checkCommonResAvailableForVmDynamicReq(req *model.TbVmDynamicReq, nsId stri
}

// getVmReqForDynamicMci is func to getVmReqFromDynamicReq
func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, reqID string, nsId string, req *model.TbVmDynamicReq) (*model.TbVmReq, error) {
func getVmReqFromDynamicReq(reqID string, nsId string, req *model.TbVmDynamicReq) (*model.TbVmReq, error) {

onDemand := true

Expand Down Expand Up @@ -1158,39 +1164,6 @@ func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, r
}
}

/*
* [Critial Section]
* - Verify and create vNets in parallel
* - Use distributed-lock, considering running multiple cb-tumblebugs.
*/

var lock *concurrency.Mutex
if session != nil {
// Generate a resource key for vNet
vNetKey := common.GenResourceKey(nsId, model.StrVNet, resourceName)
lockKey := "/dist-lock/" + vNetKey

lock, err = kvstore.NewLock(ctx, session, lockKey)
if err != nil {
log.Error().Err(err).Msg("Failed to get a dist-lock")
}
log.Debug().Msgf("Created a dist-lock: %v", lock)

// Lock
err = lock.Lock(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to acquire a dist-lock")
}
// Unlock the lock when the function exits
defer func() {
err := lock.Unlock(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to release a dist-lock")
}
}()
log.Debug().Msgf("Acquired a dist-lock: %v", lock)
}

common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Setting vNet:" + resourceName, Time: time.Now()})

vmReq.VNetId = resourceName
Expand All @@ -1202,27 +1175,25 @@ func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, r
return &model.TbVmReq{}, err
}
common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Loading default vNet:" + resourceName, Time: time.Now()})
err2 := resource.CreateSharedResource(nsId, model.StrVNet, vmReq.ConnectionName)
if err2 != nil {
log.Error().Err(err2).Msg("Failed to create new default vNet " + vmReq.VNetId + " from " + vmReq.ConnectionName)
return &model.TbVmReq{}, err2
} else {
log.Info().Msg("Created new default vNet: " + vmReq.VNetId)

// Check if the default vNet exists
_, err := resource.GetResource(nsId, model.StrVNet, vmReq.ConnectionName)
log.Debug().Err(err).Msg("checked if the default vNet does NOT exist")
// Create a new default vNet if it does not exist
if err != nil && strings.Contains(err.Error(), "does not exist") {
err2 := resource.CreateSharedResource(nsId, model.StrVNet, vmReq.ConnectionName)
if err2 != nil {
log.Error().Err(err2).Msg("Failed to create new default vNet " + vmReq.VNetId + " from " + vmReq.ConnectionName)
return &model.TbVmReq{}, err2
} else {
log.Info().Msg("Created new default vNet: " + vmReq.VNetId)
}
}
} else {
log.Info().Msg("Found and utilize default vNet: " + vmReq.VNetId)
}
vmReq.SubnetId = resourceName

if session != nil {
// Unlock the lock
err := lock.Unlock(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to release the dist-lock")
}
log.Debug().Msgf("Released the dist-lock: %v", lock)
}

common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Setting SSHKey:" + resourceName, Time: time.Now()})
vmReq.SshKeyId = resourceName
_, err = resource.GetResource(nsId, model.StrSSHKey, vmReq.SshKeyId)
Expand All @@ -1233,12 +1204,19 @@ func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, r
return &model.TbVmReq{}, err
}
common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Loading default SSHKey:" + resourceName, Time: time.Now()})
err2 := resource.CreateSharedResource(nsId, model.StrSSHKey, vmReq.ConnectionName)
if err2 != nil {
log.Error().Err(err2).Msg("Failed to create new default SSHKey " + vmReq.SshKeyId + " from " + vmReq.ConnectionName)
return &model.TbVmReq{}, err2
} else {
log.Info().Msg("Created new default SSHKey: " + vmReq.VNetId)

// Check if the default SSHKey exists
_, err := resource.GetResource(nsId, model.StrSSHKey, vmReq.ConnectionName)
log.Debug().Err(err).Msg("checked if the default SSHKey does NOT exist")
// Create a new default SSHKey if it does not exist
if err != nil && strings.Contains(err.Error(), "does not exist") {
err2 := resource.CreateSharedResource(nsId, model.StrSSHKey, vmReq.ConnectionName)
if err2 != nil {
log.Error().Err(err2).Msg("Failed to create new default SSHKey " + vmReq.SshKeyId + " from " + vmReq.ConnectionName)
return &model.TbVmReq{}, err2
} else {
log.Info().Msg("Created new default SSHKey: " + vmReq.VNetId)
}
}
} else {
log.Info().Msg("Found and utilize default SSHKey: " + vmReq.VNetId)
Expand All @@ -1255,12 +1233,19 @@ func getVmReqFromDynamicReq(ctx context.Context, session *concurrency.Session, r
return &model.TbVmReq{}, err
}
common.UpdateRequestProgress(reqID, common.ProgressInfo{Title: "Loading default securityGroup:" + resourceName, Time: time.Now()})
err2 := resource.CreateSharedResource(nsId, model.StrSecurityGroup, vmReq.ConnectionName)
if err2 != nil {
log.Error().Err(err2).Msg("Failed to create new default securityGroup " + securityGroup + " from " + vmReq.ConnectionName)
return &model.TbVmReq{}, err2
} else {
log.Info().Msg("Created new default securityGroup: " + securityGroup)

// Check if the default security group exists
_, err := resource.GetResource(nsId, model.StrSecurityGroup, vmReq.ConnectionName)
// Create a new default security group if it does not exist
log.Debug().Err(err).Msg("checked if the default security group does NOT exist")
if err != nil && strings.Contains(err.Error(), "does not exist") {
err2 := resource.CreateSharedResource(nsId, model.StrSecurityGroup, vmReq.ConnectionName)
if err2 != nil {
log.Error().Err(err2).Msg("Failed to create new default securityGroup " + securityGroup + " from " + vmReq.ConnectionName)
return &model.TbVmReq{}, err2
} else {
log.Info().Msg("Created new default securityGroup: " + securityGroup)
}
}
} else {
log.Info().Msg("Found and utilize default securityGroup: " + securityGroup)
Expand Down

0 comments on commit b65287f

Please sign in to comment.