Skip to content

Commit

Permalink
Dat it for today
Browse files Browse the repository at this point in the history
  • Loading branch information
0x19 committed Sep 16, 2024
1 parent 44367bd commit 79391b7
Show file tree
Hide file tree
Showing 15 changed files with 141 additions and 51 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,31 @@ make build && ./build/fdb benchmark --suite-type quic --clients=1 --messages=100

## Benchmarks

There is a dummy transport, starts the (gnet) UDP and does pretty much nothing. We're going to
use that one as a baseline for any other benchmark.

```
make build && ./build/fdb benchmark --suite-type dummy
Starting benchmark...
Dummy server started successfully
2024/09/16 09:26:54 UDS Server started on udp://127.0.0.1:4434
2024/09/16 09:26:54 Dummy Server is listening on 127.0.0.1:4434
--- Benchmark Report ---
Total Clients: 10
Total Messages: 1000
Success Messages: 1000
Failed Messages: 0
Total Duration: 199.805µs
Average Latency: 28ns
Throughput: 6,702,126 m/s
Memory Used: 0 bytes
Dummy server stopped successfully
```

With 10 clients, throughput should be ~ *6,702,1269* m/s

### QUIC

```
Expand Down
57 changes: 38 additions & 19 deletions benchmark/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package benchmark
import (
"context"
"fmt"
"github.com/quic-go/quic-go"
"github.com/pkg/errors"
"github.com/unpackdev/fdb"
transport_dummy "github.com/unpackdev/fdb/transports/dummy"
"github.com/unpackdev/fdb/types"
"net"
)

type DummySuite struct {
fdbInstance *fdb.FDB
server *transport_dummy.Server
client quic.Connection
stream quic.Stream
client *net.UDPConn
}

func NewDummySuite(fdbInstance *fdb.FDB) *DummySuite {
Expand All @@ -23,7 +23,7 @@ func NewDummySuite(fdbInstance *fdb.FDB) *DummySuite {
}

// Start starts the QUIC server for benchmarking.
func (qs *DummySuite) Start() error {
func (qs *DummySuite) Start(ctx context.Context) error {
dummyTransport, err := qs.fdbInstance.GetTransportByType(types.DummyTransportType)
if err != nil {
return fmt.Errorf("failed to retrieve QUIC transport: %w", err)
Expand All @@ -45,35 +45,54 @@ func (qs *DummySuite) Start() error {
rHandler := transport_dummy.NewDummyReadHandler(db)
dummyServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage)

if err := dummyServer.Start(); err != nil {
return fmt.Errorf("failed to start Dummy server: %w", err)
}
go (func() {
if err := dummyServer.Start(); err != nil {
//return fmt.Errorf("failed to start Dummy server: %w", err)
}
})()

<-dummyServer.WaitStarted()

qs.server = dummyServer
fmt.Println("QUIC server started successfully")
fmt.Println("Dummy server started successfully")
return nil
}

// Stop stops the QUIC server and closes the client connection and stream.
func (qs *DummySuite) Stop() {
if qs.stream != nil {
qs.stream.Close()
}
func (qs *DummySuite) Stop(ctx context.Context) error {
if qs.client != nil {
qs.client.CloseWithError(0, "closing connection")
if err := qs.client.Close(); err != nil {
return err
}
}
if qs.server != nil {
qs.server.Stop()
fmt.Println("QUIC server stopped successfully")
if err := qs.server.Stop(); err != nil {
return err
}
}
fmt.Println("Dummy server stopped successfully")
return nil
}

// SetupClient sets up a QUIC client and stream only once.
func (qs *DummySuite) SetupClient(ctx context.Context) error {
if qs.client != nil && qs.stream != nil {
if qs.client != nil {
return nil // Already setup, reuse client and stream
}

// Resolve the server address
serverAddr, err := net.ResolveUDPAddr("udp", qs.server.Addr())
if err != nil {
return errors.Wrap(err, "failed to resolve server address")
}

// Create the UDP client
client, err := net.DialUDP("udp", nil, serverAddr)
if err != nil {
return errors.Wrap(err, "failed to connect to server")
}

qs.client = client

/* serverAddr := qs.server.Addr()
clientTLSConfig := &tls.Config{
Expand Down Expand Up @@ -101,8 +120,8 @@ func (qs *DummySuite) SetupClient(ctx context.Context) error {
// Run sends a single message through a QUIC stream sequentially.
func (qs *DummySuite) Run(ctx context.Context) error {
// Check if stream is initialized
if qs.stream == nil {
return fmt.Errorf("stream is not initialized")
if qs.client == nil {
return fmt.Errorf("client is not initialized")
}

/* // Send the write message
Expand Down
14 changes: 8 additions & 6 deletions benchmark/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,22 @@ func (sm *SuiteManager) RegisterSuite(suiteType SuiteType, suite TransportSuite)
}

// Start starts the suite for the specified SuiteType.
func (sm *SuiteManager) Start(suiteType SuiteType) error {
func (sm *SuiteManager) Start(ctx context.Context, suiteType SuiteType) error {
suite, exists := sm.Suites[suiteType]
if !exists {
return fmt.Errorf("suite type %s not found", suiteType)
}
return suite.Start()
return suite.Start(ctx)
}

// Stop stops the suite for the specified SuiteType.
func (sm *SuiteManager) Stop(suiteType SuiteType) {
suite, exists := sm.Suites[suiteType]
if exists {
suite.Stop()
func (sm *SuiteManager) Stop(ctx context.Context, suiteType SuiteType) error {
if suite, exists := sm.Suites[suiteType]; exists {
if err := suite.Stop(ctx); err != nil {
return err
}
}
return nil
}

// Run executes the benchmarking logic for the specified SuiteType.
Expand Down
3 changes: 1 addition & 2 deletions benchmark/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func (p *ClientPool) Start(ctx context.Context, suite TransportSuite) error {
func (p *ClientPool) runClient(ctx context.Context, clientID int, suite TransportSuite) {
defer p.poolWg.Done()

err := suite.SetupClient(ctx)
if err != nil {
if err := suite.SetupClient(ctx); err != nil {
fmt.Printf("Client %d setup failed: %v\n", clientID, err)
return
}
Expand Down
20 changes: 14 additions & 6 deletions benchmark/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewQuicSuite(fdbInstance *fdb.FDB) *QuicSuite {
}

// Start starts the QUIC server for benchmarking.
func (qs *QuicSuite) Start() error {
func (qs *QuicSuite) Start(ctx context.Context) error {
quicTransport, err := qs.fdbInstance.GetTransportByType(types.QUICTransportType)
if err != nil {
return fmt.Errorf("failed to retrieve QUIC transport: %w", err)
Expand Down Expand Up @@ -68,17 +68,25 @@ func (qs *QuicSuite) Start() error {
}

// Stop stops the QUIC server and closes the client connection and stream.
func (qs *QuicSuite) Stop() {
func (qs *QuicSuite) Stop(ctx context.Context) error {
if qs.stream != nil {
qs.stream.Close()
if err := qs.stream.Close(); err != nil {
return err
}
}
if qs.client != nil {
qs.client.CloseWithError(0, "closing connection")
if err := qs.client.CloseWithError(0, "closing connection"); err != nil {
return err
}
}
if qs.quicServer != nil {
qs.quicServer.Stop()
fmt.Println("QUIC server stopped successfully")
if err := qs.quicServer.Stop(); err != nil {
return err
}
}

fmt.Println("QUIC server stopped successfully")
return nil
}

// SetupClient sets up a QUIC client and stream only once.
Expand Down
7 changes: 4 additions & 3 deletions benchmark/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package benchmark
import (
"encoding/json"
"fmt"
"github.com/dustin/go-humanize"
"os"
"time"
)
Expand All @@ -20,8 +21,8 @@ type Report struct {
LatencyHistogram []time.Duration `json:"latency_histogram"`
}

// NewBenchmarkReport creates a new benchmark report.
func NewBenchmarkReport() *Report {
// NewReport creates a new benchmark report.
func NewReport() *Report {
return &Report{
LatencyHistogram: make([]time.Duration, 0),
}
Expand All @@ -36,7 +37,7 @@ func (r *Report) PrintReport() {
fmt.Printf("Failed Messages: %d\n", r.FailedMessages)
fmt.Printf("Total Duration: %s\n", r.TotalDuration)
fmt.Printf("Average Latency: %s\n", r.AvgLatency)
fmt.Printf("Throughput: %.2f messages/second\n", r.Throughput)
fmt.Printf("Throughput: %s messages/second\n", humanize.Comma(int64(r.Throughput)))
fmt.Printf("Memory Used: %d bytes\n", r.MemoryUsed)
}

Expand Down
4 changes: 2 additions & 2 deletions benchmark/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ var ErrInvalidSuiteType = errors.New("invalid suite type")

// TransportSuite defines a common interface that all transport-specific suites must implement.
type TransportSuite interface {
Start() error
Stop()
Start(ctx context.Context) error
Stop(ctx context.Context) error
SetupClient(ctx context.Context) error
Run(ctx context.Context) error
}
15 changes: 10 additions & 5 deletions cmd/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func BenchmarkCommand() *cli.Command {

// Configure transports (for now just QUIC)
cnf := config.Config{
Logger: config.Logger{
Enabled: true,
Environment: "development",
Level: "info",
},
MdbxNodes: []config.MdbxNode{
{
Name: "benchmark",
Expand Down Expand Up @@ -68,7 +73,7 @@ func BenchmarkCommand() *cli.Command {
Config: config.DummyTransport{
Enabled: true,
IPv4: "127.0.0.1",
Port: 4433,
Port: 4434,
TLS: config.TLS{
Key: "./data/certs/key.pem",
Cert: "./data/certs/cert.pem",
Expand All @@ -94,13 +99,13 @@ func BenchmarkCommand() *cli.Command {
messagesPerClient := c.Int("messages")

// Start the suite
if err := suiteManager.Start(suiteType); err != nil {
if err := suiteManager.Start(c.Context, suiteType); err != nil {
return fmt.Errorf("failed to start suite: %w", err)
}
defer suiteManager.Stop(suiteType)

// Create benchmark report
report := benchmark.NewBenchmarkReport()
defer suiteManager.Stop(c.Context, suiteType)

report := benchmark.NewReport()

// Create client pool and start the benchmarking process
clientPool := benchmark.NewClientPool(totalClients, messagesPerClient, report)
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import "github.com/unpackdev/fdb/types"

type Config struct {
Logger Logger `yaml:"logger"`
Transports []Transport `yaml:"transports"`
MdbxNodes MdbxNodes `yaml:"nodes"`
}
Expand Down
7 changes: 7 additions & 0 deletions config/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package config

type Logger struct {
Enabled bool `yaml:"enabled"`
Environment string `yaml:"environment"`
Level string `yaml:"level"`
}
17 changes: 16 additions & 1 deletion fdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package fdb

import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/unpackdev/fdb/config"
"github.com/unpackdev/fdb/db"
"github.com/unpackdev/fdb/logger"
"github.com/unpackdev/fdb/transports"
transport_dummy "github.com/unpackdev/fdb/transports/dummy"
transport_quic "github.com/unpackdev/fdb/transports/quic"
"github.com/unpackdev/fdb/types"
"go.uber.org/zap"
)

type FDB struct {
Expand All @@ -23,6 +26,18 @@ func New(ctx context.Context, cnf config.Config) (*FDB, error) {
return nil, errors.Wrap(err, "failure to validate (f)db configuration")
}

// Sets the global logger.
// I hate to pass by reference logger everywhere...
// In case you wish to use your own zap logger you can disable logger here,
// implement your own and set the globals on your end.
if cnf.Logger.Enabled {
zLog, zlErr := logger.GetLogger(cnf.Logger.Environment, cnf.Logger.Level)
if zlErr != nil {
return nil, errors.Wrap(zlErr, "failure to construct new logger")
}
zap.ReplaceGlobals(zLog)
}

// Create a new transport manager
transportManager := transports.NewManager()

Expand Down Expand Up @@ -66,7 +81,7 @@ func New(ctx context.Context, cnf config.Config) (*FDB, error) {
return nil, errors.Wrap(err, "failed to register UDS transport")
}*/
default:
return nil, errors.New("unknown transport type")
return nil, fmt.Errorf("unknown transport type provided: %v", t.GetTransportType())
}
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (

require (
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect
github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240503222823-736c933a666d // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/erigontech/mdbx-go v0.38.4 h1:S9T7mTe9KPcFe4dOoOtVdI6gPzht9y7wMnYfUBgrQLo=
github.com/erigontech/mdbx-go v0.38.4/go.mod h1:IcOLQDPw3VM/asP6T5JVPPN4FHHgJtY16XfYjzWKVNI=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
Expand Down
Loading

0 comments on commit 79391b7

Please sign in to comment.