Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/pvc — Support for multi-node scheduling and S3-backed working directories (S3 Mountpoint CSI) #1107

Merged
merged 9 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- name: Set up Go 1.x
uses: actions/setup-go@v5
with:
go-version: 1.21
go-version: 1.22

- name: Check out code
uses: actions/checkout@v2
Expand All @@ -23,6 +23,7 @@ jobs:
key: ${{ runner.os }}-funnel-bin-${{ hashFiles('**/go.sum') }}-${{ github.ref }}
restore-keys: |
${{ runner.os }}-funnel-bin-${{ github.ref }}
${{ runner.os }}-funnel-bin-

- name: Build Funnel (if cache doesn't exist)
run: |
Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/k8s.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ jobs:
- name: Deploy Funnel
run: |
helm repo add ohsu https://ohsu-comp-bio.github.io/helm-charts
# 'local-path' is a k3d specific storage class
# Ref: https://k3d.io/v5.7.4/usage/k3s/#local-path-provisioner

# 'local-path' is a k3d specific storage class used to automatically create a PersistentVolume
# - Ref: https://k3d.io/v5.7.4/usage/k3s/#local-path-provisioner
helm upgrade --install funnel ohsu/funnel --set storage.className=local-path --set storage.provisioner=local-path

# Wait for the Deployment to be available
Expand All @@ -45,6 +46,9 @@ jobs:
# Port-forward the service
kubectl port-forward svc/funnel 8000:8000 &

- name: Setup tmate session
uses: mxschmitt/action-tmate@v3

- name: Submit Task
run: |
export PATH="$PATH:$(pwd)"
Expand Down
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ RUN --mount=type=cache,target=/root/.cache/go-build make build
# final stage
FROM alpine
WORKDIR /opt/funnel
VOLUME /opt/funnel/funnel-work-dir
EXPOSE 8000 9090
ENV PATH="/app:${PATH}"
COPY --from=build-env /go/src/github.com/ohsu-comp-bio/funnel/funnel /app/
Expand Down
25 changes: 20 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
[![Build Status](https://img.shields.io/github/actions/workflow/status/ohsu-comp-bio/funnel/tests.yaml)](https://github.com/ohsu-comp-bio/funnel/actions/workflows/tests.yaml)
[![Compliance Tests Status](https://img.shields.io/github/actions/workflow/status/ohsu-comp-bio/funnel/compliance-test.yaml?label=Compliance%20Tests)](https://github.com/ohsu-comp-bio/funnel/actions/workflows/compliance-test.yaml)
[![Gitter](https://badges.gitter.im/ohsu-comp-bio/funnel.svg)](https://gitter.im/ohsu-comp-bio/funnel)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Godoc](https://img.shields.io/badge/godoc-ref-blue.svg)](http://godoc.org/github.com/ohsu-comp-bio/funnel)
[![Build Status][build-badge]][build]
[![Compliance Tests Status][compliance-tests-badge]][compliance-tests]
[![Gitter][gitter-badge]][gitter]
[![License: MIT][license-badge]][license]
[![Godoc][godoc-badge]][godoc]

[build-badge]: https://img.shields.io/github/actions/workflow/status/ohsu-comp-bio/funnel/tests.yaml
[build]: https://github.com/ohsu-comp-bio/funnel/actions/workflows/tests.yaml

[compliance-tests]: https://github.com/ohsu-comp-bio/funnel/actions/workflows/compliance-test.yaml
[compliance-tests-badge]: https://img.shields.io/github/actions/workflow/status/ohsu-comp-bio/funnel/compliance-test.yaml?label=Compliance%20Tests

[gitter-badge]: https://badges.gitter.im/ohsu-comp-bio/funnel.svg
[gitter]: https://gitter.im/ohsu-comp-bio/funnel

[license-badge]: https://img.shields.io/badge/License-MIT-yellow.svg
[license]: https://opensource.org/licenses/MIT

[godoc-badge]: https://img.shields.io/badge/godoc-ref-blue.svg
[godoc]: http://godoc.org/github.com/ohsu-comp-bio/funnel

<a title="Funnel Homepage" href="https://ohsu-comp-bio.github.io/funnel">
<img title="Funnel Logo" src="https://github.com/user-attachments/assets/f51cf06b-d802-4e20-bde1-bcd1fc5657e6" />
Expand Down
148 changes: 104 additions & 44 deletions compute/kubernetes/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8errors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -68,13 +67,17 @@ func NewBackend(ctx context.Context, conf config.Kubernetes, reader tes.ReadOnly
}

b := &Backend{
client: clientset.BatchV1().Jobs(conf.Namespace),
namespace: conf.Namespace,
template: conf.Template,
event: writer,
database: reader,
log: log,
config: kubeconfig,
bucket: conf.Bucket,
region: conf.Region,
client: clientset.BatchV1().Jobs(conf.Namespace),
namespace: conf.Namespace,
template: conf.Template,
pvTemplate: conf.PVTemplate,
pvcTemplate: conf.PVCTemplate,
event: writer,
database: reader,
log: log,
config: kubeconfig,
}

if !conf.DisableReconciler {
Expand All @@ -87,9 +90,13 @@ func NewBackend(ctx context.Context, conf config.Kubernetes, reader tes.ReadOnly

// Backend represents the local backend.
type Backend struct {
bucket string
region string
client batchv1.JobInterface
namespace string
template string
pvTemplate string
pvcTemplate string
event events.Writer
database tes.ReadOnlyServer
log *logger.Logger
Expand Down Expand Up @@ -133,7 +140,7 @@ func (b *Backend) Close() {
//TODO: close database?
}

// Create the Funnel Worker job
// Create the Funnel Worker job from kubernetes-template.yaml
// Executor job is created in worker/kubernetes.go#Run
func (b *Backend) createJob(task *tes.Task) (*v1.Job, error) {
submitTpl, err := template.New(task.Id).Parse(b.template)
Expand All @@ -155,7 +162,7 @@ func (b *Backend) createJob(task *tes.Task) (*v1.Job, error) {
"DiskGb": res.GetDiskGb(),
})
if err != nil {
return nil, fmt.Errorf("executing template: %v", err)
return nil, fmt.Errorf("executing Worker template: %v", err)
}

decode := scheme.Codecs.UniversalDeserializer().Decode
Expand All @@ -171,43 +178,74 @@ func (b *Backend) createJob(task *tes.Task) (*v1.Job, error) {
return job, nil
}

func (b *Backend) createPVC(ctx context.Context, taskID string, resources *tes.Resources) error {
clientset, err := kubernetes.NewForConfig(b.config) // You'll need to store the config during NewBackend
// Create the Worker/Executor PVC from config/kubernetes-pvc.yaml
// TODO: Move this config file to Helm Charts so users can see/customize it
func (b *Backend) createPVC(task *tes.Task) (*corev1.PersistentVolumeClaim, error) {
// Load templates
pvcTpl, err := template.New(task.Id).Parse(b.pvcTemplate)
if err != nil {
return fmt.Errorf("getting kubernetes client: %v", err)
return nil, fmt.Errorf("parsing template: %v", err)
}

// Template parameters
var buf bytes.Buffer
err = pvcTpl.Execute(&buf, map[string]interface{}{
"TaskId": task.Id,
"Namespace": b.namespace,
"Bucket": b.bucket,
"Region": b.region,
})
if err != nil {
return nil, fmt.Errorf("executing PVC template: %v", err)
}

storageSize := resource.NewQuantity(1024*1024*1024, resource.BinarySI) // 1Gi default
if resources != nil && resources.DiskGb > 0 {
storageSize = resource.NewQuantity(int64(resources.DiskGb*1024*1024*1024), resource.BinarySI)
}

pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("funnel-pvc-%s", taskID),
Labels: map[string]string{
"app": "funnel",
"taskId": taskID,
},
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: *storageSize,
},
},
},
}

_, err = clientset.CoreV1().PersistentVolumeClaims(b.namespace).Create(ctx, pvc, metav1.CreateOptions{})
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode(buf.Bytes(), nil, nil)
if err != nil {
return fmt.Errorf("creating shared PVC: %v", err)
return nil, fmt.Errorf("decoding PVC spec: %v", err)
}

return nil
fmt.Println("PVC spec: ", string(buf.Bytes()))
pvc, ok := obj.(*corev1.PersistentVolumeClaim)
if !ok {
return nil, fmt.Errorf("failed to decode PVC spec")
}
return pvc, nil
}

// Create the Worker/Executor PV from config/kubernetes-pv.yaml
// TODO: Move this config file to Helm Charts so users can see/customize it
func (b *Backend) createPV(task *tes.Task) (*corev1.PersistentVolume, error) {
// Load templates
pvTpl, err := template.New(task.Id).Parse(b.pvTemplate)
if err != nil {
return nil, fmt.Errorf("parsing template: %v", err)
}

// Template parameters
var buf bytes.Buffer
err = pvTpl.Execute(&buf, map[string]interface{}{
"TaskId": task.Id,
"Namespace": b.namespace,
"Bucket": b.bucket,
"Region": b.region,
})
if err != nil {
return nil, fmt.Errorf("executing PV template: %v", err)
}

decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode(buf.Bytes(), nil, nil)
if err != nil {
return nil, fmt.Errorf("decoding PV spec: %v", err)
}

fmt.Println("PV spec: ", string(buf.Bytes()))
pv, ok := obj.(*corev1.PersistentVolume)
if !ok {
return nil, fmt.Errorf("failed to decode PV spec")
}
return pv, nil
}

// Add this helper function for PVC cleanup
Expand Down Expand Up @@ -235,13 +273,35 @@ func (b *Backend) Submit(ctx context.Context, task *tes.Task) error {
// Create a new background context instead of inheriting from the potentially canceled one
submitCtx := context.Background()

// TODO: Update this so that a PVC is only created if the task has inputs or outputs
// TODO: Update this so that a PVC/PV is only created if the task has inputs or outputs
// If the task has either inputs or outputs, then create a PVC
// shared between the Funnel Worker and the Executor
// e.g. `if len(task.Inputs) > 0 || len(task.Outputs) > 0 {}`
err := b.createPVC(submitCtx, task.Id, task.GetResources())
pvc, err := b.createPVC(task)
if err != nil {
return fmt.Errorf("creating shared storage PVC: %v", err)
}

pv, err := b.createPV(task)
if err != nil {
return fmt.Errorf("creating shared storage PV: %v", err)
}

clientset, err := kubernetes.NewForConfig(b.config)
if err != nil {
return fmt.Errorf("getting kubernetes client: %v", err)
}

// Create PVC
pvc, err = clientset.CoreV1().PersistentVolumeClaims(b.namespace).Create(context.Background(), pvc, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("creating PVC: %v", err)
}

// Create PV
pv, err = clientset.CoreV1().PersistentVolumes().Create(context.Background(), pv, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("creating shared storage: %v", err)
return fmt.Errorf("creating PV: %v", err)
}

// Create the worker job
Expand Down
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ func (h FTPStorage) Valid() bool {

// Kubernetes describes the configuration for the Kubernetes compute backend.
type Kubernetes struct {
// The bucket to use for the task's Working Directory
Bucket string
// The region to use for the task's Bucket
Region string
// The executor used to execute tasks. Available executors: docker, kubernetes
Executor string
// Turn off task state reconciler. When enabled, Funnel communicates with Kuberenetes
Expand All @@ -428,6 +432,10 @@ type Kubernetes struct {
ExecutorTemplate string
// ExecutorTemplateFile is the path to the executor template.
ExecutorTemplateFile string
// Worker/Executor PV job template.
PVTemplate string
// Worker/Executor PVC job template.
PVCTemplate string
// Path to the Kubernetes configuration file, otherwise assumes the Funnel server is running in a pod and
// attempts to use https://godoc.org/k8s.io/client-go/rest#InClusterConfig to infer configuration.
ConfigFile string
Expand Down
2 changes: 1 addition & 1 deletion config/default-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ AWSBatch:
# Kubernetes describes the configuration for the Kubernetes compute backend.
Kubernetes:
# The executor used to execute tasks. Available executors: docker, kubernetes
Executor: "kubernetes"
Executor: "docker"
# Turn off task state reconciler. When enabled, Funnel communicates with Kubernetes
# to find tasks that are stuck in a queued state or errored and
# updates the task state accordingly.
Expand Down
6 changes: 6 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,17 @@ func DefaultConfig() Config {

kubernetesTemplate := intern.MustAsset("config/kubernetes-template.yaml")
executorTemplate := intern.MustAsset("config/kubernetes-executor-template.yaml")
pvTemplate := intern.MustAsset("config/kubernetes-pv.yaml")
pvcTemplate := intern.MustAsset("config/kubernetes-pvc.yaml")
c.Kubernetes.Executor = "docker"
c.Kubernetes.Namespace = "default"
c.Kubernetes.ServiceAccount = "funnel-sa"
c.Kubernetes.Template = string(kubernetesTemplate)
c.Kubernetes.ExecutorTemplate = string(executorTemplate)
c.Kubernetes.Bucket = ""
c.Kubernetes.Region = ""
c.Kubernetes.PVTemplate = string(pvTemplate)
c.Kubernetes.PVCTemplate = string(pvcTemplate)
c.Kubernetes.ReconcileRate = reconcile

return c
Expand Down
Loading
Loading