Skip to content

Commit

Permalink
feat: add timeout support to filestore
Browse files Browse the repository at this point in the history
resolve #11
  • Loading branch information
criyle committed Apr 18, 2021
1 parent 222eb97 commit ba8d392
Show file tree
Hide file tree
Showing 10 changed files with 276 additions and 46 deletions.
3 changes: 2 additions & 1 deletion README.cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

沙箱相关:

- 默认同时运行任务数为 `4` ,使用 `-parallelism` 指定
- 默认同时运行任务数为和 CPU 数量相同,使用 `-parallelism` 指定
- 默认文件存储在内存里,使用 `-dir` 指定本地目录为文件存储
- 默认 cgroup 的前缀为 `executor_server` ,使用 `-cgroup-prefix` 指定
- 默认没有磁盘文件复制限制,使用 `-src-prefix` 限制 copyIn 操作文件目录前缀(需要绝对路径)
Expand All @@ -66,6 +66,7 @@
- 使用 `-seccomp-conf` 指定 `seecomp` 过滤器(需要编译标志 `seccomp`,默认不开启)(仅 Linux)
- 使用 `-pre-fork` 指定启动时创建的容器数量
- 使用 `-tmp-fs-param` 指定容器内 `tmpfs` 的挂载参数(仅 Linux)
- 使用 `-file-timeout` 指定文件存储文件最大时间。超出时间的文件将会删除。(举例 `30m`

### 环境变量

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Server:

Sandbox:

- The default concurrency is `4`, Can be specified with `-parallelism` flag.
- The default concurrency equal to number of CPU, Can be specified with `-parallelism` flag.
- The default file store is in memory, local cache can be specified with `-dir` flag.
- The default CGroup prefix is `executor_server`, Can be specified with `-cgroup-prefix` flag.
- `-src-prefix` to restrict `src` copyIn path (need to be absolute path)
Expand All @@ -69,6 +69,7 @@ Sandbox:
- the program killed by seccomp filter will have status `Dangerous Syscall`
- `-pre-fork` specifies number of container to create when server starts
- `-tmp-fs-param` specifies the tmpfs parameter for `/w` and `/tmp` when using default mounting (Linux only)
- `-file-timeout` specifies maximum TTL for file created in file store (e.g. `30m`)

### Environment Variables

Expand Down
7 changes: 6 additions & 1 deletion cmd/executorserver/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"os"
"runtime"
"time"

"github.com/criyle/go-judge/envexec"
Expand All @@ -17,7 +18,7 @@ type Config struct {
NetShare bool `flagUsage:"share net namespace with host"`
MountConf string `flagUsage:"specifies mount configuration file" default:"mount.yaml"`
SeccompConf string `flagUsage:"specifies seccomp filter" default:"seccomp.yaml"`
Parallelism int `flagUsage:"control the # of concurrency execution" default:"4"`
Parallelism int `flagUsage:"control the # of concurrency execution (default equal to number of cpu)"`
CgroupPrefix string `flagUsage:"control cgroup prefix" default:"executor_server"`
ContainerCredStart int `flagUsage:"control the start uid&gid for container" default:"10000"`

Expand All @@ -33,6 +34,7 @@ type Config struct {
Cpuset string `flagUsage:"control the usage of cpuset for all containerd process"`
EnableCPURate bool `flagUsage:"enable cpu cgroup rate control"`
CPUCfsPeriod time.Duration `flagUsage:"set cpu.cfs_period" default:"100ms"`
FileTimeout time.Duration `flagUsage:"specified timeout for filestore files"`

// server config
HTTPAddr string `flagUsage:"specifies the http binding address" default:":5050"`
Expand Down Expand Up @@ -63,5 +65,8 @@ func (c *Config) Load() error {
if os.Getpid() == 1 {
c.Release = true
}
if c.Parallelism <= 0 {
c.Parallelism = runtime.NumCPU()
}
return cl.Load(c)
}
12 changes: 10 additions & 2 deletions cmd/executorserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func main() {
logger.Sugar().Infof("config loaded: %+v", conf)

// Init environment pool
fs := newFilsStore(conf.Dir)
fs := newFilsStore(conf.Dir, conf.FileTimeout, conf.EnableMetrics)
b := newEnvBuilder(conf)
envPool := pool.NewPool(b)
prefork(envPool, conf.PreFork)
Expand Down Expand Up @@ -275,14 +275,22 @@ func grpcTokenAuth(token string) func(context.Context) (context.Context, error)
}
}

func newFilsStore(dir string) filestore.FileStore {
func newFilsStore(dir string, fileTimeout time.Duration, enableMetrics bool) filestore.FileStore {
const timeoutCheckInterval = 15 * time.Second

var fs filestore.FileStore
if dir == "" {
fs = filestore.NewFileMemoryStore()
} else {
os.MkdirAll(dir, 0755)
fs = filestore.NewFileLocalStore(dir)
}
if enableMetrics {
fs = newMetricsFileStore(fs)
}
if fileTimeout > 0 {
fs = filestore.NewTimeout(fs, fileTimeout, timeoutCheckInterval)
}
return fs
}

Expand Down
89 changes: 87 additions & 2 deletions cmd/executorserver/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package main

import (
"sync"
"time"

"github.com/criyle/go-judge/filestore"
"github.com/criyle/go-judge/worker"
"github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -18,8 +20,10 @@ var (
0.4, 0.6, 0.8, 1.0, 1.5, 2, 5, 10,
}

// 4k (1<<12) -> 1g (1<<30)
memoryBucket = prometheus.ExponentialBuckets(1<<12, 2, 19)
// 4k (1<<12) -> 4g (1<<32)
memoryBucket = prometheus.ExponentialBuckets(1<<12, 2, 21)
// 256 byte (1<<8) -> 256m (1<<28)
fileSizeBucket = prometheus.ExponentialBuckets(1<<8, 2, 20)

metricsSummaryQuantile = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}

Expand Down Expand Up @@ -56,12 +60,39 @@ var (
Help: "Summary for the memory",
Objectives: metricsSummaryQuantile,
}, []string{"status"})

fsSizeHist = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Name: "file_size_bytes",
Help: "Histgram for the file size in the file store",
Buckets: fileSizeBucket,
})

