Skip to content

Commit f81e6ac

Browse files
authored
Report failed status (#48)
1 parent d90a9c5 commit f81e6ac

17 files changed

+96
-54
lines changed

deploy/subscriptions.crd.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ spec:
1111
singular: subscription
1212
shortNames:
1313
- sub
14+
- subs
1415
preserveUnknownFields: false
1516
scope: Namespaced
1617
versions:
@@ -53,6 +54,9 @@ spec:
5354
ready:
5455
description: Whether the subscription is ready to be consumed.
5556
type: boolean
57+
failed:
58+
description: Indicates that the operator was unable to deploy a pipeline for this subscription.
59+
type: boolean
5660
message:
5761
description: Error or success message, for information only.
5862
type: string

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* Access control rule (colloquially, an Acl)
3232
*/
3333
@ApiModel(description = "Access control rule (colloquially, an Acl)")
34-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
34+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3535
public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject {
3636
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3737
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* AclList is a list of Acl
3333
*/
3434
@ApiModel(description = "AclList is a list of Acl")
35-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
35+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3636
public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject {
3737
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3838
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* A set of related ACL rules.
3030
*/
3131
@ApiModel(description = "A set of related ACL rules.")
32-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
32+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3333
public class V1alpha1AclSpec {
3434
/**
3535
* The resource access method.

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* The resource being controlled.
2929
*/
3030
@ApiModel(description = "The resource being controlled.")
31-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
31+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3232
public class V1alpha1AclSpecResource {
3333
public static final String SERIALIZED_NAME_KIND = "kind";
3434
@SerializedName(SERIALIZED_NAME_KIND)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Status, as set by the operator.
2929
*/
3030
@ApiModel(description = "Status, as set by the operator.")
31-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
31+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3232
public class V1alpha1AclStatus {
3333
public static final String SERIALIZED_NAME_MESSAGE = "message";
3434
@SerializedName(SERIALIZED_NAME_MESSAGE)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* Kafka Topic
3232
*/
3333
@ApiModel(description = "Kafka Topic")
34-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
34+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3535
public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
3636
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3737
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* KafkaTopicList is a list of KafkaTopic
3333
*/
3434
@ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
35-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
35+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3636
public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
3737
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3838
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* Desired Kafka topic configuration.
3434
*/
3535
@ApiModel(description = "Desired Kafka topic configuration.")
36-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
36+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3737
public class V1alpha1KafkaTopicSpec {
3838
public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
3939
@SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
/**
2929
* V1alpha1KafkaTopicSpecClientConfigs
3030
*/
31-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
31+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3232
public class V1alpha1KafkaTopicSpecClientConfigs {
3333
public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef";
3434
@SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Reference to a ConfigMap to use for AdminClient configuration.
2929
*/
3030
@ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.")
31-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
31+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3232
public class V1alpha1KafkaTopicSpecConfigMapRef {
3333
public static final String SERIALIZED_NAME_NAME = "name";
3434
@SerializedName(SERIALIZED_NAME_NAME)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Current state of the topic.
2929
*/
3030
@ApiModel(description = "Current state of the topic.")
31-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
31+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3232
public class V1alpha1KafkaTopicStatus {
3333
public static final String SERIALIZED_NAME_MESSAGE = "message";
3434
@SerializedName(SERIALIZED_NAME_MESSAGE)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* Hoptimator Subscription
3232
*/
3333
@ApiModel(description = "Hoptimator Subscription")
34-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
34+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3535
public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject {
3636
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3737
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* SubscriptionList is a list of Subscription
3333
*/
3434
@ApiModel(description = "SubscriptionList is a list of Subscription")
35-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
35+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3636
public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject {
3737
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
3838
@SerializedName(SERIALIZED_NAME_API_VERSION)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* Subscription spec
3232
*/
3333
@ApiModel(description = "Subscription spec")
34-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
34+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3535
public class V1alpha1SubscriptionSpec {
3636
public static final String SERIALIZED_NAME_DATABASE = "database";
3737
@SerializedName(SERIALIZED_NAME_DATABASE)

hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,12 @@
3030
* Filled in by the operator.
3131
*/
3232
@ApiModel(description = "Filled in by the operator.")
33-
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-21T22:04:16.918Z[Etc/UTC]")
33+
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-08-25T02:17:32.460Z[Etc/UTC]")
3434
public class V1alpha1SubscriptionStatus {
35+
public static final String SERIALIZED_NAME_FAILED = "failed";
36+
@SerializedName(SERIALIZED_NAME_FAILED)
37+
private Boolean failed;
38+
3539
public static final String SERIALIZED_NAME_MESSAGE = "message";
3640
@SerializedName(SERIALIZED_NAME_MESSAGE)
3741
private String message;
@@ -49,6 +53,29 @@ public class V1alpha1SubscriptionStatus {
4953
private String sql;
5054

5155

56+
public V1alpha1SubscriptionStatus failed(Boolean failed) {
57+
58+
this.failed = failed;
59+
return this;
60+
}
61+
62+
/**
63+
* Indicates that the operator was unable to deploy a pipeline for this subscription.
64+
* @return failed
65+
**/
66+
@javax.annotation.Nullable
67+
@ApiModelProperty(value = "Indicates that the operator was unable to deploy a pipeline for this subscription.")
68+
69+
public Boolean getFailed() {
70+
return failed;
71+
}
72+
73+
74+
public void setFailed(Boolean failed) {
75+
this.failed = failed;
76+
}
77+
78+
5279
public V1alpha1SubscriptionStatus message(String message) {
5380

5481
this.message = message;
@@ -158,22 +185,24 @@ public boolean equals(Object o) {
158185
return false;
159186
}
160187
V1alpha1SubscriptionStatus v1alpha1SubscriptionStatus = (V1alpha1SubscriptionStatus) o;
161-
return Objects.equals(this.message, v1alpha1SubscriptionStatus.message) &&
188+
return Objects.equals(this.failed, v1alpha1SubscriptionStatus.failed) &&
189+
Objects.equals(this.message, v1alpha1SubscriptionStatus.message) &&
162190
Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready) &&
163191
Objects.equals(this.resources, v1alpha1SubscriptionStatus.resources) &&
164192
Objects.equals(this.sql, v1alpha1SubscriptionStatus.sql);
165193
}
166194

167195
@Override
168196
public int hashCode() {
169-
return Objects.hash(message, ready, resources, sql);
197+
return Objects.hash(failed, message, ready, resources, sql);
170198
}
171199

172200

173201
@Override
174202
public String toString() {
175203
StringBuilder sb = new StringBuilder();
176204
sb.append("class V1alpha1SubscriptionStatus {\n");
205+
sb.append(" failed: ").append(toIndentedString(failed)).append("\n");
177206
sb.append(" message: ").append(toIndentedString(message)).append("\n");
178207
sb.append(" ready: ").append(toIndentedString(ready)).append("\n");
179208
sb.append(" resources: ").append(toIndentedString(resources)).append("\n");

hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java

Lines changed: 46 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -86,39 +86,49 @@ public Result reconcile(Request request) {
8686
// Phase 1
8787
log.info("Planning a new pipeline for {}/{} with SQL `{}`...", kind, name, object.getSpec().getSql());
8888

89-
Pipeline pipeline = pipeline(object);
90-
Resource.Environment subEnv = new SubscriptionEnvironment(namespace, name, pipeline)
91-
.orElse(environment);
92-
Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(subEnv);
93-
94-
// For sink resources, also expose hints.
95-
Resource.TemplateFactory sinkTemplateFactory = new Resource.SimpleTemplateFactory(subEnv
96-
.orElse(new Resource.SimpleEnvironment(map(object.getSpec().getHints()))));
97-
98-
// Render resources related to all source tables.
99-
List<String> upstreamResources = pipeline.upstreamResources().stream()
100-
.map(x -> x.render(templateFactory))
101-
.collect(Collectors.toList());
102-
103-
// Render the SQL job
104-
String sqlJob = pipeline.sqlJob().render(templateFactory);
105-
106-
// Render resources related to the sink table. For these resources, we pass along any
107-
// "hints" as part of the environment.
108-
List<String> downstreamResources = pipeline.downstreamResources().stream()
109-
.map(x -> x.render(sinkTemplateFactory))
110-
.collect(Collectors.toList());
111-
112-
List<String> combined = new ArrayList<>();
113-
combined.addAll(upstreamResources);
114-
combined.add(sqlJob);
115-
combined.addAll(downstreamResources);
116-
117-
status.setResources(combined);
118-
119-
status.setSql(object.getSpec().getSql());
120-
status.setReady(null); // null indicates that pipeline needs to be deployed
121-
status.setMessage("Planned.");
89+
try {
90+
Pipeline pipeline = pipeline(object);
91+
Resource.Environment subEnv = new SubscriptionEnvironment(namespace, name, pipeline)
92+
.orElse(environment);
93+
Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(subEnv);
94+
95+
// For sink resources, also expose hints.
96+
Resource.TemplateFactory sinkTemplateFactory = new Resource.SimpleTemplateFactory(subEnv
97+
.orElse(new Resource.SimpleEnvironment(map(object.getSpec().getHints()))));
98+
99+
// Render resources related to all source tables.
100+
List<String> upstreamResources = pipeline.upstreamResources().stream()
101+
.map(x -> x.render(templateFactory))
102+
.collect(Collectors.toList());
103+
104+
// Render the SQL job
105+
String sqlJob = pipeline.sqlJob().render(templateFactory);
106+
107+
// Render resources related to the sink table. For these resources, we pass along any
108+
// "hints" as part of the environment.
109+
List<String> downstreamResources = pipeline.downstreamResources().stream()
110+
.map(x -> x.render(sinkTemplateFactory))
111+
.collect(Collectors.toList());
112+
113+
List<String> combined = new ArrayList<>();
114+
combined.addAll(upstreamResources);
115+
combined.add(sqlJob);
116+
combined.addAll(downstreamResources);
117+
118+
status.setResources(combined);
119+
120+
status.setSql(object.getSpec().getSql());
121+
status.setReady(null); // null indicates that pipeline needs to be deployed
122+
status.setFailed(null);
123+
status.setMessage("Planned.");
124+
} catch (Exception e) {
125+
log.error("Encountered error when planning a pipeline for {}/{} with SQL `{}`.", kind, name,
126+
object.getSpec().getSql(), e);
127+
128+
// Mark the Subscription as failed.
129+
status.setFailed(true);
130+
status.setMessage("Error: " + e.getMessage());
131+
}
122132
} else if (status.getReady() == null && status.getResources() != null) {
123133
// Phase 2
124134
log.info("Deploying pipeline for {}/{}...", kind, name);
@@ -128,6 +138,7 @@ public Result reconcile(Request request) {
128138

129139
if (deployed) {
130140
status.setReady(false);
141+
status.setFailed(false);
131142
status.setMessage("Deployed.");
132143
} else {
133144
return new Result(true, operator.failureRetryDuration());
@@ -140,11 +151,13 @@ public Result reconcile(Request request) {
140151

141152
if (ready) {
142153
status.setReady(true);
154+
status.setFailed(false);
143155
status.setMessage("Ready.");
144156
log.info("{}/{} is ready.", kind, name);
145157
result = new Result(false);
146158
} else {
147159
status.setReady(false);
160+
status.setFailed(false);
148161
status.setMessage("Deployed.");
149162
log.info("Pipeline for {}/{} is NOT ready.", kind, name);
150163
}
@@ -277,10 +290,6 @@ public static Controller controller(Operator operator, HoptimatorPlanner.Factory
277290
.withReconciler(reconciler)
278291
.withName("subscription-controller")
279292
.withWorkerCount(1)
280-
//.withReadyFunc(resourceInformer::hasSynced) // optional, only starts controller when the
281-
// cache has synced up
282-
//.withWorkQueue(resourceWorkQueue)
283-
//.watch()
284293
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1Subscription.class, x).build())
285294
.build();
286295
}

0 commit comments

Comments
 (0)