Skip to content

Commit

Permalink
Merge pull request #121 from swanchain/fix-fcp-ubi-failed
Browse files Browse the repository at this point in the history
Fix the bug that FCP fails to do zk-task
  • Loading branch information
Normalnoise authored Aug 2, 2024
2 parents a880b61 + 86105dc commit db2a3cb
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 129 deletions.
10 changes: 5 additions & 5 deletions build/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ var CurrentCommit string

var NetWorkTag string

const BuildVersion = "0.6.2"
const BuildVersion = "0.6.3"

const UBITaskImageIntelCpu = "filswan/ubi-worker-cpu-intel:v2.0"
const UBITaskImageIntelGpu = "filswan/ubi-worker-gpu-intel:v2.0"
const UBITaskImageAmdCpu = "filswan/ubi-worker-cpu-amd:v2.0"
const UBITaskImageAmdGpu = "filswan/ubi-worker-gpu-amd:v2.0"
const UBITaskImageIntelCpu = "filswan/ubi-worker-cpu-intel:latest"
const UBITaskImageIntelGpu = "filswan/ubi-worker-gpu-intel:latest"
const UBITaskImageAmdCpu = "filswan/ubi-worker-cpu-amd:latest"
const UBITaskImageAmdGpu = "filswan/ubi-worker-gpu-amd:latest"
const UBIResourceExporterDockerImage = "filswan/resource-exporter:v11.2.8"

func UserVersion() string {
Expand Down
2 changes: 1 addition & 1 deletion config.toml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ WalletBlackList = "" # CP re
UbiEnginePk = "0xB5aeb540B4895cd024c1625E146684940A849ED9" # UBI Engine's public key, CP only accept the task from this UBI engine
EnableSequencer = true # Enable aa Sequencer to receive zk-task proofs
AutoChainProof = true # Sequencer insufficient balance or service unavailable, use chain to submit proof
SequencerUrl = "http://sequencer.swanchain.io" # Sequencer service's API address
SequencerUrl = "https://sequencer.swanchain.io" # Sequencer service's API address

[LOG]
CrtFile = "/YOUR_DOMAIN_NAME_CRT_PATH/server.crt" # Your domain name SSL .crt file path
Expand Down
25 changes: 14 additions & 11 deletions internal/computing/cron_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package computing
import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -42,6 +41,7 @@ func (task *CronTask) RunTask() {
task.watchExpiredTask()
task.getUbiTaskReward()
task.checkJobReward()
task.cleanImageResource()
}

func checkJobStatus() {
Expand Down Expand Up @@ -111,6 +111,18 @@ func (task *CronTask) watchNameSpaceForDeleted() {
}
}
}
})
c.Start()
}

