Skip to content

Commit

Permalink
feat: add tracking to resource provider (#145)
Browse files Browse the repository at this point in the history
* chore: add tracking mechanisms for nodes and uptime

* chore: add extra parameters when connecting to solver websocket

* chore: add connect/disconnect callbacks on websocket connection

When the disconnect event happens, use the existing mechanism to update
the correct number of resource providers nodes connected to the solver.
Add tracking when a resource provider connects/disconnects and tracking
for the node info.
  • Loading branch information
AquiGorka authored Jun 6, 2024
1 parent 0a1d212 commit cbc42d8
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 21 deletions.
6 changes: 4 additions & 2 deletions pkg/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ type ServerOptions struct {
}

type ClientOptions struct {
URL string
PrivateKey string
URL string
PrivateKey string
PublicAddress string
Type string
}
17 changes: 17 additions & 0 deletions pkg/http/websocket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,19 @@ type ConnectionWrapper struct {
mu sync.Mutex
}

type WSConnectionParams struct {
ID string
Type string
}

// StartWebSocketServer starts a WebSocket server
func StartWebSocketServer(
r *mux.Router,
path string,
messageChan chan []byte,
ctx context.Context,
connectCB func(params WSConnectionParams),
disconnectCB func(params WSConnectionParams),
) {
var mutex = &sync.Mutex{}

Expand Down Expand Up @@ -88,7 +95,13 @@ func StartWebSocketServer(
log.Error().Msgf("Error upgrading websocket: %s", err.Error())
return
}
params := r.URL.Query()
connParams := WSConnectionParams{
ID: params.Get("ID"),
Type: params.Get("Type"),
}
defer conn.Close()
connectCB(connParams)
addConnection(conn)

log.Debug().
Expand All @@ -98,10 +111,14 @@ func StartWebSocketServer(
messageType, _, err := conn.ReadMessage()
if err != nil {
log.Trace().Msgf("Client disconnected: %s", err.Error())
removeConnection(conn)
disconnectCB(connParams)
break
}
if messageType == websocket.CloseMessage {
log.Trace().Msgf("Received close frame from client.")
removeConnection(conn)
disconnectCB(connParams)
break
}
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/jobcreator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ func NewJobCreatorController(
return nil, err
}

solverClient, err := solver.NewSolverClient(http.ClientOptions{
URL: solverUrl,
PrivateKey: options.Web3.PrivateKey,
})
solverClient, err := solver.NewSolverClient(
http.ClientOptions{
URL: solverUrl,
PrivateKey: options.Web3.PrivateKey,
Type: "JobCreator",
PublicAddress: web3SDK.GetAddress().String(),
})
if err != nil {
return nil, err
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/mediator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@ func NewMediatorController(
return nil, err
}

solverClient, err := solver.NewSolverClient(http.ClientOptions{
URL: solverUrl,
PrivateKey: options.Web3.PrivateKey,
})
solverClient, err := solver.NewSolverClient(
http.ClientOptions{
URL: solverUrl,
PrivateKey: options.Web3.PrivateKey,
Type: "Mediator",
PublicAddress: web3SDK.GetAddress().String(),
})
if err != nil {
log.Error().Msgf("error NewSolverClient")
return nil, err
Expand Down
42 changes: 37 additions & 5 deletions pkg/metricsDashboard/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"github.com/lilypad-tech/lilypad/pkg/data"
)

var host = os.Getenv("API_HOST")
var endpoint = "metrics-dashboard/logs"
var url = host + endpoint
const jobsEndpoint = "jobs"
const nodeInfoEndpoint = "nodes"
const nodeConnectionEndpoint = "uptimes"

func TrackEvent(json string) {
var host = os.Getenv("API_HOST") + "metrics-dashboard/"

func TrackEvent(url string, json string) {
data := []byte(json)

client := &http.Client{Timeout: time.Second * 1}
Expand All @@ -31,6 +33,7 @@ func TrackEvent(json string) {
}

func TrackJobOfferUpdate(evOffer data.JobOfferContainer) {
var url = host + jobsEndpoint
var module = evOffer.JobOffer.Module.Name
if module == "" {
module = evOffer.JobOffer.Module.Repo + ":" + evOffer.JobOffer.Module.Hash
Expand All @@ -49,5 +52,34 @@ func TrackJobOfferUpdate(evOffer data.JobOfferContainer) {
byts, _ := json.Marshal(data)
payload := string(byts)

TrackEvent(payload)
TrackEvent(url, payload)
}

func TrackNodeInfo(resourceOffer data.ResourceOffer) {
var url = host + nodeInfoEndpoint

data := map[string]interface{}{
"ID": resourceOffer.ResourceProvider,
"GPU": resourceOffer.Spec.GPU,
"CPU": resourceOffer.Spec.CPU,
"RAM": resourceOffer.Spec.RAM,
"Modules": resourceOffer.Modules,
}
byts, _ := json.Marshal(data)
payload := string(byts)

TrackEvent(url, payload)
}

func TrackNodeConnectionEvent(event string, ID string) {
var url = host + nodeConnectionEndpoint
data := map[string]interface{}{
"ID": ID,
"Event": event,
"Time": time.Now().UnixMilli(),
}
byts, _ := json.Marshal(data)
payload := string(byts)

TrackEvent(url, payload)
}
11 changes: 7 additions & 4 deletions pkg/resourceprovider/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ func NewResourceProviderController(
return nil, err
}

solverClient, err := solver.NewSolverClient(http.ClientOptions{
URL: solverUrl,
PrivateKey: options.Web3.PrivateKey,
})
solverClient, err := solver.NewSolverClient(
http.ClientOptions{
URL: solverUrl,
PrivateKey: options.Web3.PrivateKey,
Type: "ResourceProvider",
PublicAddress: web3SDK.GetAddress().String(),
})
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/solver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ func (client *SolverClient) Start(ctx context.Context, cm *system.CleanupManager
}
}
}()
websocketURL := fmt.Sprintf("%s%s%s%s%s", http.WEBSOCKET_SUB_PATH, "?&Type=", client.options.Type, "&ID=", client.options.PublicAddress)
http.ConnectWebSocket(
http.WebsocketURL(client.options, http.WEBSOCKET_SUB_PATH),
http.WebsocketURL(client.options, websocketURL),
websocketEventChannel,
ctx,
)
Expand Down
30 changes: 30 additions & 0 deletions pkg/solver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/lilypad-tech/lilypad/pkg/data"
"github.com/lilypad-tech/lilypad/pkg/metricsDashboard"
"github.com/lilypad-tech/lilypad/pkg/solver/store"
"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/lilypad-tech/lilypad/pkg/web3"
Expand All @@ -20,6 +21,7 @@ type SolverEventType string
const (
JobOfferAdded SolverEventType = "JobOfferAdded"
ResourceOfferAdded SolverEventType = "ResourceOfferAdded"
ResourceOfferRemoved SolverEventType = "ResourceOfferRemoved"
DealAdded SolverEventType = "DealAdded"
JobOfferStateUpdated SolverEventType = "JobOfferStateUpdated"
ResourceOfferStateUpdated SolverEventType = "ResourceOfferStateUpdated"
Expand Down Expand Up @@ -321,17 +323,45 @@ func (controller *SolverController) addResourceOffer(resourceOffer data.Resource

controller.log.Info("add resource offer", resourceOffer)

metricsDashboard.TrackNodeInfo(resourceOffer)

ret, err := controller.store.AddResourceOffer(data.GetResourceOfferContainer(resourceOffer))
if err != nil {
return nil, err
}

controller.writeEvent(SolverEvent{
EventType: ResourceOfferAdded,
ResourceOffer: ret,
})
return ret, nil
}

func (controller *SolverController) removeResourceOfferBYResourceProvider(ID string) error {
controller.log.Info("remove resource offer", ID)
resourceOffers, err := controller.store.GetResourceOffers(store.GetResourceOffersQuery{
ResourceProvider: ID,
})
if err != nil {
return err
}

if len(resourceOffers) == 0 {
return nil
}

err = controller.store.RemoveResourceOffer(resourceOffers[0].ID)
if err != nil {
return err
}

controller.writeEvent(SolverEvent{
EventType: ResourceOfferRemoved,
ResourceOffer: nil,
})
return nil
}

func (controller *SolverController) addDeal(deal data.Deal) (*data.DealContainer, error) {
id, err := data.GetDealID(deal)
if err != nil {
Expand Down
19 changes: 18 additions & 1 deletion pkg/solver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
"path/filepath"
"time"

"github.com/gorilla/mux"
"github.com/lilypad-tech/lilypad/pkg/data"
"github.com/lilypad-tech/lilypad/pkg/http"
"github.com/lilypad-tech/lilypad/pkg/metricsDashboard"
"github.com/lilypad-tech/lilypad/pkg/solver/store"
"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/gorilla/mux"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -95,6 +96,8 @@ func (solverServer *solverServer) ListenAndServe(ctx context.Context, cm *system
http.WEBSOCKET_SUB_PATH,
websocketEventChannel,
ctx,
solverServer.connectCB,
solverServer.disconnectCB,
)

srv := &corehttp.Server{
Expand Down Expand Up @@ -131,6 +134,20 @@ func (solverServer *solverServer) ListenAndServe(ctx context.Context, cm *system
return nil
}

// WS connect events
func (solverServer *solverServer) connectCB(connParams http.WSConnectionParams) {
if connParams.Type == "ResourceProvider" {
metricsDashboard.TrackNodeConnectionEvent("Connect", connParams.ID)
}
}

func (solverServer *solverServer) disconnectCB(connParams http.WSConnectionParams) {
if connParams.Type == "ResourceProvider" {
metricsDashboard.TrackNodeConnectionEvent("Disconnect", connParams.ID)
solverServer.controller.removeResourceOfferBYResourceProvider(connParams.ID)
}
}

/*
*
*
Expand Down

0 comments on commit cbc42d8

Please sign in to comment.