Skip to content

Commit 6c484ee

Browse files
authored
feat: add support for customizing connect service (#2709)
* Add new field .spec.server.service to SparkConnect Signed-off-by: Yi Chen <github@chenyicn.net> * Add support for customing Spark connect server service Signed-off-by: Yi Chen <github@chenyicn.net> * Set Kubernetes client to suppress warnings in REST config Signed-off-by: Yi Chen <github@chenyicn.net> --------- Signed-off-by: Yi Chen <github@chenyicn.net>
1 parent de5a1e2 commit 6c484ee

File tree

9 files changed

+1309
-8
lines changed

9 files changed

+1309
-8
lines changed

api/v1alpha1/sparkconnect_types.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ type SparkConnectSpec struct {
8787
// ServerSpec is specification of the Spark connect server.
8888
type ServerSpec struct {
8989
SparkPodSpec `json:",inline"`
90+
91+
// Service exposes the Spark connect server.
92+
// +optional
93+
Service *corev1.Service `json:"service,omitempty"`
9094
}
9195

9296
// ExecutorSpec is specification of the executor.

api/v1alpha1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkconnects.yaml

Lines changed: 531 additions & 0 deletions
Large diffs are not rendered by default.

cmd/operator/controller/start.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
2929
// to ensure that exec-entrypoint and run can make use of them.
3030
_ "k8s.io/client-go/plugin/pkg/client/auth"
31+
"k8s.io/client-go/rest"
3132

3233
"github.com/spf13/cobra"
3334
"github.com/spf13/viper"
@@ -204,6 +205,7 @@ func start() {
204205

205206
// Create the client rest config. Use kubeConfig if given, otherwise assume in-cluster.
206207
cfg, err := ctrl.GetConfig()
208+
cfg.WarningHandler = rest.NoWarnings{}
207209
if err != nil {
208210
logger.Error(err, "failed to get kube config")
209211
os.Exit(1)

config/crd/bases/sparkoperator.k8s.io_sparkconnects.yaml

Lines changed: 531 additions & 0 deletions
Large diffs are not rendered by default.

internal/controller/sparkconnect/reconciler.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -447,16 +447,18 @@ func (r *Reconciler) mutateServerPod(_ context.Context, conn *v1alpha1.SparkConn
447447
// createOrUpdateServerService creates or updates the server service for the SparkConnect resource.
448448
func (r *Reconciler) createOrUpdateServerService(ctx context.Context, conn *v1alpha1.SparkConnect) error {
449449
logger := ctrl.LoggerFrom(ctx)
450-
logger.V(1).Info("Create or update server service")
451450

452-
svc := &corev1.Service{
453-
ObjectMeta: metav1.ObjectMeta{
454-
Name: GetServerServiceName(conn),
455-
Namespace: conn.Namespace,
456-
},
451+
// Use the service specified in the server spec if provided.
452+
svc := conn.Spec.Server.Service
453+
if svc == nil {
454+
svc = &corev1.Service{}
457455
}
456+
svc.Name = GetServerServiceName(conn)
457+
// Namespace provided by user will be ignored.
458+
svc.Namespace = conn.Namespace
458459

459-
_, err := controllerutil.CreateOrUpdate(ctx, r.client, svc, func() error {
460+
// Create or update server service.
461+
opResult, err := controllerutil.CreateOrUpdate(ctx, r.client, svc, func() error {
460462
if err := r.mutateServerService(ctx, conn, svc); err != nil {
461463
return fmt.Errorf("failed to mutate server service: %v", err)
462464
}
@@ -465,6 +467,12 @@ func (r *Reconciler) createOrUpdateServerService(ctx context.Context, conn *v1al
465467
if err != nil {
466468
return fmt.Errorf("failed to create or update server service: %v", err)
467469
}
470+
switch opResult {
471+
case controllerutil.OperationResultCreated:
472+
logger.Info("Server service created")
473+
case controllerutil.OperationResultUpdated:
474+
logger.Info("Server service updated")
475+
}
468476

469477
// Update SparkConnect status.
470478
conn.Status.Server.ServiceName = svc.Name
@@ -475,7 +483,6 @@ func (r *Reconciler) createOrUpdateServerService(ctx context.Context, conn *v1al
475483
// mutateServerService mutates the server service for the SparkConnect resource.
476484
func (r *Reconciler) mutateServerService(_ context.Context, conn *v1alpha1.SparkConnect, svc *corev1.Service) error {
477485
if svc.CreationTimestamp.IsZero() {
478-
svc.Spec.Type = corev1.ServiceTypeClusterIP
479486
svc.Spec.Ports = []corev1.ServicePort{
480487
{
481488
Name: "driver-rpc",
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
Copyright 2025 The Kubeflow authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package sparkconnect
18+
19+
import (
20+
"fmt"
21+
"path/filepath"
22+
"runtime"
23+
"testing"
24+
25+
"github.com/kubeflow/spark-operator/v2/api/v1alpha1"
26+
"github.com/kubeflow/spark-operator/v2/api/v1beta2"
27+
. "github.com/onsi/ginkgo/v2"
28+
. "github.com/onsi/gomega"
29+
30+
"k8s.io/client-go/kubernetes/scheme"
31+
"k8s.io/client-go/rest"
32+
"sigs.k8s.io/controller-runtime/pkg/client"
33+
"sigs.k8s.io/controller-runtime/pkg/envtest"
34+
"sigs.k8s.io/controller-runtime/pkg/log"
35+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
36+
// +kubebuilder:scaffold:imports
37+
)
38+
39+
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
40+
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
41+
42+
var cfg *rest.Config
43+
var k8sClient client.Client
44+
var testEnv *envtest.Environment
45+
46+
func TestSparkConnectController(t *testing.T) {
47+
RegisterFailHandler(Fail)
48+
49+
RunSpecs(t, "SparkConnect Controller Suite")
50+
}
51+
52+
var _ = BeforeSuite(func() {
53+
log.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
54+
55+
By("bootstrapping test environment")
56+
testEnv = &envtest.Environment{
57+
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")},
58+
ErrorIfCRDPathMissing: true,
59+
60+
// The BinaryAssetsDirectory is only required if you want to run the tests directly
61+
// without call the makefile target test. If not informed it will look for the
62+
// default path defined in controller-runtime which is /usr/local/kubebuilder/.
63+
// Note that you must have the required binaries setup under the bin directory to perform
64+
// the tests directly. When we run make test it will be setup and used automatically.
65+
BinaryAssetsDirectory: filepath.Join("..", "..", "..", "bin", "k8s",
66+
fmt.Sprintf("1.32.0-%s-%s", runtime.GOOS, runtime.GOARCH)),
67+
}
68+
69+
var err error
70+
// cfg is defined in this file globally.
71+
cfg, err = testEnv.Start()
72+
Expect(err).NotTo(HaveOccurred())
73+
Expect(cfg).NotTo(BeNil())
74+
75+
Expect(v1alpha1.AddToScheme(scheme.Scheme)).Should(Succeed())
76+
Expect(v1beta2.AddToScheme(scheme.Scheme)).Should(Succeed())
77+
// +kubebuilder:scaffold:scheme
78+
79+
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
80+
Expect(err).NotTo(HaveOccurred())
81+
Expect(k8sClient).NotTo(BeNil())
82+
83+
})
84+
85+
var _ = AfterSuite(func() {
86+
By("tearing down the test environment")
87+
err := testEnv.Stop()
88+
Expect(err).NotTo(HaveOccurred())
89+
})

internal/controller/sparkconnect/util.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ func GetServerPodName(conn *v1alpha1.SparkConnect) string {
6565

6666
// GetServerServiceName returns the name of the server service for SparkConnect.
6767
func GetServerServiceName(conn *v1alpha1.SparkConnect) string {
68+
// Use the service specified in the server spec if provided.
69+
svc := conn.Spec.Server.Service
70+
if svc != nil {
71+
return svc.Name
72+
}
73+
74+
// Otherwise, use the default service name.
6875
return fmt.Sprintf("%s-server", conn.Name)
6976
}
7077

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
Copyright 2025 The kubeflow authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package sparkconnect
18+
19+
import (
20+
. "github.com/onsi/ginkgo/v2"
21+
. "github.com/onsi/gomega"
22+
23+
corev1 "k8s.io/api/core/v1"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
26+
"github.com/kubeflow/spark-operator/v2/api/v1alpha1"
27+
"github.com/kubeflow/spark-operator/v2/pkg/common"
28+
)
29+
30+
var _ = Describe("Util functions", func() {
31+
var conn *v1alpha1.SparkConnect
32+
33+
BeforeEach(func() {
34+
conn = &v1alpha1.SparkConnect{
35+
ObjectMeta: metav1.ObjectMeta{
36+
Name: "test-spark-connect",
37+
Namespace: "test-namespace",
38+
},
39+
Spec: v1alpha1.SparkConnectSpec{
40+
SparkVersion: "4.0.0",
41+
Server: v1alpha1.ServerSpec{
42+
SparkPodSpec: v1alpha1.SparkPodSpec{},
43+
},
44+
Executor: v1alpha1.ExecutorSpec{
45+
SparkPodSpec: v1alpha1.SparkPodSpec{},
46+
},
47+
},
48+
}
49+
})
50+
51+
Context("GetCommonLabels", func() {
52+
It("should return correct common labels", func() {
53+
labels := GetCommonLabels(conn)
54+
Expect(labels).To(HaveLen(2))
55+
Expect(labels).To(HaveKeyWithValue(common.LabelCreatedBySparkOperator, "true"))
56+
Expect(labels).To(HaveKeyWithValue(common.LabelSparkConnectName, "test-spark-connect"))
57+
})
58+
})
59+
60+
Context("GetServerSelectorLabels", func() {
61+
It("should return correct server selector labels", func() {
62+
labels := GetServerSelectorLabels(conn)
63+
Expect(labels).To(HaveLen(4))
64+
Expect(labels).To(HaveKeyWithValue(common.LabelLaunchedBySparkOperator, "true"))
65+
Expect(labels).To(HaveKeyWithValue(common.LabelSparkConnectName, "test-spark-connect"))
66+
Expect(labels).To(HaveKeyWithValue(common.LabelSparkRole, common.SparkRoleConnectServer))
67+
Expect(labels).To(HaveKeyWithValue(common.LabelSparkVersion, "4.0.0"))
68+
})
69+
})
70+
71+
Context("GetExecutorSelectorLabels", func() {
72+
It("should return correct executor selector labels", func() {
73+
labels := GetExecutorSelectorLabels(conn)
74+
Expect(labels).To(HaveLen(3))
75+
Expect(labels).To(HaveKeyWithValue(common.LabelLaunchedBySparkOperator, "true"))
76+
Expect(labels).To(HaveKeyWithValue(common.LabelSparkConnectName, "test-spark-connect"))
77+
Expect(labels).To(HaveKeyWithValue(common.LabelSparkRole, common.SparkRoleExecutor))
78+
})
79+
})
80+
81+
Context("GetConfigMapName", func() {
82+
It("should return correct config map name", func() {
83+
name := GetConfigMapName(conn)
84+
Expect(name).To(Equal("test-spark-connect-conf"))
85+
})
86+
})
87+
88+
Context("GetServerPodName", func() {
89+
It("should return correct server pod name", func() {
90+
name := GetServerPodName(conn)
91+
Expect(name).To(Equal("test-spark-connect-server"))
92+
})
93+
})
94+
95+
Context("GetServerServiceName", func() {
96+
When("service is not specified in server spec", func() {
97+
It("should return default server service name", func() {
98+
name := GetServerServiceName(conn)
99+
Expect(name).To(Equal("test-spark-connect-server"))
100+
})
101+
})
102+
103+
When("service is specified in server spec", func() {
104+
BeforeEach(func() {
105+
conn.Spec.Server.Service = &corev1.Service{
106+
ObjectMeta: metav1.ObjectMeta{
107+
Name: "custom-service-name",
108+
},
109+
}
110+
})
111+
112+
It("should return the specified service name", func() {
113+
name := GetServerServiceName(conn)
114+
Expect(name).To(Equal("custom-service-name"))
115+
})
116+
})
117+
})
118+
119+
Context("GetServerServiceHost", func() {
120+
It("should return correct server service host", func() {
121+
host := GetServerServiceHost(conn)
122+
Expect(host).To(Equal("test-spark-connect-server.test-namespace.svc.cluster.local"))
123+
})
124+
})
125+
})

0 commit comments

Comments
 (0)