Skip to content

Commit

Permalink
[FLINK-37229] Add/record DELETING/DELETED lifecycle states
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed Jan 28, 2025
1 parent d5d027e commit 379c690
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public enum ResourceLifecycleState {
STABLE(true, "The resource deployment is considered to be stable and won’t be rolled back"),
ROLLING_BACK(false, "The resource is being rolled back to the last stable spec"),
ROLLED_BACK(true, "The resource is deployed with the last stable spec"),
FAILED(true, "The job terminally failed");
FAILED(true, "The job terminally failed"),
DELETING(false, "The resource is being deleted"),
DELETED(true, "The resource is deleted");

@JsonIgnore private final boolean terminal;
@JsonIgnore @Getter private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
public abstract ReconciliationStatus<SPEC> getReconciliationStatus();

public ResourceLifecycleState getLifecycleState() {
if (ResourceLifecycleState.DELETING == lifecycleState
|| ResourceLifecycleState.DELETED == lifecycleState) {
return lifecycleState;
}

var reconciliationStatus = getReconciliationStatus();

if (reconciliationStatus.isBeforeFirstDeployment()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
Expand Down Expand Up @@ -104,6 +105,7 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context josdkContext) {
"Cleaning up FlinkDeployment",
josdkContext.getClient());
statusRecorder.updateStatusFromCache(flinkApp);
flinkApp.getStatus().setLifecycleState(ResourceLifecycleState.DELETING);
var ctx = ctxFactory.getResourceContext(flinkApp, josdkContext);
try {
observerFactory.getOrCreate(flinkApp).observe(ctx);
Expand All @@ -113,7 +115,8 @@ public DeleteControl cleanup(FlinkDeployment flinkApp, Context josdkContext) {

var deleteControl = reconcilerFactory.getOrCreate(flinkApp).cleanup(ctx);
if (deleteControl.isRemoveFinalizer()) {
statusRecorder.removeCachedStatus(flinkApp);
flinkApp.getStatus().setLifecycleState(ResourceLifecycleState.DELETED);
statusRecorder.cleanupForDeletion(flinkApp);
ctxFactory.cleanup(flinkApp);
} else {
statusRecorder.patchAndCacheStatus(flinkApp, ctx.getKubernetesClient());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.health.CanaryResourceManager;
Expand Down Expand Up @@ -145,6 +146,8 @@ public DeleteControl cleanup(FlinkSessionJob sessionJob, Context josdkContext) {
EventRecorder.Component.Operator,
"Cleaning up FlinkSessionJob",
josdkContext.getClient());
statusRecorder.updateStatusFromCache(sessionJob);
sessionJob.getStatus().setLifecycleState(ResourceLifecycleState.DELETING);
var ctx = ctxFactory.getResourceContext(sessionJob, josdkContext);
try {
observer.observe(ctx);
Expand All @@ -154,8 +157,9 @@ public DeleteControl cleanup(FlinkSessionJob sessionJob, Context josdkContext) {

var deleteControl = reconciler.cleanup(ctx);
if (deleteControl.isRemoveFinalizer()) {
sessionJob.getStatus().setLifecycleState(ResourceLifecycleState.DELETED);
ctxFactory.cleanup(sessionJob);
statusRecorder.removeCachedStatus(sessionJob);
statusRecorder.cleanupForDeletion(sessionJob);
} else {
statusRecorder.patchAndCacheStatus(sessionJob, ctx.getKubernetesClient());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,7 @@ public void patchAndCacheStatus(CR resource, KubernetesClient client) {
return;
}

Class<?> statusClass;
if (resource instanceof FlinkDeployment) {
statusClass = FlinkDeploymentStatus.class;
} else if (resource instanceof FlinkSessionJob) {
statusClass = FlinkSessionJobStatus.class;
} else if (resource instanceof FlinkStateSnapshot) {
statusClass = FlinkStateSnapshotStatus.class;
} else {
throw new RuntimeException(
String.format("Resource is unknown class: %s", resource.getClass()));
}

var prevStatus = (STATUS) objectMapper.convertValue(previousStatusNode, statusClass);
var prevStatus = convertPreviousStatus(resource, previousStatusNode);

Exception err = null;
for (int i = 0; i < 3; i++) {
Expand All @@ -134,6 +122,21 @@ public void patchAndCacheStatus(CR resource, KubernetesClient client) {
metricManager.onUpdate(resource);
}

private STATUS convertPreviousStatus(CR resource, ObjectNode previousStatusNode) {
Class<?> statusClass;
if (resource instanceof FlinkDeployment) {
statusClass = FlinkDeploymentStatus.class;
} else if (resource instanceof FlinkSessionJob) {
statusClass = FlinkSessionJobStatus.class;
} else if (resource instanceof FlinkStateSnapshot) {
statusClass = FlinkStateSnapshotStatus.class;
} else {
throw new RuntimeException(
String.format("Resource is unknown class: %s", resource.getClass()));
}
return (STATUS) objectMapper.convertValue(previousStatusNode, statusClass);
}

private void replaceStatus(CR resource, STATUS prevStatus, KubernetesClient client)
throws JsonProcessingException {
int retries = 0;
Expand Down Expand Up @@ -240,13 +243,15 @@ public void updateStatusFromCache(CR resource) {
}

/**
* Remove cached status for Flink resource.
* Clean up resource after deletion and send a last status update.
*
* @param resource Flink resource.
*/
public void removeCachedStatus(CR resource) {
statusCache.remove(ResourceID.fromResource(resource));
public void cleanupForDeletion(CR resource) {
var prevJson = statusCache.remove(ResourceID.fromResource(resource));
var prevStatus = convertPreviousStatus(resource, prevJson);
metricManager.onRemove(resource);
statusUpdateListener.accept(resource, prevStatus);
}

public static <S extends CommonStatus<?>, CR extends AbstractFlinkResource<?, S>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.apache.flink.configuration.DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH;
import static org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils.IMAGE;
Expand Down Expand Up @@ -934,13 +933,4 @@ private PodTemplateSpec createTestPodWithContainers() {
TestUtils.getTestPodTemplate("hostname", List.of(mainContainer, sideCarContainer));
return pod;
}

private static Stream<KubernetesConfigOptions.ServiceExposedType> serviceExposedTypes() {
return Stream.of(
null,
KubernetesConfigOptions.ServiceExposedType.ClusterIP,
KubernetesConfigOptions.ServiceExposedType.LoadBalancer,
KubernetesConfigOptions.ServiceExposedType.Headless_ClusterIP,
KubernetesConfigOptions.ServiceExposedType.NodePort);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@ public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception
assertEquals(
appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec(),
appCluster.getStatus().getReconciliationStatus().getLastStableSpec());

testController.cleanup(appCluster, context);
// Make sure status is recorded and sent out at the end of cleanup
assertEquals(
ResourceLifecycleState.DELETED,
testController
.getStatusUpdateCounter()
.currentResource
.getStatus()
.getLifecycleState());
assertEquals(ResourceLifecycleState.DELETED, appCluster.getStatus().getLifecycleState());
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
Expand Down Expand Up @@ -652,6 +653,14 @@ public void testCancelJobNotFound() throws Exception {

assertEquals(CANCELLING, sessionJob.getStatus().getJobStatus().getState());
assertFalse(deleteControl.isRemoveFinalizer());
assertEquals(
ResourceLifecycleState.DELETING,
testController
.getStatusUpdateCounter()
.currentResource
.getStatus()
.getLifecycleState());
assertEquals(ResourceLifecycleState.DELETING, sessionJob.getStatus().getLifecycleState());
assertEquals(
configManager.getOperatorConfiguration().getProgressCheckInterval().toMillis(),
deleteControl.getScheduleDelay().get());
Expand All @@ -660,6 +669,14 @@ public void testCancelJobNotFound() throws Exception {
flinkService.setFlinkJobNotFound(true);
deleteControl = testController.cleanup(sessionJob, context);
assertTrue(deleteControl.isRemoveFinalizer());
assertEquals(
ResourceLifecycleState.DELETED,
testController
.getStatusUpdateCounter()
.currentResource
.getStatus()
.getLifecycleState());
assertEquals(ResourceLifecycleState.DELETED, sessionJob.getStatus().getLifecycleState());
}

private void verifyReconcileInitialSuspendedDeployment(FlinkSessionJob sessionJob)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class TestingFlinkDeploymentController

@Getter private ReconcilerFactory reconcilerFactory;
private FlinkDeploymentController flinkDeploymentController;
private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter();
@Getter private StatusUpdateCounter statusUpdateCounter = new StatusUpdateCounter();
private FlinkResourceEventCollector flinkResourceEventCollector =
new FlinkResourceEventCollector();

Expand Down Expand Up @@ -174,11 +174,12 @@ public Queue<Event> flinkResourceEvents() {
return flinkResourceEventCollector.events;
}

private static class StatusUpdateCounter
/** Test status consumer. */
protected static class StatusUpdateCounter
implements BiConsumer<FlinkDeployment, FlinkDeploymentStatus> {

private FlinkDeployment currentResource;
private int counter;
FlinkDeployment currentResource;
int counter;

@Override
public void accept(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ public class TestingFlinkSessionJobController

@Getter private CanaryResourceManager<FlinkSessionJob> canaryResourceManager;
private FlinkSessionJobController flinkSessionJobController;

@Getter
private TestingFlinkSessionJobController.StatusUpdateCounter statusUpdateCounter =
new TestingFlinkSessionJobController.StatusUpdateCounter();

private FlinkResourceEventCollector flinkResourceEventCollector =
new FlinkResourceEventCollector();
private EventRecorder eventRecorder;
Expand Down Expand Up @@ -161,10 +164,11 @@ public Queue<Event> events() {
return flinkResourceEventCollector.events;
}

private static class StatusUpdateCounter
/** Test status consumer. */
protected static class StatusUpdateCounter
implements BiConsumer<FlinkSessionJob, FlinkSessionJobStatus> {

private FlinkSessionJob currentResource;
FlinkSessionJob currentResource;
private int counter;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10378,6 +10378,8 @@ spec:
lifecycleState:
enum:
- CREATED
- DELETED
- DELETING
- DEPLOYED
- FAILED
- ROLLED_BACK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ spec:
lifecycleState:
enum:
- CREATED
- DELETED
- DELETING
- DEPLOYED
- FAILED
- ROLLED_BACK
Expand Down

0 comments on commit 379c690

Please sign in to comment.