Skip to content

Commit

Permalink
feat: fetch machine specs from bacalhau (#371)
Browse files Browse the repository at this point in the history
* feat: fetch machine specs from bacalhau

* feat: add disk available

* typo fix

Co-authored-by: Brian Ginsburg <7957636+bgins@users.noreply.github.com>

---------

Co-authored-by: Brian Ginsburg <7957636+bgins@users.noreply.github.com>
  • Loading branch information
walkah and bgins authored Sep 25, 2024
1 parent aaf5d9a commit d867292
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 3 deletions.
13 changes: 13 additions & 0 deletions pkg/data/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,24 @@ type MachineSpec struct {
// let's not use a float and fix the precision to 1/1000
GPU int `json:"gpu"`

GPUs []GPUSpec `json:"gpus"`

// Milli-CPU
CPU int `json:"cpu"`

// Megabytes
RAM int `json:"ram"`

// Disk space available
Disk int `json:"disk"`
}

type GPUSpec struct {
Name string `json:"name"`

Vendor string `json:"vendor"`

VRAM int `json:"vram"`
}

// this is what is loaded from the template file in the git repo
Expand Down
26 changes: 26 additions & 0 deletions pkg/executor/bacalhau/bacalhau.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,32 @@ func (executor *BacalhauExecutor) IsAvailable() (bool, error) {
return true, nil
}

func (executor *BacalhauExecutor) GetMachineSpecs() ([]data.MachineSpec, error) {
var specs []data.MachineSpec
result, err := executor.bacalhauClient.getNodes()
if err != nil {
return specs, err
}

for _, node := range result.Nodes {
spec := data.MachineSpec{
CPU: int(node.Info.ComputeNodeInfo.MaxCapacity.CPU) * 1000, // convert float to "milli-CPU"
RAM: int(node.Info.ComputeNodeInfo.MaxCapacity.Memory),
GPU: int(node.Info.ComputeNodeInfo.MaxCapacity.GPU),
Disk: int(node.Info.ComputeNodeInfo.MaxCapacity.Disk),
}
for _, gpu := range node.Info.ComputeNodeInfo.MaxCapacity.GPUs {
spec.GPUs = append(spec.GPUs, data.GPUSpec{
Name: gpu.Name,
Vendor: string(gpu.Vendor),
VRAM: int(gpu.Memory),
})
}
specs = append(specs, spec)
}
return specs, nil
}

func (executor *BacalhauExecutor) RunJob(
deal data.DealContainer,
module data.Module,
Expand Down
4 changes: 4 additions & 0 deletions pkg/executor/bacalhau/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (client *BacalhauClient) getVersion() (apimodels.GetVersionResponse, error)
return getRequest[apimodels.GetVersionResponse](client, "agent/version")
}

func (client *BacalhauClient) getNodes() (apimodels.ListNodesResponse, error) {
return getRequest[apimodels.ListNodesResponse](client, "orchestrator/nodes")
}

func getRequest[ResultType any](client *BacalhauClient, requestPath string) (ResultType, error) {
var result ResultType
url := fmt.Sprintf("%s/%s", client.getBaseUrl(), requestPath)
Expand Down
4 changes: 4 additions & 0 deletions pkg/executor/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (executor *NoopExecutor) IsAvailable() (bool, error) {
return true, nil
}

func (executor *NoopExecutor) GetMachineSpecs() ([]data.MachineSpec, error) {
return []data.MachineSpec{}, nil
}

func (executor *NoopExecutor) RunJob(
deal data.DealContainer,
module data.Module,
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type ExecutorResults struct {
type Executor interface {
Id() (string, error)
IsAvailable() (bool, error)
GetMachineSpecs() ([]data.MachineSpec, error)
// run the given job and return a local folder
// that contains the results
RunJob(
Expand Down
13 changes: 10 additions & 3 deletions pkg/resourceprovider/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (controller *ResourceProviderController) solve(ctx context.Context) error {
*/

/*
Ensure resource offers are posted to the solve
Ensure resource offers are posted to the solver
*/

func (controller *ResourceProviderController) getResourceOffer(index int, spec data.MachineSpec) data.ResourceOffer {
Expand Down Expand Up @@ -274,8 +274,15 @@ func (controller *ResourceProviderController) ensureResourceOffers() error {

addResourceOffers := []data.ResourceOffer{}

// map over the specs we have in the config
for index, spec := range controller.options.Offers.Specs {
// get the specs from our available compute node(s)
computeNodes, err := controller.executor.GetMachineSpecs()
if err != nil {
controller.log.Error("error getting machine specs", err)
return err
}

// map over the specs we have
for index, spec := range computeNodes {

// check if the resource offer already exists
// if it does then we need to update it
Expand Down

0 comments on commit d867292

Please sign in to comment.