Skip to content

Commit de4fbe9

Browse files
authored
feat: impl GenerateConnectionURL func (#12)
* fix lint * feat: impl GenerateConnectionURL func
1 parent 38fffa6 commit de4fbe9

File tree

8 files changed

+45
-32
lines changed

8 files changed

+45
-32
lines changed

cmd/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func main() {
167167
Scheme: mgr.GetScheme(),
168168
Scheduler: scheduler,
169169
WorkerGenerator: &worker.WorkerGenerator{
170-
PodTemplate: &config.Worker,
170+
WorkerConfig: &config.Worker,
171171
},
172172
}).SetupWithManager(mgr); err != nil {
173173
setupLog.Error(err, "unable to create controller", "controller", "TensorFusionConnection")
@@ -185,7 +185,7 @@ func main() {
185185

186186
// nolint:goconst
187187
if os.Getenv("ENABLE_WEBHOOKS") != "false" {
188-
if err = webhookcorev1.SetupPodWebhookWithManager(mgr, &config.PodMutator); err != nil {
188+
if err = webhookcorev1.SetupPodWebhookWithManager(mgr, &config.PodMutation); err != nil {
189189
setupLog.Error(err, "unable to create webhook", "webhook", "Pod")
190190
os.Exit(1)
191191
}

internal/config/config.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,17 @@ import (
99
)
1010

1111
type Config struct {
12-
Worker corev1.PodTemplate `json:"worker"`
13-
PodMutator PodMutator `json:"podMutator"`
12+
Worker Worker `json:"worker"`
13+
PodMutation PodMutation `json:"podMutation"`
1414
}
1515

16-
type PodMutator struct {
16+
type Worker struct {
17+
corev1.PodTemplate
18+
SendPort int16 `json:"sendPort"`
19+
ReceivePort int16 `json:"receivePort"`
20+
}
21+
22+
type PodMutation struct {
1723
PatchToPod map[string]any `json:"patchToPod"`
1824
PatchToContainer map[string]any `json:"patchToContainer"`
1925
}
@@ -33,21 +39,25 @@ func LoadConfig(filename string) (*Config, error) {
3339

3440
func NewDefaultConfig() *Config {
3541
return &Config{
36-
Worker: corev1.PodTemplate{
37-
Template: corev1.PodTemplateSpec{
38-
Spec: corev1.PodSpec{
39-
TerminationGracePeriodSeconds: ptr.To[int64](0),
40-
Containers: []corev1.Container{
41-
{
42-
Name: "tensorfusion-worker",
43-
Image: "busybox:stable-glibc",
44-
Command: []string{"sleep", "infinity"},
42+
Worker: Worker{
43+
SendPort: 1234,
44+
ReceivePort: 4321,
45+
PodTemplate: corev1.PodTemplate{
46+
Template: corev1.PodTemplateSpec{
47+
Spec: corev1.PodSpec{
48+
TerminationGracePeriodSeconds: ptr.To[int64](0),
49+
Containers: []corev1.Container{
50+
{
51+
Name: "tensorfusion-worker",
52+
Image: "busybox:stable-glibc",
53+
Command: []string{"sleep", "infinity"},
54+
},
4555
},
4656
},
4757
},
4858
},
4959
},
50-
PodMutator: PodMutator{
60+
PodMutation: PodMutation{
5161
PatchToPod: map[string]any{
5262
"spec": map[string]any{
5363
"initContainers": []corev1.Container{

internal/controller/tensorfusionconnection_controller.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,15 +117,15 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
117117
}
118118

119119
// Start worker job
120-
phase, err := r.tryStartWorker(ctx, connection, types.NamespacedName{Name: connection.Name, Namespace: connection.Namespace})
120+
workerPod, err := r.tryStartWorker(ctx, connection, types.NamespacedName{Name: connection.Name, Namespace: connection.Namespace})
121121
if err != nil {
122122
log.Error(err, "Failed to start worker pod")
123123
return ctrl.Result{}, err
124124
}
125125

126-
if phase == corev1.PodRunning {
126+
if workerPod.Status.Phase == corev1.PodRunning {
127127
connection.Status.Phase = tfv1.TensorFusionConnectionRunning
128-
connection.Status.ConnectionURL = r.WorkerGenerator.GenerateConnectionURL(gpu, connection)
128+
connection.Status.ConnectionURL = r.WorkerGenerator.GenerateConnectionURL(gpu, connection, workerPod)
129129
}
130130
// TODO: Handle PodFailure
131131

@@ -141,23 +141,23 @@ func (r *TensorFusionConnectionReconciler) Reconcile(ctx context.Context, req ct
141141
return ctrl.Result{}, nil
142142
}
143143

144-
func (r *TensorFusionConnectionReconciler) tryStartWorker(ctx context.Context, connection *tfv1.TensorFusionConnection, namespacedName types.NamespacedName) (corev1.PodPhase, error) {
144+
func (r *TensorFusionConnectionReconciler) tryStartWorker(ctx context.Context, connection *tfv1.TensorFusionConnection, namespacedName types.NamespacedName) (*corev1.Pod, error) {
145145
// Try to get the Pod
146146
pod := &corev1.Pod{}
147147
if err := r.Get(ctx, namespacedName, pod); err != nil {
148148
if errors.IsNotFound(err) {
149149
// Pod doesn't exist, create a new one
150150
pod = r.WorkerGenerator.GenerateWorkerPod(connection, namespacedName)
151151
if err := ctrl.SetControllerReference(connection, pod, r.Scheme); err != nil {
152-
return "", fmt.Errorf("set owner reference %w", err)
152+
return nil, fmt.Errorf("set owner reference %w", err)
153153
}
154154
if err := r.Create(ctx, pod); err != nil {
155-
return "", fmt.Errorf("create pod %w", err)
155+
return nil, fmt.Errorf("create pod %w", err)
156156
}
157-
return corev1.PodPending, nil
157+
return pod, nil
158158
}
159159
}
160-
return pod.Status.Phase, nil
160+
return pod, nil
161161
}
162162

163163
// handleDeletion handles cleanup of external dependencies

internal/controller/tensorfusionconnection_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ var _ = Describe("TensorFusionConnection Controller", func() {
7575
Client: k8sClient,
7676
Scheme: k8sClient.Scheme(),
7777
WorkerGenerator: &worker.WorkerGenerator{
78-
PodTemplate: &config.Worker,
78+
WorkerConfig: &config.Worker,
7979
},
8080
}
8181
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{

internal/webhook/v1/pod_webhook.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import (
3737
)
3838

3939
// SetupPodWebhookWithManager registers the webhook for Pod in the manager.
40-
func SetupPodWebhookWithManager(mgr ctrl.Manager, config *config.PodMutator) error {
40+
func SetupPodWebhookWithManager(mgr ctrl.Manager, config *config.PodMutation) error {
4141
webhookServer := mgr.GetWebhookServer()
4242

4343
webhookServer.Register("/mutate-v1-pod",
@@ -53,7 +53,7 @@ func SetupPodWebhookWithManager(mgr ctrl.Manager, config *config.PodMutator) err
5353

5454
type TensorFusionPodMutator struct {
5555
Client client.Client
56-
Config *config.PodMutator
56+
Config *config.PodMutation
5757
decoder admission.Decoder
5858
}
5959

internal/webhook/v1/pod_webhook_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ var _ = Describe("TensorFusionPodMutator", func() {
5757
config := config.NewDefaultConfig()
5858
mutator = &TensorFusionPodMutator{
5959
Client: client,
60-
Config: &config.PodMutator,
60+
Config: &config.PodMutation,
6161
}
6262
Expect(mutator.InjectDecoder(decoder)).To(Succeed())
6363
})

internal/webhook/v1/webhook_suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ var _ = BeforeSuite(func() {
119119
Expect(err).NotTo(HaveOccurred())
120120

121121
conf := config.NewDefaultConfig()
122-
err = SetupPodWebhookWithManager(mgr, &conf.PodMutator)
122+
err = SetupPodWebhookWithManager(mgr, &conf.PodMutation)
123123
Expect(err).NotTo(HaveOccurred())
124124

125125
// +kubebuilder:scaffold:webhook

internal/worker/worker.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
package worker
22

33
import (
4+
"fmt"
5+
46
tfv1 "github.com/NexusGPU/tensor-fusion-operator/api/v1"
7+
"github.com/NexusGPU/tensor-fusion-operator/internal/config"
58
corev1 "k8s.io/api/core/v1"
69
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
710
"k8s.io/apimachinery/pkg/types"
811
)
912

1013
type WorkerGenerator struct {
11-
PodTemplate *corev1.PodTemplate
14+
WorkerConfig *config.Worker
1215
}
1316

14-
func (wg *WorkerGenerator) GenerateConnectionURL(_gpu *tfv1.GPU, _connection *tfv1.TensorFusionConnection) string {
15-
return "TODO://"
17+
func (wg *WorkerGenerator) GenerateConnectionURL(_gpu *tfv1.GPU, connection *tfv1.TensorFusionConnection, pod *corev1.Pod) string {
18+
return fmt.Sprintf("native+%s+%d+%d", pod.Status.PodIP, wg.WorkerConfig.SendPort, wg.WorkerConfig.ReceivePort)
1619
}
1720

1821
func (wg *WorkerGenerator) GenerateWorkerPod(
@@ -24,6 +27,6 @@ func (wg *WorkerGenerator) GenerateWorkerPod(
2427
Name: namespacedName.Name,
2528
Namespace: namespacedName.Namespace,
2629
},
27-
Spec: wg.PodTemplate.Template.Spec,
30+
Spec: wg.WorkerConfig.Template.Spec,
2831
}
2932
}

0 commit comments

Comments
 (0)