Skip to content

Commit

Permalink
Update code to handle new node management and fix some bugs.
Browse files Browse the repository at this point in the history
This commit fixes a few bugs and updates the code to handle new node management. It fixes the logic for determining the number of messages a client should send and updates the logic for handling onions in the client. It also includes some logging to help with debugging. The `StartRuns` function now correctly signals nodes to start and returns an error if an error occurs. The `formOnions` function now correctly determines the routing path for each message and logs the routing path. The `startRun` function now correctly sends onions to the first node and logs the message if an error occurs. The `Receive` function now correctly removes the outermost layer of an onion and logs an error if the onion is invalid. The `HandleStartRun` function now logs the result of starting a run and the `start` function now correctly sends onions to all nodes.
  • Loading branch information
HannahMarsh committed Jun 24, 2024
1 parent c6714a8 commit ba34310
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func main() {
}
}

http.HandleFunc("/receiveOnion", newNode.HandleReceiveOnion)
http.HandleFunc("/receive", newNode.HandleReceiveOnion)
http.HandleFunc("/start", newNode.HandleStartRun)

go func() {
Expand Down
1 change: 1 addition & 0 deletions internal/api/startRun.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ package api
type StartRunApi struct {
ParticipatingClients []PublicNodeApi
ActiveNodes []PublicNodeApi
NumMessagesPerClient int
}
2 changes: 2 additions & 0 deletions internal/bulletin_board/bulletin_board.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func (bb *BulletinBoard) StartRuns() error {
if bb.allNodesReady() {
if err := bb.signalNodesToStart(); err != nil {
return PrettyLogger.WrapError(err, "error signaling nodes to start")
} else {
return nil
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions internal/bulletin_board/bulletin_board_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,14 @@ func (bb *BulletinBoard) signalNodesToStart() error {
}
})

numMessages := utils.Max(utils.MapEntries(bb.Clients, func(_ int, client *ClientView) int {
return len(client.MessageQueue)
})) + 2

vs := api.StartRunApi{
ParticipatingClients: activeClients,
ActiveNodes: activeNodes,
NumMessagesPerClient: numMessages,
}

if data, err := json.Marshal(vs); err != nil {
Expand Down
54 changes: 49 additions & 5 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (c *Client) RegisterWithBulletinBoard() error {

func (c *Client) StartGeneratingMessages(client_addresses []string) {
slog.Info("Client starting to generate messages", "id", c.ID)
var msgNum int = 0
for {
select {
case <-config.GlobalCtx.Done():
Expand All @@ -100,8 +101,9 @@ func (c *Client) StartGeneratingMessages(client_addresses []string) {
messages = append(messages, api.Message{
From: c.Adddress,
To: addr,
Msg: fmt.Sprintf("msg from client(id=%d)", c.ID),
Msg: fmt.Sprintf("Msg#%d from client(id=%d)", msgNum, c.ID),
})
msgNum++
}
}
var wg sync.WaitGroup
Expand All @@ -120,7 +122,7 @@ func (c *Client) StartGeneratingMessages(client_addresses []string) {
}()
wg.Wait()
}
time.Sleep(15 * time.Second)
time.Sleep(5 * time.Second)
}
}

Expand All @@ -146,7 +148,31 @@ func (c *Client) formOnions(start api.StartRunApi) (map[string][]api.OnionApi, e

onions := make(map[string][]api.OnionApi)

slog.Info("formOnions", "id", c.ID, "num_messages", len(c.Messages), "num_participants", len(start.ParticipatingClients), "num_active_nodes", len(start.ActiveNodes))
nodes := utils.Filter(start.ActiveNodes, func(node api.PublicNodeApi) bool {
return node.Address != c.Adddress && node.Address != ""
})

numMessagesToSend := make(map[string]int)

for _, msg := range c.Messages {
if _, found := numMessagesToSend[msg.To]; !found {
numMessagesToSend[msg.To] = 0
}
numMessagesToSend[msg.To]++
}

for addr, numMessages := range numMessagesToSend {
if numMessages < start.NumMessagesPerClient {
numDummyNeeded := start.NumMessagesPerClient - numMessages
for i := 0; i < numDummyNeeded; i++ {
c.Messages = append(c.Messages, api.Message{
From: c.Adddress,
To: addr,
Msg: "dummy",
})
}
}
}

for _, msg := range c.Messages {
if destination, found := utils.Find(start.ParticipatingClients, api.PublicNodeApi{}, func(client api.PublicNodeApi) bool {
Expand All @@ -157,7 +183,7 @@ func (c *Client) formOnions(start api.StartRunApi) (map[string][]api.OnionApi, e

if msgString, err := json.Marshal(msg); err != nil {
return nil, pl.WrapError(err, "failed to marshal message")
} else if routingPath, err2 := DetermineRoutingPath(3, start.ActiveNodes); err2 != nil {
} else if routingPath, err2 := DetermineRoutingPath(3, nodes); err2 != nil {
return nil, pl.WrapError(err2, "failed to determine routing path")
} else {
routingPath = append(routingPath, destination)
Expand All @@ -167,6 +193,7 @@ func (c *Client) formOnions(start api.StartRunApi) (map[string][]api.OnionApi, e
addresses := utils.Map(routingPath, func(node api.PublicNodeApi) string {
return node.Address
})
slog.Info("routing path", "path", addresses)
if addr, onion, err3 := pi_t.FormOnion(msgString, publicKeys, addresses); err3 != nil {
return nil, pl.WrapError(err3, "failed to create onion")
} else {
Expand Down Expand Up @@ -212,7 +239,7 @@ func (c *Client) startRun(start api.StartRunApi) (bool, error) {
} else {
for addr, onions := range toSend {
for _, onion := range onions {
url := fmt.Sprintf("%s/receiveOnion", addr)
url := fmt.Sprintf("%s/receive", addr)

if data, err2 := json.Marshal(onion); err2 != nil {
slog.Error("failed to marshal msgs", err2)
Expand All @@ -226,11 +253,14 @@ func (c *Client) startRun(start api.StartRunApi) (bool, error) {
}(resp.Body)
if resp.StatusCode != http.StatusOK {
return true, pl.NewError("%s: Failed to send to first node(url=%s), status code: %d, status: %s", pl.GetFuncName(), url, resp.StatusCode, resp.Status)
} else {
slog.Info("Client sent onion to first mixer", "mixer_address", addr)
}
}
}
}
}
c.Messages = make([]api.Message, 0)
return true, nil
}

Expand All @@ -246,6 +276,20 @@ func (c *Client) startRun(start api.StartRunApi) (bool, error) {
//}

func (c *Client) Receive(o string) error {
if destination, payload, err := pi_t.PeelOnion(o, c.PrivateKey); err != nil {
return pl.WrapError(err, "node.Receive(): failed to remove layer")
} else {
if destination == "" {
var msg api.Message
if err2 := json.Unmarshal([]byte(payload), &msg); err2 != nil {
return pl.WrapError(err2, "node.Receive(): failed to unmarshal message")
}
slog.Info("Received message", "from", msg.From, "to", msg.To, "msg", msg.Msg)

} else {
return pl.NewError("Received onion", "destination", destination)
}
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/client/clientHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *Client) HandleStartRun(w http.ResponseWriter, r *http.Request) {
if didParticipate, err := c.startRun(start); err != nil {
slog.Error("Error starting run", err)
} else {
slog.Info("Run complete", "did_participate", didParticipate)
slog.Info("Done sending onions", "did_participate", didParticipate)
}
}()
w.WriteHeader(http.StatusOK)
Expand Down
10 changes: 10 additions & 0 deletions pkg/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ func Sum(values []int) int {
return sum
}

func Max(values []int) int {
m := values[0]
for _, v := range values {
if v > m {
m = v
}
}
return m
}

func GetValues[K comparable, V any](m map[K]V) []V {
values := make([]V, 0, len(m))
for _, v := range m {
Expand Down

0 comments on commit ba34310

Please sign in to comment.