diff --git a/README.md b/README.md index a8a626d..ae9e8b3 100755 --- a/README.md +++ b/README.md @@ -73,12 +73,12 @@ view when Alice sends a message to Carol instead. ### Parameters (_also defined in [/config/config.yml](config/config.yml)_) +- **$N$**: The minimum number of clients. +- **$n$**: The minimum number of relays. - **$x$**: Server load (i.e. the expected number of onions each relay processes per round). - **$\ell_1$**: The number of mixers in each routing path. - **$\ell_2$**: The number of gatekeepers in each routing path. - **$L$**: The number of rounds (also the length of the routing path, equal to $`\ell_1 + \ell_2 + 1`$ ). -- **$R$**: The number of participating relays. -- **$N$**: The number of participating clients. - **$d$**: The number of non-null key-blocks in $S_1$. (thus $d$ is the threshold for number of bruises before an onion is discard by a gatekeeper). - **$\tau$**: ( $\tau \lt \(1 − \gamma\)\(1 − \chi\)$ ) The fraction of expected checkpoint onions needed for a relay to progress its local clock. - **$\epsilon$**: The privacy loss in the worst case scenario. @@ -87,6 +87,8 @@ view when Alice sends a message to Carol instead. - **$\theta$**: The maximum fraction of bruisable layers that can be bruised before the innermost tulip bulb becomes unrecoverable. Note that $d = \theta \cdot \ell_1$ - **$\chi$**: The fraction of $N$ relays that can be corrupted and controlled by the adversary (the subset is chosen prior to execution). Note that $\chi \lt \theta - 0.5$ and $\chi \lt \frac{d}{\ell_1} - 0.5$ +- **`BulletinBoardUrl`**: The IP address and port of the bulletin board. +- **`MetricsPort`**: The port that all aggregated metrics are served (on the Bulletin board's IP address). ### No Global Clock: @@ -522,7 +524,7 @@ go test -v ./... Usage ----- -All configurations are set in the [`config/config.yaml`](config/config/yaml) file. +All configurations are initialized in the [`config/config.yaml`](config/config/yaml) file. ### Running the Bulletin Board @@ -535,60 +537,41 @@ go run cmd/bulletin-board/main.go -logLevel= ### Running a Relay ```bash -go run cmd/relay/main.go -id= -logLevel= +go run cmd/relay/main.go -id= -host= -port= -promPort= -logLevel= ``` - Options: - ``: The unique identifier for the relay. + - ``: (optional) The public host IP for the relay. If not given, the public IP will be retrieved automatically. + - ``: The port number for the relay. + - ``: The port number for scraping the relay's Prometheus metrics. - ``: (optional) The logging level (e.g., "info", "debug", "warn", "error"). ### Running a Client ```bash -go run cmd/client/main.go -id= -logLevel= +go run cmd/client/main.go -id= -host= -port= -promPort= -logLevel= ``` - Options: - ``: The unique identifier for the client. + - ``: (optional) The public host IP for the client. If not given, the public IP will be retrieved automatically. + - ``: The port number for the client. + - ``: The port number for scraping the client's Prometheus metrics. - ``: (optional) The logging level (e.g., "info", "debug", "warn", "error"). -### Running the Metric Collector (Scrapes Prometheus endpoints for Clients and relays) -```bash -go run cmd/metrics/main.go -logLevel= -``` -- Options: - - ``: (optional) The logging level (e.g., "info", "debug", "warn", "error"). - -### Running the Browser-Based Visualization Server - -```bash -go run cmd/visualizer/visualizer.go -port -``` -- Options: - - ``: The port number for the visualization server. Access at `http://localhost:`. - ## Endpoints ### Bulletin Board - **Register Client**: `POST /registerClient` - **Register Relay**: `POST /registerRelay` +- **Updating config with new parameters (for next run)**: `POST /updateConfig` ### Relay & Client - **Receive Onion**: `POST /receive` - **Get Status**: `GET /status` - **Start Run**: `POST /start` -- **Prometheus Metrics**: `GET /metrics` - Note that this is served on a different port which is specified in the [config.yml](/config/config.yml) file - -When implementing the onion routing protocol, it helps to run the visualization server -in real time to view the messages and onions processes by each client and relay. For a small number of clients/relays, this makes -debugging the protocol easier. - -Obviously, it is not recommended to run the visualization program once we deploy the simulation in a distributed environment (with potentially hundreds of relays and rounds). - -![](img/vis.png) - -![](img/demo.gif) - +- **Prometheus Metrics**: `GET /metrics` - Note that this is served on a different port --- diff --git a/cmd/bulletin-board/main.go b/cmd/bulletin-board/main.go index 0cfa026..9c77998 100755 --- a/cmd/bulletin-board/main.go +++ b/cmd/bulletin-board/main.go @@ -39,22 +39,15 @@ func main() { } // Initialize global configurations by loading them from config/config.yml - if err, path := config.InitGlobal(); err != nil { + if err, _ := config.InitGlobal(); err != nil { slog.Error("failed to init config", err) os.Exit(1) - } else if err = config.InitPrometheusConfig(path); err != nil { - slog.Error("failed to init prometheus config", err) - os.Exit(1) } - cfg := config.GlobalConfig - - host := cfg.BulletinBoard.Host - port := cfg.BulletinBoard.Port // Construct the full URL for the Bulletin Board - url := fmt.Sprintf("https://%s:%d", host, port) + url := fmt.Sprintf("https://%s:%d", config.GetBulletinBoardHost(), config.GetBulletinBoardPort()) - slog.Info("⚡ init Bulletin board") + slog.Info("⚡ init Bulletin board", "url", url) // Create a new instance of the Bulletin Board with the current configuration. bulletinBoard := bulletin_board.NewBulletinBoard() @@ -76,8 +69,9 @@ func main() { // Set up HTTP handlers http.HandleFunc("/registerRelay", bulletinBoard.HandleRegisterRelay) http.HandleFunc("/registerClient", bulletinBoard.HandleRegisterClient) - http.HandleFunc("/registerIntentToSend", bulletinBoard.HandleRegisterIntentToSend) + //http.HandleFunc("/registerIntentToSend", bulletinBoard.HandleRegisterIntentToSend) http.HandleFunc("/updateRelay", bulletinBoard.HandleUpdateRelayInfo) + http.HandleFunc("/updateConfig", bulletinBoard.HandleUpdateConfig) http.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) { slog.Info("Shutdown signal received") quit <- os.Signal(syscall.SIGTERM) // signal shutdown @@ -89,7 +83,7 @@ func main() { // Start the HTTP server go func() { - if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil { + if err := http.ListenAndServe(fmt.Sprintf(":%d", config.GetBulletinBoardPort()), nil); err != nil { if errors.Is(err, http.ErrServerClosed) { // Check if the server was closed intentionally (normal shutdown). slog.Info("HTTP server closed") } else { @@ -104,8 +98,10 @@ func main() { select { case v := <-quit: // OS signal is received config.GlobalCancel() + bulletinBoard.Shutdown() slog.Info("", "signal.Notify", v) case done := <-config.GlobalCtx.Done(): // global context is canceled slog.Info("", "ctx.Done", done) + bulletinBoard.Shutdown() } } diff --git a/cmd/client/main.go b/cmd/client/main.go index 4b60b60..a952c0c 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -8,6 +8,7 @@ import ( "github.com/HannahMarsh/pi_t-experiment/config" "github.com/HannahMarsh/pi_t-experiment/internal/metrics" "github.com/HannahMarsh/pi_t-experiment/internal/model/client" + "github.com/HannahMarsh/pi_t-experiment/pkg/utils" "go.uber.org/automaxprocs/maxprocs" "log/slog" "net/http" @@ -21,8 +22,11 @@ import ( func main() { // Define command-line flags - id := flag.Int("id", -1, "ID of the newClient (required)") - logLevel := flag.String("log-level", "debug", "Log level") + id_ := flag.Int("id", -1, "ID of the newClient (required)") + ip_ := flag.String("host", "x", "IP address of the client") + port_ := flag.Int("port", 8080, "Port of the client") + promPort_ := flag.Int("promPort", 8200, "Port of the client's Prometheus metrics") + logLevel_ := flag.String("log-level", "debug", "Log level") flag.Usage = func() { if _, err := fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0]); err != nil { @@ -33,15 +37,30 @@ func main() { flag.Parse() + id := *id_ + ip := *ip_ + port := *port_ + promPort := *promPort_ + logLevel := *logLevel_ + + pl.SetUpLogrusAndSlog(logLevel) + // Check if the required flag is provided - if *id == -1 { + if id == -1 { _, _ = fmt.Fprintf(os.Stderr, "Error: the -id flag is required\n") flag.Usage() os.Exit(2) } - // Set up logrus with the specified log level. - pl.SetUpLogrusAndSlog(*logLevel) + if ip == "x" { + IP, err := utils.GetPublicIP() + if err != nil { + slog.Error("failed to get public IP", err) + os.Exit(1) + } + slog.Info("", "IP", IP.IP, "Hostname", IP.HostName, "City", IP.City, "Region", IP.Region, "Country", IP.Country, "Location", IP.Location, "Org", IP.Org, "Postal", IP.Postal, "Timezone", IP.Timezone, "ReadMe", IP.ReadMe) + ip = IP.IP + } // Automatically adjust the GOMAXPROCS setting based on the number of available CPU cores. if _, err := maxprocs.Set(); err != nil { @@ -55,31 +74,12 @@ func main() { os.Exit(1) } - cfg := config.GlobalConfig - - // Find the client configuration that matches the provided ID. - var clientConfig *config.Client - for _, c := range cfg.Clients { - if c.ID == *id { - clientConfig = &c - break - } - } - - if clientConfig == nil { - pl.LogNewError("Invalid id. Failed to get newClient config for id=%d", *id) - os.Exit(1) - } - - slog.Info("⚡ init newClient", "id", *id) - - // Construct the full URL for the Bulletin Board - bulletinBoardAddress := fmt.Sprintf("http://%s:%d", cfg.BulletinBoard.Host, cfg.BulletinBoard.Port) + slog.Info("⚡ init newClient", "id", id) var newClient *client.Client // Attempt to create a new client instance, retrying every 5 seconds upon failure (in case the bulletin board isn't ready yet). for { - if n, err := client.NewClient(clientConfig.ID, clientConfig.Host, clientConfig.Port, bulletinBoardAddress); err != nil { + if n, err := client.NewClient(id, ip, port, promPort, config.GetBulletinBoardAddress()); err != nil { slog.Error("failed to create new c. Trying again in 5 seconds. ", err) time.Sleep(5 * time.Second) continue @@ -109,22 +109,19 @@ func main() { }) // Serve Prometheus metrics in a separate goroutine. - shutdownMetrics := metrics.ServeMetrics(clientConfig.PrometheusPort, metrics.MSG_SENT, metrics.MSG_RECEIVED, metrics.ONION_SIZE) + shutdownMetrics := metrics.ServeMetrics(promPort, metrics.MSG_SENT, metrics.MSG_RECEIVED, metrics.ONION_SIZE) // Start the HTTP server go func() { // Construct the address for the HTTP server based on the client's port. - address := fmt.Sprintf(":%d", clientConfig.Port) + address := fmt.Sprintf(":%d", port) // Attempt to start the HTTP server. if err2 := http.ListenAndServe(address, nil); err2 != nil && !errors.Is(err2, http.ErrServerClosed) { slog.Error("failed to start HTTP server", err2) } }() - slog.Info("🌏 start newClient...", "address", fmt.Sprintf("http://%s:%d", clientConfig.Host, clientConfig.Port)) - - // Start generating messages in a separate goroutine. - go newClient.StartGeneratingMessages() + slog.Info("🌏 start newClient...", "address", fmt.Sprintf("http://%s:%d", ip, port)) // Wait for either an OS signal to quit or the global context to be canceled select { diff --git a/cmd/metrics/main.go b/cmd/metrics/main.go deleted file mode 100644 index 59e6b16..0000000 --- a/cmd/metrics/main.go +++ /dev/null @@ -1,122 +0,0 @@ -package main - -import ( - "flag" - "fmt" - pl "github.com/HannahMarsh/PrettyLogger" - "github.com/HannahMarsh/pi_t-experiment/config" - "go.uber.org/automaxprocs/maxprocs" - "io/ioutil" - "log/slog" - "net/http" - "os" - "os/exec" -) - -func main() { - // Define command-line flags - logLevel := flag.String("log-level", "debug", "Log level") - - flag.Usage = func() { - if _, err := fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0]); err != nil { - slog.Error("Usage of %s:\n", err, os.Args[0]) - } - flag.PrintDefaults() - } - - flag.Parse() - - pl.SetUpLogrusAndSlog(*logLevel) - - // Automatically adjust the GOMAXPROCS setting based on the number of available CPU cores. - if _, err := maxprocs.Set(); err != nil { - slog.Error("failed set max procs", err) - os.Exit(1) - } - - if err, path := config.InitGlobal(); err != nil { - slog.Error("failed to init config", err) - os.Exit(1) - } else if err = config.InitPrometheusConfig(path); err != nil { - slog.Error("failed to init prometheus config", err) - os.Exit(1) - } else { - // Command to start Prometheus - cmd := exec.Command(config.GlobalConfig.PrometheusPath, "--config.file", path) - - // Set the environment variables, if needed - cmd.Env = os.Environ() - - // Set the command's standard output and error to the current process's output and error - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - // Start the Prometheus process - err := cmd.Start() - if err != nil { - slog.Error("failed to start Prometheus", err) - os.Exit(1) - } - } - // - //relayPromAddresses := utils.Map(config.GlobalConfig.Relays, func(n config.Relay) string { - // return fmt.Sprintf("http://%s:%d/metrics", n.Host, n.PrometheusPort) - //}) - // - //clientPromAddresses := utils.Map(config.GlobalConfig.Clients, func(c config.Client) string { - // return fmt.Sprintf("http://%s:%d/metrics", c.Host, c.PrometheusPort) - //}) - // - //slog.Info("⚡ init visualizer", "relayPromAddresses", relayPromAddresses, "clientPromAddresses", clientPromAddresses) - // - //scrapeInterval := time.Duration(config.GlobalConfig.ScrapeInterval) * time.Millisecond - // - //// Start the metric collector - //for { - // nextScrape := time.Now().Add(scrapeInterval) - // var wg sync.WaitGroup - // wg.Add(len(relayPromAddresses) + len(clientPromAddresses)) - // - // for _, address := range relayPromAddresses { - // go func(address string) { - // defer wg.Done() - // scrapeMetrics(address) - // }(address) - // } - // - // for _, address := range clientPromAddresses { - // go func(address string) { - // defer wg.Done() - // scrapeMetrics(address) - // }(address) - // } - // - // wg.Wait() - // if time.Until(nextScrape) > 0 { - // time.Sleep(time.Until(nextScrape)) - // } - //} - -} - -func scrapeMetrics(address string) { - resp, err := http.Get(address) - if err != nil { - slog.Error("failed to scrape visualizer", err) - return - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - pl.LogNewError("%s: unexpected status code %d", address, resp.StatusCode) - return - } - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - slog.Error("failed to read response body", err) - return - } - - slog.Debug("scraped visualizer", "address", address, "response", string(body)) -} diff --git a/cmd/relay/main.go b/cmd/relay/main.go index 598405d..094b41e 100755 --- a/cmd/relay/main.go +++ b/cmd/relay/main.go @@ -8,6 +8,7 @@ import ( "github.com/HannahMarsh/pi_t-experiment/config" "github.com/HannahMarsh/pi_t-experiment/internal/metrics" "github.com/HannahMarsh/pi_t-experiment/internal/model/relay" + "github.com/HannahMarsh/pi_t-experiment/pkg/utils" "go.uber.org/automaxprocs/maxprocs" "log/slog" "net/http" @@ -21,8 +22,11 @@ import ( func main() { // Define command-line flags - id := flag.Int("id", -1, "ID of the new relay (required)") - logLevel := flag.String("log-level", "debug", "Log level") + id_ := flag.Int("id", -1, "ID of the newClient (required)") + ip_ := flag.String("host", "x", "IP address of the relay") + port_ := flag.Int("port", 8080, "Port of the client") + promPort_ := flag.Int("promPort", 8200, "Port of the relay's Prometheus metrics") + logLevel_ := flag.String("log-level", "debug", "Log level") flag.Usage = func() { if _, err := fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0]); err != nil { @@ -33,15 +37,30 @@ func main() { flag.Parse() + id := *id_ + ip := *ip_ + port := *port_ + promPort := *promPort_ + logLevel := *logLevel_ + + pl.SetUpLogrusAndSlog(logLevel) + // Check if the required flag is provided - if *id == -1 { + if id == -1 { _, _ = fmt.Fprintf(os.Stderr, "Error: the -id flag is required\n") flag.Usage() os.Exit(2) } - // Set up logrus with the specified log level. - pl.SetUpLogrusAndSlog(*logLevel) + if ip == "x" { + IP, err := utils.GetPublicIP() + if err != nil { + slog.Error("failed to get public IP", err) + os.Exit(1) + } + slog.Info("", "IP", IP.IP, "Hostname", IP.HostName, "City", IP.City, "Region", IP.Region, "Country", IP.Country, "Location", IP.Location, "Org", IP.Org, "Postal", IP.Postal, "Timezone", IP.Timezone, "ReadMe", IP.ReadMe) + ip = IP.IP + } // Automatically adjust the GOMAXPROCS setting based on the number of available CPU cores. if _, err := maxprocs.Set(); err != nil { @@ -55,31 +74,12 @@ func main() { os.Exit(1) } - cfg := config.GlobalConfig - - // Find the relay configuration that matches the provided ID. - var relayConfig *config.Relay - for _, r := range cfg.Relays { - if r.ID == *id { - relayConfig = &r - break - } - } - - if relayConfig == nil { - slog.Error("invalid id", errors.New(fmt.Sprintf("failed to get newRelay config for id=%d", *id))) - os.Exit(1) - } - - slog.Info("⚡ init newRelay", "id", *id) - - // Construct the full URL for the Bulletin Board - bulletinBoardAddress := fmt.Sprintf("http://%s:%d", cfg.BulletinBoard.Host, cfg.BulletinBoard.Port) + slog.Info("⚡ init newRelay", "id", id) var newRelay *relay.Relay // Attempt to create a new relay instance, retrying every 5 seconds upon failure (in case the bulletin board isn't ready yet). for { - if n, err := relay.NewRelay(relayConfig.ID, relayConfig.Host, relayConfig.Port, bulletinBoardAddress); err != nil { + if n, err := relay.NewRelay(id, ip, port, promPort, config.GetBulletinBoardAddress()); err != nil { slog.Error("failed to create newRelay. Trying again in 5 seconds. ", err) time.Sleep(5 * time.Second) continue @@ -109,11 +109,11 @@ func main() { }) // Serve Prometheus metrics in a separate goroutine. - shutdownMetrics := metrics.ServeMetrics(relayConfig.PrometheusPort, metrics.PROCESSING_TIME, metrics.ONION_COUNT, metrics.ONION_SIZE) + shutdownMetrics := metrics.ServeMetrics(promPort, metrics.PROCESSING_TIME, metrics.ONION_COUNT, metrics.ONION_SIZE) // Start the HTTP server go func() { - if err := http.ListenAndServe(fmt.Sprintf(":%d", relayConfig.Port), nil); err != nil { + if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil { if errors.Is(err, http.ErrServerClosed) { slog.Info("HTTP server closed") } else { @@ -122,7 +122,7 @@ func main() { } }() - slog.Info("🌏 start newRelay...", "address", fmt.Sprintf("http://%s:%d", relayConfig.Host, relayConfig.Port)) + slog.Info("🌏 start newRelay...", "address", fmt.Sprintf("http://%s:%d", ip, port)) // Wait for either an OS signal to quit or the global context to be canceled select { diff --git a/cmd/visualizer/main.go b/cmd/visualizer/main.go index 5066d8f..c982c53 100644 --- a/cmd/visualizer/main.go +++ b/cmd/visualizer/main.go @@ -1,238 +1,228 @@ package main import ( - "encoding/json" - "errors" - "flag" - "fmt" - pl "github.com/HannahMarsh/PrettyLogger" - "github.com/HannahMarsh/pi_t-experiment/config" - "github.com/HannahMarsh/pi_t-experiment/internal/api/structs" _ "github.com/lib/pq" - "go.uber.org/automaxprocs/maxprocs" - "log/slog" - "net/http" - "os" - "os/signal" - "sync" - "syscall" - "time" ) func main() { - logLevel := flag.String("log-level", "debug", "Log level") - flag.Usage = func() { - if _, err := fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0]); err != nil { - slog.Error("Usage of %s:\n", err, os.Args[0]) - } - flag.PrintDefaults() - } - - flag.Parse() - - pl.SetUpLogrusAndSlog(*logLevel) - - // set GOMAXPROCS - if _, err := maxprocs.Set(); err != nil { - slog.Error("failed set max procs", err) - os.Exit(1) - } - - if err, _ := config.InitGlobal(); err != nil { - slog.Error("failed to init config", err) - os.Exit(1) - } - - slog.Info("⚡ init visualizer", "address", "http://localhost:8200") - - time.Sleep(1 * time.Second) - http.HandleFunc("/data", serveData) - http.Handle("/", http.FileServer(http.Dir("./static"))) - //http.Handle("/client", http.FileServer(http.Dir("./static/client"))) - //http.Handle("/nodes", http.FileServer(http.Dir("./static/nodes"))) - //http.Handle("/nodes/rounds", http.FileServer(http.Dir("./static/nodes/rounds"))) - - go func() { - if err := http.ListenAndServe(fmt.Sprintf(":%d", 8200), nil); err != nil { - if errors.Is(err, http.ErrServerClosed) { - slog.Info("HTTP server closed") - } else { - slog.Error("failed to start HTTP server", err) - } - } - }() - - slog.Info("🌏 start visualizer...", "address", "http://localhost:8200") - - quit := make(chan os.Signal, 1) - signal.Notify(quit, os.Interrupt, syscall.SIGTERM) - - select { - case v := <-quit: - config.GlobalCancel() - slog.Info("", "signal.Notify", v) - case done := <-config.GlobalCtx.Done(): - slog.Info("", "ctx.Done", done) - } - -} - -type Data struct { - Clients map[string]structs.ClientStatus - Messages []Message - Nodes map[string]structs.NodeStatus - mu sync.RWMutex } -type Message struct { - From string - To string - RoutingPath []structs.PublicNodeApi - Msg string - TimeSent string - TimeReceived string - Hash string -} - -var ( - data Data = Data{ - Clients: make(map[string]structs.ClientStatus), - Messages: make([]Message, 0), - Nodes: make(map[string]structs.NodeStatus), - } -) - -func serveData(w http.ResponseWriter, r *http.Request) { - // Set the response header to application/json - w.Header().Set("Content-Type", "application/json") - - data.mu.Lock() - defer data.mu.Unlock() - - for _, client := range config.GlobalConfig.Clients { - addr := fmt.Sprintf("http://%s:%d/status", client.Host, client.Port) - resp, err := http.Get(addr) - if err != nil { - slog.Error("failed to get client status", err) - } else { - defer resp.Body.Close() - var status structs.ClientStatus - if err = json.NewDecoder(resp.Body).Decode(&status); err != nil { - slog.Error("failed to decode client status", err) - } else { - data.Clients[addr] = status - } - } - } - - for _, relay := range config.GlobalConfig.Relays { - addr := fmt.Sprintf("http://%s:%d/status", relay.Host, relay.Port) - resp, err := http.Get(addr) - if err != nil { - slog.Error("failed to get client status", err) - } else { - defer resp.Body.Close() - var status structs.NodeStatus - if err = json.NewDecoder(resp.Body).Decode(&status); err != nil { - slog.Error("failed to decode client status", err) - } else { - data.Nodes[addr] = status - } - } - } - - m := make(map[string]Message) - - for _, client := range data.Clients { - for _, sent := range client.MessagesSent { - mstr := sent.Message.Hash - if _, present := m[mstr]; present { - if m[mstr].From != sent.Message.From { - pl.LogNewError("from (%s) and sent.from (%s) do not match", m[mstr].From, sent.Message.From) - } - if m[mstr].To != sent.Message.To { - pl.LogNewError("to (%s) and sent.to (%s) do not match", m[mstr].To, sent.Message.To) - } - if m[mstr].Msg != sent.Message.Msg { - pl.LogNewError("msg (%s) and sent.msg (%s) do not match", m[mstr].Msg, sent.Message.Msg) - } - if m[mstr].Hash != sent.Message.Hash { - pl.LogNewError("hash (%s) and sent.hash (%s) do not match", m[mstr].Hash, sent.Message.Hash) - } - msg := Message{ - From: sent.Message.From, - To: sent.Message.To, - RoutingPath: sent.RoutingPath, - Msg: sent.Message.Msg, - TimeSent: sent.TimeSent.Format("2006-01-02 15:04:05"), - TimeReceived: m[mstr].TimeReceived, - Hash: sent.Message.Hash, - } - m[mstr] = msg - } else { - m[mstr] = Message{ - From: sent.Message.From, - To: sent.Message.To, - RoutingPath: sent.RoutingPath, - Msg: sent.Message.Msg, - TimeSent: sent.TimeSent.Format("2006-01-02 15:04:05"), - TimeReceived: "not received", - Hash: sent.Message.Hash, - } - } - } - for _, received := range client.MessagesReceived { - mstr := received.Message.Hash - if _, present := m[mstr]; present { - if m[mstr].From != received.Message.From { - pl.LogNewError("from (%s) and received.from (%s) do not match", m[mstr].From, received.Message.From) - } - if m[mstr].To != received.Message.To { - pl.LogNewError("to (%s) and received.to (%s) do not match", m[mstr].To, received.Message.To) - } - if m[mstr].Msg != received.Message.Msg { - pl.LogNewError("msg (%s) and received.msg (%s) do not match", m[mstr].Msg, received.Message.Msg) - } - if m[mstr].Hash != received.Message.Hash { - pl.LogNewError("hash (%s) and received.hash (%s) do not match", m[mstr].Hash, received.Message.Hash) - } - msg := Message{ - From: received.Message.From, - To: received.Message.To, - RoutingPath: m[mstr].RoutingPath, - Msg: received.Message.Msg, - TimeSent: m[mstr].TimeSent, - TimeReceived: received.TimeReceived.Format("2006-01-02 15:04:05"), - Hash: received.Message.Hash, - } - m[mstr] = msg - } else { - m[mstr] = Message{ - From: received.Message.From, - To: received.Message.To, - RoutingPath: make([]structs.PublicNodeApi, 0), - Msg: received.Message.Msg, - TimeSent: "not sent", - TimeReceived: received.TimeReceived.Format("2006-01-02 15:04:05"), - Hash: received.Message.Hash, - } - } - } - } - - data.Messages = make([]Message, 0) - for _, msg := range m { - data.Messages = append(data.Messages, msg) - } - - // Encode the data as JSON and write to the response - str, err := json.Marshal(data) - if err != nil { - slog.Error("failed to marshal data", err) - } else { - if _, err = w.Write(str); err != nil { - slog.Error("failed to write data", err) - } - } -} +// +//func main() { +// logLevel := flag.String("log-level", "debug", "Log level") +// +// flag.Usage = func() { +// if _, err := fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0]); err != nil { +// slog.Error("Usage of %s:\n", err, os.Args[0]) +// } +// flag.PrintDefaults() +// } +// +// flag.Parse() +// +// pl.SetUpLogrusAndSlog(*logLevel) +// +// // set GOMAXPROCS +// if _, err := maxprocs.Set(); err != nil { +// slog.Error("failed set max procs", err) +// os.Exit(1) +// } +// +// if err, _ := config.InitGlobal(); err != nil { +// slog.Error("failed to init config", err) +// os.Exit(1) +// } +// +// slog.Info("⚡ init visualizer", "address", "http://localhost:8200") +// +// time.Sleep(1 * time.Second) +// http.HandleFunc("/data", serveData) +// http.Handle("/", http.FileServer(http.Dir("./static"))) +// //http.Handle("/client", http.FileServer(http.Dir("./static/client"))) +// //http.Handle("/nodes", http.FileServer(http.Dir("./static/nodes"))) +// //http.Handle("/nodes/rounds", http.FileServer(http.Dir("./static/nodes/rounds"))) +// +// go func() { +// if err := http.ListenAndServe(fmt.Sprintf(":%d", 8200), nil); err != nil { +// if errors.Is(err, http.ErrServerClosed) { +// slog.Info("HTTP server closed") +// } else { +// slog.Error("failed to start HTTP server", err) +// } +// } +// }() +// +// slog.Info("🌏 start visualizer...", "address", "http://localhost:8200") +// +// quit := make(chan os.Signal, 1) +// signal.Notify(quit, os.Interrupt, syscall.SIGTERM) +// +// select { +// case v := <-quit: +// config.GlobalCancel() +// slog.Info("", "signal.Notify", v) +// case done := <-config.GlobalCtx.Done(): +// slog.Info("", "ctx.Done", done) +// } +// +//} +// +//type Data struct { +// Clients map[string]structs.ClientStatus +// Messages []Message +// Nodes map[string]structs.NodeStatus +// mu sync.RWMutex +//} +// +//type Message struct { +// From string +// To string +// RoutingPath []structs.PublicNodeApi +// Msg string +// TimeSent string +// TimeReceived string +// Hash string +//} +// +//var ( +// data Data = Data{ +// Clients: make(map[string]structs.ClientStatus), +// Messages: make([]Message, 0), +// Nodes: make(map[string]structs.NodeStatus), +// } +//) +// +//func serveData(w http.ResponseWriter, r *http.Request) { +// // Set the response header to application/json +// w.Header().Set("Content-Type", "application/json") +// +// data.mu.Lock() +// defer data.mu.Unlock() +// +// for _, client := range config.GlobalConfig.Clients { +// addr := fmt.Sprintf("http://%s:%d/status", client.Host, client.Port) +// resp, err := http.Get(addr) +// if err != nil { +// slog.Error("failed to get client status", err) +// } else { +// defer resp.Body.Close() +// var status structs.ClientStatus +// if err = json.NewDecoder(resp.Body).Decode(&status); err != nil { +// slog.Error("failed to decode client status", err) +// } else { +// data.Clients[addr] = status +// } +// } +// } +// +// for _, relay := range config.GlobalConfig.Relays { +// addr := fmt.Sprintf("http://%s:%d/status", relay.Host, relay.Port) +// resp, err := http.Get(addr) +// if err != nil { +// slog.Error("failed to get client status", err) +// } else { +// defer resp.Body.Close() +// var status structs.NodeStatus +// if err = json.NewDecoder(resp.Body).Decode(&status); err != nil { +// slog.Error("failed to decode client status", err) +// } else { +// data.Nodes[addr] = status +// } +// } +// } +// +// m := make(map[string]Message) +// +// for _, client := range data.Clients { +// for _, sent := range client.MessagesSent { +// mstr := sent.Message.Hash +// if _, present := m[mstr]; present { +// if m[mstr].From != sent.Message.From { +// pl.LogNewError("from (%s) and sent.from (%s) do not match", m[mstr].From, sent.Message.From) +// } +// if m[mstr].To != sent.Message.To { +// pl.LogNewError("to (%s) and sent.to (%s) do not match", m[mstr].To, sent.Message.To) +// } +// if m[mstr].Msg != sent.Message.Msg { +// pl.LogNewError("msg (%s) and sent.msg (%s) do not match", m[mstr].Msg, sent.Message.Msg) +// } +// if m[mstr].Hash != sent.Message.Hash { +// pl.LogNewError("hash (%s) and sent.hash (%s) do not match", m[mstr].Hash, sent.Message.Hash) +// } +// msg := Message{ +// From: sent.Message.From, +// To: sent.Message.To, +// RoutingPath: sent.RoutingPath, +// Msg: sent.Message.Msg, +// TimeSent: sent.TimeSent.Format("2006-01-02 15:04:05"), +// TimeReceived: m[mstr].TimeReceived, +// Hash: sent.Message.Hash, +// } +// m[mstr] = msg +// } else { +// m[mstr] = Message{ +// From: sent.Message.From, +// To: sent.Message.To, +// RoutingPath: sent.RoutingPath, +// Msg: sent.Message.Msg, +// TimeSent: sent.TimeSent.Format("2006-01-02 15:04:05"), +// TimeReceived: "not received", +// Hash: sent.Message.Hash, +// } +// } +// } +// for _, received := range client.MessagesReceived { +// mstr := received.Message.Hash +// if _, present := m[mstr]; present { +// if m[mstr].From != received.Message.From { +// pl.LogNewError("from (%s) and received.from (%s) do not match", m[mstr].From, received.Message.From) +// } +// if m[mstr].To != received.Message.To { +// pl.LogNewError("to (%s) and received.to (%s) do not match", m[mstr].To, received.Message.To) +// } +// if m[mstr].Msg != received.Message.Msg { +// pl.LogNewError("msg (%s) and received.msg (%s) do not match", m[mstr].Msg, received.Message.Msg) +// } +// if m[mstr].Hash != received.Message.Hash { +// pl.LogNewError("hash (%s) and received.hash (%s) do not match", m[mstr].Hash, received.Message.Hash) +// } +// msg := Message{ +// From: received.Message.From, +// To: received.Message.To, +// RoutingPath: m[mstr].RoutingPath, +// Msg: received.Message.Msg, +// TimeSent: m[mstr].TimeSent, +// TimeReceived: received.TimeReceived.Format("2006-01-02 15:04:05"), +// Hash: received.Message.Hash, +// } +// m[mstr] = msg +// } else { +// m[mstr] = Message{ +// From: received.Message.From, +// To: received.Message.To, +// RoutingPath: make([]structs.PublicNodeApi, 0), +// Msg: received.Message.Msg, +// TimeSent: "not sent", +// TimeReceived: received.TimeReceived.Format("2006-01-02 15:04:05"), +// Hash: received.Message.Hash, +// } +// } +// } +// } +// +// data.Messages = make([]Message, 0) +// for _, msg := range m { +// data.Messages = append(data.Messages, msg) +// } +// +// // Encode the data as JSON and write to the response +// str, err := json.Marshal(data) +// if err != nil { +// slog.Error("failed to marshal data", err) +// } else { +// if _, err = w.Write(str); err != nil { +// slog.Error("failed to write data", err) +// } +// } +//} diff --git a/config/config.go b/config/config.go index fbdac81..e8e83d0 100755 --- a/config/config.go +++ b/config/config.go @@ -14,65 +14,168 @@ import ( ) type BulletinBoard struct { - Host string `yaml:"host"` - Port int `yaml:"port"` - Address string -} - -type Relay struct { - ID int `yaml:"id"` - Host string `yaml:"host"` - Port int `yaml:"port"` - PrometheusPort int `yaml:"prometheus_port"` - Address string -} - -type Client struct { - ID int `yaml:"id"` - Host string `yaml:"host"` - Port int `yaml:"port"` - PrometheusPort int `yaml:"prometheus_port"` - Address string -} - -type Metrics struct { - Host string `yaml:"host"` - Port int `yaml:"port"` - Address string + Host string `yaml:"host"` + Port int `yaml:"port"` + Address string + PromPort int `yaml:"promPort"` } type Config struct { + MinimumClients int `yaml:"N"` + MinimumRelays int `yaml:"n"` + BulletinBoard BulletinBoard `yaml:"bulletin_board"` + Vis bool `yaml:"vis"` + PrometheusPath string `yaml:"prometheusPath"` + ScrapeInterval int `yaml:"scrapeInterval"` ServerLoad int `yaml:"x"` D int `yaml:"d"` Delta float64 `yaml:"delta"` L1 int `yaml:"l1"` L2 int `yaml:"l2"` Chi float64 `yaml:"chi"` - BulletinBoard BulletinBoard `yaml:"bulletin_board"` - Relays []Relay `yaml:"relays"` - Metrics Metrics `yaml:"visualizer"` - Clients []Client `yaml:"clients"` - Vis bool `yaml:"vis"` - ScrapeInterval int `yaml:"scrapeInterval"` DropAllOnionsFromClient int `yaml:"dropAllOnionsFromClient"` - PrometheusPath string `yaml:"prometheusPath"` } -var GlobalConfig *Config +func GetVis() bool { + mu.RLock() + defer mu.RUnlock() + return globalConfig.Vis +} + +func GetPrometheusPath() string { + mu.RLock() + defer mu.RUnlock() + return globalConfig.PrometheusPath +} + +func GetBulletinBoardAddress() string { + mu.RLock() + defer mu.RUnlock() + return globalConfig.BulletinBoard.Address +} + +func GetMinimumClients() int { + mu.RLock() + defer mu.RUnlock() + return globalConfig.MinimumClients +} + +func GetMinimumRelays() int { + mu.RLock() + defer mu.RUnlock() + return globalConfig.MinimumRelays +} + +func GetBulletinBoardHost() string { + mu.RLock() + defer mu.RUnlock() + return globalConfig.BulletinBoard.Host +} + +func GetBulletinBoardPort() int { + mu.RLock() + defer mu.RUnlock() + return globalConfig.BulletinBoard.Port +} + +func GetMetricsPort() int { + mu.RLock() + defer mu.RUnlock() + return globalConfig.BulletinBoard.PromPort +} + +func GetMetricsUrl() string { + mu.RLock() + defer mu.RUnlock() + return fmt.Sprintf("http://%s:%d", globalConfig.BulletinBoard.Host, globalConfig.BulletinBoard.PromPort) +} + +func GetBulletinBoardUrl() string { + mu.RLock() + if globalConfig.BulletinBoard.Address != "" { + defer mu.RUnlock() + return globalConfig.BulletinBoard.Address + } + mu.RUnlock() + mu.Lock() + defer mu.Unlock() + if globalConfig.BulletinBoard.Address == "" { + globalConfig.BulletinBoard.Address = fmt.Sprintf("http://%s:%d", globalConfig.BulletinBoard.Host, globalConfig.BulletinBoard.Port) + } + return globalConfig.BulletinBoard.Address +} + +func GetServerLoad() int { + mu.RLock() + defer mu.RUnlock() + return globalConfig.ServerLoad +} + +func GetD() int { + mu.RLock() + defer mu.RUnlock() + return globalConfig.D +} + +func GetDelta() float64 { + mu.RLock() + defer mu.RUnlock() + return globalConfig.Delta +} + +func GetConfig() Config { + mu.RLock() + defer mu.RUnlock() + return *globalConfig +} + +func GetL1() int { + mu.RLock() + defer mu.RUnlock() + return globalConfig.L1 +} + +func GetL2() int { + mu.RLock() + defer mu.RUnlock() + return globalConfig.L2 +} + +func GetChi() float64 { + mu.RLock() + defer mu.RUnlock() + return globalConfig.Chi +} + +func GetScrapeInterval() int { + mu.RLock() + defer mu.RUnlock() + return globalConfig.ScrapeInterval +} + +func GetDropAllOnionsFromClient() int { + mu.RLock() + defer mu.RUnlock() + return globalConfig.DropAllOnionsFromClient +} + +var globalConfig *Config var GlobalCtx context.Context var GlobalCancel context.CancelFunc -var Names sync.Map +var mu sync.RWMutex func InitGlobal() (error, string) { + mu.Lock() + defer mu.Unlock() GlobalCtx, GlobalCancel = context.WithCancel(context.Background()) - GlobalConfig = &Config{} + globalConfig = &Config{} path := "" if dir, err := os.Getwd(); err != nil { return PrettyLogger.WrapError(err, "config.NewConfig(): global config error"), "" - } else if err2 := cleanenv.ReadConfig(dir+"/config/config.yml", GlobalConfig); err2 != nil { + } else if err2 := cleanenv.ReadConfig(dir+"/config/config.yml", globalConfig); err2 != nil { // Get the absolute path of the current file _, currentFile, _, ok := runtime.Caller(0) @@ -81,78 +184,72 @@ func InitGlobal() (error, string) { } currentDir := filepath.Dir(currentFile) configFilePath := filepath.Join(currentDir, "/config.yml") - if err3 := cleanenv.ReadConfig(configFilePath, GlobalConfig); err3 != nil { + if err3 := cleanenv.ReadConfig(configFilePath, globalConfig); err3 != nil { return PrettyLogger.WrapError(err3, "config.NewConfig(): global config error"), "" } else { path = configFilePath } } else { path = dir + "/config/config.yml" - if err3 := cleanenv.ReadEnv(GlobalConfig); err3 != nil { + if err3 := cleanenv.ReadEnv(globalConfig); err3 != nil { return PrettyLogger.WrapError(err3, "config.NewConfig(): global config error"), "" } } path = strings.ReplaceAll(path, "config.yml", "prometheus.yml") - // Update relay addresses - for i := range GlobalConfig.Relays { - GlobalConfig.Relays[i].Address = fmt.Sprintf("http://%s:%d", GlobalConfig.Relays[i].Host, GlobalConfig.Relays[i].Port) - } - // Update client addresses - for i := range GlobalConfig.Clients { - GlobalConfig.Clients[i].Address = fmt.Sprintf("http://%s:%d", GlobalConfig.Clients[i].Host, GlobalConfig.Clients[i].Port) - } + globalConfig.BulletinBoard.Address = fmt.Sprintf("http://%s:%d", globalConfig.BulletinBoard.Host, globalConfig.BulletinBoard.Port) - GlobalConfig.BulletinBoard.Address = fmt.Sprintf("http://%s:%d", GlobalConfig.BulletinBoard.Host, GlobalConfig.BulletinBoard.Port) - GlobalConfig.Metrics.Address = fmt.Sprintf("http://%s:%d", GlobalConfig.Metrics.Host, GlobalConfig.Metrics.Port) return nil, path } -func HostPortToName(host string, port int) string { - return AddressToName(fmt.Sprintf("http://%s:%d", host, port)) -} - -var PurpleColor = "\033[35m" -var OrangeColor = "\033[33m" -var ResetColor = "\033[0m" - -func AddressToName(address string) string { - if name, ok := Names.Load(address); ok { - return name.(string) +func UpdateConfig(cfg Config) { + mu.Lock() + defer mu.Unlock() + if globalConfig == nil { + globalConfig = &cfg } - if strings.Count(address, "/") > 2 { - spl := strings.Split(address, "/") - address = spl[0] + "//" + spl[1] + if cfg.BulletinBoard.Host != "" { + globalConfig.BulletinBoard.Host = cfg.BulletinBoard.Host } - if name, ok := Names.Load(address); ok { - return name.(string) + if cfg.BulletinBoard.Port != 0 { + globalConfig.BulletinBoard.Port = cfg.BulletinBoard.Port } - for _, relay := range GlobalConfig.Relays { - if address == relay.Address { - name := fmt.Sprintf("%sRelay %d%s", PurpleColor, relay.ID, ResetColor) - //name := fmt.Sprintf("Relay %d", relay.ID) - Names.Store(address, name) - return name - } + if cfg.PrometheusPath != "" { + globalConfig.PrometheusPath = cfg.PrometheusPath } - for _, client := range GlobalConfig.Clients { - if address == client.Address { - name := fmt.Sprintf("%sClient %d%s", OrangeColor, client.ID, ResetColor) - //name := fmt.Sprintf("Client %d", client.ID) - Names.Store(address, name) - return name - } + if cfg.ScrapeInterval != 0 { + globalConfig.ScrapeInterval = cfg.ScrapeInterval + } + if cfg.BulletinBoard.PromPort != 0 { + globalConfig.BulletinBoard.PromPort = cfg.BulletinBoard.PromPort + } + if cfg.ServerLoad != 0 { + globalConfig.ServerLoad = cfg.ServerLoad + } + if cfg.D != 0 { + globalConfig.D = cfg.D + } + if cfg.Delta != 0 { + globalConfig.Delta = cfg.Delta + } + if cfg.L1 != 0 { + globalConfig.L1 = cfg.L1 + } + if cfg.L2 != 0 { + globalConfig.L2 = cfg.L2 + } + if cfg.Chi != 0 { + globalConfig.Chi = cfg.Chi + } + if cfg.DropAllOnionsFromClient != 0 { + globalConfig.DropAllOnionsFromClient = cfg.DropAllOnionsFromClient } - if address == GlobalConfig.BulletinBoard.Address { - name := "Bulletin Board" - Names.Store(address, name) - return name + if cfg.MinimumClients != 0 { + globalConfig.MinimumClients = cfg.MinimumClients } - if address == GlobalConfig.Metrics.Address { - name := "Metrics" - Names.Store(address, name) - return name + if cfg.MinimumRelays != 0 { + globalConfig.MinimumRelays = cfg.MinimumRelays } - return address + globalConfig.BulletinBoard.Address = fmt.Sprintf("http://%s:%d", globalConfig.BulletinBoard.Host, globalConfig.BulletinBoard.Port) } diff --git a/config/config.yml b/config/config.yml index 2f7a7e9..11349c1 100755 --- a/config/config.yml +++ b/config/config.yml @@ -1,3 +1,5 @@ +N: 6 # minimum number of clients before running protocol +n: 6 # minimum number of relays before running protocol l1: 3 # Number of planned mixers in a routing path l2: 2 # Number of planned gatekeepers in a routing path x: 25 # Server load (x = Ω(polylog λ)) i.e. the expected number of onions per intermediary hop @@ -5,65 +7,11 @@ tau: 0.8 # (τ < (1 − γ)(1 − X)) Fraction of checkpoints needed to pro d: 2 # Threshold for number of bruises before an onion is discarded by a gatekeeper delta: 1e-5 # The probability of differential privacy violation due to the adversary's actions. chi: 1.0 # Fraction of corrupted relays (which perform no mixing) -vis: true # Visualize the network -scrapeInterval: 1000 # Prometheus scrape interval in milliseconds dropAllOnionsFromClient: 1 # Client ID to drop all onions from +vis: true # Visualize the network +scrapeInterval: 15 # Prometheus scrape interval in seconds prometheusPath: '/opt/homebrew/bin/prometheus' - - -metrics: - host: 'localhost' - port: 8200 bulletin_board: host: 'localhost' port: 8080 -clients: - - id: 1 - host: 'localhost' - port: 8101 - prometheus_port: 9101 - - id: 2 - host: 'localhost' - port: 8102 - prometheus_port: 9102 - - id: 3 - host: 'localhost' - port: 8103 - prometheus_port: 9103 - - id: 4 - host: 'localhost' - port: 8104 - prometheus_port: 9104 - - id: 5 - host: 'localhost' - port: 8105 - prometheus_port: 9105 - - id: 6 - host: 'localhost' - port: 8106 - prometheus_port: 9106 -relays: - - id: 1 - host: 'localhost' - port: 8081 - prometheus_port: 9201 - - id: 2 - host: 'localhost' - port: 8082 - prometheus_port: 9202 - - id: 3 - host: 'localhost' - port: 8083 - prometheus_port: 9203 - - id: 4 - host: 'localhost' - port: 8084 - prometheus_port: 9204 - - id: 5 - host: 'localhost' - port: 8085 - prometheus_port: 9205 - - id: 6 - host: 'localhost' - port: 8086 - prometheus_port: 9206 + promPort: 8200 # where aggregated prometheus metrics are served from \ No newline at end of file diff --git a/config/prometheus.go b/config/prometheus.go deleted file mode 100644 index 1ace870..0000000 --- a/config/prometheus.go +++ /dev/null @@ -1,99 +0,0 @@ -package config - -import ( - "fmt" - pl "github.com/HannahMarsh/PrettyLogger" - "gopkg.in/yaml.v3" - "log/slog" - "os" -) - -type Global struct { - ScrapeInterval string `yaml:"scrape_interval"` - ExternalLabels ExternalLabels `yaml:"external_labels"` -} - -type ExternalLabels struct { - Monitor string `yaml:"monitor"` -} - -type ScrapeConfig struct { - JobName string `yaml:"job_name"` - ScrapeInterval string `yaml:"scrape_interval"` - StaticConfigs []StaticConfig `yaml:"static_configs"` -} - -type StaticConfig struct { - Targets []string `yaml:"targets"` -} - -type PromConfig struct { - Global Global `yaml:"global"` - ScrapeConfigs []ScrapeConfig `yaml:"scrape_configs"` -} - -func InitPrometheusConfig(path string) error { - - promCfg := PromConfig{ - Global: Global{ - ScrapeInterval: "15s", - ExternalLabels: ExternalLabels{ - Monitor: "pi_t", - }, - }, - ScrapeConfigs: []ScrapeConfig{}, - } - - for _, client := range GlobalConfig.Clients { - promCfg.ScrapeConfigs = append(promCfg.ScrapeConfigs, ScrapeConfig{ - JobName: fmt.Sprintf("client-%d", client.ID), - ScrapeInterval: "5s", - StaticConfigs: []StaticConfig{ - { - Targets: []string{fmt.Sprintf("%s:%d", client.Host, client.PrometheusPort)}, - }, - }, - }) - } - - for _, relay := range GlobalConfig.Relays { - promCfg.ScrapeConfigs = append(promCfg.ScrapeConfigs, ScrapeConfig{ - JobName: fmt.Sprintf("relay-%d", relay.ID), - ScrapeInterval: "5s", - StaticConfigs: []StaticConfig{ - { - Targets: []string{fmt.Sprintf("%s:%d", relay.Host, relay.PrometheusPort)}, - }, - }, - }) - } - - // Marshal the struct into YAML format - data, err := yaml.Marshal(&promCfg) - if err != nil { - return pl.WrapError(err, "failed to marshal prometheus config") - } - - // Open the file for writing (creates the file if it doesn't exist) - file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - return pl.WrapError(err, "failed to open file for writing") - } - defer file.Close() - - // Write the YAML data to the file - _, err = file.Write(data) - if err != nil { - return pl.WrapError(err, "failed to write prometheus config to file") - } - - // Ensure the data is flushed to disk immediately - err = file.Sync() - if err != nil { - return pl.WrapError(err, "failed to flush prometheus config to disk") - } - - slog.Info("prometheus config written to file", "path", path) - - return nil -} diff --git a/go.mod b/go.mod index ca3c9ea..6059954 100755 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.21 //toolchain go1.21.2 require ( - github.com/HannahMarsh/PrettyLogger v1.0.2 + github.com/HannahMarsh/PrettyLogger v1.0.3 github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.12.0 github.com/ilyakaznacheev/cleanenv v1.3.0 diff --git a/go.sum b/go.sum index 00c76cd..46e7d94 100755 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/HannahMarsh/PrettyLogger v1.0.2 h1:zyJJptCBA5SY75gWxWmyb5MCOoP3A1cOOE8cgo5SEeg= github.com/HannahMarsh/PrettyLogger v1.0.2/go.mod h1:LQcS0IwGxo6MoU8gitIcn+zay529D+GLct/fg4wYLp0= +github.com/HannahMarsh/PrettyLogger v1.0.3 h1:R1FdHoRWfJUdMEwvCopZPuZHmbpslt7Cx32EUZK/8BM= +github.com/HannahMarsh/PrettyLogger v1.0.3/go.mod h1:LQcS0IwGxo6MoU8gitIcn+zay529D+GLct/fg4wYLp0= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= diff --git a/internal/api/api_functions/functions.go b/internal/api/api_functions/functions.go index 64e904c..d2816d7 100644 --- a/internal/api/api_functions/functions.go +++ b/internal/api/api_functions/functions.go @@ -7,7 +7,6 @@ import ( "encoding/json" "fmt" pl "github.com/HannahMarsh/PrettyLogger" - "github.com/HannahMarsh/pi_t-experiment/config" "github.com/HannahMarsh/pi_t-experiment/internal/api/structs" "github.com/HannahMarsh/pi_t-experiment/internal/metrics" "github.com/HannahMarsh/pi_t-experiment/internal/pi_t/onion_model" @@ -20,7 +19,7 @@ import ( // sendOnion sends an onion to the specified address with compression and timeout func SendOnion(to, from string, o onion_model.Onion, layer int) error { - slog.Debug("Sending onion...", "from", config.AddressToName(from), "to", config.AddressToName(to)) + slog.Debug("Sending onion...", "from", from, "to", to) url := fmt.Sprintf("%s/receive", to) data, err := json.Marshal(o) @@ -63,7 +62,7 @@ func SendOnion(to, from string, o onion_model.Onion, layer int) error { resp, err := client.Do(req) if err != nil { - return pl.WrapError(err, "%s: failed to send POST request with onion to %s", pl.GetFuncName(), config.AddressToName(to)) + return pl.WrapError(err, "%s: failed to send POST request with onion to %s", pl.GetFuncName(), (to)) } defer func(Body io.ReadCloser) { @@ -76,7 +75,7 @@ func SendOnion(to, from string, o onion_model.Onion, layer int) error { return pl.NewError("%s: failed to send to first relay(url=%s), status code: %d, status: %s", pl.GetFuncName(), url, resp.StatusCode, resp.Status) } - slog.Debug("✅ Successfully sent onion. ", "from", config.AddressToName(from), "to", config.AddressToName(to)) + slog.Debug("✅ Successfully sent onion. ", "from", (from), "to", (to)) return nil } diff --git a/internal/api/api_functions/functions_test.go b/internal/api/api_functions/functions_test.go index 46a62b8..7840e66 100644 --- a/internal/api/api_functions/functions_test.go +++ b/internal/api/api_functions/functions_test.go @@ -79,7 +79,7 @@ func TestReceiveOnionMultipleLayers(t *testing.T) { nodes[i] = node{privateKeyPEM, publicKeyPEM, fmt.Sprintf("http://localhost:%d", port), port} } - slog.Info(strings.Join(utils.Map(nodes, func(n node) string { return config.AddressToName(n.address) }), " -> ")) + slog.Info(strings.Join(utils.Map(nodes, func(n node) string { return n.address }), " -> ")) secretMessage := "secret message" diff --git a/internal/api/structs/clientStatus.go b/internal/api/structs/clientStatus.go index e119b5c..ae719bb 100644 --- a/internal/api/structs/clientStatus.go +++ b/internal/api/structs/clientStatus.go @@ -27,21 +27,24 @@ type Received struct { TimeReceived time.Time } -func NewClientStatus(id int, address, publicKey string) *ClientStatus { +func NewClientStatus(id, port, promPort int, address, host, publicKey string) *ClientStatus { return &ClientStatus{ MessagesSent: make([]Sent, 0), MessagesReceived: make([]Received, 0), Client: PublicNodeApi{ - ID: id, - Address: address, - PublicKey: publicKey, - Time: time.Now(), + ID: id, + Address: address, + PublicKey: publicKey, + Host: host, + Port: port, + PrometheusPort: promPort, + Time: time.Now(), }, } } func (cs *ClientStatus) AddSent(clientReceiver PublicNodeApi, routingPath []PublicNodeApi, message Message) { - if config.GlobalConfig.Vis { + if config.GetVis() { cs.mu.Lock() defer cs.mu.Unlock() cs.MessagesSent = append(cs.MessagesSent, Sent{ @@ -56,7 +59,7 @@ func (cs *ClientStatus) AddSent(clientReceiver PublicNodeApi, routingPath []Publ } func (cs *ClientStatus) AddReceived(message Message) { - if config.GlobalConfig.Vis { + if config.GetVis() { cs.mu.Lock() defer cs.mu.Unlock() cs.MessagesReceived = append(cs.MessagesReceived, Received{ @@ -68,7 +71,7 @@ func (cs *ClientStatus) AddReceived(message Message) { } func (cs *ClientStatus) GetStatus() string { - if config.GlobalConfig.Vis { + if config.GetVis() { cs.mu.RLock() defer cs.mu.RUnlock() if str, err := json.Marshal(cs); err != nil { diff --git a/internal/api/structs/message.go b/internal/api/structs/message.go index 720c17d..2efef54 100644 --- a/internal/api/structs/message.go +++ b/internal/api/structs/message.go @@ -5,10 +5,10 @@ import ( ) type Message struct { - From string `json:"from"` - To string `json:"to"` + From string `json:"f"` + To string `json:"t"` Msg string `json:"msg"` - Hash string `json:"hash"` + Hash string `json:"h"` } func NewMessage(from, to, msg string) Message { diff --git a/internal/api/structs/nodeApi.go b/internal/api/structs/nodeApi.go index 44b92d5..b3edc29 100644 --- a/internal/api/structs/nodeApi.go +++ b/internal/api/structs/nodeApi.go @@ -5,14 +5,17 @@ import ( ) type PublicNodeApi struct { - ID int - Address string - PublicKey string - Time time.Time + ID int `json:"i"` + Address string `json:"a"` + Host string `json:"h"` + Port int `json:"po"` + PrometheusPort int `json:"pp"` + PublicKey string `json:"pk"` + Time time.Time `json:"t"` } type IntentToSend struct { - From PublicNodeApi - To []PublicNodeApi - Time time.Time + From PublicNodeApi `json:"f"` + To []PublicNodeApi `json:"t"` + Time time.Time `json:"ti"` } diff --git a/internal/api/structs/nodeStatus.go b/internal/api/structs/nodeStatus.go index 1b3d2be..7d2dd97 100644 --- a/internal/api/structs/nodeStatus.go +++ b/internal/api/structs/nodeStatus.go @@ -28,14 +28,17 @@ type OnionStatus struct { NonceVerification bool } -func NewNodeStatus(id int, address, publicKey string) *NodeStatus { +func NewNodeStatus(id, port, promPort int, address, host, publicKey string) *NodeStatus { return &NodeStatus{ Received: make([]OnionStatus, 0), Node: PublicNodeApi{ - ID: id, - Address: address, - PublicKey: publicKey, - Time: time.Now(), + ID: id, + Address: address, + PublicKey: publicKey, + Host: host, + Port: port, + PrometheusPort: promPort, + Time: time.Now(), }, CheckpointOnionsReceived: make(map[int]int), ExpectedCheckpoints: make(map[int]int), @@ -44,7 +47,7 @@ func NewNodeStatus(id int, address, publicKey string) *NodeStatus { } func (ns *NodeStatus) AddCheckpointOnion(layer int) { - if config.GlobalConfig.Vis { + if config.GetVis() { ns.mu.Lock() defer ns.mu.Unlock() ns.CheckpointOnionsReceived[layer]++ @@ -52,7 +55,7 @@ func (ns *NodeStatus) AddCheckpointOnion(layer int) { } func (ns *NodeStatus) AddExpectedCheckpoint(layer int) { - if config.GlobalConfig.Vis { + if config.GetVis() { ns.mu.Lock() defer ns.mu.Unlock() ns.ExpectedCheckpoints[layer]++ @@ -60,7 +63,7 @@ func (ns *NodeStatus) AddExpectedCheckpoint(layer int) { } func (ns *NodeStatus) AddOnion(lastHop, thisAddress, nextHop string, layer int, isCheckPointOnion bool, wasBruised bool) { - if config.GlobalConfig.Vis { + if config.GetVis() { ns.mu.Lock() defer ns.mu.Unlock() ns.Received = append(ns.Received, OnionStatus{ @@ -77,7 +80,7 @@ func (ns *NodeStatus) AddOnion(lastHop, thisAddress, nextHop string, layer int, } func (ns *NodeStatus) GetStatus() string { - if config.GlobalConfig.Vis { + if config.GetVis() { ns.mu.RLock() defer ns.mu.RUnlock() if str, err := json.Marshal(ns); err != nil { diff --git a/internal/api/structs/onionApi.go b/internal/api/structs/onionApi.go index bb7ce12..95e1705 100644 --- a/internal/api/structs/onionApi.go +++ b/internal/api/structs/onionApi.go @@ -1,7 +1,7 @@ package structs type OnionApi struct { - To string - From string - Onion string + To string `json:"t"` + From string `json:"f"` + Onion string `json:"o"` } diff --git a/internal/api/structs/startRun.go b/internal/api/structs/startRun.go index 8b680f1..126a55b 100644 --- a/internal/api/structs/startRun.go +++ b/internal/api/structs/startRun.go @@ -1,21 +1,25 @@ package structs +import "github.com/HannahMarsh/pi_t-experiment/config" + type ClientStartRunApi struct { - Relays []PublicNodeApi - Clients []PublicNodeApi - CheckpointOnions []CheckpointOnion + Relays []PublicNodeApi `json:"r"` + Clients []PublicNodeApi `json:"c"` + CheckpointOnions []CheckpointOnion `json:"co"` + Config config.Config `json:"cfg"` } type RelayStartRunApi struct { - Checkpoints []Checkpoint + Checkpoints []Checkpoint `json:"cp"` + Config config.Config `json:"cfg"` } type CheckpointOnion struct { - Path []Checkpoint + Path []Checkpoint `json:"p"` } type Checkpoint struct { - Receiver PublicNodeApi - Nonce string - Layer int + Receiver PublicNodeApi `json:"r"` + Nonce string `json:"n"` + Layer int `json:"l"` } diff --git a/internal/model/bulletin_board/bulletin_board.go b/internal/model/bulletin_board/bulletin_board.go index 0aab3df..f83b68f 100644 --- a/internal/model/bulletin_board/bulletin_board.go +++ b/internal/model/bulletin_board/bulletin_board.go @@ -7,6 +7,7 @@ import ( pl "github.com/HannahMarsh/PrettyLogger" "github.com/HannahMarsh/pi_t-experiment/config" "github.com/HannahMarsh/pi_t-experiment/internal/api/structs" + "github.com/HannahMarsh/pi_t-experiment/internal/model/bulletin_board/metrics" "net/http" "sync" "time" @@ -54,30 +55,41 @@ func (bb *BulletinBoard) RegisterClient(client structs.PublicNodeApi) { defer bb.mu.Unlock() // If the client is not already present in the Clients map, create a new ClientView for it. - if _, present := bb.Clients[client.ID]; !present { - bb.Clients[client.ID] = NewClientView(client, time.Second*10) - } + //if _, present := bb.Clients[client.ID]; !present { + bb.Clients[client.ID] = NewClientView(client, time.Second*10) + //} return } // RegisterIntentToSend records a client's intent to send a message, updating the active client list accordingly. -func (bb *BulletinBoard) RegisterIntentToSend(its structs.IntentToSend) error { +//func (bb *BulletinBoard) RegisterIntentToSend(its structs.IntentToSend) error { +// bb.mu.Lock() +// defer bb.mu.Unlock() +// +// // Ensure the sender is registered in the Clients map. +// if _, present := bb.Clients[its.From.ID]; !present { +// bb.Clients[its.From.ID] = NewClientView(its.From, time.Second*10) +// } else { +// // Register any additional client in the 'To' field of the IntentToSend. +// for _, client := range its.To { +// if _, present = bb.Clients[client.ID]; !present { +// bb.Clients[client.ID] = NewClientView(client, time.Second*10) +// } +// } +// } +// // Update the sender's ClientView with the intent to send data. +// bb.Clients[its.From.ID].UpdateClient(its) +// return nil +//} + +func (bb *BulletinBoard) Shutdown() error { bb.mu.Lock() defer bb.mu.Unlock() - // Ensure the sender is registered in the Clients map. - if _, present := bb.Clients[its.From.ID]; !present { - bb.Clients[its.From.ID] = NewClientView(its.From, time.Second*10) - } else { - // Register any additional client in the 'To' field of the IntentToSend. - for _, client := range its.To { - if _, present = bb.Clients[client.ID]; !present { - bb.Clients[client.ID] = NewClientView(client, time.Second*10) - } - } + if err := metrics.StopPrometheus(); err != nil { + return pl.WrapError(err, "error stopping Prometheus") } - // Update the sender's ClientView with the intent to send data. - bb.Clients[its.From.ID].UpdateClient(its) + return nil } @@ -109,10 +121,13 @@ func (bb *BulletinBoard) signalNodesToStart() error { return node.IsActive() && node.Address != "" }), func(_ int, nv *RelayView) structs.PublicNodeApi { return structs.PublicNodeApi{ - ID: nv.ID, - Address: nv.Address, - PublicKey: nv.PublicKey, - Time: nv.LastHeartbeat, + ID: nv.ID, + Address: nv.Address, + PublicKey: nv.PublicKey, + Host: nv.Host, + Port: nv.Port, + PrometheusPort: nv.PromPort, + Time: nv.LastHeartbeat, } }) @@ -121,15 +136,20 @@ func (bb *BulletinBoard) signalNodesToStart() error { return cl.IsActive() && cl.Address != "" }), func(_ int, cv *ClientView) structs.PublicNodeApi { return structs.PublicNodeApi{ - ID: cv.ID, - Address: cv.Address, - PublicKey: cv.PublicKey, + ID: cv.ID, + Address: cv.Address, + PublicKey: cv.PublicKey, + Host: cv.Host, + Port: cv.Port, + PrometheusPort: cv.PromPort, } }) // Generate checkpoint onions for the run based on the desired server load from the global configuration checkpoints := GetCheckpoints(activeNodes, activeClients) + cfg := config.GetConfig() + // Prepare start signals for each client, including checkpoints. clientStartSignals := make(map[structs.PublicNodeApi]structs.ClientStartRunApi) for _, client := range activeClients { @@ -137,6 +157,7 @@ func (bb *BulletinBoard) signalNodesToStart() error { Clients: activeClients, Relays: activeNodes, CheckpointOnions: checkpoints[client.ID], + Config: cfg, } clientStartSignals[client] = csr } @@ -155,6 +176,7 @@ func (bb *BulletinBoard) signalNodesToStart() error { for _, node := range activeNodes { nodeStartSignals[node] = structs.RelayStartRunApi{ Checkpoints: allCheckpoints[node.ID], + Config: cfg, } } @@ -162,6 +184,11 @@ func (bb *BulletinBoard) signalNodesToStart() error { var wg sync.WaitGroup wg.Add(len(activeNodes) + len(activeClients)) + if err := metrics.RestartPrometheus(activeNodes, activeClients); err != nil { + slog.Error("Error restarting Prometheus", err) + return pl.WrapError(err, "error restarting Prometheus") + } + // Signal all active client to start the run. for client, csr := range clientStartSignals { go func(client structs.PublicNodeApi, csr structs.ClientStartRunApi) { @@ -218,19 +245,19 @@ func (bb *BulletinBoard) allNodesReady() bool { }) // If the number of active relays is less than required, log and return false. - if activeNodes < len(config.GlobalConfig.Relays) { - slog.Info("Not all nodes are registered") + if activeNodes < config.GetMinimumRelays() { + slog.Info("Not all nodes are registered.", "registered", activeNodes, "min required", config.GetMinimumRelays()) return false } // Count the number of client that have registered intent to send messages. registeredClients := utils.CountAny(utils.GetValues(bb.Clients), func(client *ClientView) bool { - return client.MessageQueue != nil && len(client.MessageQueue) > 0 + return true // client.MessageQueue != nil && len(client.MessageQueue) > 0 }) // If the number of registered client is less than required, log and return false. - if registeredClients < len(config.GlobalConfig.Clients) { - slog.Info("Not all client are registered") + if registeredClients < config.GetMinimumClients() { + slog.Info("Not all client are registered.", "registered", registeredClients, "min required", config.GetMinimumClients()) return false } diff --git a/internal/model/bulletin_board/bulletin_board_handler.go b/internal/model/bulletin_board/bulletin_board_handler.go index 4375220..d66c706 100644 --- a/internal/model/bulletin_board/bulletin_board_handler.go +++ b/internal/model/bulletin_board/bulletin_board_handler.go @@ -2,6 +2,7 @@ package bulletin_board import ( "encoding/json" + "github.com/HannahMarsh/pi_t-experiment/config" "github.com/HannahMarsh/pi_t-experiment/internal/api/structs" "log/slog" "net/http" @@ -45,45 +46,61 @@ func (bb *BulletinBoard) HandleRegisterClient(w http.ResponseWriter, r *http.Req slog.Info("Registering client with", "id", client.ID) // Register the client with the bulletin board. - bb.RegisterClient(client) + go bb.RegisterClient(client) w.WriteHeader(http.StatusCreated) } // HandleRegisterIntentToSend processes HTTP requests for registering a client's intent to send a message. -func (bb *BulletinBoard) HandleRegisterIntentToSend(w http.ResponseWriter, r *http.Request) { - var its structs.IntentToSend +//func (bb *BulletinBoard) HandleRegisterIntentToSend(w http.ResponseWriter, r *http.Request) { +// var its structs.IntentToSend +// +// // Decode the JSON request body into the intent-to-send struct. +// if err := json.NewDecoder(r.Body).Decode(&its); err != nil { +// slog.Error("Error decoding intent-to-send registration request", err) +// http.Error(w, err.Error(), http.StatusBadRequest) +// return +// } +// +// // Register the intent to send with the bulletin board. +// if err := bb.RegisterIntentToSend(its); err != nil { +// slog.Error("Error registering intent-to-send request", err) +// http.Error(w, err.Error(), http.StatusInternalServerError) +// return +// } +// +// w.WriteHeader(http.StatusOK) +//} - // Decode the JSON request body into the intent-to-send struct. - if err := json.NewDecoder(r.Body).Decode(&its); err != nil { - slog.Error("Error decoding intent-to-send registration request", err) +// HandleUpdateRelayInfo processes HTTP requests for updating relay node information. +func (bb *BulletinBoard) HandleUpdateRelayInfo(w http.ResponseWriter, r *http.Request) { + var nodeInfo structs.PublicNodeApi + + // Decode the JSON request body into the nodeInfo struct. + if err := json.NewDecoder(r.Body).Decode(&nodeInfo); err != nil { + slog.Error("Error decoding relay info update request", err) http.Error(w, err.Error(), http.StatusBadRequest) return } - // Register the intent to send with the bulletin board. - if err := bb.RegisterIntentToSend(its); err != nil { - slog.Error("Error registering intent-to-send request", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + // Update the node information in the bulletin board. + bb.UpdateRelay(nodeInfo) w.WriteHeader(http.StatusOK) } -// HandleUpdateRelayInfo processes HTTP requests for updating relay node information. -func (bb *BulletinBoard) HandleUpdateRelayInfo(w http.ResponseWriter, r *http.Request) { - var nodeInfo structs.PublicNodeApi +// HandleUpdateConfig processes HTTP requests for updating the config +func (bb *BulletinBoard) HandleUpdateConfig(w http.ResponseWriter, r *http.Request) { + var cfg config.Config // Decode the JSON request body into the nodeInfo struct. - if err := json.NewDecoder(r.Body).Decode(&nodeInfo); err != nil { - slog.Error("Error decoding relay info update request", err) + if err := json.NewDecoder(r.Body).Decode(&cfg); err != nil { + slog.Error("Error decoding config update request", err) http.Error(w, err.Error(), http.StatusBadRequest) return } - // Update the node information in the bulletin board. - bb.UpdateRelay(nodeInfo) + go config.UpdateConfig(cfg) w.WriteHeader(http.StatusOK) } diff --git a/internal/model/bulletin_board/calculateCheckpoints.go b/internal/model/bulletin_board/calculateCheckpoints.go index 3d1c24e..c011aa0 100644 --- a/internal/model/bulletin_board/calculateCheckpoints.go +++ b/internal/model/bulletin_board/calculateCheckpoints.go @@ -16,7 +16,7 @@ func GetCheckpoints(relays, clients []structs.PublicNodeApi) map[int][]structs.C numRelays := len(relays) // Calculate the expected number of checkpoint onions each client should send based on the server load. - expectedToSend := int((float64(numRelays)*float64(config.GlobalConfig.ServerLoad))/float64(numClients)) - 1 + expectedToSend := int((float64(numRelays)*float64(config.GetServerLoad()))/float64(numClients)) - 1 for _, client := range clients { checkpoints[client.ID] = make([]structs.CheckpointOnion, 0) @@ -25,7 +25,7 @@ func GetCheckpoints(relays, clients []structs.PublicNodeApi) map[int][]structs.C path := make([]structs.Checkpoint, 0) // Generate the relay path for the checkpoint onion, which includes L1 mixers and L2 gatekeepers. - for j := 0; j < config.GlobalConfig.L1+config.GlobalConfig.L2; j++ { + for j := 0; j < config.GetL1()+config.GetL2(); j++ { path = append(path, structs.Checkpoint{ Receiver: utils.RandomElement(relays), // Randomly select a node as the receiver for this layer. Nonce: uuid.New().String(), // Generate a new UUID to use as the nonce for this layer. diff --git a/internal/model/bulletin_board/ClientView.go b/internal/model/bulletin_board/clientView.go similarity index 85% rename from internal/model/bulletin_board/ClientView.go rename to internal/model/bulletin_board/clientView.go index 1e93262..ca9f988 100644 --- a/internal/model/bulletin_board/ClientView.go +++ b/internal/model/bulletin_board/clientView.go @@ -9,6 +9,9 @@ type ClientView struct { ID int Address string PublicKey string + Host string + Port int + PromPort int MessageQueue []structs.PublicNodeApi LastHeartbeat time.Time MaxTimeBetweenHeartbeats time.Duration @@ -19,6 +22,9 @@ func NewClientView(c structs.PublicNodeApi, maxTimeBetweenHeartbeats time.Durati ID: c.ID, Address: c.Address, PublicKey: c.PublicKey, + Host: c.Host, + Port: c.Port, + PromPort: c.PrometheusPort, MessageQueue: make([]structs.PublicNodeApi, 0), LastHeartbeat: time.Now(), MaxTimeBetweenHeartbeats: maxTimeBetweenHeartbeats, diff --git a/internal/model/bulletin_board/metrics/prometheus.go b/internal/model/bulletin_board/metrics/prometheus.go new file mode 100644 index 0000000..7542aa5 --- /dev/null +++ b/internal/model/bulletin_board/metrics/prometheus.go @@ -0,0 +1,200 @@ +package metrics + +import ( + "fmt" + pl "github.com/HannahMarsh/PrettyLogger" + "github.com/HannahMarsh/pi_t-experiment/config" + "github.com/HannahMarsh/pi_t-experiment/internal/api/structs" + "github.com/ilyakaznacheev/cleanenv" + "gopkg.in/yaml.v3" + "log/slog" + "os" + "os/exec" + "path/filepath" + "runtime" + "sync" +) + +type Global struct { + ScrapeInterval string `yaml:"scrape_interval"` + ExternalLabels ExternalLabels `yaml:"external_labels"` +} + +type ExternalLabels struct { + Monitor string `yaml:"monitor"` +} + +type ScrapeConfig struct { + JobName string `yaml:"job_name"` + ScrapeInterval string `yaml:"scrape_interval"` + StaticConfigs []StaticConfig `yaml:"static_configs"` +} + +type StaticConfig struct { + Targets []string `yaml:"targets"` +} + +type PromConfig struct { + Global Global `yaml:"global"` + ScrapeConfigs []ScrapeConfig `yaml:"scrape_configs"` +} + +var PID int +var mu = &sync.Mutex{} + +func RestartPrometheus(relays, clients []structs.PublicNodeApi) error { + + path := "" + + promCfg := &PromConfig{} + + if dir, err := os.Getwd(); err != nil { + return pl.WrapError(err, "config.NewConfig(): global config error") + } else if err2 := cleanenv.ReadConfig(dir+"/internal/model/bulletin_board/metrics/prometheus.yml", promCfg); err2 != nil { + + // Get the absolute path of the current file + _, currentFile, _, ok := runtime.Caller(0) + if !ok { + return pl.NewError("Failed to get current file path") + } + currentDir := filepath.Dir(currentFile) + configFilePath := filepath.Join(currentDir, "/prometheus.yml") + if err3 := cleanenv.ReadConfig(configFilePath, promCfg); err3 != nil { + return pl.WrapError(err3, "InitPrometheusConfig(): global config error") + } else { + path = configFilePath + } + } else { + path = dir + "/internal/model/bulletin_board/metrics/prometheus.yml" + if err3 := cleanenv.ReadEnv(promCfg); err3 != nil { + return pl.WrapError(err3, "InitPrometheusConfig(): global config error") + } + } + + promCfg_ := PromConfig{ + Global: Global{ + ScrapeInterval: fmt.Sprintf("%ds", config.GetScrapeInterval()), + ExternalLabels: ExternalLabels{ + Monitor: "pi_t", + }, + }, + ScrapeConfigs: []ScrapeConfig{}, + } + + for _, client := range clients { + promCfg_.ScrapeConfigs = append(promCfg_.ScrapeConfigs, ScrapeConfig{ + JobName: fmt.Sprintf("client-%d", client.ID), + ScrapeInterval: "5s", + StaticConfigs: []StaticConfig{ + { + Targets: []string{fmt.Sprintf("%s:%d", client.Host, client.PrometheusPort)}, + }, + }, + }) + } + + for _, relay := range relays { + promCfg_.ScrapeConfigs = append(promCfg_.ScrapeConfigs, ScrapeConfig{ + JobName: fmt.Sprintf("relay-%d", relay.ID), + ScrapeInterval: "5s", + StaticConfigs: []StaticConfig{ + { + Targets: []string{fmt.Sprintf("%s:%d", relay.Host, relay.PrometheusPort)}, + }, + }, + }) + } + + // Marshal the struct into YAML format + data, err := yaml.Marshal(&promCfg_) + if err != nil { + return pl.WrapError(err, "failed to marshal prometheus config") + } + + // Open the file for writing (creates the file if it doesn't exist) + file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return pl.WrapError(err, "failed to open file for writing") + } + defer file.Close() + + // Write the YAML data to the file + _, err = file.Write(data) + if err != nil { + return pl.WrapError(err, "failed to write prometheus config to file") + } + + // Ensure the data is flushed to disk immediately + err = file.Sync() + if err != nil { + return pl.WrapError(err, "failed to flush prometheus config to disk") + } + + slog.Info("prometheus config written to file", "path", path) + + // Sop Prometheus + + if err := StopPrometheus(); err != nil { + return pl.WrapError(err, "failed to stop Prometheus") + } + + // Start Prometheus + if err := StartPrometheus(path); err != nil { + return pl.WrapError(err, "failed to start Prometheus") + } + + slog.Info("Prometheus restarted successfully", "pid", PID) + + return nil +} + +func StartPrometheus(path string) error { + + mu.Lock() + defer mu.Unlock() + // Start Prometheus + + // Command to start Prometheus + cmd := exec.Command(config.GetPrometheusPath(), "--config.file", path) + + slog.Info("Starting prometheus with config", "path", path) + + // Set the environment variables, if needed + cmd.Env = os.Environ() + + // Set the command's standard output and error to the current process's output and error + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + // Start the Prometheus process + err := cmd.Start() + if err != nil { + slog.Error("failed to start Prometheus", err) + os.Exit(1) + } + + PID = cmd.Process.Pid + return nil +} + +func StopPrometheus() error { + // Sop Prometheus + + mu.Lock() + defer mu.Unlock() + if PID != 0 { + cmdStop := exec.Command("kill", fmt.Sprintf("%d", PID)) + err := cmdStop.Run() + if err != nil { + slog.Error("failed to stop Prometheus", err) + return pl.WrapError(err, "failed to stop Prometheus") + } else { + slog.Info("successfully stopped Prometheus") + PID = 0 + } + } else { + slog.Info("No running Prometheus instance found, skipping stop") + } + + return nil +} diff --git a/config/prometheus.yml b/internal/model/bulletin_board/metrics/prometheus.yml similarity index 100% rename from config/prometheus.yml rename to internal/model/bulletin_board/metrics/prometheus.yml index 974e31f..e9d36a5 100644 --- a/config/prometheus.yml +++ b/internal/model/bulletin_board/metrics/prometheus.yml @@ -3,16 +3,6 @@ global: external_labels: monitor: pi_t scrape_configs: - - job_name: client-1 - scrape_interval: 5s - static_configs: - - targets: - - localhost:9101 - - job_name: client-2 - scrape_interval: 5s - static_configs: - - targets: - - localhost:9102 - job_name: client-3 scrape_interval: 5s static_configs: @@ -33,21 +23,16 @@ scrape_configs: static_configs: - targets: - localhost:9106 - - job_name: relay-1 - scrape_interval: 5s - static_configs: - - targets: - - localhost:9201 - - job_name: relay-2 + - job_name: client-1 scrape_interval: 5s static_configs: - targets: - - localhost:9202 - - job_name: relay-3 + - localhost:9101 + - job_name: client-2 scrape_interval: 5s static_configs: - targets: - - localhost:9203 + - localhost:9102 - job_name: relay-4 scrape_interval: 5s static_configs: @@ -63,3 +48,18 @@ scrape_configs: static_configs: - targets: - localhost:9206 + - job_name: relay-1 + scrape_interval: 5s + static_configs: + - targets: + - localhost:9201 + - job_name: relay-2 + scrape_interval: 5s + static_configs: + - targets: + - localhost:9202 + - job_name: relay-3 + scrape_interval: 5s + static_configs: + - targets: + - localhost:9203 diff --git a/internal/model/bulletin_board/RelayView.go b/internal/model/bulletin_board/relayView.go similarity index 82% rename from internal/model/bulletin_board/RelayView.go rename to internal/model/bulletin_board/relayView.go index 8a94763..65cda8e 100644 --- a/internal/model/bulletin_board/RelayView.go +++ b/internal/model/bulletin_board/relayView.go @@ -10,6 +10,9 @@ type RelayView struct { ID int Address string PublicKey string + Host string + Port int + PromPort int mu sync.RWMutex LastHeartbeat time.Time MaxTimeBetweenHeartbeats time.Duration @@ -20,6 +23,9 @@ func NewNodeView(n structs.PublicNodeApi, maxTimeBetweenHeartbeats time.Duration ID: n.ID, Address: n.Address, PublicKey: n.PublicKey, + Host: n.Host, + Port: n.Port, + PromPort: n.PrometheusPort, LastHeartbeat: n.Time, MaxTimeBetweenHeartbeats: maxTimeBetweenHeartbeats, } diff --git a/internal/model/client/client.go b/internal/model/client/client.go index 65bc940..56e574e 100644 --- a/internal/model/client/client.go +++ b/internal/model/client/client.go @@ -28,6 +28,7 @@ type Client struct { Address string // Full address of the client in the form http://host:port. PrivateKey string // Client's long term private key for decryption. PublicKey string // Client's long term public key for encryption. + PrometheusPort int // Port number for Prometheus metrics. ActiveRelays []structs.PublicNodeApi // List of active relays known to the client. OtherClients []structs.PublicNodeApi // List of other client known to the client. Messages []structs.Message // Messages to be sent by the client. @@ -38,7 +39,7 @@ type Client struct { } // NewClient creates a new client instance with a unique ID, host, and port. -func NewClient(id int, host string, port int, bulletinBoardUrl string) (*Client, error) { +func NewClient(id int, host string, port int, promPort int, bulletinBoardUrl string) (*Client, error) { // Generate a key pair (private and public) for the client. if privateKey, publicKey, err := keys.KeyGen(); err != nil { return nil, pl.WrapError(err, "relay.NewClient(): failed to generate key pair") @@ -50,11 +51,12 @@ func NewClient(id int, host string, port int, bulletinBoardUrl string) (*Client, Address: fmt.Sprintf("http://%s:%d", host, port), PublicKey: publicKey, PrivateKey: privateKey, + PrometheusPort: promPort, ActiveRelays: make([]structs.PublicNodeApi, 0), BulletinBoardUrl: bulletinBoardUrl, Messages: make([]structs.Message, 0), OtherClients: make([]structs.PublicNodeApi, 0), - status: structs.NewClientStatus(id, fmt.Sprintf("http://%s:%d", host, port), publicKey), + status: structs.NewClientStatus(id, port, promPort, fmt.Sprintf("http://%s:%d", host, port), host, publicKey), } c.wg.Add(1) @@ -74,9 +76,12 @@ func (c *Client) RegisterWithBulletinBoard() error { // Marshal the client's public information into JSON. if data, err := json.Marshal(structs.PublicNodeApi{ - ID: c.ID, - Address: c.Address, - PublicKey: c.PublicKey, + ID: c.ID, + Address: c.Address, + PublicKey: c.PublicKey, + PrometheusPort: c.PrometheusPort, + Host: c.Host, + Port: c.Port, }); err != nil { return pl.WrapError(err, "%s: failed to marshal Client info", pl.GetFuncName()) } else { @@ -103,8 +108,8 @@ func (c *Client) RegisterWithBulletinBoard() error { } // getRecipient determines the recipient client for sending a message based on the client's ID. -func (c *Client) getRecipient() (string, int) { - numClients := len(config.GlobalConfig.Clients) +func (c *Client) getRecipient(clients []structs.PublicNodeApi) (string, int) { + numClients := len(clients) // Generate a reversed array of client IDs. recipients := utils.Reverse(utils.NewIntArray(1, numClients+1)) @@ -116,19 +121,19 @@ func (c *Client) getRecipient() (string, int) { // Determine the recipient ID based on the client's ID. sendTo := recipients[c.ID-1] - recipient := *(utils.Find(config.GlobalConfig.Clients, func(client config.Client) bool { + recipient := *(utils.Find(clients, func(client structs.PublicNodeApi) bool { return client.ID == sendTo })) return fmt.Sprintf("http://%s:%d", recipient.Host, recipient.Port), recipient.ID // Return the recipient's address and ID. } // StartGeneratingMessages generates a single message to be sent to another client. -func (c *Client) StartGeneratingMessages() { +func (c *Client) generateMessages(start structs.ClientStartRunApi) { defer c.wg.Done() // Mark this operation as done in the WaitGroup when finished. slog.Info("Client starting to generate messages", "id", c.ID) // Get the recipient's address and ID. - recipientAddress, recipientId := c.getRecipient() + recipientAddress, _ := c.getRecipient(start.Clients) // Create a new message to send to the recipient. messages := []structs.Message{ @@ -136,21 +141,21 @@ func (c *Client) StartGeneratingMessages() { } // Register the intent to send the message with the bulletin board. - if err := c.RegisterIntentToSend(messages); err != nil { - slog.Error(pl.GetFuncName()+": Error registering intent to send", err) - } else { - slog.Info(fmt.Sprintf("Client %d sending to client %d", c.ID, recipientId)) - c.mu.Lock() - defer c.mu.Unlock() - c.Messages = messages // Store the messages to be sent. - } + //if err := c.RegisterIntentToSend(messages); err != nil { + // slog.Error(pl.GetFuncName()+": Error registering intent to send", err) + //} else { + // slog.Info(fmt.Sprintf("Client %d sending to client %d", c.ID, recipientId)) + c.mu.Lock() + defer c.mu.Unlock() + c.Messages = messages // Store the messages to be sent. + //} } // DetermineRoutingPath determines a random routing path of mixers and gatekeepers. func DetermineRoutingPath(participants []structs.PublicNodeApi) ([]structs.PublicNodeApi, []structs.PublicNodeApi, error) { // initialize slices for mixers and gatekeepers in the path. - mixers := make([]structs.PublicNodeApi, config.GlobalConfig.L1) - gatekeepers := make([]structs.PublicNodeApi, config.GlobalConfig.L2) + mixers := make([]structs.PublicNodeApi, config.GetL1()) + gatekeepers := make([]structs.PublicNodeApi, config.GetL2()) for i := range mixers { mixers[i] = utils.RandomElement(participants) // Randomly select a mixer for each layer. @@ -247,7 +252,7 @@ func (c *Client) processMessage(msg structs.Message, destination structs.PublicN } // Form the onion using the client's private key and the determined routing path. - o, err := pi_t.FORMONION(string(msgBytes), mixersAddr, gatekeepersAddr, destination.Address, publicKeys, metadata, config.GlobalConfig.D) + o, err := pi_t.FORMONION(string(msgBytes), mixersAddr, gatekeepersAddr, destination.Address, publicKeys, metadata, config.GetD()) if err != nil { return nil, pl.WrapError(err, "failed to create onion") } @@ -297,15 +302,15 @@ func (c *Client) processCheckpoint(checkpointOnion structs.CheckpointOnion, clie metadata = append(metadata, onion_model.Metadata{}) // Extract the addresses of mixers and gatekeepers for the routing path. - mixersAddr := utils.Map(path[:config.GlobalConfig.L1], func(mixer structs.PublicNodeApi) string { + mixersAddr := utils.Map(path[:config.GetL1()], func(mixer structs.PublicNodeApi) string { return mixer.Address }) - gatekeepersAddr := utils.Map(path[config.GlobalConfig.L1:], func(gatekeeper structs.PublicNodeApi) string { + gatekeepersAddr := utils.Map(path[config.GetL1():], func(gatekeeper structs.PublicNodeApi) string { return gatekeeper.Address }) // Form the checkpoint onion using the client's private key and the determined routing path. - o, err := pi_t.FORMONION(string(dummyPayload), mixersAddr, gatekeepersAddr, clientReceiver.Address, checkpointPublicKeys, metadata, config.GlobalConfig.D) + o, err := pi_t.FORMONION(string(dummyPayload), mixersAddr, gatekeepersAddr, clientReceiver.Address, checkpointPublicKeys, metadata, config.GetD()) if err != nil { return nil, pl.WrapError(err, "failed to create checkpoint onion") } @@ -328,6 +333,10 @@ func (c *Client) startRun(start structs.ClientStartRunApi) error { c.mu.Lock() // Lock the mutex to ensure exclusive access to the client's state during the run. defer c.mu.Unlock() // Unlock the mutex when the function returns. + config.UpdateConfig(start.Config) // Update the global configuration based on the start signal. + + c.generateMessages(start) + // Ensure that there are relays and client participating in the run. if len(start.Relays) == 0 { return pl.NewError("%s: no participating relays", pl.GetFuncName()) @@ -384,7 +393,7 @@ func (c *Client) Receive(oApi structs.OnionApi) error { if err2 := json.Unmarshal([]byte(peeled.Content), &msg); err2 != nil { return pl.WrapError(err2, "relay.Receive(): failed to unmarshal message") } - slog.Info("Client received onion", "layer", layer, "from", config.AddressToName(msg.From), "message", msg.Msg) + slog.Info("Client received onion", "layer", layer, "from", msg.From, "message", msg.Msg) // Record the received message in the client's status. c.status.AddReceived(msg) @@ -399,48 +408,51 @@ func (c *Client) GetStatus() string { } // RegisterIntentToSend registers the client's intent to send messages with the bulletin board. -func (c *Client) RegisterIntentToSend(messages []structs.Message) error { - // Convert the list of messages into a list of public node APIs for the recipients. - to := utils.Map(messages, func(m structs.Message) structs.PublicNodeApi { - if f := utils.Find(c.OtherClients, func(c structs.PublicNodeApi) bool { - return c.Address == m.To - }); f != nil { - return *f - } else { - return structs.PublicNodeApi{} - } - }) - - // Marshal the intent-to-send data into JSON. - if data, err := json.Marshal(structs.IntentToSend{ - From: structs.PublicNodeApi{ - ID: c.ID, - Address: c.Address, - PublicKey: c.PublicKey, - Time: time.Now(), - }, - To: to, - }); err != nil { - return pl.WrapError(err, "%s: failed to marshal Client info", pl.GetFuncName()) - } else { - // Send a POST request to the bulletin board to register the intent to send messages. - url := c.BulletinBoardUrl + "/registerIntentToSend" - if resp, err2 := http.Post(url, "application/json", bytes.NewBuffer(data)); err2 != nil { - return pl.WrapError(err2, "%s: failed to send POST request to bulletin board", pl.GetFuncName()) - } else { - defer func(Body io.ReadCloser) { - // Ensure the response body is closed to avoid resource leaks. - if err3 := Body.Close(); err3 != nil { - fmt.Printf("Client.UpdateBulletinBoard(): error closing response body: %v\n", err2) - } - }(resp.Body) - // Check if the intent to send was registered successfully based on the HTTP status code. - if resp.StatusCode != http.StatusOK { - return pl.NewError("%s failed to register intent to send, status code: %d, %s", pl.GetFuncName(), resp.StatusCode, resp.Status) - } else { - c.Messages = messages // Store the messages to be sent. - } - return nil - } - } -} +//func (c *Client) RegisterIntentToSend(messages []structs.Message) error { +// // Convert the list of messages into a list of public node APIs for the recipients. +// to := utils.Map(messages, func(m structs.Message) structs.PublicNodeApi { +// if f := utils.Find(c.OtherClients, func(c structs.PublicNodeApi) bool { +// return c.Address == m.To +// }); f != nil { +// return *f +// } else { +// return structs.PublicNodeApi{} +// } +// }) +// +// // Marshal the intent-to-send data into JSON. +// if data, err := json.Marshal(structs.IntentToSend{ +// From: structs.PublicNodeApi{ +// ID: c.ID, +// Address: c.Address, +// PublicKey: c.PublicKey, +// Host: c.Host, +// Port: c.Port, +// PrometheusPort: c.PrometheusPort, +// Time: time.Now(), +// }, +// To: to, +// }); err != nil { +// return pl.WrapError(err, "%s: failed to marshal Client info", pl.GetFuncName()) +// } else { +// // Send a POST request to the bulletin board to register the intent to send messages. +// url := c.BulletinBoardUrl + "/registerIntentToSend" +// if resp, err2 := http.Post(url, "application/json", bytes.NewBuffer(data)); err2 != nil { +// return pl.WrapError(err2, "%s: failed to send POST request to bulletin board", pl.GetFuncName()) +// } else { +// defer func(Body io.ReadCloser) { +// // Ensure the response body is closed to avoid resource leaks. +// if err3 := Body.Close(); err3 != nil { +// fmt.Printf("Client.UpdateBulletinBoard(): error closing response body: %v\n", err2) +// } +// }(resp.Body) +// // Check if the intent to send was registered successfully based on the HTTP status code. +// if resp.StatusCode != http.StatusOK { +// return pl.NewError("%s failed to register intent to send, status code: %d, %s", pl.GetFuncName(), resp.StatusCode, resp.Status) +// } else { +// c.Messages = messages // Store the messages to be sent. +// } +// return nil +// } +// } +//} diff --git a/internal/model/relay/relay.go b/internal/model/relay/relay.go index 140bdaf..57aa219 100644 --- a/internal/model/relay/relay.go +++ b/internal/model/relay/relay.go @@ -14,7 +14,6 @@ import ( "github.com/HannahMarsh/pi_t-experiment/pkg/utils" "io" "net/http" - "strings" "sync" "time" @@ -31,6 +30,7 @@ type Relay struct { Address string // Full address of the relay in the form http://host:port. PrivateKey string // Relay's private key for decryption. PublicKey string // Relay's public key for encryption. + PrometheusPort int // Port number for Prometheus metrics. BulletinBoardUrl string // URL of the bulletin board for relay registration and communication. lastUpdate time.Time // Timestamp of the last update sent to the bulletin board. status *structs.NodeStatus // Relay status, including received onions and checkpoints. @@ -43,36 +43,17 @@ type Relay struct { } // NewRelay creates a new relay instance with a unique ID, host, and port. -func NewRelay(id int, host string, port int, bulletinBoardUrl string) (*Relay, error) { +func NewRelay(id int, host string, port int, promPort int, bulletinBoardUrl string) (*Relay, error) { // Generate a key pair (private and public) for the relay. if privateKey, publicKey, err := keys.KeyGen(); err != nil { return nil, pl.WrapError(err, "relay.NewClient(): failed to generate key pair") } else { // Initialize a list of expected nonces for each layer. - expectedCheckpoints := make([]map[string]bool, config.GlobalConfig.L1+config.GlobalConfig.L2+1) + expectedCheckpoints := make([]map[string]bool, config.GetL1()+config.GetL2()+1) for i := range expectedCheckpoints { expectedCheckpoints[i] = make(map[string]bool) } - // Determine if the relay is corrupted based on the configuration's corruption rate (Chi). - numCorrupted := int(config.GlobalConfig.Chi * float64(len(config.GlobalConfig.Relays))) - corruptedNodes := utils.PseudoRandomSubset(config.GlobalConfig.Relays, numCorrupted, 42) - slog.Debug("", "corrupted nodes", strings.Join(utils.Map(corruptedNodes, func(node config.Relay) string { return fmt.Sprintf("%d", node.ID) }), ", ")) - isCorrupted := utils.Contains(corruptedNodes, func(node config.Relay) bool { - return node.ID == id - }) - - addressToDropFrom := "" - - // If the relay is corrupted, set the address to drop all onions from (specified in the configuration) - if isCorrupted { - if c := utils.Find(config.GlobalConfig.Clients, func(client config.Client) bool { - return client.ID == config.GlobalConfig.DropAllOnionsFromClient - }); c != nil { - addressToDropFrom = c.Address - } - } - n := &Relay{ ID: id, Host: host, @@ -80,12 +61,11 @@ func NewRelay(id int, host string, port int, bulletinBoardUrl string) (*Relay, e Port: port, PublicKey: publicKey, PrivateKey: privateKey, + PrometheusPort: promPort, BulletinBoardUrl: bulletinBoardUrl, - status: structs.NewNodeStatus(id, fmt.Sprintf("http://%s:%d", host, port), publicKey), + status: structs.NewNodeStatus(id, port, promPort, fmt.Sprintf("http://%s:%d", host, port), host, publicKey), checkpointsReceived: &cm.ConcurrentMap[int, int]{}, expectedNonces: expectedCheckpoints, - isCorrupted: isCorrupted, - addressToDropFrom: addressToDropFrom, } n.wg.Add(1) @@ -109,10 +89,13 @@ func (n *Relay) GetStatus() string { // getPublicNodeInfo returns the relay's public information in the form of a PublicNodeApi struct. func (n *Relay) getPublicNodeInfo() structs.PublicNodeApi { return structs.PublicNodeApi{ - ID: n.ID, - Address: n.Address, - PublicKey: n.PublicKey, - Time: time.Now(), + ID: n.ID, + Address: n.Address, + PublicKey: n.PublicKey, + PrometheusPort: n.PrometheusPort, + Host: n.Host, + Port: n.Port, + Time: time.Now(), } } @@ -136,6 +119,30 @@ func (n *Relay) startRun(start structs.RelayStartRunApi) (didParticipate bool, e defer n.mu.Unlock() defer n.wg.Done() + config.UpdateConfig(start.Config) // Update the global configuration based on the start signal. + + // Determine if the relay is corrupted based on the configuration's corruption rate (Chi). + numRelays := utils.Max(config.GetMinimumRelays(), n.ID) + numCorrupted := int(config.GetChi() * float64(numRelays)) + corruptedNodes := utils.PseudoRandomSubset(utils.NewIntArray(1, numRelays), numCorrupted, 42) + isCorrupted := utils.Contains(corruptedNodes, func(id int) bool { + return id == n.ID + }) + + addressToDropFrom := "" + // + //// If the relay is corrupted, set the address to drop all onions from (specified in the configuration) + //if isCorrupted { + // if c := utils.Find(config.flobalConfig.Clients, func(client config2.Client) bool { + // return client.ID == config.flobalConfig.DropAllOnionsFromClient + // }); c != nil { + // addressToDropFrom = c.Address + // } + //} + + n.isCorrupted = isCorrupted + n.addressToDropFrom = addressToDropFrom + // Iterate over the checkpoints in the start signal to record the expected nonces. for _, c := range start.Checkpoints { n.expectedNonces[c.Layer][c.Nonce] = true @@ -195,7 +202,7 @@ func (n *Relay) Receive(oApi structs.OnionApi) error { peeled.Sepal = peeled.Sepal.RemoveBlock() // If not a checkpoint, remove the block from the onion. } - slog.Info("Received onion", "ischeckpoint?", metadata.Nonce != "", "layer", layer, "nextHop", config.AddressToName(nextHop)) + slog.Info("Received onion", "ischeckpoint?", metadata.Nonce != "", "layer", layer, "nextHop", nextHop) n.status.AddOnion(oApi.From, n.Address, nextHop, layer, isCheckpoint, !wasBruised) @@ -257,10 +264,13 @@ func (n *Relay) updateBulletinBoard(endpoint string, expectedStatusCode int) err // Marshal the relay's public information into JSON. if data, err := json.Marshal(structs.PublicNodeApi{ - ID: n.ID, - Address: n.Address, - PublicKey: n.PublicKey, - Time: t, + ID: n.ID, + Address: n.Address, + PublicKey: n.PublicKey, + PrometheusPort: n.PrometheusPort, + Host: n.Host, + Port: n.Port, + Time: t, }); err != nil { return pl.WrapError(err, "relay.UpdateBulletinBoard(): failed to marshal relay info") } else { diff --git a/pkg/utils/publicIp.go b/pkg/utils/publicIp.go new file mode 100644 index 0000000..df8ab7f --- /dev/null +++ b/pkg/utils/publicIp.go @@ -0,0 +1,40 @@ +package utils + +import ( + "encoding/json" + pl "github.com/HannahMarsh/PrettyLogger" + "io" + "log/slog" + "net/http" +) + +type PublicIP struct { + IP string `json:"ip"` + HostName string `json:"hostname"` + City string `json:"city"` + Region string `json:"region"` + Country string `json:"country"` + Location string `json:"loc"` + Org string `json:"org"` + Postal string `json:"postal"` + Timezone string `json:"timezone"` + ReadMe string `json:"readme"` +} + +func GetPublicIP() (*PublicIP, error) { + resp, err := http.Get("https://ipinfo.io/json") + if err != nil { + return nil, err + } + defer func(Body io.ReadCloser) { + if err := Body.Close(); err != nil { + slog.Error("failed to close response body", err) + } + }(resp.Body) + + var ip PublicIP + if err := json.NewDecoder(resp.Body).Decode(&ip); err != nil { + return nil, pl.WrapError(err, "failed to decode public IP") + } + return &ip, nil +}