fsSizeSummary = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: metricsNamespace,
Name: "file_size",
Help: "Summary for the file size in the file store",
Objectives: metricsSummaryQuantile,
})

fsTotalCount = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "file_current_total",
Help: "Total number of current files in the file store",
})

fsTotalSize = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "file_size_current_total",
Help: "Total size of current files in the file store",
})
)

func init() {
prometheus.MustRegister(execErrorCount)
prometheus.MustRegister(execTimeHist, execTimeSummary)
prometheus.MustRegister(execMemHist, execMemSummary)
prometheus.MustRegister(fsSizeHist, fsSizeSummary, fsTotalSize)
}

func execObserve(res worker.Response) {
Expand All @@ -79,3 +110,57 @@ func execObserve(res worker.Response) {
execMemSummary.WithLabelValues(status).Observe(mob)
}
}

var _ filestore.FileStore = &metricsFileStore{}

type metricsFileStore struct {
mu sync.Mutex
filestore.FileStore
fileSize map[string]int
}

func newMetricsFileStore(fs filestore.FileStore) filestore.FileStore {
return &metricsFileStore{
FileStore: fs,
fileSize: make(map[string]int),
}
}

func (m *metricsFileStore) Add(name string, content []byte) (string, error) {
id, err := m.FileStore.Add(name, content)
if err != nil {
return "", err
}

m.mu.Lock()
defer m.mu.Unlock()

s := len(content)
m.fileSize[id] = s

sf := float64(s)
fsSizeHist.Observe(sf)
fsSizeSummary.Observe(sf)
fsTotalSize.Add(sf)
fsTotalCount.Inc()

return id, nil
}

