Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .codespellrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[codespell]
ignore-words-list = NotIn,notin,AfterAll,ND,aks
ignore-words-list = NotIn,notin,AfterAll,ND,aks,deriver
skip = *.svg,*.mod,*.sum
138 changes: 67 additions & 71 deletions cmd/thv-operator/controllers/mcpregistry_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -81,7 +81,7 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
mcpRegistry := &mcpv1alpha1.MCPRegistry{}
err := r.Get(ctx, req.NamespacedName, mcpRegistry)
if err != nil {
if errors.IsNotFound(err) {
if kerrors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Return and don't requeue
ctxLogger.Info("MCPRegistry resource not found. Ignoring since object must be deleted")
Expand Down Expand Up @@ -148,17 +148,38 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

// 3. Create status collector for batched updates
statusCollector := mcpregistrystatus.NewCollector(mcpRegistry)
// 3. Create status manager for batched updates with separation of concerns
statusManager := mcpregistrystatus.NewStatusManager(mcpRegistry)
statusDeriver := mcpregistrystatus.NewDefaultStatusDeriver()

// 4. Reconcile sync operation
result, syncErr := r.reconcileSync(ctx, mcpRegistry, statusCollector)
result, syncErr := r.reconcileSync(ctx, mcpRegistry, statusManager)

// 5. Reconcile API service (deployment and service, independent of sync status)
if syncErr == nil {
if apiErr := r.registryAPIManager.ReconcileAPIService(ctx, mcpRegistry, statusCollector); apiErr != nil {
if apiErr := r.registryAPIManager.ReconcileAPIService(ctx, mcpRegistry); apiErr != nil {
ctxLogger.Error(apiErr, "Failed to reconcile API service")
// Set API status with detailed error message from structured error
statusManager.API().SetAPIStatus(mcpv1alpha1.APIPhaseError, apiErr.Message, "")
statusManager.API().SetAPIReadyCondition(apiErr.ConditionReason, apiErr.Message, metav1.ConditionFalse)
err = apiErr
} else {
// API reconciliation successful - check readiness and set appropriate status
isReady := r.registryAPIManager.IsAPIReady(ctx, mcpRegistry)
if isReady {
// TODO: Get actual service endpoint - for now, construct it
endpoint := fmt.Sprintf("http://%s.%s.svc.cluster.local:8080",
mcpRegistry.GetAPIResourceName(), mcpRegistry.Namespace)
statusManager.API().SetAPIStatus(mcpv1alpha1.APIPhaseReady,
"Registry API is ready and serving requests", endpoint)
statusManager.API().SetAPIReadyCondition("APIReady",
"Registry API is ready and serving requests", metav1.ConditionTrue)
} else {
statusManager.API().SetAPIStatus(mcpv1alpha1.APIPhaseDeploying,
"Registry API deployment is not ready yet", "")
statusManager.API().SetAPIReadyCondition("APINotReady",
"Registry API deployment is not ready yet", metav1.ConditionFalse)
}
}
}

Expand All @@ -171,10 +192,10 @@ func (r *MCPRegistryReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// 7. Derive overall phase and message from sync and API status
r.deriveOverallStatus(ctx, mcpRegistry, statusCollector)
r.deriveOverallStatus(ctx, mcpRegistry, statusManager, statusDeriver)

// 8. Apply all status changes in a single batch update
if statusUpdateErr := statusCollector.Apply(ctx, r.Client); statusUpdateErr != nil {
if statusUpdateErr := statusManager.Apply(ctx, r.Client); statusUpdateErr != nil {
ctxLogger.Error(statusUpdateErr, "Failed to apply batched status update")
// Return the status update error only if there was no main reconciliation error
if syncErr == nil {
Expand Down Expand Up @@ -227,7 +248,7 @@ func (*MCPRegistryReconciler) preserveExistingSyncData(mcpRegistry *mcpv1alpha1.
//
//nolint:gocyclo // Complex reconciliation logic requires multiple conditions
func (r *MCPRegistryReconciler) reconcileSync(
ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry, statusCollector mcpregistrystatus.Collector,
ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry, statusManager mcpregistrystatus.StatusManager,
) (ctrl.Result, error) {
ctxLogger := log.FromContext(ctx)

Expand Down Expand Up @@ -255,7 +276,7 @@ func (r *MCPRegistryReconciler) reconcileSync(
if currentSyncPhase != mcpv1alpha1.SyncPhaseIdle || currentMessage != "No sync required" {
// Preserve existing sync data when no sync is needed
lastSyncTime, lastSyncHash, serverCount := r.preserveExistingSyncData(mcpRegistry)
statusCollector.SetSyncStatus(mcpv1alpha1.SyncPhaseIdle, "No sync required", 0, lastSyncTime, lastSyncHash, serverCount)
statusManager.Sync().SetSyncStatus(mcpv1alpha1.SyncPhaseIdle, "No sync required", 0, lastSyncTime, lastSyncHash, serverCount)
}

// Schedule next reconciliation if we have a sync policy
Expand All @@ -273,40 +294,51 @@ func (r *MCPRegistryReconciler) reconcileSync(
if syncReason == sync.ReasonManualNoChanges {
// Preserve existing sync data for manual sync with no changes
lastSyncTime, lastSyncHash, serverCount := r.preserveExistingSyncData(mcpRegistry)
statusCollector.SetSyncStatus(
statusManager.Sync().SetSyncStatus(
mcpv1alpha1.SyncPhaseComplete, "Manual sync completed (no data changes)", 0,
lastSyncTime, lastSyncHash, serverCount)
return r.syncManager.UpdateManualSyncTriggerOnly(ctx, mcpRegistry)
}

// Set sync status to syncing before starting the operation
// Clear sync data when starting sync operation
statusCollector.SetSyncStatus(
statusManager.Sync().SetSyncStatus(
mcpv1alpha1.SyncPhaseSyncing, "Synchronizing registry data",
getCurrentAttemptCount(mcpRegistry)+1, nil, "", 0)

// Perform the sync - the sync manager will handle core registry field updates
result, syncResult, err := r.syncManager.PerformSync(ctx, mcpRegistry)
result, syncResult, syncErr := r.syncManager.PerformSync(ctx, mcpRegistry)

if err != nil {
if syncErr != nil {
// Sync failed - set sync status to failed
ctxLogger.Error(err, "Sync failed, scheduling retry")
ctxLogger.Error(syncErr, "Sync failed, scheduling retry")
// Preserve existing sync data when sync fails
lastSyncTime, lastSyncHash, serverCount := r.preserveExistingSyncData(mcpRegistry)
statusCollector.SetSyncStatus(mcpv1alpha1.SyncPhaseFailed,
fmt.Sprintf("Sync failed: %v", err), getCurrentAttemptCount(mcpRegistry)+1, lastSyncTime, lastSyncHash, serverCount)

// Set sync status with detailed error message from SyncError
statusManager.Sync().SetSyncStatus(mcpv1alpha1.SyncPhaseFailed,
syncErr.Message, getCurrentAttemptCount(mcpRegistry)+1, lastSyncTime, lastSyncHash, serverCount)
// Set the appropriate condition based on the error type
statusManager.Sync().SetSyncCondition(metav1.Condition{
Type: syncErr.ConditionType,
Status: metav1.ConditionFalse,
Reason: syncErr.ConditionReason,
Message: syncErr.Message,
LastTransitionTime: metav1.Now(),
})

// Use a shorter retry interval instead of the full sync interval
retryAfter := time.Minute * 5 // Default retry interval
if result.RequeueAfter > 0 {
// If PerformSync already set a retry interval, use it
retryAfter = result.RequeueAfter
}
return ctrl.Result{RequeueAfter: retryAfter}, err
return ctrl.Result{RequeueAfter: retryAfter}, syncErr
}

// Sync successful - set sync status to complete using data from sync result
now := metav1.Now()
statusCollector.SetSyncStatus(mcpv1alpha1.SyncPhaseComplete, "Registry data synchronized successfully", 0,
statusManager.Sync().SetSyncStatus(mcpv1alpha1.SyncPhaseComplete, "Registry data synchronized successfully", 0,
&now, syncResult.Hash, syncResult.ServerCount)

ctxLogger.Info("Registry data sync completed successfully")
Expand All @@ -326,7 +358,7 @@ func (r *MCPRegistryReconciler) reconcileSync(
ctxLogger.Info("Sync successful, no automatic sync policy configured")
}

return result, err
return result, nil
}

// finalizeMCPRegistry performs the finalizer logic for the MCPRegistry
Expand Down Expand Up @@ -356,61 +388,25 @@ func (r *MCPRegistryReconciler) finalizeMCPRegistry(ctx context.Context, registr
}

// deriveOverallStatus determines the overall MCPRegistry phase and message based on sync and API status
func (r *MCPRegistryReconciler) deriveOverallStatus(
ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry, statusCollector mcpregistrystatus.Collector) {
func (*MCPRegistryReconciler) deriveOverallStatus(
ctx context.Context, mcpRegistry *mcpv1alpha1.MCPRegistry,
statusManager mcpregistrystatus.StatusManager, statusDeriver mcpregistrystatus.StatusDeriver) {
ctxLogger := log.FromContext(ctx)

// Create a temporary copy with current collected status to derive phase
tempRegistry := mcpRegistry.DeepCopy()

// Apply the collected status changes to temp registry for phase calculation
// Note: This is a simulation - we can't actually access the collected values directly
// Instead, we'll use the DeriveOverallPhase method which works with current status
// The controller will need to be smart about when sync/API status get updated

// For now, let's derive phase based on current MCPRegistry status since
// the status collector changes haven't been applied yet
derivedPhase := tempRegistry.DeriveOverallPhase()
derivedMessage := r.deriveMessage(derivedPhase, tempRegistry)
// Use the StatusDeriver to determine the overall phase and message
// based on current sync and API statuses
derivedPhase, derivedMessage := statusDeriver.DeriveOverallStatus(
mcpRegistry.Status.SyncStatus,
mcpRegistry.Status.APIStatus,
)

// Only update phase and message if they've changed
if mcpRegistry.Status.Phase != derivedPhase {
statusCollector.SetPhase(derivedPhase)
ctxLogger.Info("Updated overall phase", "oldPhase", mcpRegistry.Status.Phase, "newPhase", derivedPhase)
}

if mcpRegistry.Status.Message != derivedMessage {
statusCollector.SetMessage(derivedMessage)
ctxLogger.Info("Updated overall message", "message", derivedMessage)
}
}

// deriveMessage creates an appropriate message based on the overall phase and registry state
func (*MCPRegistryReconciler) deriveMessage(phase mcpv1alpha1.MCPRegistryPhase, mcpRegistry *mcpv1alpha1.MCPRegistry) string {
switch phase {
case mcpv1alpha1.MCPRegistryPhasePending:
if mcpRegistry.Status.SyncStatus != nil && mcpRegistry.Status.SyncStatus.Phase == mcpv1alpha1.SyncPhaseComplete {
return "Registry data synced, API deployment in progress"
}
return "Registry initialization in progress"
case mcpv1alpha1.MCPRegistryPhaseReady:
return "Registry is ready and API is serving requests"
case mcpv1alpha1.MCPRegistryPhaseFailed:
// Return more specific error message if available
if mcpRegistry.Status.SyncStatus != nil && mcpRegistry.Status.SyncStatus.Phase == mcpv1alpha1.SyncPhaseFailed {
return fmt.Sprintf("Sync failed: %s", mcpRegistry.Status.SyncStatus.Message)
}
if mcpRegistry.Status.APIStatus != nil && mcpRegistry.Status.APIStatus.Phase == mcpv1alpha1.APIPhaseError {
return fmt.Sprintf("API deployment failed: %s", mcpRegistry.Status.APIStatus.Message)
}
return "Registry operation failed"
case mcpv1alpha1.MCPRegistryPhaseSyncing:
return "Registry data synchronization in progress"
case mcpv1alpha1.MCPRegistryPhaseTerminating:
return "Registry is being terminated"
default:
return "Registry status unknown"
}
statusManager.SetOverallStatus(derivedPhase, derivedMessage)
ctxLogger.Info("Updated overall status",
"oldPhase", mcpRegistry.Status.Phase,
"newPhase", derivedPhase,
"oldMessage", mcpRegistry.Status.Message,
"newMessage", derivedMessage)
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
87 changes: 79 additions & 8 deletions cmd/thv-operator/pkg/mcpregistrystatus/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// StatusCollector collects status changes during reconciliation
// and applies them in a single batch update at the end.
// It implements the Collector interface.
// It implements the StatusManager interface.
type StatusCollector struct {
mcpRegistry *mcpv1alpha1.MCPRegistry
hasChanges bool
Expand All @@ -24,14 +24,36 @@ type StatusCollector struct {
syncStatus *mcpv1alpha1.SyncStatus
apiStatus *mcpv1alpha1.APIStatus
conditions map[string]metav1.Condition

// Component collectors
syncCollector *syncStatusCollector
apiCollector *apiStatusCollector
}

// syncStatusCollector implements SyncStatusCollector
type syncStatusCollector struct {
parent *StatusCollector
}

// apiStatusCollector implements APIStatusCollector
type apiStatusCollector struct {
parent *StatusCollector
}

// NewStatusManager creates a new StatusManager for the given MCPRegistry resource.
func NewStatusManager(mcpRegistry *mcpv1alpha1.MCPRegistry) StatusManager {
return newStatusCollector(mcpRegistry)
}

// NewCollector creates a new status update collector for the given MCPRegistry resource.
func NewCollector(mcpRegistry *mcpv1alpha1.MCPRegistry) Collector {
return &StatusCollector{
// newStatusCollector creates the internal StatusCollector implementation
func newStatusCollector(mcpRegistry *mcpv1alpha1.MCPRegistry) *StatusCollector {
collector := &StatusCollector{
mcpRegistry: mcpRegistry,
conditions: make(map[string]metav1.Condition),
}
collector.syncCollector = &syncStatusCollector{parent: collector}
collector.apiCollector = &apiStatusCollector{parent: collector}
return collector
}

// SetPhase sets the phase to be updated.
Expand All @@ -46,17 +68,22 @@ func (s *StatusCollector) SetMessage(message string) {
s.hasChanges = true
}

// SetAPIReadyCondition adds or updates the API ready condition.
func (s *StatusCollector) SetAPIReadyCondition(reason, message string, status metav1.ConditionStatus) {
s.conditions[mcpv1alpha1.ConditionAPIReady] = metav1.Condition{
Type: mcpv1alpha1.ConditionAPIReady,
// SetCondition sets a general condition with the specified type, reason, message, and status
func (s *StatusCollector) SetCondition(conditionType, reason, message string, status metav1.ConditionStatus) {
s.conditions[conditionType] = metav1.Condition{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not strictly cause by your patch but since we have multiple methods writing into s.conditions do you think we should use a SyncMap or similar?

Type: conditionType,
Status: status,
Reason: reason,
Message: message,
}
s.hasChanges = true
}

// SetAPIReadyCondition adds or updates the API ready condition.
func (s *StatusCollector) SetAPIReadyCondition(reason, message string, status metav1.ConditionStatus) {
s.SetCondition(mcpv1alpha1.ConditionAPIReady, reason, message, status)
}

// SetSyncStatus sets the detailed sync status.
func (s *StatusCollector) SetSyncStatus(
phase mcpv1alpha1.SyncPhase, message string, attemptCount int,
Expand Down Expand Up @@ -148,3 +175,47 @@ func (s *StatusCollector) Apply(ctx context.Context, k8sClient client.Client) er

return nil
}

// StatusManager interface methods

// Sync returns the sync status collector
func (s *StatusCollector) Sync() SyncStatusCollector {
return s.syncCollector
}

// API returns the API status collector
func (s *StatusCollector) API() APIStatusCollector {
return s.apiCollector
}

// SetOverallStatus sets the overall phase and message explicitly (for special cases)
func (s *StatusCollector) SetOverallStatus(phase mcpv1alpha1.MCPRegistryPhase, message string) {
s.SetPhase(phase)
s.SetMessage(message)
}

// SyncStatusCollector implementation

// SetSyncCondition sets a sync-related condition
func (sc *syncStatusCollector) SetSyncCondition(condition metav1.Condition) {
sc.parent.conditions[condition.Type] = condition
sc.parent.hasChanges = true
}

// SetSyncStatus delegates to the parent's SetSyncStatus method
func (sc *syncStatusCollector) SetSyncStatus(phase mcpv1alpha1.SyncPhase, message string, attemptCount int,
lastSyncTime *metav1.Time, lastSyncHash string, serverCount int) {
sc.parent.SetSyncStatus(phase, message, attemptCount, lastSyncTime, lastSyncHash, serverCount)
}

// APIStatusCollector implementation

// SetAPIStatus delegates to the parent's SetAPIStatus method
func (ac *apiStatusCollector) SetAPIStatus(phase mcpv1alpha1.APIPhase, message string, endpoint string) {
ac.parent.SetAPIStatus(phase, message, endpoint)
}

// SetAPIReadyCondition delegates to the parent's SetAPIReadyCondition method
func (ac *apiStatusCollector) SetAPIReadyCondition(reason, message string, status metav1.ConditionStatus) {
ac.parent.SetAPIReadyCondition(reason, message, status)
}
Loading
Loading