Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .codespellrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[codespell]
ignore-words-list = NotIn,notin,AfterAll,ND,aks
ignore-words-list = NotIn,notin,AfterAll,ND,aks,deriver
skip = *.svg,*.mod,*.sum
43 changes: 43 additions & 0 deletions cmd/thv-operator/api/v1alpha1/mcpregistry_types.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package v1alpha1

import (
"context"
"fmt"
"reflect"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/log"
)

const (
Expand Down Expand Up @@ -425,6 +428,46 @@ func (r *MCPRegistry) DeriveOverallPhase() MCPRegistryPhase {
return MCPRegistryPhasePending
}

// IsEqualTo checks if the current status is equal to the new status
// This is used to avoid unnecessary status updates
func (r *MCPRegistryStatus) IsEqualTo(ctx context.Context, newStatus MCPRegistryStatus) bool {
// Do not use DeepEqual but checks only the fields that are erlevant for status changes
// This is used to avoid unnecessary status updates
ctxLogger := log.FromContext(ctx)

if r.Phase != newStatus.Phase {
ctxLogger.V(1).Info("Phase difference", "current", r.Phase, "updated", newStatus.Phase)
return false
}
if r.Message != newStatus.Message {
ctxLogger.V(1).Info("Message difference", "current", r.Message, "updated", newStatus.Message)
return false
}
if r.SyncStatus != nil && newStatus.SyncStatus != nil {
if r.SyncStatus.Phase != newStatus.SyncStatus.Phase {
ctxLogger.V(1).Info("SyncStatus.Phase difference", "current", r.SyncStatus.Phase, "updated", newStatus.SyncStatus.Phase)
return false
}
}
if r.APIStatus != nil && newStatus.APIStatus != nil {
if r.APIStatus.Phase != newStatus.APIStatus.Phase {
ctxLogger.V(1).Info("APIStatus.Phase difference", "current", r.APIStatus.Phase, "updated", newStatus.APIStatus.Phase)
return false
}
}

if !reflect.DeepEqual(r.StorageRef, newStatus.StorageRef) {
ctxLogger.V(1).Info("StorageRef difference", "current", r.StorageRef, "updated", newStatus.StorageRef)
return false
}
if !reflect.DeepEqual(r.Conditions, newStatus.Conditions) {
ctxLogger.V(1).Info("Conditions difference", "current", r.Conditions, "updated", newStatus.Conditions)
return false
}

return true
}

func init() {
SchemeBuilder.Register(&MCPRegistry{}, &MCPRegistryList{})
}
142 changes: 71 additions & 71 deletions cmd/thv-operator/controllers/mcpregistry_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -81,7 +81,7 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
mcpRegistry := &mcpv1alpha1.MCPRegistry{}
err := r.Get(ctx, req.NamespacedName, mcpRegistry)
if err != nil {
if errors.IsNotFound(err) {
if kerrors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Return and don't requeue
ctxLogger.Info("MCPRegistry resource not found. Ignoring since object must be deleted")
Expand Down Expand Up @@ -148,17 +148,38 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

// 3. Create status collector for batched updates
statusCollector := mcpregistrystatus.NewCollector(mcpRegistry)
// 3. Create status manager for batched updates with separation of concerns
statusManager := mcpregistrystatus.NewStatusManager(mcpRegistry)
statusDeriver := mcpregistrystatus.NewDefaultStatusDeriver()

// 4. Reconcile sync operation
result, syncErr := r.reconcileSync(ctx, mcpRegistry, statusCollector)
result, syncErr := r.reconcileSync(ctx, mcpRegistry, statusManager)

// 5. Reconcile API service (deployment and service, independent of sync status)
if syncErr == nil {
if apiErr := r.registryAPIManager.ReconcileAPIService(ctx, mcpRegistry, statusCollector); apiErr != nil {
if apiErr := r.registryAPIManager.ReconcileAPIService(ctx, mcpRegistry); apiErr != nil {
ctxLogger.Error(apiErr, "Failed to reconcile API service")
// Set API status with detailed error message from structured error
statusManager.API().SetAPIStatus(mcpv1alpha1.APIPhaseError, apiErr.Message, "")
statusManager.API().SetAPIReadyCondition(apiErr.ConditionReason, apiErr.Message, metav1.ConditionFalse)
err = apiErr
} else {
// API reconciliation successful - check readiness and set appropriate status
isReady := r.registryAPIManager.IsAPIReady(ctx, mcpRegistry)
if isReady {
// TODO: Get actual service endpoint - for now, construct it
endpoint := fmt.Sprintf("http://%s.%s.svc.cluster.local:8080",
mcpRegistry.GetAPIResourceName(), mcpRegistry.Namespace)
statusManager.API().SetAPIStatus(mcpv1alpha1.APIPhaseReady,
"Registry API is ready and serving requests", endpoint)
statusManager.API().SetAPIReadyCondition("APIReady",
"Registry API is ready and serving requests", metav1.ConditionTrue)
} else {
statusManager.API().SetAPIStatus(mcpv1alpha1.APIPhaseDeploying,
"Registry API deployment is not ready yet", "")
statusManager.API().SetAPIReadyCondition("APINotReady",
"Registry API deployment is not ready yet", metav1.ConditionFalse)
}
}
}

Expand All @@ -171,10 +192,10 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// 7. Derive overall phase and message from sync and API status
r.deriveOverallStatus(ctx, mcpRegistry, statusCollector)
r.deriveOverallStatus(ctx, mcpRegistry, statusManager, statusDeriver)

// 8. Apply all status changes in a single batch update
if statusUpdateErr := statusCollector.Apply(ctx, r.Client); statusUpdateErr != nil {
if statusUpdateErr := statusManager.Apply(ctx, r.Client); statusUpdateErr != nil {
ctxLogger.Error(statusUpdateErr, "Failed to apply batched status update")
// Return the status update error only if there was no main reconciliation error
if syncErr == nil {
Expand Down Expand Up @@ -207,6 +228,10 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
"requeueAfter", result.RequeueAfter)
}

if result.RequeueAfter > 0 {
ctxLogger.Info("Resetting error to nil because of requeue")
err = nil
}
return result, err
}

Expand All @@ -227,7 +252,7 @@ func (*MCPRegistryReconciler) preserveExistingSyncData(mcpRegistry *mcpv1alpha1.
//
//nolint:gocyclo // Complex reconciliation logic requires multiple conditions
func (r *MCPRegistryReconciler) reconcileSync(
ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry, statusCollector mcpregistrystatus.Collector,
ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry, statusManager mcpregistrystatus.StatusManager,
) (ctrl.Result, error) {
ctxLogger := log.FromContext(ctx)

Expand Down Expand Up @@ -255,7 +280,7 @@ func (r *MCPRegistryReconciler) reconcileSync(
if currentSyncPhase != mcpv1alpha1.SyncPhaseIdle || currentMessage != "No sync required" {
// Preserve existing sync data when no sync is needed
lastSyncTime, lastSyncHash, serverCount := r.preserveExistingSyncData(mcpRegistry)
statusCollector.SetSyncStatus(mcpv1alpha1.SyncPhaseIdle, "No sync required", 0, lastSyncTime, lastSyncHash, serverCount)
statusManager.Sync().SetSyncStatus(mcpv1alpha1.SyncPhaseIdle, "No sync required", 0, lastSyncTime, lastSyncHash, serverCount)
}

// Schedule next reconciliation if we have a sync policy
Expand All @@ -273,40 +298,51 @@ func (r *MCPRegistryReconciler) reconcileSync(
if syncReason == sync.ReasonManualNoChanges {
// Preserve existing sync data for manual sync with no changes
lastSyncTime, lastSyncHash, serverCount := r.preserveExistingSyncData(mcpRegistry)
statusCollector.SetSyncStatus(
statusManager.Sync().SetSyncStatus(
mcpv1alpha1.SyncPhaseComplete, "Manual sync completed (no data changes)", 0,
lastSyncTime, lastSyncHash, serverCount)
return r.syncManager.UpdateManualSyncTriggerOnly(ctx, mcpRegistry)
}

// Set sync status to syncing before starting the operation
// Clear sync data when starting sync operation
statusCollector.SetSyncStatus(
statusManager.Sync().SetSyncStatus(
mcpv1alpha1.SyncPhaseSyncing, "Synchronizing registry data",
getCurrentAttemptCount(mcpRegistry)+1, nil, "", 0)

// Perform the sync - the sync manager will handle core registry field updates
result, syncResult, err := r.syncManager.PerformSync(ctx, mcpRegistry)
result, syncResult, syncErr := r.syncManager.PerformSync(ctx, mcpRegistry)

if err != nil {
if syncErr != nil {
// Sync failed - set sync status to failed
ctxLogger.Error(err, "Sync failed, scheduling retry")
ctxLogger.Info("Sync failed, scheduling retry", "error", syncErr.Error())
// Preserve existing sync data when sync fails
lastSyncTime, lastSyncHash, serverCount := r.preserveExistingSyncData(mcpRegistry)
statusCollector.SetSyncStatus(mcpv1alpha1.SyncPhaseFailed,
fmt.Sprintf("Sync failed: %v", err), getCurrentAttemptCount(mcpRegistry)+1, lastSyncTime, lastSyncHash, serverCount)

// Set sync status with detailed error message from SyncError
statusManager.Sync().SetSyncStatus(mcpv1alpha1.SyncPhaseFailed,
syncErr.Message, getCurrentAttemptCount(mcpRegistry)+1, lastSyncTime, lastSyncHash, serverCount)
// Set the appropriate condition based on the error type
statusManager.Sync().SetSyncCondition(metav1.Condition{
Type: syncErr.ConditionType,
Status: metav1.ConditionFalse,
Reason: syncErr.ConditionReason,
Message: syncErr.Message,
LastTransitionTime: metav1.Now(),
})

// Use a shorter retry interval instead of the full sync interval
retryAfter := time.Minute * 5 // Default retry interval
if result.RequeueAfter > 0 {
// If PerformSync already set a retry interval, use it
retryAfter = result.RequeueAfter
}
return ctrl.Result{RequeueAfter: retryAfter}, err
return ctrl.Result{RequeueAfter: retryAfter}, syncErr
}

// Sync successful - set sync status to complete using data from sync result
now := metav1.Now()
statusCollector.SetSyncStatus(mcpv1alpha1.SyncPhaseComplete, "Registry data synchronized successfully", 0,
statusManager.Sync().SetSyncStatus(mcpv1alpha1.SyncPhaseComplete, "Registry data synchronized successfully", 0,
&now, syncResult.Hash, syncResult.ServerCount)

ctxLogger.Info("Registry data sync completed successfully")
Expand All @@ -326,7 +362,7 @@ func (r *MCPRegistryReconciler) reconcileSync(
ctxLogger.Info("Sync successful, no automatic sync policy configured")
}

return result, err
return result, nil
}

// finalizeMCPRegistry performs the finalizer logic for the MCPRegistry
Expand Down Expand Up @@ -356,61 +392,25 @@ func (r *MCPRegistryReconciler) finalizeMCPRegistry(ctx context.Context, registr
}

// deriveOverallStatus determines the overall MCPRegistry phase and message based on sync and API status
func (r *MCPRegistryReconciler) deriveOverallStatus(
ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry, statusCollector mcpregistrystatus.Collector) {
func (*MCPRegistryReconciler) deriveOverallStatus(
ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry,
statusManager mcpregistrystatus.StatusManager, statusDeriver mcpregistrystatus.StatusDeriver) {
ctxLogger := log.FromContext(ctx)

// Create a temporary copy with current collected status to derive phase
tempRegistry := mcpRegistry.DeepCopy()

// Apply the collected status changes to temp registry for phase calculation
// Note: This is a simulation - we can't actually access the collected values directly
// Instead, we'll use the DeriveOverallPhase method which works with current status
// The controller will need to be smart about when sync/API status get updated

// For now, let's derive phase based on current MCPRegistry status since
// the status collector changes haven't been applied yet
derivedPhase := tempRegistry.DeriveOverallPhase()
derivedMessage := r.deriveMessage(derivedPhase, tempRegistry)
// Use the StatusDeriver to determine the overall phase and message
// based on current sync and API statuses
derivedPhase, derivedMessage := statusDeriver.DeriveOverallStatus(
statusManager.Sync().Status(),
statusManager.API().Status(),
)

// Only update phase and message if they've changed
if mcpRegistry.Status.Phase != derivedPhase {
statusCollector.SetPhase(derivedPhase)
ctxLogger.Info("Updated overall phase", "oldPhase", mcpRegistry.Status.Phase, "newPhase", derivedPhase)
}

if mcpRegistry.Status.Message != derivedMessage {
statusCollector.SetMessage(derivedMessage)
ctxLogger.Info("Updated overall message", "message", derivedMessage)
}
}

// deriveMessage creates an appropriate message based on the overall phase and registry state
func (*MCPRegistryReconciler) deriveMessage(phase mcpv1alpha1.MCPRegistryPhase, mcpRegistry *mcpv1alpha1.MCPRegistry) string {
switch phase {
case mcpv1alpha1.MCPRegistryPhasePending:
if mcpRegistry.Status.SyncStatus != nil && mcpRegistry.Status.SyncStatus.Phase == mcpv1alpha1.SyncPhaseComplete {
return "Registry data synced, API deployment in progress"
}
return "Registry initialization in progress"
case mcpv1alpha1.MCPRegistryPhaseReady:
return "Registry is ready and API is serving requests"
case mcpv1alpha1.MCPRegistryPhaseFailed:
// Return more specific error message if available
if mcpRegistry.Status.SyncStatus != nil && mcpRegistry.Status.SyncStatus.Phase == mcpv1alpha1.SyncPhaseFailed {
return fmt.Sprintf("Sync failed: %s", mcpRegistry.Status.SyncStatus.Message)
}
if mcpRegistry.Status.APIStatus != nil && mcpRegistry.Status.APIStatus.Phase == mcpv1alpha1.APIPhaseError {
return fmt.Sprintf("API deployment failed: %s", mcpRegistry.Status.APIStatus.Message)
}
return "Registry operation failed"
case mcpv1alpha1.MCPRegistryPhaseSyncing:
return "Registry data synchronization in progress"
case mcpv1alpha1.MCPRegistryPhaseTerminating:
return "Registry is being terminated"
default:
return "Registry status unknown"
}
statusManager.SetOverallStatus(derivedPhase, derivedMessage)
ctxLogger.Info("Updated overall status",
"oldPhase", mcpRegistry.Status.Phase,
"newPhase", derivedPhase,
"oldMessage", mcpRegistry.Status.Message,
"newMessage", derivedMessage)
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
Loading
Loading