Skip to content

Commit

Permalink
Merge pull request whywaita#27 from site0801/fix/fix-revert-v0.0.9-v0…
Browse files Browse the repository at this point in the history
….0.11

Fix bug from v0.0.9 to v0.0.11
  • Loading branch information
whywaita authored Oct 24, 2023
2 parents 65ac707 + eb9adbe commit f301cfd
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 31 deletions.
2 changes: 1 addition & 1 deletion server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/whywaita/shoes-lxd-multi/server
go 1.19

require (
github.com/docker/go-units v0.4.0
github.com/docker/go-units v0.5.0
github.com/lxc/lxd v0.0.0-20211202222358-a293da71aeb0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus/client_golang v1.12.1
Expand Down
2 changes: 2 additions & 0 deletions server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
68 changes: 67 additions & 1 deletion server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/whywaita/shoes-lxd-multi/server/pkg/api"
"github.com/whywaita/shoes-lxd-multi/server/pkg/config"
"github.com/whywaita/shoes-lxd-multi/server/pkg/lxdclient"
"github.com/whywaita/shoes-lxd-multi/server/pkg/metric"
)

Expand All @@ -26,13 +27,22 @@ func main() {
}

func run() error {
hostConfigs, mapping, listenPort, overCommitPercent, err := config.Load()
hostConfigs, mapping, periodSec, listenPort, overCommitPercent, err := config.Load()
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}

go serveMetrics(context.Background(), hostConfigs)

// lxd resource cache
t := time.NewTicker(time.Duration(periodSec) * time.Second)
var hcs []config.HostConfig
hostConfigs.Range(func(key string, value config.HostConfig) bool {
hcs = append(hcs, value)
return true
})
go setLXDResourceCacheWithTicker(hcs, t)

server, err := api.New(hostConfigs, mapping, overCommitPercent)
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
Expand Down Expand Up @@ -70,3 +80,59 @@ func serveMetrics(ctx context.Context, hostConfigs *config.HostConfigMap) {
log.Fatal(err)
}
}

func setLXDResourceCacheWithTicker(hcs []config.HostConfig, ticker *time.Ticker) {
for {
_ = <-ticker.C

log.Print("LXD cache is updating")
if err := setLXDResourceCache(hcs); err != nil {
log.Fatal(err)
}
}
}

func setLXDResourceCache(hcs []config.HostConfig) error {
hosts, err := lxdclient.ConnectLXDs(hcs)
if err != nil {
return fmt.Errorf("failed to connect LXD hosts: %s", err)
}

for _, host := range hosts {
if err := setLXDHostResourceCache(&host); err != nil {
return err
}
}
return nil
}

func setLXDHostResourceCache(host *lxdclient.LXDHost) error {
allCPU, allMemory, hostname, err := lxdclient.ScrapeLXDHostResources(host.Client)
if err != nil {
return fmt.Errorf("failed to scrape lxd resources: %s", err)
}

instances, err := lxdclient.GetAnyInstances(host.Client)
if err != nil {
return fmt.Errorf("failed to retrieve list of instance (host: %s): %s", hostname, err)
}

allocatedCPU, allocatedMemory, err := lxdclient.ScrapeLXDHostAllocatedResources(instances)
if err != nil {
return fmt.Errorf("failed to scrape instance info: %s", err)
}

s := lxdclient.LXDStatus{
Resource: lxdclient.Resource{
CPUTotal: allCPU,
MemoryTotal: allMemory,
CPUUsed: allocatedCPU,
MemoryUsed: allocatedMemory,
},
HostConfig: host.HostConfig,
}
if err := lxdclient.SetStatusCache(host.HostConfig.LxdHost, s); err != nil {
return fmt.Errorf("failed to set status cache: %s", err)
}
return nil
}
4 changes: 4 additions & 0 deletions server/pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"
"net"
"sync"

myshoespb "github.com/whywaita/myshoes/api/proto.go"
pb "github.com/whywaita/shoes-lxd-multi/proto.go"
Expand All @@ -20,6 +21,8 @@ type ShoesLXDMultiServer struct {
resourceMapping map[myshoespb.ResourceType]config.Mapping

overCommitPercent uint64

mu sync.Mutex
}