func (m *metricsFileStore) Remove(id string) bool {
success := m.FileStore.Remove(id)

m.mu.Lock()
defer m.mu.Unlock()

s, ok := m.fileSize[id]
if !ok {
return success
}

sf := float64(s)
fsTotalSize.Sub(sf)
fsTotalCount.Dec()

return success
}
4 changes: 2 additions & 2 deletions filestore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

const randIDLength = 12

var errUniqueIDNotGenerated = errors.New("Unique id does not exists after tried 50 times")
var errUniqueIDNotGenerated = errors.New("unique id does not exists after tried 50 times")

// FileStore defines interface to store file
type FileStore interface {
Expand All @@ -31,7 +31,7 @@ func generateID() (string, error) {
if _, err := base32.NewEncoder(base32.StdEncoding, &buf).Write(b); err != nil {
return "", err
}
return string(buf.Bytes()), nil
return buf.String(), nil
}

func generateUniqueID(isExists func(string) (bool, error)) (string, error) {
Expand Down
133 changes: 133 additions & 0 deletions filestore/timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package filestore

import (
"container/heap"
"sync"
"time"

"github.com/criyle/go-judge/envexec"
)

var (
_ FileStore = &Timeout{}
_ heap.Interface = &Timeout{}
)

// Timeout is a file system with a maximun TTL
type Timeout struct {
mu sync.Mutex
FileStore
timeout time.Duration
files []timeoutFile
idToIndex map[string]int
}

type timeoutFile struct {
id string
time time.Time
}

// NewTimeout creates a timeout file system with maximun TTL for a file
func NewTimeout(fs FileStore, timeout time.Duration, checkInterval time.Duration) FileStore {
t := &Timeout{
FileStore: fs,
timeout: timeout,
files: make([]timeoutFile, 0),
idToIndex: make(map[string]int),
}
go t.checkTimeoutLoop(checkInterval)
return t
}

func (t *Timeout) checkTimeoutLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
for {
t.checkTimeoutAndRemove()
<-ticker.C
}
}

func (t *Timeout) checkTimeoutAndRemove() {
t.mu.Lock()
defer t.mu.Unlock()

now := time.Now()
for len(t.files) > 0 && t.files[0].time.Add(t.timeout).Before(now) {
f := t.files[0]
t.FileStore.Remove(f.id)
heap.Pop(t)
}
}

func (t *Timeout) Len() int {
return len(t.files)
}

func (t *Timeout) Less(i, j int) bool {
return t.files[i].time.Before(t.files[j].time)
}

func (t *Timeout) Swap(i, j int) {
t.files[i], t.files[j] = t.files[j], t.files[i]
t.idToIndex[t.files[i].id] = i
t.idToIndex[t.files[j].id] = j
}

func (t *Timeout) Push(x interface{}) {
e := x.(timeoutFile)
t.files = append(t.files, e)
t.idToIndex[e.id] = len(t.files) - 1
}

func (t *Timeout) Pop() interface{} {
e := t.files[len(t.files)-1]
t.files = t.files[:len(t.files)-1]
delete(t.idToIndex, e.id)
return e
}

func (t *Timeout) Add(name string, content []byte) (string, error) {
// try add to file store underlying
id, err := t.FileStore.Add(name, content)
if err != nil {
return "", err
}

t.mu.Lock()
defer t.mu.Unlock()

f := timeoutFile{id, time.Now()}
heap.Push(t, f)

return id, nil
}

func (t *Timeout) Remove(id string) bool {
success := t.FileStore.Remove(id)

t.mu.Lock()
defer t.mu.Unlock()

index, ok := t.idToIndex[id]
if !ok {
return success
}
heap.Remove(t, index)
return success
}

func (t *Timeout) Get(id string) (string, envexec.File) {
name, file := t.FileStore.Get(id)

t.mu.Lock()
defer t.mu.Unlock()

index, ok := t.idToIndex[id]
if !ok {
return name, file
}
t.files[index].time = time.Now()
heap.Fix(t, index)

return name, file
}
Loading

0 comments on commit ba8d392

Please sign in to comment.