From 30a01c1390e94d64c681e23819a909972b8f044e Mon Sep 17 00:00:00 2001 From: Duarte Martinho Date: Tue, 11 Nov 2025 14:33:35 +0000 Subject: [PATCH 1/2] Update all rest package and all endpoints Signed-off-by: Duarte Martinho --- pkg/rest/api_brokers.go | 58 ++++ pkg/rest/api_cluster_name.go | 58 ++++ pkg/rest/api_clusters.go | 82 +++++ pkg/rest/api_metadata.go | 36 +++ pkg/rest/api_metrics.go | 172 ++++++++++ pkg/rest/api_status.go | 35 ++ pkg/rest/api_topics.go | 116 +++++++ pkg/rest/server.go | 601 ++++------------------------------- pkg/session/session.go | 31 ++ 9 files changed, 643 insertions(+), 546 deletions(-) create mode 100644 pkg/rest/api_brokers.go create mode 100644 pkg/rest/api_cluster_name.go create mode 100644 pkg/rest/api_clusters.go create mode 100644 pkg/rest/api_metadata.go create mode 100644 pkg/rest/api_metrics.go create mode 100644 pkg/rest/api_status.go create mode 100644 pkg/rest/api_topics.go diff --git a/pkg/rest/api_brokers.go b/pkg/rest/api_brokers.go new file mode 100644 index 0000000..148a45b --- /dev/null +++ b/pkg/rest/api_brokers.go @@ -0,0 +1,58 @@ +package rest + +import ( + "fmt" + "net/http" + + "github.com/IBM/openkommander/pkg/logger" + "github.com/IBM/openkommander/pkg/session" +) + +func HandleCreateBroker(w http.ResponseWriter, r *http.Request) { + _ = r // Not used in this stub implementation + response := Response{ + Status: "ok", + Message: "Broker creation is not implemented yet", + } + + SendJSON(w, http.StatusNotImplemented, response) +} + +func HandleGetBrokers(w http.ResponseWriter, r *http.Request) { + currentSession := session.GetCurrentSession() + client, err := currentSession.GetClient() + if err != nil { + logger.Error("Failed to create Kafka client for brokers operation", "cluster", session.GetActiveClusterName(), "error", err) + SendError(w, "Failed to create Kafka client", err) + return + } + if client == nil { + logger.Error("Client creation failed for brokers operation", "cluster", session.GetActiveClusterName()) + SendError(w, "Failed to create Kafka client", fmt.Errorf("client creation failed")) + return + } + + brokers := client.Brokers() + brokerList := make([]map[string]interface{}, 0) + + for _, brokerInfo := range brokers { + connected, err := brokerInfo.Connected() + if err != nil { + connected = false + } + + tlsState, _ := brokerInfo.TLSConnectionState() + + brokerData := map[string]interface{}{ + "id": brokerInfo.ID(), + "addr": brokerInfo.Addr(), + "connected": connected, + "rack": brokerInfo.Rack(), + "state": tlsState, + } + brokerList = append(brokerList, brokerData) + } + + logger.Info("Successfully retrieved brokers", "cluster", session.GetActiveClusterName(), "broker_count", len(brokerList)) + SendJSON(w, http.StatusOK, Response{Status: "ok", Data: brokerList}) +} diff --git a/pkg/rest/api_cluster_name.go b/pkg/rest/api_cluster_name.go new file mode 100644 index 0000000..6ad8a98 --- /dev/null +++ b/pkg/rest/api_cluster_name.go @@ -0,0 +1,58 @@ +package rest + +import ( + "fmt" + "net/http" + + "github.com/IBM/openkommander/pkg/logger" + "github.com/IBM/openkommander/pkg/session" +) + +func HandleGetClusterByName(w http.ResponseWriter, r *http.Request, clusterName string) { + err := session.InitAPI() + if err != nil { + logger.Error("Failed to initialize session for cluster by name", "error", err) + SendError(w, "Failed to initialize session", err) + return + } + + cluster := session.GetClusterByName(clusterName) + if cluster == nil { + logger.Warn("Cluster not found", "cluster_name", clusterName) + SendError(w, "Cluster not found", fmt.Errorf("cluster '%s' not found", clusterName)) + return + } + logger.Info("Successfully retrieved cluster details", "cluster_name", clusterName) + SendJSON(w, http.StatusOK, Response{Status: "ok", Data: cluster}) +} + +func HandleDeleteClusterByName(w http.ResponseWriter, r *http.Request, clusterName string) { + err := session.InitAPI() + if err != nil { + logger.Error("Failed to initialize session for cluster deletion", "error", err) + SendError(w, "Failed to initialize session", err) + return + } + + success := session.Logout(clusterName) + if !success { + logger.Warn("Failed to logout from cluster", "cluster_name", clusterName) + SendError(w, "Failed to logout from cluster", fmt.Errorf("failed to logout from cluster '%s'", clusterName)) + return + } + logger.Info("Successfully logged out from cluster", "cluster_name", clusterName) + SendJSON(w, http.StatusOK, Response{Status: "ok", Message: fmt.Sprintf("Logged out from cluster '%s' successfully", clusterName)}) +} + +func HandleSelectCluster(w http.ResponseWriter, r *http.Request, clusterName string) { + err := session.InitAPI() + if err != nil { + logger.Error("Failed to initialize session for select cluster", "error", err) + SendError(w, "Failed to initialize session", err) + return + } + + session.SelectCluster(clusterName) + logger.Info("Successfully set active cluster", "cluster_name", clusterName) + SendJSON(w, http.StatusOK, Response{Status: "ok", Message: fmt.Sprintf("Active cluster set to '%s'", clusterName)}) +} diff --git a/pkg/rest/api_clusters.go b/pkg/rest/api_clusters.go new file mode 100644 index 0000000..695dc15 --- /dev/null +++ b/pkg/rest/api_clusters.go @@ -0,0 +1,82 @@ +package rest + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/IBM/openkommander/pkg/constants" + "github.com/IBM/openkommander/pkg/logger" + "github.com/IBM/openkommander/pkg/session" +) + +func HandleListClusters(w http.ResponseWriter, r *http.Request) { + clusters := session.GetClusterConnections() + activeCluster := session.GetActiveClusterName() + + if len(clusters) == 0 { + // Return an empty slice of maps when there are no clusters + SendJSON(w, http.StatusOK, Response{Status: "ok", Data: []map[string]interface{}{}}) + return + } + + rows := make([]map[string]interface{}, 0, len(clusters)) + + for i, cluster := range clusters { + status := "Disconnected" + if cluster.IsAuthenticated { + status = "Connected" + } + + active := "No" + if cluster.Name == activeCluster { + active = "Yes" + } + + rows = append(rows, map[string]interface{}{ + "id": i + 1, + "name": cluster.Name, + "brokers": cluster.Brokers, + "status": status, + "active": active, + }) + } + + logger.Info("Successfully retrieved clusters", "cluster_count", len(clusters)) + SendJSON(w, http.StatusOK, Response{Status: "ok", Data: rows}) +} + +func HandleLoginCluster(w http.ResponseWriter, r *http.Request) { + var req struct { + Name string `json:"name"` + Broker string `json:"broker"` + Version string `json:"version"` + } + + err := json.NewDecoder(r.Body).Decode(&req) + + if err != nil { + logger.Error("Invalid request body for cluster login", "error", err) + SendError(w, "Invalid request body", err) + return + } + + err = session.InitAPI() + if err != nil { + logger.Error("Failed to initialize session for cluster login", "error", err) + SendError(w, "Failed to initialize session", err) + return + } + + brokers := []string{req.Broker} + + success, message := session.LoginWithParams(brokers, constants.KafkaVersion, req.Name) + if !success { + logger.Error("Failed to login to cluster", "cluster_name", req.Name, "message", message) + SendError(w, "Failed to login to cluster: "+message, nil) + return + } + + logger.Info("Successfully logged in to cluster", "cluster_name", req.Name) + SendJSON(w, http.StatusOK, Response{Status: "ok", Message: fmt.Sprintf("Logged in to cluster '%s' successfully", req.Name)}) +} diff --git a/pkg/rest/api_metadata.go b/pkg/rest/api_metadata.go new file mode 100644 index 0000000..8516478 --- /dev/null +++ b/pkg/rest/api_metadata.go @@ -0,0 +1,36 @@ +package rest + +import ( + "fmt" + "net/http" + + "github.com/IBM/openkommander/internal/core/commands" + "github.com/IBM/openkommander/pkg/logger" + "github.com/IBM/openkommander/pkg/session" +) + +func HandleClusterMetadata(w http.ResponseWriter, r *http.Request) { + currentSession := session.GetCurrentSession() + client, err := currentSession.GetClient() + + if err != nil { + logger.Error("Failed to create Kafka client for cluster metadata operation", "cluster", session.GetActiveClusterName(), "error", err) + SendError(w, "Failed to create Kafka client", err) + return + } + if client == nil { + logger.Error("Client creation failed for cluster metadata operation", "cluster", session.GetActiveClusterName()) + SendError(w, "Failed to create Kafka client", fmt.Errorf("client creation failed")) + return + } + + metadata, failure := commands.GetClusterMetadata() + if failure != nil { + logger.Error("Failed to get cluster metadata", "cluster", session.GetActiveClusterName(), "error", failure.Err) + SendError(w, "Failed to get cluster metadata", failure.Err) + return + } + + logger.Info("Successfully retrieved cluster metadata", "cluster", session.GetActiveClusterName()) + SendJSON(w, http.StatusOK, Response{Status: "ok", Data: metadata}) +} diff --git a/pkg/rest/api_metrics.go b/pkg/rest/api_metrics.go new file mode 100644 index 0000000..3c6a3d8 --- /dev/null +++ b/pkg/rest/api_metrics.go @@ -0,0 +1,172 @@ +package rest + +import ( + "fmt" + "net/http" + "slices" + "time" + + "github.com/IBM/openkommander/pkg/logger" + "github.com/IBM/openkommander/pkg/session" + "github.com/IBM/sarama" +) + +// MessagesLastMinute holds the count of produced and consumed messages in the last minute and last second +type MessagesLastMinute struct { + Topic string `json:"topic"` + ProducedCount int64 `json:"produced_count"` + ConsumedCount int64 `json:"consumed_count"` + ProducedPerSec int64 `json:"produced_per_sec"` + ConsumedPerSec int64 `json:"consumed_per_sec"` +} + +// OffsetHistoryEntry stores offset and timestamp +type OffsetHistoryEntry struct { + Offset int64 + Timestamp time.Time +} + +// In-memory rolling window for offsets (not production safe) +var producedHistory = make(map[string][]OffsetHistoryEntry) +var consumedHistory = make(map[string][]OffsetHistoryEntry) + +// HandleMessagesPerMinute handles requests for message metrics +func HandleMessagesPerMinute(w http.ResponseWriter, r *http.Request) { + currentSession := session.GetCurrentSession() + admin, err := currentSession.GetAdminClient() + if err != nil { + logger.Error("Failed to create admin client for messages per minute", "cluster", session.GetActiveClusterName(), "error", err) + SendError(w, "Failed to create admin client", err) + return + } + if admin == nil { + logger.Error("Admin client creation failed for messages per minute", "cluster", session.GetActiveClusterName()) + SendError(w, "Failed to create admin client", fmt.Errorf("admin client creation failed")) + return + } + + kafkaClient, err := currentSession.GetClient() + if err != nil { + logger.Error("Failed to get Kafka client for messages per minute", "cluster", session.GetActiveClusterName(), "error", err) + SendError(w, "Failed to get Kafka client", err) + return + } + if kafkaClient == nil { + logger.Error("Kafka client is nil for messages per minute", "cluster", session.GetActiveClusterName()) + SendError(w, "Kafka client not available", fmt.Errorf("kafka client is nil")) + return + } + + topics, err := admin.ListTopics() + + if err != nil { + logger.Error("Failed to list topics for messages per minute", "cluster", session.GetActiveClusterName(), "error", err) + SendError(w, "Failed to list topics", err) + return + } + + logger.Debug("Processing message metrics", "cluster", session.GetActiveClusterName(), "topic_count", len(topics)) + + // Calculate produced and consumed message counts in the last minute for each topic + counts := []MessagesLastMinute{} + now := time.Now() + totalAllProduced := int64(0) + totalAllConsumed := int64(0) + totalAllProducedSec := int64(0) + totalAllConsumedSec := int64(0) + + for name := range topics { // Produced: sum latest offsets across all partitions + partitions, err := kafkaClient.Partitions(name) + if err != nil { + logger.Warn("Failed to get partitions for topic", "cluster", session.GetActiveClusterName(), "topic", name, "error", err) + } + var totalProduced int64 = 0 + for _, partition := range partitions { + offset, err := kafkaClient.GetOffset(name, partition, sarama.OffsetNewest) + if err == nil { + totalProduced += offset + } + } + // Consumed: sum committed offsets for all consumer groups + var totalConsumed int64 = 0 + groups, err := admin.ListConsumerGroups() + if err != nil { + logger.Warn("Failed to list consumer groups", "cluster", session.GetActiveClusterName(), "error", err) + } + + for group := range groups { + offsets, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{name: partitions}) + if err == nil && offsets.Blocks != nil { + for _, partition := range partitions { + block := offsets.GetBlock(name, partition) + if block != nil && block.Offset > 0 { + totalConsumed += block.Offset + } + } + } + } // Store offset history for rolling window + producedHistory[name] = append(producedHistory[name], OffsetHistoryEntry{Offset: totalProduced, Timestamp: now}) + consumedHistory[name] = append(consumedHistory[name], OffsetHistoryEntry{Offset: totalConsumed, Timestamp: now}) + + // Remove entries older than 1 minute using slices.DeleteFunc + pruneMinute := now.Add(-1 * time.Minute) + producedHistory[name] = slices.DeleteFunc(producedHistory[name], func(entry OffsetHistoryEntry) bool { + return entry.Timestamp.Before(pruneMinute) + }) + consumedHistory[name] = slices.DeleteFunc(consumedHistory[name], func(entry OffsetHistoryEntry) bool { + return entry.Timestamp.Before(pruneMinute) + }) + + // Create filtered copies for second-based calculations + pruneSecond := now.Add(-1 * time.Second) + producedHistorySec := slices.DeleteFunc(slices.Clone(producedHistory[name]), func(entry OffsetHistoryEntry) bool { + return entry.Timestamp.Before(pruneSecond) + }) + consumedHistorySec := slices.DeleteFunc(slices.Clone(consumedHistory[name]), func(entry OffsetHistoryEntry) bool { + return entry.Timestamp.Before(pruneSecond) + }) + + // Calculate produced/consumed in last minute + producedCount := int64(0) + consumedCount := int64(0) + if len(producedHistory[name]) > 1 { + producedCount = producedHistory[name][len(producedHistory[name])-1].Offset - producedHistory[name][0].Offset + } + if len(consumedHistory[name]) > 1 { + consumedCount = consumedHistory[name][len(consumedHistory[name])-1].Offset - consumedHistory[name][0].Offset + } + + // Calculate produced/consumed in last second + producedPerSec := int64(0) + consumedPerSec := int64(0) + if len(producedHistorySec) > 1 { + producedPerSec = producedHistorySec[len(producedHistorySec)-1].Offset - producedHistorySec[0].Offset + } + if len(consumedHistorySec) > 1 { + consumedPerSec = consumedHistorySec[len(consumedHistorySec)-1].Offset - consumedHistorySec[0].Offset + } + + totalAllProduced += producedCount + totalAllConsumed += consumedCount + totalAllProducedSec += producedPerSec + totalAllConsumedSec += consumedPerSec + + counts = append(counts, MessagesLastMinute{ + Topic: name, + ProducedCount: producedCount, + ConsumedCount: consumedCount, + ProducedPerSec: producedPerSec, + ConsumedPerSec: consumedPerSec, + }) + } + + counts = append(counts, MessagesLastMinute{ + Topic: "total", + ProducedCount: totalAllProduced, + ConsumedCount: totalAllConsumed, + ProducedPerSec: totalAllProducedSec, + ConsumedPerSec: totalAllConsumedSec, + }) + logger.Info("Successfully calculated message metrics", "cluster", session.GetActiveClusterName(), "total_topics", len(counts)-1, "total_produced", totalAllProduced, "total_consumed", totalAllConsumed) + SendJSON(w, http.StatusOK, Response{Status: "ok", Data: counts}) +} diff --git a/pkg/rest/api_status.go b/pkg/rest/api_status.go new file mode 100644 index 0000000..8a18616 --- /dev/null +++ b/pkg/rest/api_status.go @@ -0,0 +1,35 @@ +package rest + +import ( + "net/http" + "time" + + "github.com/IBM/openkommander/pkg/logger" + "github.com/IBM/openkommander/pkg/session" +) + +func HandleStatus(w http.ResponseWriter, r *http.Request, startTime time.Time) { + clusters := session.GetClusterConnections() + kafkaStatus := "disconnected" + for _, cluster := range clusters { + if cluster.IsAuthenticated { + kafkaStatus = "connected" + break + } + } + + uptime := time.Since(startTime).Seconds() + logger.Info("Status check completed", "kafka_status", kafkaStatus, "clusters_count", len(clusters), "uptime_seconds", uptime) + + response := Response{ + Status: "ok", + Message: "OpenKommander REST API is running", + Data: map[string]interface{}{ + "status": "running", + "kafka_status": kafkaStatus, + "clusters_count": len(clusters), + "uptime_seconds": uptime, + }, + } + SendJSON(w, http.StatusOK, response) +} diff --git a/pkg/rest/api_topics.go b/pkg/rest/api_topics.go new file mode 100644 index 0000000..bde8469 --- /dev/null +++ b/pkg/rest/api_topics.go @@ -0,0 +1,116 @@ +package rest + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/IBM/openkommander/pkg/logger" + "github.com/IBM/openkommander/pkg/session" + "github.com/IBM/sarama" +) + +func ListTopics(w http.ResponseWriter, r *http.Request) { + currentSession := session.GetCurrentSession() + admin, err := currentSession.GetAdminClient() + + if err != nil { + logger.Error("Failed to create admin client for listing topics", "cluster", session.GetActiveClusterName(), "error", err) + SendError(w, "Failed to create admin client", err) + return + } + + topics, err := admin.ListTopics() + + if err != nil { + logger.Error("Failed to list topics from Kafka", "cluster", session.GetActiveClusterName(), "error", err) + SendError(w, "Failed to list topics", err) + return + } + + logger.Debug("Successfully retrieved topics", "cluster", session.GetActiveClusterName(), "topic_count", len(topics)) + + topicList := make([]map[string]interface{}, 0, len(topics)) + for name, details := range topics { + replicas := int(details.NumPartitions) * int(details.ReplicationFactor) + inSyncReplicas := replicas + topicList = append(topicList, map[string]interface{}{ + "name": name, + "partitions": details.NumPartitions, + "replication_factor": details.ReplicationFactor, + "replicas": replicas, + "in_sync_replicas": inSyncReplicas, + }) + } + + SendJSON(w, http.StatusOK, Response{Status: "ok", Data: topicList}) +} + +func CreateTopic(w http.ResponseWriter, r *http.Request) { + currentSession := session.GetCurrentSession() + admin, err := currentSession.GetAdminClient() + + if err != nil { + logger.Error("Failed to create admin client for topic creation", "cluster", session.GetActiveClusterName(), "error", err) + SendError(w, "Failed to create admin client", err) + return + } + + var req TopicRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + logger.Error("Invalid request body for topic creation", "cluster", session.GetActiveClusterName(), "error", err) + SendError(w, "Invalid request body", err) + return + } + + logger.Info("Topic creation request details", + "cluster", session.GetActiveClusterName(), + "topic_name", req.Name, + "partitions", req.Partitions, + "replication_factor", req.ReplicationFactor) + + err = admin.CreateTopic(req.Name, &sarama.TopicDetail{ + NumPartitions: req.Partitions, + ReplicationFactor: req.ReplicationFactor, + }, false) + + if err != nil { + logger.Error("Failed to create topic in Kafka", "cluster", session.GetActiveClusterName(), "topic_name", req.Name, "error", err) + SendError(w, "Failed to create topic", err) + return + } + + logger.Info("Topic created successfully", "cluster", session.GetActiveClusterName(), "topic_name", req.Name, "partitions", req.Partitions, "replication_factor", req.ReplicationFactor) + SendJSON(w, http.StatusCreated, Response{Status: "ok", Message: fmt.Sprintf("Topic '%s' created successfully", req.Name)}) +} + +func DeleteTopic(w http.ResponseWriter, r *http.Request) { + currentSession := session.GetCurrentSession() + admin, err := currentSession.GetAdminClient() + + if err != nil { + logger.Error("Failed to create admin client for topic deletion", "cluster", session.GetActiveClusterName(), "error", err) + SendError(w, "Failed to create admin client", err) + return + } + + topicName := r.PathValue("name") + + if topicName == "" { + logger.Warn("Topic name is required for deletion", "cluster", session.GetActiveClusterName()) + SendError(w, "Topic name is required", nil) + return + } + + logger.Info("Topic deletion request details", "cluster", session.GetActiveClusterName(), "topic_name", topicName) + + err = admin.DeleteTopic(topicName) + if err != nil { + logger.Error("Failed to delete topic from Kafka", "cluster", session.GetActiveClusterName(), "topic_name", topicName, "error", err) + SendError(w, "Failed to delete topic", err) + return + } + + logger.Info("Topic deleted successfully", "cluster", session.GetActiveClusterName(), "topic_name", topicName) + SendJSON(w, http.StatusOK, Response{Status: "ok", Message: fmt.Sprintf("Topic '%s' deleted successfully", topicName)}) +} diff --git a/pkg/rest/server.go b/pkg/rest/server.go index 7bc30dd..7b94e40 100644 --- a/pkg/rest/server.go +++ b/pkg/rest/server.go @@ -2,17 +2,13 @@ package rest import ( "context" - "encoding/json" - "fmt" "net/http" "os" "os/signal" - "slices" "strings" "syscall" "time" - "github.com/IBM/openkommander/internal/core/commands" "github.com/IBM/openkommander/pkg/constants" "github.com/IBM/openkommander/pkg/logger" "github.com/IBM/sarama" @@ -30,18 +26,6 @@ type Server struct { startTime time.Time } -type Response struct { - Status string `json:"status"` - Message string `json:"message,omitempty"` - Data interface{} `json:"data,omitempty"` -} - -type TopicRequest struct { - Name string `json:"name"` - Partitions int32 `json:"partitions"` - ReplicationFactor int16 `json:"replication_factor"` -} - func LoggingMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() @@ -136,26 +120,29 @@ func NewServer(port string) (*Server, error) { router := http.NewServeMux() + // Status endpoint for API supports GET only + router.HandleFunc("/api/v1/status", wrapWithLogging(s.handleStatus)) + // Topics endpoint supports GET, POST, DELETE - router.HandleFunc("/api/v1/{broker}/topics", wrapWithLogging(s.handleTopics)) + router.HandleFunc("/api/v1/topics", wrapWithLogging(s.handleTopics)) + router.HandleFunc("/api/v1/topics/{name}", wrapWithLogging(s.handleTopics)) // Brokers endpoint supports GET, POST - router.HandleFunc("/api/v1/{broker}/brokers", wrapWithLogging(s.handleBrokers)) + router.HandleFunc("/api/v1/brokers", wrapWithLogging(s.handleBrokers)) // Metrics/messages/minute endpoint supports GET only - router.HandleFunc("/api/v1/{broker}/metrics/messages/minute", wrapWithLogging(s.handleMessagesPerMinute)) - - // Status endpoint supports GET only - router.HandleFunc("/api/v1/{broker}/status", wrapWithLogging(s.handleStatus)) + router.HandleFunc("/api/v1/metrics/messages/minute", wrapWithLogging(s.handleMessagesPerMinute)) - // Health endpoint supports GET only - router.HandleFunc("/api/v1/{broker}/health", wrapWithLogging(s.handleHealth)) + // // Health endpoint supports GET only + // router.HandleFunc("/api/v1/{broker}/health", wrapWithLogging(s.handleHealth)) // Clusters endpoint supports GET only router.HandleFunc("/api/v1/clusters", wrapWithLogging(s.handleClusters)) + router.HandleFunc("/api/v1/cluster/{name}", wrapWithLogging(s.handleClusterByName)) + // Cluster metadata endpoint supports GET only - router.HandleFunc("/api/v1/clusters/{clusterId}/metadata", wrapWithLogging(s.handleClusterMetadata)) + router.HandleFunc("/api/v1/cluster/metadata", wrapWithLogging(s.handleClusterMetadata)) frontendDir := constants.OpenKommanderFolder + "/frontend" fileServer := http.FileServer(http.Dir(frontendDir)) @@ -251,577 +238,99 @@ func StartRESTServer(port string) { } } -func createNewClient(w http.ResponseWriter, r *http.Request, s *Server) (status bool, err error) { - broker := r.PathValue("broker") - - logger.Kafka("Creating new Kafka client", broker, "connect", "client_addr", r.RemoteAddr) - - if broker == "" { - logger.Warn("Broker not specified in request", "url", r.URL.String()) - return false, fmt.Errorf("broker not specified") - } - config := sarama.NewConfig() - config.Version = constants.SaramaKafkaVersion - client, err := sarama.NewClient([]string{broker}, config) - if err != nil { - logger.Error("Failed to create Kafka client", "broker", broker, "error", err) - return false, fmt.Errorf("failed to create Kafka client: %w", err) - } - - s.kafkaClient = client - logger.Kafka("Successfully created Kafka client", broker, "connect") - return true, nil -} - func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - w.WriteHeader(http.StatusMethodNotAllowed) - sendJSON(w, http.StatusMethodNotAllowed, Response{ - Status: "error", - Message: fmt.Sprintf("Method %s not allowed", r.Method), - }) - return - } - - broker := r.PathValue("broker") - - status, err := createNewClient(w, r, s) - if err != nil { - logger.Error("Failed to create Kafka client for status check", "broker", broker, "error", err) - sendError(w, "Failed to create Kafka client", nil) - return - } - if !status { - logger.Error("Client creation failed for status check", "broker", broker) - sendError(w, "Failed to create Kafka client", fmt.Errorf("client creation failed")) - return - } - - if s.kafkaClient == nil { - sendError(w, "Kafka client not initialized", fmt.Errorf("kafka client is nil")) + if !enforceMethod(w, r, []string{http.MethodGet}) { return } - brokers := s.kafkaClient.Brokers() - kafkaStatus := "disconnected" - if len(brokers) > 0 { - kafkaStatus = "connected" - } - uptime := time.Since(s.startTime).Seconds() - logger.Info("Status check completed", "broker", broker, "kafka_status", kafkaStatus, "brokers_count", len(brokers), "uptime_seconds", uptime) - - response := Response{ - Status: "ok", - Message: "OpenKommander REST API is running", - Data: map[string]interface{}{ - "kafka_status": kafkaStatus, - "brokers_count": len(brokers), - "uptime_seconds": uptime, - }, + switch r.Method { + case http.MethodGet: + HandleStatus(w, r, s.startTime) } - sendJSON(w, http.StatusOK, response) } func (s *Server) handleTopics(w http.ResponseWriter, r *http.Request) { - broker := r.PathValue("broker") - - status, err := createNewClient(w, r, s) - if err != nil { - logger.Error("Failed to create Kafka client for topics operation", "broker", broker, "method", r.Method, "error", err) - sendError(w, "Failed to create Kafka client", err) - return - } - if !status { - logger.Error("Client creation failed for topics operation", "broker", broker, "method", r.Method) - sendError(w, "Failed to create Kafka client", fmt.Errorf("client creation failed")) - return - } - - if s.kafkaClient == nil { - logger.Error("Kafka client not initialized for topics operation", "broker", broker) - sendError(w, "Kafka client not initialized", fmt.Errorf("kafka client is nil")) + if !enforceMethod(w, r, []string{http.MethodGet, http.MethodPost, http.MethodDelete}) { return } switch r.Method { case http.MethodGet: - s.listTopics(w, r) + ListTopics(w, r) case http.MethodPost: - s.createTopic(w, r) + CreateTopic(w, r) case http.MethodDelete: - s.deleteTopic(w, r) - default: - logger.Warn("Method not allowed for topics endpoint", "method", r.Method, "broker", broker) - w.WriteHeader(http.StatusMethodNotAllowed) - sendJSON(w, http.StatusMethodNotAllowed, Response{ - Status: "error", - Message: fmt.Sprintf("Method %s not allowed", r.Method), - }) + DeleteTopic(w, r) } } func (s *Server) handleBrokers(w http.ResponseWriter, r *http.Request) { - broker := r.PathValue("broker") - switch r.Method { - case http.MethodPost: - s.createBroker(w, r) - case http.MethodGet: - s.getBrokers(w, r) - default: - logger.Warn("Method not allowed for brokers endpoint", "method", r.Method, "broker", broker) - w.WriteHeader(http.StatusMethodNotAllowed) - sendJSON(w, http.StatusMethodNotAllowed, Response{ - Status: "error", - Message: fmt.Sprintf("Method %s not allowed", r.Method), - }) - } -} - -func (s *Server) createBroker(w http.ResponseWriter, r *http.Request) { - _ = r // Not used in this stub implementation - response := Response{ - Status: "ok", - Message: "Broker creation is not implemented yet", - } - - sendJSON(w, http.StatusNotImplemented, response) -} - -func (s *Server) getBrokers(w http.ResponseWriter, r *http.Request) { - broker := r.PathValue("broker") - - status, err := createNewClient(w, r, s) - if err != nil { - logger.Error("Failed to create Kafka client for brokers operation", "broker", broker, "error", err) - sendError(w, "Failed to create Kafka client", err) - return - } - if !status { - logger.Error("Client creation failed for brokers operation", "broker", broker) - sendError(w, "Failed to create Kafka client", fmt.Errorf("client creation failed")) - return - } - - if s.kafkaClient == nil { - logger.Error("Kafka client not initialized for brokers operation", "broker", broker) - sendError(w, "Kafka client not initialized", fmt.Errorf("kafka client is nil")) - return - } - - brokers := s.kafkaClient.Brokers() - brokerList := make([]map[string]interface{}, 0) - - for _, brokerInfo := range brokers { - connected, err := brokerInfo.Connected() - if err != nil { - connected = false - } - - tlsState, _ := brokerInfo.TLSConnectionState() - - brokerData := map[string]interface{}{ - "id": brokerInfo.ID(), - "addr": brokerInfo.Addr(), - "connected": connected, - "rack": brokerInfo.Rack(), - "state": tlsState, - } - brokerList = append(brokerList, brokerData) - } - - logger.Info("Successfully retrieved brokers", "broker", broker, "broker_count", len(brokerList)) - sendJSON(w, http.StatusOK, Response{Status: "ok", Data: brokerList}) -} - -func (s *Server) listTopics(w http.ResponseWriter, r *http.Request) { - broker := r.PathValue("broker") - admin, err := sarama.NewClusterAdminFromClient(s.kafkaClient) - - if err != nil { - logger.Error("Failed to create admin client for listing topics", "broker", broker, "error", err) - sendError(w, "Failed to create admin client", err) - return - } - defer func() { - if closeErr := admin.Close(); closeErr != nil { - logger.Warn("Failed to close admin client", "error", closeErr) - } - }() - - topics, err := admin.ListTopics() - - if err != nil { - logger.Error("Failed to list topics from Kafka", "broker", broker, "error", err) - sendError(w, "Failed to list topics", err) - return - } - - logger.Debug("Successfully retrieved topics", "broker", broker, "topic_count", len(topics)) - - topicList := make([]map[string]interface{}, 0, len(topics)) - for name, details := range topics { - replicas := int(details.NumPartitions) * int(details.ReplicationFactor) - inSyncReplicas := replicas - topicList = append(topicList, map[string]interface{}{ - "name": name, - "partitions": details.NumPartitions, - "replication_factor": details.ReplicationFactor, - "replicas": replicas, - "in_sync_replicas": inSyncReplicas, - }) - } - - sendJSON(w, http.StatusOK, Response{Status: "ok", Data: topicList}) -} - -func (s *Server) createTopic(w http.ResponseWriter, r *http.Request) { - broker := r.PathValue("broker") - - var req TopicRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - logger.Error("Invalid request body for topic creation", "broker", broker, "error", err) - sendError(w, "Invalid request body", err) - return - } - - logger.Info("Topic creation request details", - "broker", broker, - "topic_name", req.Name, - "partitions", req.Partitions, - "replication_factor", req.ReplicationFactor) - admin, err := sarama.NewClusterAdminFromClient(s.kafkaClient) - if err != nil { - logger.Error("Failed to create admin client for topic creation", "broker", broker, "topic_name", req.Name, "error", err) - sendError(w, "Failed to create admin client", err) - return - } - defer func() { - if closeErr := admin.Close(); closeErr != nil { - logger.Warn("Failed to close admin client", "error", closeErr) - } - }() - err = admin.CreateTopic(req.Name, &sarama.TopicDetail{ - NumPartitions: req.Partitions, - ReplicationFactor: req.ReplicationFactor, - }, false) - if err != nil { - logger.Error("Failed to create topic in Kafka", "broker", broker, "topic_name", req.Name, "error", err) - sendError(w, "Failed to create topic", err) - return - } - - logger.Info("Topic created successfully", "broker", broker, "topic_name", req.Name, "partitions", req.Partitions, "replication_factor", req.ReplicationFactor) - sendJSON(w, http.StatusCreated, Response{Status: "ok", Message: fmt.Sprintf("Topic '%s' created successfully", req.Name)}) -} - -func (s *Server) deleteTopic(w http.ResponseWriter, r *http.Request) { - broker := r.PathValue("broker") - - var req TopicRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - logger.Error("Invalid request body for topic deletion", "broker", broker, "error", err) - sendError(w, "Invalid request body", err) - return - } - topicName := req.Name - if topicName == "" { - logger.Warn("Topic name is required for deletion", "broker", broker) - sendError(w, "Topic name is required", nil) + if !enforceMethod(w, r, []string{http.MethodGet, http.MethodPost}) { return } - logger.Info("Topic deletion request details", "broker", broker, "topic_name", topicName) - admin, err := sarama.NewClusterAdminFromClient(s.kafkaClient) - if err != nil { - logger.Error("Failed to create admin client for topic deletion", "broker", broker, "topic_name", topicName, "error", err) - sendError(w, "Failed to create admin client", err) - return - } - defer func() { - if closeErr := admin.Close(); closeErr != nil { - logger.Warn("Failed to close admin client", "error", closeErr) - } - }() - - err = admin.DeleteTopic(topicName) - if err != nil { - logger.Error("Failed to delete topic from Kafka", "broker", broker, "topic_name", topicName, "error", err) - sendError(w, "Failed to delete topic", err) - return - } - - logger.Info("Topic deleted successfully", "broker", broker, "topic_name", topicName) - sendJSON(w, http.StatusOK, Response{Status: "ok", Message: fmt.Sprintf("Topic '%s' deleted successfully", topicName)}) -} - -func sendJSON(w http.ResponseWriter, status int, payload interface{}) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(status) - if err := json.NewEncoder(w).Encode(payload); err != nil { - logger.Error("Failed to encode JSON response", "error", err) + switch r.Method { + case http.MethodGet: + HandleGetBrokers(w, r) + case http.MethodPost: + HandleCreateBroker(w, r) } } -func sendError(w http.ResponseWriter, message string, err error) { - logger.Error(message, "error", err) - sendJSON(w, http.StatusInternalServerError, Response{ - Status: "error", - Message: fmt.Sprintf("%s: %v", message, err), - }) -} - -// MessagesLastMinute holds the count of produced and consumed messages in the last minute and last second -type MessagesLastMinute struct { - Topic string `json:"topic"` - ProducedCount int64 `json:"produced_count"` - ConsumedCount int64 `json:"consumed_count"` - ProducedPerSec int64 `json:"produced_per_sec"` - ConsumedPerSec int64 `json:"consumed_per_sec"` -} - -// OffsetHistoryEntry stores offset and timestamp -type OffsetHistoryEntry struct { - Offset int64 - Timestamp time.Time -} - -// In-memory rolling window for offsets (not production safe) -var producedHistory = make(map[string][]OffsetHistoryEntry) -var consumedHistory = make(map[string][]OffsetHistoryEntry) - -// In-memory store for last offsets and timestamps (for demo purposes, not production safe) - -// Handler for messages per minute func (s *Server) handleMessagesPerMinute(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - w.WriteHeader(http.StatusMethodNotAllowed) - sendJSON(w, http.StatusMethodNotAllowed, Response{ - Status: "error", - Message: fmt.Sprintf("Method %s not allowed", r.Method), - }) + if !enforceMethod(w, r, []string{http.MethodGet}) { return } - broker := r.PathValue("broker") - - status, err := createNewClient(w, r, s) - if err != nil { - logger.Error("Failed to create Kafka client for messages per minute", "broker", broker, "error", err) - sendError(w, "Failed to create Kafka client", err) - return - } - if !status { - logger.Error("Client creation failed for messages per minute", "broker", broker) - sendError(w, "Failed to create Kafka client", fmt.Errorf("client creation failed")) - return - } - - if s.kafkaClient == nil { - logger.Error("Kafka client not initialized for messages per minute", "broker", broker) - sendError(w, "Kafka client not initialized", fmt.Errorf("kafka client is nil")) - return - } - - admin, err := sarama.NewClusterAdminFromClient(s.kafkaClient) - if err != nil { - logger.Error("Failed to create admin client for messages per minute", "broker", broker, "error", err) - sendError(w, "Failed to create admin client", nil) - return + switch r.Method { + case http.MethodGet: + HandleMessagesPerMinute(w, r) } +} - defer func() { - if closeErr := admin.Close(); closeErr != nil { - logger.Warn("Failed to close admin client", "error", closeErr) - } - }() - - topics, err := admin.ListTopics() - - if err != nil { - logger.Error("Failed to list topics for messages per minute", "broker", broker, "error", err) - sendError(w, "Failed to list topics", nil) +func (s *Server) handleClusters(w http.ResponseWriter, r *http.Request) { + if !enforceMethod(w, r, []string{http.MethodGet, http.MethodPost}) { return } - logger.Debug("Processing message metrics", "broker", broker, "topic_count", len(topics)) - - // Calculate produced and consumed message counts in the last minute for each topic - counts := []MessagesLastMinute{} - now := time.Now() - totalAllProduced := int64(0) - totalAllConsumed := int64(0) - totalAllProducedSec := int64(0) - totalAllConsumedSec := int64(0) - - for name := range topics { // Produced: sum latest offsets across all partitions - partitions, err := s.kafkaClient.Partitions(name) - if err != nil { - logger.Warn("Failed to get partitions for topic", "broker", broker, "topic", name, "error", err) - } - var totalProduced int64 = 0 - for _, partition := range partitions { - offset, err := s.kafkaClient.GetOffset(name, partition, sarama.OffsetNewest) - if err == nil { - totalProduced += offset - } - } - // Consumed: sum committed offsets for all consumer groups - var totalConsumed int64 = 0 - groups, err := admin.ListConsumerGroups() - if err != nil { - logger.Warn("Failed to list consumer groups", "broker", broker, "error", err) - } - - for group := range groups { - offsets, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{name: partitions}) - if err == nil && offsets.Blocks != nil { - for _, partition := range partitions { - block := offsets.GetBlock(name, partition) - if block != nil && block.Offset > 0 { - totalConsumed += block.Offset - } - } - } - } // Store offset history for rolling window - producedHistory[name] = append(producedHistory[name], OffsetHistoryEntry{Offset: totalProduced, Timestamp: now}) - consumedHistory[name] = append(consumedHistory[name], OffsetHistoryEntry{Offset: totalConsumed, Timestamp: now}) - - // Remove entries older than 1 minute using slices.DeleteFunc - pruneMinute := now.Add(-1 * time.Minute) - producedHistory[name] = slices.DeleteFunc(producedHistory[name], func(entry OffsetHistoryEntry) bool { - return entry.Timestamp.Before(pruneMinute) - }) - consumedHistory[name] = slices.DeleteFunc(consumedHistory[name], func(entry OffsetHistoryEntry) bool { - return entry.Timestamp.Before(pruneMinute) - }) - - // Create filtered copies for second-based calculations - pruneSecond := now.Add(-1 * time.Second) - producedHistorySec := slices.DeleteFunc(slices.Clone(producedHistory[name]), func(entry OffsetHistoryEntry) bool { - return entry.Timestamp.Before(pruneSecond) - }) - consumedHistorySec := slices.DeleteFunc(slices.Clone(consumedHistory[name]), func(entry OffsetHistoryEntry) bool { - return entry.Timestamp.Before(pruneSecond) - }) - - // Calculate produced/consumed in last minute - producedCount := int64(0) - consumedCount := int64(0) - if len(producedHistory[name]) > 1 { - producedCount = producedHistory[name][len(producedHistory[name])-1].Offset - producedHistory[name][0].Offset - } - if len(consumedHistory[name]) > 1 { - consumedCount = consumedHistory[name][len(consumedHistory[name])-1].Offset - consumedHistory[name][0].Offset - } - - // Calculate produced/consumed in last second - producedPerSec := int64(0) - consumedPerSec := int64(0) - if len(producedHistorySec) > 1 { - producedPerSec = producedHistorySec[len(producedHistorySec)-1].Offset - producedHistorySec[0].Offset - } - if len(consumedHistorySec) > 1 { - consumedPerSec = consumedHistorySec[len(consumedHistorySec)-1].Offset - consumedHistorySec[0].Offset - } - - totalAllProduced += producedCount - totalAllConsumed += consumedCount - totalAllProducedSec += producedPerSec - totalAllConsumedSec += consumedPerSec - - counts = append(counts, MessagesLastMinute{ - Topic: name, - ProducedCount: producedCount, - ConsumedCount: consumedCount, - ProducedPerSec: producedPerSec, - ConsumedPerSec: consumedPerSec, - }) + switch r.Method { + case http.MethodGet: + HandleListClusters(w, r) + case http.MethodPost: + HandleLoginCluster(w, r) } - - counts = append(counts, MessagesLastMinute{ - Topic: "total", - ProducedCount: totalAllProduced, - ConsumedCount: totalAllConsumed, - ProducedPerSec: totalAllProducedSec, - ConsumedPerSec: totalAllConsumedSec, - }) - logger.Info("Successfully calculated message metrics", "broker", broker, "total_topics", len(counts)-1, "total_produced", totalAllProduced, "total_consumed", totalAllConsumed) - sendJSON(w, http.StatusOK, Response{Status: "ok", Data: counts}) } -func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - w.WriteHeader(http.StatusMethodNotAllowed) - sendJSON(w, http.StatusMethodNotAllowed, Response{ - Status: "error", - Message: fmt.Sprintf("Method %s not allowed", r.Method), - }) +func (s *Server) handleClusterByName(w http.ResponseWriter, r *http.Request) { + if !enforceMethod(w, r, []string{http.MethodGet, http.MethodPost, http.MethodDelete}) { return } - response := Response{ - Status: "ok", - Message: "Health check successful", - } - sendJSON(w, http.StatusOK, response) -} + clusterName := r.PathValue("name") -// Handler for clusters endpoint -func (s *Server) handleClusters(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - w.WriteHeader(http.StatusMethodNotAllowed) - sendJSON(w, http.StatusMethodNotAllowed, Response{ - Status: "error", - Message: fmt.Sprintf("Method %s not allowed", r.Method), - }) + if clusterName == "" { + logger.Warn("Cluster name is required in path", "path", r.URL.Path) + SendError(w, "Cluster name is required in path", nil) return } - // Use the command from internal/core/commands - clusters, failure := commands.ListClusters() - if failure != nil { - logger.Error("Failed to list clusters", "error", failure.Err) - sendError(w, "Failed to list clusters", failure.Err) - return + switch r.Method { + case http.MethodGet: + HandleGetClusterByName(w, r, clusterName) + case http.MethodPost: + HandleSelectCluster(w, r, clusterName) + case http.MethodDelete: + HandleDeleteClusterByName(w, r, clusterName) } - - logger.Info("Successfully retrieved clusters", "cluster_count", len(clusters)) - sendJSON(w, http.StatusOK, Response{Status: "ok", Data: clusters}) } -// Handler for cluster metadata endpoint func (s *Server) handleClusterMetadata(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - w.WriteHeader(http.StatusMethodNotAllowed) - sendJSON(w, http.StatusMethodNotAllowed, Response{ - Status: "error", - Message: fmt.Sprintf("Method %s not allowed", r.Method), - }) + if !enforceMethod(w, r, []string{http.MethodGet}) { return } - clusterId := r.PathValue("clusterId") - - status, err := createNewClient(w, r, s) - if err != nil { - logger.Error("Failed to create Kafka client for cluster metadata operation", "clusterId", clusterId, "error", err) - sendError(w, "Failed to create Kafka client", err) - return - } - if !status { - logger.Error("Client creation failed for cluster metadata operation", "clusterId", clusterId) - sendError(w, "Failed to create Kafka client", fmt.Errorf("client creation failed")) - return - } - - // Use the command from internal/core/commands - metadata, failure := commands.GetClusterMetadata() - if failure != nil { - logger.Error("Failed to get cluster metadata", "clusterId", clusterId, "error", failure.Err) - sendError(w, "Failed to get cluster metadata", failure.Err) - return + switch r.Method { + case http.MethodGet: + HandleClusterMetadata(w, r) } - - logger.Info("Successfully retrieved cluster metadata", "clusterId", clusterId) - sendJSON(w, http.StatusOK, Response{Status: "ok", Data: metadata}) } diff --git a/pkg/session/session.go b/pkg/session/session.go index 911d54a..05c2b83 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -105,6 +105,37 @@ func (s *session) Connect(ctx context.Context) (sarama.Client, error) { return client, nil } +func (s *session) ConnectAdmin(ctx context.Context) (sarama.ClusterAdmin, error) { + if s.adminClient != nil { + return s.adminClient, nil + } + + activeCluster := s.getActiveCluster() + if activeCluster == nil { + return nil, fmt.Errorf("no active cluster selected") + } + + version, err := sarama.ParseKafkaVersion(activeCluster.Version) + if err != nil { + return nil, fmt.Errorf("invalid kafka version: %w", err) + } + client, err := cluster.NewCluster(activeCluster.Brokers, version).Connect(ctx) + if err != nil { + return nil, fmt.Errorf("error connecting to cluster: %w", err) + } + adminClient, err := cluster.NewCluster(activeCluster.Brokers, version).ConnectAdmin(ctx) + if err != nil { + return nil, fmt.Errorf("error connecting to cluster as admin: %w", err) + } + s.client = client + s.adminClient = adminClient + index := s.getActiveClusterIndex() + if index >= 0 { + s.clusters[index].IsAuthenticated = true + } + return adminClient, nil +} + func (s *session) Disconnect() { if s.client != nil { if err := s.client.Close(); err != nil { From 81e7d93f4e49053e58389e606f3ed79021067460 Mon Sep 17 00:00:00 2001 From: Duarte Martinho Date: Tue, 11 Nov 2025 14:36:06 +0000 Subject: [PATCH 2/2] Added utils Signed-off-by: Duarte Martinho --- pkg/rest/utils.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 pkg/rest/utils.go diff --git a/pkg/rest/utils.go b/pkg/rest/utils.go new file mode 100644 index 0000000..8d40b93 --- /dev/null +++ b/pkg/rest/utils.go @@ -0,0 +1,37 @@ +package rest + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/IBM/openkommander/pkg/logger" +) + +type Response struct { + Status string `json:"status"` + Message string `json:"message,omitempty"` + Data interface{} `json:"data,omitempty"` +} + +type TopicRequest struct { + Name string `json:"name"` + Partitions int32 `json:"partitions"` + ReplicationFactor int16 `json:"replication_factor"` +} + +func SendJSON(w http.ResponseWriter, status int, payload interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + if err := json.NewEncoder(w).Encode(payload); err != nil { + logger.Error("Failed to encode JSON response", "error", err) + } +} + +func SendError(w http.ResponseWriter, message string, err error) { + logger.Error(message, "error", err) + SendJSON(w, http.StatusInternalServerError, Response{ + Status: "error", + Message: fmt.Sprintf("%s: %v", message, err), + }) +}