Skip to content

Commit 2f697e2

Browse files
committed
feat(opencurve#27): supports a common way for deployment waiting
Signed-off-by: Anur Ijuokarukas <anurnomeru@163.com>
1 parent ebde98e commit 2f697e2

File tree

3 files changed

+127
-28
lines changed

3 files changed

+127
-28
lines changed

pkg/chunkserver/chunkserver.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/coreos/pkg/capnslog"
88
"github.com/pkg/errors"
9+
v1 "k8s.io/api/apps/v1"
910
"k8s.io/apimachinery/pkg/types"
1011

1112
curvev1 "github.com/opencurve/curve-operator/api/v1"
@@ -106,14 +107,17 @@ func (c *Cluster) Start(nodeNameIP map[string]string) error {
106107
logger.Info("create physical pool successed")
107108

108109
// 3. startChunkServers start all chunkservers for each device of every node
109-
err = c.startChunkServers()
110+
var chunkServers []*v1.Deployment
111+
chunkServers, err = c.startChunkServers()
110112
if err != nil {
111113
return errors.Wrap(err, "failed to start chunkserver")
112114
}
113115

114116
// 4. wait all chunkservers online before create logical pool
115117
logger.Info("starting all chunkserver")
116-
time.Sleep(30 * time.Second)
118+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
119+
defer cancel()
120+
k8sutil.WaitForDeploymentsToStart(ctx, c.context.Clientset, 3*time.Second, chunkServers...)
117121

118122
// 5. create logical pool
119123
_, err = c.runCreatePoolJob(nodeNameIP, "logical_pool")

pkg/chunkserver/spec.go

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import (
66

77
"github.com/pkg/errors"
88
apps "k8s.io/api/apps/v1"
9-
v1 "k8s.io/api/core/v1"
9+
v1 "k8s.io/api/apps/v1"
10+
corev1 "k8s.io/api/core/v1"
1011
kerrors "k8s.io/apimachinery/pkg/api/errors"
1112
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1213

@@ -15,20 +16,21 @@ import (
1516
)
1617

1718
// startChunkServers start all chunkservers for each device of every node
18-
func (c *Cluster) startChunkServers() error {
19+
func (c *Cluster) startChunkServers() ([]*v1.Deployment, error) {
20+
results := make([]*v1.Deployment, 0)
1921
if len(job2DeviceInfos) == 0 {
2022
logger.Errorf("no job to format device and provision chunk file")
21-
return nil
23+
return results, nil
2224
}
2325

2426
if len(chunkserverConfigs) == 0 {
2527
logger.Errorf("no device need to start chunkserver")
26-
return nil
28+
return results, nil
2729
}
2830

2931
if len(job2DeviceInfos) != len(chunkserverConfigs) {
3032
logger.Errorf("no device need to start chunkserver")
31-
return errors.New("failed to start chunkserver because of job numbers is not equal with chunkserver config")
33+
return results, errors.New("failed to start chunkserver because of job numbers is not equal with chunkserver config")
3234
}
3335

3436
_ = c.createStartCSConfigMap()
@@ -41,18 +43,18 @@ func (c *Cluster) startChunkServers() error {
4143

4244
err := c.createConfigMap(csConfig)
4345
if err != nil {
44-
return errors.Wrapf(err, "failed to create chunkserver configmap for %v", config.ChunkserverConfigMapName)
46+
return results, errors.Wrapf(err, "failed to create chunkserver configmap for %v", config.ChunkserverConfigMapName)
4547
}
4648

4749
d, err := c.makeDeployment(&csConfig)
4850
if err != nil {
49-
return errors.Wrap(err, "failed to create chunkserver Deployment")
51+
return results, errors.Wrap(err, "failed to create chunkserver Deployment")
5052
}
5153

5254
newDeployment, err := c.context.Clientset.AppsV1().Deployments(c.namespacedName.Namespace).Create(d)
5355
if err != nil {
5456
if !kerrors.IsAlreadyExists(err) {
55-
return errors.Wrapf(err, "failed to create chunkserver deployment %s", csConfig.ResourceName)
57+
return results, errors.Wrapf(err, "failed to create chunkserver deployment %s", csConfig.ResourceName)
5658
}
5759
logger.Infof("deployment for chunkserver %s already exists. updating if needed", csConfig.ResourceName)
5860

@@ -63,12 +65,11 @@ func (c *Cluster) startChunkServers() error {
6365
} else {
6466
logger.Infof("Deployment %s has been created , waiting for startup", newDeployment.GetName())
6567
// TODO:wait for the new deployment
66-
// deploymentsToWaitFor = append(deploymentsToWaitFor, newDeployment)
68+
results = append(results, newDeployment)
6769
}
6870
// update condition type and phase etc.
6971
}
70-
71-
return nil
72+
return results, nil
7273
}
7374

7475
// createCSClientConfigMap create cs_client configmap
@@ -95,7 +96,7 @@ func (c *Cluster) createCSClientConfigMap() error {
9596
config.CSClientConfigMapDataKey: replacedCsClientData,
9697
}
9798

98-
cm := &v1.ConfigMap{
99+
cm := &corev1.ConfigMap{
99100
ObjectMeta: metav1.ObjectMeta{
100101
Name: config.CSClientConfigMapName,
101102
Namespace: c.namespacedName.Namespace,
@@ -146,7 +147,7 @@ func (c *Cluster) CreateS3ConfigMap() error {
146147
config.S3ConfigMapDataKey: configMapData,
147148
}
148149

149-
cm := &v1.ConfigMap{
150+
cm := &corev1.ConfigMap{
150151
ObjectMeta: metav1.ObjectMeta{
151152
Name: config.S3ConfigMapName,
152153
Namespace: c.namespacedName.Namespace,
@@ -175,7 +176,7 @@ func (c *Cluster) createStartCSConfigMap() error {
175176
startChunkserverScriptFileDataKey: script.START,
176177
}
177178

178-
cm := &v1.ConfigMap{
179+
cm := &corev1.ConfigMap{
179180
ObjectMeta: metav1.ObjectMeta{
180181
Name: startChunkserverConfigMapName,
181182
Namespace: c.namespacedName.Namespace,
@@ -227,7 +228,7 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
227228
config.ChunkserverConfigMapDataKey: replacedChunkServerData,
228229
}
229230

230-
cm := &v1.ConfigMap{
231+
cm := &corev1.ConfigMap{
231232
ObjectMeta: metav1.ObjectMeta{
232233
Name: csConfig.CurrentConfigMapName,
233234
Namespace: c.namespacedName.Namespace,
@@ -254,19 +255,19 @@ func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment,
254255
vols, _ := c.createTopoAndToolVolumeAndMount()
255256
volumes = append(volumes, vols...)
256257

257-
podSpec := v1.PodTemplateSpec{
258+
podSpec := corev1.PodTemplateSpec{
258259
ObjectMeta: metav1.ObjectMeta{
259260
Name: csConfig.ResourceName,
260261
Labels: c.getChunkServerPodLabels(csConfig),
261262
},
262-
Spec: v1.PodSpec{
263-
Containers: []v1.Container{
263+
Spec: corev1.PodSpec{
264+
Containers: []corev1.Container{
264265
c.makeCSDaemonContainer(csConfig),
265266
},
266267
NodeName: csConfig.NodeName,
267-
RestartPolicy: v1.RestartPolicyAlways,
268+
RestartPolicy: corev1.RestartPolicyAlways,
268269
HostNetwork: true,
269-
DNSPolicy: v1.DNSClusterFirstWithHostNet,
270+
DNSPolicy: corev1.DNSClusterFirstWithHostNet,
270271
Volumes: volumes,
271272
},
272273
}
@@ -301,7 +302,7 @@ func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment,
301302
}
302303

303304
// makeCSDaemonContainer create chunkserver container
304-
func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Container {
305+
func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) corev1.Container {
305306

306307
privileged := true
307308
runAsUser := int64(0)
@@ -321,7 +322,7 @@ func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Containe
321322
argsChunkserverPort := strconv.Itoa(csConfig.Port)
322323
argsConfigFileMountPath := path.Join(config.ChunkserverConfigMapMountPathDir, config.ChunkserverConfigMapDataKey)
323324

324-
container := v1.Container{
325+
container := corev1.Container{
325326
Name: "chunkserver",
326327
Command: []string{
327328
"/bin/bash",
@@ -339,16 +340,16 @@ func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Containe
339340
Image: c.spec.CurveVersion.Image,
340341
ImagePullPolicy: c.spec.CurveVersion.ImagePullPolicy,
341342
VolumeMounts: volMounts,
342-
Ports: []v1.ContainerPort{
343+
Ports: []corev1.ContainerPort{
343344
{
344345
Name: "listen-port",
345346
ContainerPort: int32(csConfig.Port),
346347
HostPort: int32(csConfig.Port),
347-
Protocol: v1.ProtocolTCP,
348+
Protocol: corev1.ProtocolTCP,
348349
},
349350
},
350-
Env: []v1.EnvVar{{Name: "TZ", Value: "Asia/Hangzhou"}},
351-
SecurityContext: &v1.SecurityContext{
351+
Env: []corev1.EnvVar{{Name: "TZ", Value: "Asia/Hangzhou"}},
352+
SecurityContext: &corev1.SecurityContext{
352353
Privileged: &privileged,
353354
RunAsUser: &runAsUser,
354355
RunAsNonRoot: &runAsNonRoot,

pkg/k8sutil/deployment.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package k8sutil
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
v1 "k8s.io/api/apps/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/client-go/kubernetes"
10+
)
11+
12+
// WaitForDeploymentsToStart waits for the deployments to start, and returns a channel to indicate whether
13+
// all deployments are started or not
14+
//
15+
// tickDuration is the interval to check the deployment status
16+
// objectMeta is the metadata of the deployment
17+
//
18+
// we use the hub chan to collect the result of each deployment, and when all deployments are started,
19+
// we return true, otherwise, we return false, this design let WaitForDeploymentToStart and
20+
// WaitForDeploymentsToStart can be used in the same way
21+
func WaitForDeploymentsToStart(ctx context.Context, clientSet kubernetes.Interface, tickDuration time.Duration,
22+
objectMetas ...*v1.Deployment) chan bool {
23+
length := len(objectMetas)
24+
hub := make(chan bool, length)
25+
defer close(hub)
26+
for i := range objectMetas {
27+
objectMata := objectMetas[i]
28+
go func() {
29+
if succeed := <-WaitForDeploymentToStart(ctx, clientSet, tickDuration, objectMata); !succeed {
30+
hub <- false
31+
return
32+
}
33+
}()
34+
}
35+
36+
chn := make(chan bool)
37+
go func() {
38+
defer close(chn)
39+
for i := 0; i < length; i++ {
40+
if succeed := <-hub; !succeed {
41+
chn <- false
42+
return
43+
}
44+
}
45+
chn <- true
46+
return
47+
}()
48+
return chn
49+
}
50+
51+
// WaitForDeploymentToStart waits for the deployment to start, and returns a channel to indicate whether
52+
// the deployment is started or not
53+
//
54+
// tickDuration is the interval to check the deployment status
55+
// objectMeta is the metadata of the deployment
56+
func WaitForDeploymentToStart(ctx context.Context, clientSet kubernetes.Interface, tickDuration time.Duration,
57+
objectMeta *v1.Deployment) chan bool {
58+
ticker := time.NewTicker(tickDuration)
59+
defer ticker.Stop()
60+
61+
chn := make(chan bool)
62+
go func() {
63+
defer close(chn)
64+
for {
65+
select {
66+
case <-ticker.C:
67+
deployment, err := clientSet.AppsV1().Deployments(objectMeta.GetNamespace()).Get(objectMeta.GetName(),
68+
metav1.GetOptions{})
69+
logger.Infof("waiting for deployment %s starting", deployment.Name)
70+
if err != nil {
71+
72+
// TODO: return the failed reason is required??
73+
logger.Errorf("failed to get deployment %s in cluster", objectMeta.GetName())
74+
chn <- false
75+
return
76+
}
77+
if deployment.Status.ObservedGeneration != deployment.Status.ObservedGeneration &&
78+
deployment.Status.UpdatedReplicas > 0 &&
79+
deployment.Status.ReadyReplicas > 0 {
80+
logger.Infof("deployment %s has been started", deployment.Name)
81+
chn <- true
82+
return
83+
}
84+
85+
// TODO: should log the unready reason, e.g. conditions, etc. to help debugging??
86+
case <-ctx.Done():
87+
chn <- false
88+
logger.Infof("stop waiting for deployment %s to start due to context is done", objectMeta.GetName())
89+
return
90+
}
91+
}
92+
}()
93+
return chn
94+
}

0 commit comments

Comments
 (0)