func (task *CronTask) cleanImageResource() {
c := cron.New(cron.WithSeconds())
c.AddFunc("@every 24h", func() {
defer func() {
if err := recover(); err != nil {
logs.GetLogger().Errorf("cleanImageResource catch panic error: %+v", err)
}
}()
NewDockerService().CleanResource()
})
c.Start()
Expand Down Expand Up @@ -339,7 +351,7 @@ func (task *CronTask) checkJobReward() {

func (task *CronTask) getUbiTaskReward() {
c := cron.New(cron.WithSeconds())
c.AddFunc("* * 0/1 * ?", func() {
c.AddFunc("* 0/10 * * ?", func() {
defer func() {
if err := recover(); err != nil {
logs.GetLogger().Errorf("task job: [GetUbiTaskReward], error: %+v", err)
Expand Down Expand Up @@ -463,15 +475,6 @@ func checkFcpJobInfoInChain(job *models.JobEntity) {

}

func checkHealth(url string) bool {
response, err := http.Get(url)
if err != nil {
return false
}
defer response.Body.Close()
return response.StatusCode == 200
}

type TaskGroup struct {
Items []*models.TaskEntity
Ids []int64
Expand Down
100 changes: 0 additions & 100 deletions internal/computing/docker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import (
"github.com/filswan/go-mcs-sdk/mcs/api/common/logs"
"github.com/swanchain/go-computing-provider/build"
"io"
"log"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
Expand Down Expand Up @@ -70,104 +68,6 @@ func ExtractExposedPort(dockerfilePath string) (string, error) {

return exposedPort, nil
}
func RunContainer(imageName, dockerfilePath string) string {
exposedPort, err := ExtractExposedPort(dockerfilePath)
if err != nil {
log.Printf("Failed to extract exposed port: %v", err)
return ""
}

portMapping := exposedPort + ":" + exposedPort
err = RemoveContainerIfExists(imageName)
if err != nil {
log.Printf("Failed to remove existing container: %v", err)
return ""
}
var stdout bytes.Buffer
var stderr bytes.Buffer
cmd := exec.Command("docker", "run", "-d", "-p", portMapping, imageName)
cmd.Stdout = &stdout
cmd.Stderr = &stderr

err = cmd.Run()
if err != nil {
log.Printf("run container error: %v\n%s", err, stderr.String())
return ""
}

containerID := strings.TrimSpace(stdout.String())

// Clear the stdout buffer
stdout.Reset()

cmd = exec.Command("docker", "port", containerID)
cmd.Stdout = &stdout
cmd.Stderr = &stderr

err = cmd.Run()
if err != nil {
log.Printf("get container port error: %v\n%s", err, stderr.String())
return ""
}

portMapping = strings.TrimSpace(stdout.String())
fmt.Printf("Port mapping: %s\n", portMapping)

re := regexp.MustCompile(`0\.0\.0\.0:(\d+)`)
match := re.FindStringSubmatch(portMapping)
if len(match) < 2 {
log.Printf("unexpected port mapping format: %s", portMapping)
return ""
}

hostPort := match[1]

// Replace "0.0.0.0" with the desired IP address (e.g., "127.0.0.1")
hostIP := "127.0.0.1"

url := "http://" + hostIP + ":" + hostPort
return url
}

func RemoveContainerIfExists(imageName string) error {
var stdout bytes.Buffer
var stderr bytes.Buffer

cmd := exec.Command("docker", "ps", "-a", "-q", "--filter", "ancestor="+imageName)
cmd.Stdout = &stdout
cmd.Stderr = &stderr

err := cmd.Run()
if err != nil {
log.Printf("list containers error: %v\n%s", err, stderr.String())
return err
}

containerIDs := strings.Split(strings.TrimSpace(stdout.String()), "\n")
if len(containerIDs) == 0 || containerIDs[0] == "" {
log.Printf("No container with image %s found.", imageName)
return nil
}

for _, containerID := range containerIDs {
stdout.Reset()
stderr.Reset()

cmd = exec.Command("docker", "rm", "-f", containerID)
cmd.Stdout = &stdout
cmd.Stderr = &stderr

err = cmd.Run()
if err != nil {
log.Printf("remove container error: %v\n%s", err, stderr.String())
return err
}

log.Printf("Removed container with ID %s", containerID)
}

return nil
}

func (ds *DockerService) BuildImage(buildPath, imageName string) error {
// Create a buffer
Expand Down
48 changes: 38 additions & 10 deletions internal/computing/ubi.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import (
)

func DoUbiTaskForK8s(c *gin.Context) {
if !conf.GetConfig().UBI.EnableSequencer && !conf.GetConfig().UBI.AutoChainProof {
c.JSON(http.StatusInternalServerError, util.CreateErrorResponse(util.RejectZkTaskError))
return
}

var ubiTask models.UBITaskReq
if err := c.ShouldBindJSON(&ubiTask); err != nil {
Expand Down Expand Up @@ -280,6 +284,12 @@ func DoUbiTaskForK8s(c *gin.Context) {
execCommand := []string{"ubi-bench", "c2"}
JobName := strings.ToLower(models.UbiTaskTypeStr(ubiTask.Type)) + "-" + strconv.Itoa(ubiTask.ID)
filC2Param := envVars["FIL_PROOFS_PARAMETER_CACHE"]

if len(strings.TrimSpace(filC2Param)) == 0 {
logs.GetLogger().Warnf("task_id: %d, `FIL_PROOFS_PARAMETER_CACHE` variable is not configured", ubiTask.ID)
return
}

if gpuFlag == "0" {
delete(envVars, "RUST_GPU_TOOLS_CUSTOM_GPU")
envVars["BELLMAN_NO_GPU"] = "1"
Expand Down Expand Up @@ -348,7 +358,7 @@ func DoUbiTaskForK8s(c *gin.Context) {
},
},
},
RestartPolicy: "Never",
RestartPolicy: v1.RestartPolicyNever,
},
},
BackoffLimit: new(int32),
Expand All @@ -357,7 +367,7 @@ func DoUbiTaskForK8s(c *gin.Context) {
}

*job.Spec.BackoffLimit = 1
*job.Spec.TTLSecondsAfterFinished = 120
*job.Spec.TTLSecondsAfterFinished = 300

if _, err = k8sService.k8sClient.BatchV1().Jobs(namespace).Create(context.TODO(), job, metaV1.CreateOptions{}); err != nil {
logs.GetLogger().Errorf("Failed creating ubi task job: %v", err)
Expand All @@ -369,9 +379,14 @@ func DoUbiTaskForK8s(c *gin.Context) {
LabelSelector: fmt.Sprintf("job-name=%s", JobName),
})
if err != nil {
logs.GetLogger().Errorf("failed get pod, taskId: %d, error: %v,retrying", ubiTask.ID, err)
return false, err
}

if len(pods.Items) == 0 {
return false, nil
}

for _, p := range pods.Items {
for _, condition := range p.Status.Conditions {
if condition.Type != coreV1.PodReady && condition.Status != coreV1.ConditionTrue {
Expand Down Expand Up @@ -400,8 +415,10 @@ func DoUbiTaskForK8s(c *gin.Context) {
break
}
if podName == "" {
logs.GetLogger().Errorf("failed get pod name, taskId: %d", ubiTask.ID)
return
}
logs.GetLogger().Infof("successfully get pod name, taskId: %d, podName: %s", ubiTask.ID, podName)

req := k8sService.k8sClient.CoreV1().Pods(namespace).GetLogs(podName, &v1.PodLogOptions{
Container: "",
Expand Down Expand Up @@ -469,6 +486,11 @@ func ReceiveUbiProof(c *gin.Context) {
}

func DoUbiTaskForDocker(c *gin.Context) {
if !conf.GetConfig().UBI.EnableSequencer && !conf.GetConfig().UBI.AutoChainProof {
c.JSON(http.StatusInternalServerError, util.CreateErrorResponse(util.RejectZkTaskError))
return
}

var ubiTask models.UBITaskReq
if err := c.ShouldBindJSON(&ubiTask); err != nil {
c.JSON(http.StatusBadRequest, util.CreateErrorResponse(util.JsonError))
Expand Down Expand Up @@ -652,6 +674,10 @@ func DoUbiTaskForDocker(c *gin.Context) {
if !ok {
filC2Param = "/var/tmp/filecoin-proof-parameters"
}
if len(strings.TrimSpace(filC2Param)) == 0 {
logs.GetLogger().Warnf("task_id: %d, `FIL_PROOFS_PARAMETER_CACHE` variable is not configured", ubiTask.ID)
return
}

hostConfig := &container.HostConfig{
Binds: []string{fmt.Sprintf("%s:/var/tmp/filecoin-proof-parameters", filC2Param)},
Expand All @@ -673,7 +699,7 @@ func DoUbiTaskForDocker(c *gin.Context) {

dockerService := NewDockerService()
if err = dockerService.ContainerCreateAndStart(containerConfig, hostConfig, containerName); err != nil {
logs.GetLogger().Errorf("create ubi task container failed, error: %v", err)
logs.GetLogger().Errorf("failed to create ubi task container, task_id: %d, error: %v", ubiTask.ID, err)
return
}

Expand Down Expand Up @@ -1048,7 +1074,7 @@ func reportClusterResourceForDocker() {

func CronTaskForEcp() {
go func() {
ticker := time.NewTicker(10 * time.Minute)
ticker := time.NewTicker(24 * time.Hour)
for range ticker.C {
NewDockerService().CleanResource()
}
Expand Down Expand Up @@ -1139,10 +1165,14 @@ func syncTaskStatusForSequencerService() error {
}
} else {
switch t.Status {
case "verified":
status = models.TASK_VERIFIED_STATUS
case "received":
status = models.TASK_RECEIVED_STATUS
case "running":
status = models.TASK_RUNNING_STATUS
case "submitted":
status = models.TASK_SUBMITTED_STATUS
case "verified":
status = models.TASK_VERIFIED_STATUS
case "rewarded":
status = models.TASK_REWARDED_STATUS
case "invalid":
Expand All @@ -1153,6 +1183,8 @@ func syncTaskStatusForSequencerService() error {
status = models.TASK_TIMEOUT_STATUS
case "verifyFailed":
status = models.TASK_VERIFYFAILED_STATUS
case "failed":
status = models.TASK_FAILED_STATUS
default:
status = models.TASK_UNKNOWN_STATUS
}
Expand Down Expand Up @@ -1341,10 +1373,6 @@ func submitTaskToSequencer(proof string, task *models.TaskEntity, timeOut int64,
}

func checkBalance(cpAccountAddress string) (bool, error) {
if !conf.GetConfig().UBI.EnableSequencer && !conf.GetConfig().UBI.AutoChainProof {
return false, fmt.Errorf("do not accept tasks")
}

chainUrl, err := conf.GetRpcByNetWorkName()
if err != nil {
return false, fmt.Errorf("failed to get rpc url, cpAccount: %s, error: %v", cpAccountAddress, err)
Expand Down
19 changes: 18 additions & 1 deletion internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package db

import (
_ "embed"
"fmt"
"github.com/swanchain/go-computing-provider/internal/models"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"os"
"path"
"time"
)

const cpDBName = "provider.db"
Expand All @@ -15,7 +19,13 @@ var DB *gorm.DB
func InitDb(cpRepoPath string) {
var err error

DB, err = gorm.Open(sqlite.Open(path.Join(cpRepoPath, cpDBName)), &gorm.Config{})
DB, err = gorm.Open(sqlite.Open(path.Join(cpRepoPath, cpDBName)), &gorm.Config{
Logger: logger.New(myLog{}, logger.Config{
SlowThreshold: 5 * time.Second, // Slow SQL threshold
LogLevel: logger.Error, // Log level
Colorful: false, // Disable color
}),
})
if err != nil {
panic("failed to connect database")
}
Expand All @@ -31,3 +41,10 @@ func InitDb(cpRepoPath string) {
func NewDbService() *gorm.DB {
return DB
}

type myLog struct {
}

func (c myLog) Printf(format string, args ...interface{}) {
fmt.Fprintf(os.Stdout, format, args...)
}
2 changes: 1 addition & 1 deletion ubi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ curl -fsSL https://raw.githubusercontent.com/swanchain/go-computing-provider/rel
## Install ECP and Init CP Account
- Download `computing-provider`
```bash
wget https://github.com/swanchain/go-computing-provider/releases/download/v0.6.2/computing-provider
wget https://github.com/swanchain/go-computing-provider/releases/download/v0.6.3/computing-provider
```

- Initialize ECP repo
Expand Down
Loading

0 comments on commit db2a3cb

Please sign in to comment.