Skip to content

Commit

Permalink
removed spew, fixed loop bug, added env variables, fixed translator bug
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindi committed Dec 6, 2024
1 parent 3365f61 commit 3c3564d
Show file tree
Hide file tree
Showing 14 changed files with 65 additions and 118 deletions.
6 changes: 5 additions & 1 deletion .local.dev
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@ WEB3_PAYMENTS_ADDRESS=0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6
WEB3_POW_ADDRESS_=0x4ed7c70F96B99c776995fB64377f0d4aB3B0e1C1
WEB3_STORAGE_ADDRESS_=0xB7f8BC63BbcaD18155201308C8f3540b07f84F5e
WEB3_TOKEN_ADDRESS_=0xa513E6E4b8f2a923D98304ec87F64353C4D5C853
WEB3_USERS_ADDRESS=0x0DCd1Bf9A1b36cE34237eEaFef220932846BCD82
WEB3_USERS_ADDRESS=0x0DCd1Bf9A1b36cE34237eEaFef220932846BCD82
BACALHAU_API_HOST=localhost
BACALHAU_API_PORT=20000
JOB_STATUS_POLL_INTERVAL=5

9 changes: 1 addition & 8 deletions cmd/lilypad/mediator.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package lilypad

import (
"fmt"

"github.com/lilypad-tech/lilypad/pkg/executor/bacalhau"
"github.com/lilypad-tech/lilypad/pkg/ipfs"
"github.com/lilypad-tech/lilypad/pkg/mediator"
optionsfactory "github.com/lilypad-tech/lilypad/pkg/options"
"github.com/lilypad-tech/lilypad/pkg/system"
Expand Down Expand Up @@ -48,12 +45,8 @@ func runMediator(cmd *cobra.Command, options mediator.MediatorOptions) error {
return err
}

ipfsClient, err := ipfs.NewClient(commandCtx.Ctx, options.IPFS.Addr)
if err != nil {
return fmt.Errorf("error creating IPFS client: %s", err.Error())
}

executor, err := bacalhau.NewBacalhauExecutor(options.Bacalhau, ipfsClient)
executor, err := bacalhau.NewBacalhauExecutor(options.Bacalhau)
if err != nil {
return err
}
Expand Down
10 changes: 1 addition & 9 deletions cmd/lilypad/resource-provider.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package lilypad

import (
"fmt"

"github.com/lilypad-tech/lilypad/pkg/executor/bacalhau"
"github.com/lilypad-tech/lilypad/pkg/ipfs"
optionsfactory "github.com/lilypad-tech/lilypad/pkg/options"
"github.com/lilypad-tech/lilypad/pkg/resourceprovider"
"github.com/lilypad-tech/lilypad/pkg/system"
Expand Down Expand Up @@ -53,12 +50,7 @@ func runResourceProvider(cmd *cobra.Command, options resourceprovider.ResourcePr
return err
}

ipfsClient, err := ipfs.NewClient(commandCtx.Ctx, options.IPFS.Addr)
if err != nil {
return fmt.Errorf("error creating IPFS client: %s", err.Error())
}

executor, err := bacalhau.NewBacalhauExecutor(options.Bacalhau, ipfsClient)
executor, err := bacalhau.NewBacalhauExecutor(options.Bacalhau)
if err != nil {
return err
}
Expand Down
4 changes: 1 addition & 3 deletions cmd/lilypad/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,7 @@ func runJob(cmd *cobra.Command, options jobcreator.JobCreatorOptions, network st
os.Exit(1)
}

// UPDATE FUNCTION
// fmt.Printf("evOffer: %s --------------------------------------\n")
// spew.Dump(evOffer)


})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ toolchain go1.23.2
require (
github.com/BurntSushi/toml v0.3.1
github.com/bacalhau-project/bacalhau v1.5.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/ethereum/go-ethereum v1.13.4
github.com/fatih/color v1.16.0
github.com/go-git/go-git/v5 v5.10.0
Expand Down Expand Up @@ -63,6 +62,7 @@ require (
github.com/crackcomm/go-gitignore v0.0.0-20231225121904-e25f5bc08668 // indirect
github.com/crate-crypto/go-kzg-4844 v0.3.0 // indirect
github.com/cyphar/filepath-securejoin v0.2.4 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
Expand Down
89 changes: 42 additions & 47 deletions pkg/executor/bacalhau/bacalhau.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ import (
"path/filepath"
"strings"
"time"

"github.com/lilypad-tech/lilypad/pkg/data"
executorlib "github.com/lilypad-tech/lilypad/pkg/executor"
"github.com/lilypad-tech/lilypad/pkg/ipfs"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/ipfs/boxo/blockservice"
blockstore "github.com/ipfs/boxo/blockstore"
Expand All @@ -35,47 +32,33 @@ const RESULTS_DIR = "bacalhau-results"

type BacalhauExecutorOptions struct {
ApiHost string
ApiPort string
ApiPort string
JobStatusPollInterval uint64
}

type BacalhauExecutor struct {
Options BacalhauExecutorOptions
bacalhauEnv []string

bacalhauClient BacalhauClient
ipfsClient *ipfs.Client
}

func NewBacalhauExecutor(options BacalhauExecutorOptions, ipfsClient *ipfs.Client) (*BacalhauExecutor, error) {
func NewBacalhauExecutor(options BacalhauExecutorOptions) (*BacalhauExecutor, error) {

// apiHost := ""
// if options.ApiHost != "DO_NOT_SET" {
// apiHost = fmt.Sprintf("BACALHAU_API_HOST=%s", options.ApiHost)
// }
// fmt.Printf("OPTIONS: %s\n", options)
// bacalhauEnv := []string{
// apiHost,
// fmt.Sprintf("HOME=%s", os.Getenv("HOME")),
// }
apiHost := fmt.Sprintf("http://%s:%s", options.ApiHost, options.ApiPort)

// log.Debug().Msgf("bacalhauEnv: %s", bacalhauEnv)
apiHost := fmt.Sprintf("http://%s:%s", "localhost", "20000")

client, err := NewBacalhauClient(apiHost)
client, err := newBacalhauClient(apiHost)
if err != nil {
return nil, err
}

return &BacalhauExecutor{
Options: options,

bacalhauClient: *client,
ipfsClient: ipfsClient,
}, nil
}

func (executor *BacalhauExecutor) Id() (string, error) {
id, err := executor.bacalhauClient.GetID()
id, err := executor.bacalhauClient.getID()
if err != nil {
return "", fmt.Errorf("error getting bacalhau ID %s", err.Error())
}
Expand All @@ -84,26 +67,46 @@ func (executor *BacalhauExecutor) Id() (string, error) {

// Checks that Bacalhau is installed, correctly configured, and available
func (executor *BacalhauExecutor) IsAvailable() (bool, error) {
alive, err := executor.bacalhauClient.Alive()
alive, err := executor.bacalhauClient.alive()
if !alive || err != nil {
return false, fmt.Errorf("Bacalhau is not currently available. Please ensure that Bacalhau is running, then try again. %w", err)
}

// Check that we have the right version of Bacalhau
version, err := executor.bacalhauClient.GetVersion()
version, err := executor.bacalhauClient.getVersion()
if err != nil {
return false, fmt.Errorf("error getting bacalhau version %s", err.Error())
}
// TODO: we may want to relax this
if version != "v1.5.1" {

if version < "v1.5.1" {
return false, errors.New("Bacalhau version must be v1.5.1")
}

return true, nil
}

func (executor *BacalhauExecutor) GetMachineSpecs() ([]data.MachineSpec, error) {
return executor.bacalhauClient.GetMachineSpecs()
nodes, err := executor.bacalhauClient.getNodes()
var specs []data.MachineSpec
for _, node := range nodes {
spec := data.MachineSpec{
CPU: int(node.Info.ComputeNodeInfo.MaxCapacity.CPU) * 1000, // convert float to "milli-CPU"
RAM: int(node.Info.ComputeNodeInfo.MaxCapacity.Memory),
GPU: int(node.Info.ComputeNodeInfo.MaxCapacity.GPU),
Disk: int(node.Info.ComputeNodeInfo.MaxCapacity.Disk),
}
for _, gpu := range node.Info.ComputeNodeInfo.MaxCapacity.GPUs {
spec.GPUs = append(spec.GPUs, data.GPUSpec{
Name: gpu.Name,
Vendor: string(gpu.Vendor),
VRAM: int(gpu.Memory),
})
}
specs = append(specs, spec)
}
if err != nil {
return nil, err
}
return specs, nil
}

func (executor *BacalhauExecutor) RunJob(
Expand All @@ -118,7 +121,7 @@ func (executor *BacalhauExecutor) RunJob(

var jobExecutions []*models.Execution
for {
jobInfo, err := executor.bacalhauClient.GetJob(jobId)
jobInfo, err := executor.bacalhauClient.getJob(jobId)
if err != nil {
return nil, fmt.Errorf("error getting job %s: %s", jobId, err.Error())
}
Expand All @@ -130,20 +133,18 @@ func (executor *BacalhauExecutor) RunJob(
jobExecutions = jobInfo.Executions.Items

if len(jobExecutions) > 0 {
if jobInfo.Job.State.StateType != models.JobStateTypeCompleted {
return nil, fmt.Errorf("job %s did not complete yet: %s", jobId, jobInfo.Job.State.StateType.String())
if jobInfo.Job.State.StateType == models.JobStateTypeCompleted {
break
}
break
}

time.Sleep(5 * time.Second)
time.Sleep(time.Duration(executor.Options.JobStatusPollInterval) * time.Second)
}

system.EnsureDataDir(RESULTS_DIR)
resultsDir := system.GetDataDir(filepath.Join(RESULTS_DIR, deal.ID))


cid,resultsDir, err := executor.prepareResults(jobId,jobExecutions[0].ID)
cid, resultsDir, err := executor.prepareResults(jobId, jobExecutions[0].ID)
if err != nil {
return nil, fmt.Errorf("error preparing results: %s", err.Error())
}
Expand All @@ -158,18 +159,17 @@ func (executor *BacalhauExecutor) RunJob(
}

func (executor *BacalhauExecutor) prepareResults(jobId string, executionId string) (cid string, resultsDir string, err error) {

home, err := os.UserHomeDir()
if err != nil {
return "", "", fmt.Errorf("error getting home directory: %s", err.Error())
}

resultsPath := filepath.Join(home, ".bacalhau", "compute", "results", "local-publisher", fmt.Sprintf("%s.tar.gz", executionId))



gzipFile, err := os.Open(resultsPath)
if err != nil {
return "", "", fmt.Errorf("error opening gzip file: %s", err.Error())
return "", "", fmt.Errorf("error opening gzip file: %s", err.Error())
}
defer gzipFile.Close()

Expand All @@ -179,7 +179,6 @@ func (executor *BacalhauExecutor) prepareResults(jobId string, executionId strin
}
defer gzipReader.Close()


tarPath := strings.TrimSuffix(resultsPath, ".gz")
tarFile, err := os.Create(tarPath)
if err != nil {
Expand Down Expand Up @@ -216,13 +215,11 @@ func GenerateCID(path string) (string, error) {
}
defer file.Close()


ds := dsync.MutexWrap(datastore.NewNullDatastore())
bs := blockstore.NewBlockstore(ds)
bs = blockstore.NewIdStore(bs)
bsrv := blockservice.New(bs, offline.Exchange(bs))
dsrv := merkledag.NewDAGService(bsrv)


defer func() {
if err := bsrv.Close(); err != nil {
Expand All @@ -242,9 +239,8 @@ func GenerateCID(path string) (string, error) {
NoCopy: false,
}


bufReader := bufio.NewReader(file)

ufsBuilder, err := ufsImportParams.New(chunker.NewSizeSplitter(bufReader, chunker.DefaultBlockSize))
if err != nil {
return "", fmt.Errorf("failed to create UnixFS builder: %w", err)
Expand All @@ -258,12 +254,11 @@ func GenerateCID(path string) (string, error) {
return nd.Cid().String(), nil
}


func (executor *BacalhauExecutor) getJobID(
deal data.DealContainer,
module data.Module,
) (string, error) {
putJobResponse, err := executor.bacalhauClient.PostJob(module.Job)
putJobResponse, err := executor.bacalhauClient.postJob(module.Job)
if err != nil {
return "", fmt.Errorf("error creating job %s -> %s", deal.ID, err.Error())
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/executor/bacalhau/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ type BacalhauClient struct {
api clientv2.API
}

func NewBacalhauClient(apiHost string) (*BacalhauClient, error) {
func newBacalhauClient(apiHost string) (*BacalhauClient, error) {
client := clientv2.NewHTTPClient(apiHost)
apiClient := clientv2.NewAPI(client)
return &BacalhauClient{api: apiClient}, nil
}

func (c *BacalhauClient) GetID() (string, error) {
func (c *BacalhauClient) getID() (string, error) {

getNodeRequest := apimodels.GetAgentNodeRequest{}
response, err := c.api.Agent().Node(context.Background(), &getNodeRequest)
Expand All @@ -35,7 +35,7 @@ func (c *BacalhauClient) GetID() (string, error) {

}

func (c *BacalhauClient) Alive() (bool, error) {
func (c *BacalhauClient) alive() (bool, error) {

response, err := c.api.Agent().Alive(context.Background())
if err != nil {
Expand All @@ -45,7 +45,7 @@ func (c *BacalhauClient) Alive() (bool, error) {
return response.IsReady(), nil
}

func (c *BacalhauClient) GetVersion() (string, error) {
func (c *BacalhauClient) getVersion() (string, error) {
response, err := c.api.Agent().Version(context.Background())
if err != nil {
return "", fmt.Errorf("error getting bacalhau version %s", err.Error())
Expand All @@ -54,7 +54,7 @@ func (c *BacalhauClient) GetVersion() (string, error) {
return response.BuildVersionInfo.GitVersion, nil
}

func (c *BacalhauClient) PostJob(job bacalhau.Job) (*apimodels.PutJobResponse, error) {
func (c *BacalhauClient) postJob(job bacalhau.Job) (*apimodels.PutJobResponse, error) {

translatedJob := translateJob(job)

Expand All @@ -65,7 +65,7 @@ func (c *BacalhauClient) PostJob(job bacalhau.Job) (*apimodels.PutJobResponse, e
return c.api.Jobs().Put(context.Background(), &putJobRequest)
}

func (c *BacalhauClient) GetJob(jobID string) (*apimodels.GetJobResponse, error) {
func (c *BacalhauClient) getJob(jobID string) (*apimodels.GetJobResponse, error) {
getJobRequest := apimodels.GetJobRequest{
JobID: jobID,
Include: "executions",
Expand All @@ -79,7 +79,7 @@ func (c *BacalhauClient) GetJob(jobID string) (*apimodels.GetJobResponse, error)
return response, nil
}

func (c *BacalhauClient) GetJobResult(jobId string ) (string, error) {
func (c *BacalhauClient) getJobResult(jobId string ) (string, error) {
getJobResultsRequest := apimodels.ListJobResultsRequest{
JobID: jobId,
}
Expand All @@ -97,7 +97,7 @@ func (c *BacalhauClient) GetJobResult(jobId string ) (string, error) {



func (c *BacalhauClient) GetNodes() ([]*models.NodeState, error) {
func (c *BacalhauClient) getNodes() ([]*models.NodeState, error) {

getNodesRequest := apimodels.ListNodesRequest{}
response, err := c.api.Nodes().List(context.Background(), &getNodesRequest)
Expand All @@ -107,8 +107,8 @@ func (c *BacalhauClient) GetNodes() ([]*models.NodeState, error) {
return response.Nodes, nil
}

func (c *BacalhauClient) GetMachineSpecs() ([]data.MachineSpec, error) {
nodes, err := c.GetNodes()
func (c *BacalhauClient) getMachineSpecs() ([]data.MachineSpec, error) {
nodes, err := c.getNodes()
var specs []data.MachineSpec
for _, node := range nodes {
spec := data.MachineSpec{
Expand Down Expand Up @@ -173,6 +173,7 @@ func translateJob(job bacalhau.Job) *models.Job {
ExecutionTimeout: job.Spec.Timeout,
// TODO: add queue timeout and total timeout
},
ResultPaths: translateOutputSources(job.Spec.Outputs),
},
},
State: models.NewJobState(models.JobStateTypePending),
Expand Down
Loading

0 comments on commit 3c3564d

Please sign in to comment.