Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions pkg/rest/api_brokers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package rest

import (
"fmt"
"net/http"

"github.com/IBM/openkommander/pkg/logger"
"github.com/IBM/openkommander/pkg/session"
)

func HandleCreateBroker(w http.ResponseWriter, r *http.Request) {
_ = r // Not used in this stub implementation
response := Response{
Status: "ok",
Message: "Broker creation is not implemented yet",
}

SendJSON(w, http.StatusNotImplemented, response)
}

func HandleGetBrokers(w http.ResponseWriter, r *http.Request) {
currentSession := session.GetCurrentSession()
client, err := currentSession.GetClient()
if err != nil {
logger.Error("Failed to create Kafka client for brokers operation", "cluster", session.GetActiveClusterName(), "error", err)
SendError(w, "Failed to create Kafka client", err)
return
}
if client == nil {
logger.Error("Client creation failed for brokers operation", "cluster", session.GetActiveClusterName())
SendError(w, "Failed to create Kafka client", fmt.Errorf("client creation failed"))
return
}

brokers := client.Brokers()
brokerList := make([]map[string]interface{}, 0)

for _, brokerInfo := range brokers {
connected, err := brokerInfo.Connected()
if err != nil {
connected = false
}

tlsState, _ := brokerInfo.TLSConnectionState()

brokerData := map[string]interface{}{
"id": brokerInfo.ID(),
"addr": brokerInfo.Addr(),
"connected": connected,
"rack": brokerInfo.Rack(),
"state": tlsState,
}
brokerList = append(brokerList, brokerData)
}

logger.Info("Successfully retrieved brokers", "cluster", session.GetActiveClusterName(), "broker_count", len(brokerList))
SendJSON(w, http.StatusOK, Response{Status: "ok", Data: brokerList})
}
58 changes: 58 additions & 0 deletions pkg/rest/api_cluster_name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package rest

import (
"fmt"
"net/http"

"github.com/IBM/openkommander/pkg/logger"
"github.com/IBM/openkommander/pkg/session"
)

func HandleGetClusterByName(w http.ResponseWriter, r *http.Request, clusterName string) {
err := session.InitAPI()
if err != nil {
logger.Error("Failed to initialize session for cluster by name", "error", err)
SendError(w, "Failed to initialize session", err)
return
}

cluster := session.GetClusterByName(clusterName)
if cluster == nil {
logger.Warn("Cluster not found", "cluster_name", clusterName)
SendError(w, "Cluster not found", fmt.Errorf("cluster '%s' not found", clusterName))
return
}
logger.Info("Successfully retrieved cluster details", "cluster_name", clusterName)
SendJSON(w, http.StatusOK, Response{Status: "ok", Data: cluster})
}

func HandleDeleteClusterByName(w http.ResponseWriter, r *http.Request, clusterName string) {
err := session.InitAPI()
if err != nil {
logger.Error("Failed to initialize session for cluster deletion", "error", err)
SendError(w, "Failed to initialize session", err)
return
}

success := session.Logout(clusterName)
if !success {
logger.Warn("Failed to logout from cluster", "cluster_name", clusterName)
SendError(w, "Failed to logout from cluster", fmt.Errorf("failed to logout from cluster '%s'", clusterName))
return
}
logger.Info("Successfully logged out from cluster", "cluster_name", clusterName)
SendJSON(w, http.StatusOK, Response{Status: "ok", Message: fmt.Sprintf("Logged out from cluster '%s' successfully", clusterName)})
}

func HandleSelectCluster(w http.ResponseWriter, r *http.Request, clusterName string) {
err := session.InitAPI()
if err != nil {
logger.Error("Failed to initialize session for select cluster", "error", err)
SendError(w, "Failed to initialize session", err)
return
}

session.SelectCluster(clusterName)
logger.Info("Successfully set active cluster", "cluster_name", clusterName)
SendJSON(w, http.StatusOK, Response{Status: "ok", Message: fmt.Sprintf("Active cluster set to '%s'", clusterName)})
}
82 changes: 82 additions & 0 deletions pkg/rest/api_clusters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package rest

import (
"encoding/json"
"fmt"
"net/http"

"github.com/IBM/openkommander/pkg/constants"
"github.com/IBM/openkommander/pkg/logger"
"github.com/IBM/openkommander/pkg/session"
)

func HandleListClusters(w http.ResponseWriter, r *http.Request) {
clusters := session.GetClusterConnections()
activeCluster := session.GetActiveClusterName()

if len(clusters) == 0 {
// Return an empty slice of maps when there are no clusters
SendJSON(w, http.StatusOK, Response{Status: "ok", Data: []map[string]interface{}{}})
return
}

rows := make([]map[string]interface{}, 0, len(clusters))

for i, cluster := range clusters {
status := "Disconnected"
if cluster.IsAuthenticated {
status = "Connected"
}

active := "No"
if cluster.Name == activeCluster {
active = "Yes"
}

rows = append(rows, map[string]interface{}{
"id": i + 1,
"name": cluster.Name,
"brokers": cluster.Brokers,
"status": status,
"active": active,
})
}

logger.Info("Successfully retrieved clusters", "cluster_count", len(clusters))
SendJSON(w, http.StatusOK, Response{Status: "ok", Data: rows})
}

func HandleLoginCluster(w http.ResponseWriter, r *http.Request) {
var req struct {
Name string `json:"name"`
Broker string `json:"broker"`
Version string `json:"version"`
}

err := json.NewDecoder(r.Body).Decode(&req)

if err != nil {
logger.Error("Invalid request body for cluster login", "error", err)
SendError(w, "Invalid request body", err)
return
}

err = session.InitAPI()
if err != nil {
logger.Error("Failed to initialize session for cluster login", "error", err)
SendError(w, "Failed to initialize session", err)
return
}

brokers := []string{req.Broker}

success, message := session.LoginWithParams(brokers, constants.KafkaVersion, req.Name)
if !success {
logger.Error("Failed to login to cluster", "cluster_name", req.Name, "message", message)
SendError(w, "Failed to login to cluster: "+message, nil)
return
}

logger.Info("Successfully logged in to cluster", "cluster_name", req.Name)
SendJSON(w, http.StatusOK, Response{Status: "ok", Message: fmt.Sprintf("Logged in to cluster '%s' successfully", req.Name)})
}
36 changes: 36 additions & 0 deletions pkg/rest/api_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package rest

import (
"fmt"
"net/http"

"github.com/IBM/openkommander/internal/core/commands"
"github.com/IBM/openkommander/pkg/logger"
"github.com/IBM/openkommander/pkg/session"
)

func HandleClusterMetadata(w http.ResponseWriter, r *http.Request) {
currentSession := session.GetCurrentSession()
client, err := currentSession.GetClient()

if err != nil {
logger.Error("Failed to create Kafka client for cluster metadata operation", "cluster", session.GetActiveClusterName(), "error", err)
SendError(w, "Failed to create Kafka client", err)
return
}
if client == nil {
logger.Error("Client creation failed for cluster metadata operation", "cluster", session.GetActiveClusterName())
SendError(w, "Failed to create Kafka client", fmt.Errorf("client creation failed"))
return
}

metadata, failure := commands.GetClusterMetadata()
if failure != nil {
logger.Error("Failed to get cluster metadata", "cluster", session.GetActiveClusterName(), "error", failure.Err)
SendError(w, "Failed to get cluster metadata", failure.Err)
return
}

logger.Info("Successfully retrieved cluster metadata", "cluster", session.GetActiveClusterName())
SendJSON(w, http.StatusOK, Response{Status: "ok", Data: metadata})
}
Loading