// New create gRPC server
Expand All @@ -28,6 +31,7 @@ func New(hostConfigs *config.HostConfigMap, mapping map[myshoespb.ResourceType]c
hostConfigs: hostConfigs,
resourceMapping: mapping,
overCommitPercent: overCommitPercent,
mu: sync.Mutex{},
}, nil
}

Expand Down
70 changes: 47 additions & 23 deletions server/pkg/api/server_add_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log"
"math/rand"
"net/url"
"sort"
"strconv"
"strings"
"sync"
Expand All @@ -23,6 +22,8 @@ import (
pb "github.com/whywaita/shoes-lxd-multi/proto.go"
"github.com/whywaita/shoes-lxd-multi/server/pkg/lxdclient"

"github.com/docker/go-units"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -51,22 +52,12 @@ func (s *ShoesLXDMultiServer) AddInstance(ctx context.Context, req *pb.AddInstan
}

var client lxd.InstanceServer
var reqInstance api.InstancesPost
if errors.Is(err, ErrInstanceIsNotFound) {
host, err = s.scheduleHost(targetLXDHosts)
host, reqInstance, err = s.setLXDStatusCache(targetLXDHosts, instanceName, instanceSource, req)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to schedule host: %+v", err)
}
log.Printf("AddInstance scheduled host: %s, runnerName: %s\n", host.HostConfig.LxdHost, instanceName)

reqInstance := api.InstancesPost{
InstancePut: api.InstancePut{
Config: s.getInstanceConfig(req.SetupScript, req.ResourceType),
Devices: s.getInstanceDevices(),
},
Name: instanceName,
Source: *instanceSource,
return nil, err
}

client = host.Client
op, err := client.CreateInstance(reqInstance)
if err != nil {
Expand Down Expand Up @@ -105,6 +96,47 @@ func (s *ShoesLXDMultiServer) AddInstance(ctx context.Context, req *pb.AddInstan
}, nil
}

func (s *ShoesLXDMultiServer) setLXDStatusCache(targetLXDHosts []lxdclient.LXDHost, instanceName string, instanceSource *api.InstanceSource, req *pb.AddInstanceRequest) (*lxdclient.LXDHost, api.InstancesPost, error) {
s.mu.Lock()
defer s.mu.Unlock()

host, err := s.scheduleHost(targetLXDHosts)
if err != nil {
return nil, api.InstancesPost{}, status.Errorf(codes.InvalidArgument, "failed to schedule host: %+v", err)
}
log.Printf("AddInstance scheduled host: %s, runnerName: %s\n", host.HostConfig.LxdHost, instanceName)

reqInstance := api.InstancesPost{
InstancePut: api.InstancePut{
Config: s.getInstanceConfig(req.SetupScript, req.ResourceType),
Devices: s.getInstanceDevices(),
},
Name: instanceName,
Source: *instanceSource,
}

cpu, err := strconv.ParseUint(reqInstance.InstancePut.Config["limits.cpu"], 10, 64)
if err != nil {
return nil, api.InstancesPost{}, fmt.Errorf("failde to parse limits.cpu: %w", err)
}

memory, err := units.FromHumanSize(reqInstance.InstancePut.Config["limits.memory"])
if err != nil {
return nil, api.InstancesPost{}, fmt.Errorf("failde to parse limits.memory: %w", err)
}

cache, err := lxdclient.GetStatusCache(host.HostConfig.LxdHost)
if err != nil {
return nil, api.InstancesPost{}, err
}
cache.Resource.CPUUsed += cpu
cache.Resource.MemoryUsed += uint64(memory)
if err := lxdclient.SetStatusCache(host.HostConfig.LxdHost, cache); err != nil {
return nil, api.InstancesPost{}, fmt.Errorf("failed to set status cache: %s", err)
}
return host, reqInstance, nil
}

func (s *ShoesLXDMultiServer) getInstanceConfig(setupScript string, rt myshoespb.ResourceType) map[string]string {
rawLXCConfig := `lxc.apparmor.profile = unconfined
lxc.cgroup.devices.allow = a
Expand Down Expand Up @@ -209,15 +241,7 @@ func schedule(targets []targetHost, limitOverCommit uint64) (*targetHost, error)
return nil, ErrNoValidHost
}

// 1. use lowest over-commit instance
// 2. check limit of over-commit
sort.SliceStable(schedulableTargets, func(i, j int) bool {
// lowest percentOverCommit is first
return schedulableTargets[i].percentOverCommit < schedulableTargets[j].percentOverCommit
})

index := rand.Intn(len(schedulableTargets))
return &schedulableTargets[index], nil
return &schedulableTargets[rand.Intn(len(schedulableTargets))], nil
}

// parseAlias parse user input
Expand Down
25 changes: 19 additions & 6 deletions server/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const (

// EnvLXDResourceTypeMapping is mapping resource in lxd
EnvLXDResourceTypeMapping = "LXD_MULTI_RESOURCE_TYPE_MAPPING"
// EnvLXDResourceCachePeriodSec is period of setting LXD resource cache
EnvLXDResourceCachePeriodSec = "LXD_MULTI_RESOURCE_CACHE_PERIOD_SEC"
// EnvPort will listen port
EnvPort = "LXD_MULTI_PORT"
// EnvOverCommit will set percent of over commit in CPU
Expand All @@ -30,18 +32,29 @@ type Mapping struct {
}

// Load load config from Environment values
func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int, uint64, error) {
func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int64, int, uint64, error) {
hostConfigs, err := loadHostConfigs()
if err != nil {
return nil, nil, -1, 0, fmt.Errorf("failed to load host config: %w", err)
return nil, nil, 0, -1, 0, fmt.Errorf("failed to load host config: %w", err)
}

envMappingJSON := os.Getenv(EnvLXDResourceTypeMapping)
var m map[myshoespb.ResourceType]Mapping
if envMappingJSON != "" {
m, err = readResourceTypeMapping(envMappingJSON)
if err != nil {
return nil, nil, -1, 0, fmt.Errorf("failed to read %s: %w", EnvLXDResourceTypeMapping, err)
return nil, nil, 0, -1, 0, fmt.Errorf("failed to read %s: %w", EnvLXDResourceTypeMapping, err)
}
}

envPeriodSec := os.Getenv(EnvLXDResourceCachePeriodSec)
var periodSec int64
if envPeriodSec == "" {
periodSec = 10
} else {
periodSec, err = strconv.ParseInt(envPeriodSec, 10, 64)
if err != nil {
return nil, nil, 0, -1, 0, fmt.Errorf("failed to parse %s, need to uint: %w", EnvOverCommit, err)
}
}

Expand All @@ -52,7 +65,7 @@ func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int, uint64, er
} else {
port, err = strconv.Atoi(envPort)
if err != nil {
return nil, nil, -1, 0, fmt.Errorf("failed to parse %s, need to int: %w", EnvPort, err)
return nil, nil, 0, -1, 0, fmt.Errorf("failed to parse %s, need to int: %w", EnvPort, err)
}
}

Expand All @@ -63,11 +76,11 @@ func Load() (*HostConfigMap, map[myshoespb.ResourceType]Mapping, int, uint64, er
} else {
overCommitPercent, err = strconv.ParseUint(envOCP, 10, 64)
if err != nil {
return nil, nil, -1, 0, fmt.Errorf("failed to parse %s, need to uint: %w", EnvOverCommit, err)
return nil, nil, 0, -1, 0, fmt.Errorf("failed to parse %s, need to uint: %w", EnvOverCommit, err)
}
}

return hostConfigs, m, port, overCommitPercent, nil
return hostConfigs, m, periodSec, port, overCommitPercent, nil
}

func readResourceTypeMapping(env string) (map[myshoespb.ResourceType]Mapping, error) {
Expand Down

0 comments on commit f301cfd

Please sign in to comment.