From 79391b7cb213d28af1d7ccc18d6a424d1e62f919 Mon Sep 17 00:00:00 2001 From: Nevio Date: Mon, 16 Sep 2024 09:36:01 +0200 Subject: [PATCH] Dat it for today --- README.md | 25 +++++++++++++++++ benchmark/dummy.go | 57 +++++++++++++++++++++++++------------- benchmark/manager.go | 14 ++++++---- benchmark/pool.go | 3 +- benchmark/quic.go | 20 +++++++++---- benchmark/report.go | 7 +++-- benchmark/type.go | 4 +-- cmd/benchmark.go | 15 ++++++---- config/config.go | 1 + config/logger.go | 7 +++++ fdb.go | 17 +++++++++++- go.mod | 1 + go.sum | 2 ++ transports/dummy/server.go | 11 ++++---- transports/quic/server.go | 8 ++++-- 15 files changed, 141 insertions(+), 51 deletions(-) create mode 100644 config/logger.go diff --git a/README.md b/README.md index 733f8f0..4faf2ab 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,31 @@ make build && ./build/fdb benchmark --suite-type quic --clients=1 --messages=100 ## Benchmarks +There is a dummy transport, starts the (gnet) UDP and does pretty much nothing. We're going to +use that one as a baseline for any other benchmark. + +``` +make build && ./build/fdb benchmark --suite-type dummy +Starting benchmark... +Dummy server started successfully +2024/09/16 09:26:54 UDS Server started on udp://127.0.0.1:4434 +2024/09/16 09:26:54 Dummy Server is listening on 127.0.0.1:4434 + +--- Benchmark Report --- +Total Clients: 10 +Total Messages: 1000 +Success Messages: 1000 +Failed Messages: 0 +Total Duration: 199.805µs +Average Latency: 28ns +Throughput: 6,702,126 m/s +Memory Used: 0 bytes + +Dummy server stopped successfully +``` + +With 10 clients, throughput should be ~ *6,702,1269* m/s + ### QUIC ``` diff --git a/benchmark/dummy.go b/benchmark/dummy.go index 08a6876..15f03a5 100644 --- a/benchmark/dummy.go +++ b/benchmark/dummy.go @@ -3,17 +3,17 @@ package benchmark import ( "context" "fmt" - "github.com/quic-go/quic-go" + "github.com/pkg/errors" "github.com/unpackdev/fdb" transport_dummy "github.com/unpackdev/fdb/transports/dummy" "github.com/unpackdev/fdb/types" + "net" ) type DummySuite struct { fdbInstance *fdb.FDB server *transport_dummy.Server - client quic.Connection - stream quic.Stream + client *net.UDPConn } func NewDummySuite(fdbInstance *fdb.FDB) *DummySuite { @@ -23,7 +23,7 @@ func NewDummySuite(fdbInstance *fdb.FDB) *DummySuite { } // Start starts the QUIC server for benchmarking. -func (qs *DummySuite) Start() error { +func (qs *DummySuite) Start(ctx context.Context) error { dummyTransport, err := qs.fdbInstance.GetTransportByType(types.DummyTransportType) if err != nil { return fmt.Errorf("failed to retrieve QUIC transport: %w", err) @@ -45,35 +45,54 @@ func (qs *DummySuite) Start() error { rHandler := transport_dummy.NewDummyReadHandler(db) dummyServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage) - if err := dummyServer.Start(); err != nil { - return fmt.Errorf("failed to start Dummy server: %w", err) - } + go (func() { + if err := dummyServer.Start(); err != nil { + //return fmt.Errorf("failed to start Dummy server: %w", err) + } + })() + + <-dummyServer.WaitStarted() qs.server = dummyServer - fmt.Println("QUIC server started successfully") + fmt.Println("Dummy server started successfully") return nil } // Stop stops the QUIC server and closes the client connection and stream. -func (qs *DummySuite) Stop() { - if qs.stream != nil { - qs.stream.Close() - } +func (qs *DummySuite) Stop(ctx context.Context) error { if qs.client != nil { - qs.client.CloseWithError(0, "closing connection") + if err := qs.client.Close(); err != nil { + return err + } } if qs.server != nil { - qs.server.Stop() - fmt.Println("QUIC server stopped successfully") + if err := qs.server.Stop(); err != nil { + return err + } } + fmt.Println("Dummy server stopped successfully") + return nil } -// SetupClient sets up a QUIC client and stream only once. func (qs *DummySuite) SetupClient(ctx context.Context) error { - if qs.client != nil && qs.stream != nil { + if qs.client != nil { return nil // Already setup, reuse client and stream } + // Resolve the server address + serverAddr, err := net.ResolveUDPAddr("udp", qs.server.Addr()) + if err != nil { + return errors.Wrap(err, "failed to resolve server address") + } + + // Create the UDP client + client, err := net.DialUDP("udp", nil, serverAddr) + if err != nil { + return errors.Wrap(err, "failed to connect to server") + } + + qs.client = client + /* serverAddr := qs.server.Addr() clientTLSConfig := &tls.Config{ @@ -101,8 +120,8 @@ func (qs *DummySuite) SetupClient(ctx context.Context) error { // Run sends a single message through a QUIC stream sequentially. func (qs *DummySuite) Run(ctx context.Context) error { // Check if stream is initialized - if qs.stream == nil { - return fmt.Errorf("stream is not initialized") + if qs.client == nil { + return fmt.Errorf("client is not initialized") } /* // Send the write message diff --git a/benchmark/manager.go b/benchmark/manager.go index f7c1874..88de081 100644 --- a/benchmark/manager.go +++ b/benchmark/manager.go @@ -34,20 +34,22 @@ func (sm *SuiteManager) RegisterSuite(suiteType SuiteType, suite TransportSuite) } // Start starts the suite for the specified SuiteType. -func (sm *SuiteManager) Start(suiteType SuiteType) error { +func (sm *SuiteManager) Start(ctx context.Context, suiteType SuiteType) error { suite, exists := sm.Suites[suiteType] if !exists { return fmt.Errorf("suite type %s not found", suiteType) } - return suite.Start() + return suite.Start(ctx) } // Stop stops the suite for the specified SuiteType. -func (sm *SuiteManager) Stop(suiteType SuiteType) { - suite, exists := sm.Suites[suiteType] - if exists { - suite.Stop() +func (sm *SuiteManager) Stop(ctx context.Context, suiteType SuiteType) error { + if suite, exists := sm.Suites[suiteType]; exists { + if err := suite.Stop(ctx); err != nil { + return err + } } + return nil } // Run executes the benchmarking logic for the specified SuiteType. diff --git a/benchmark/pool.go b/benchmark/pool.go index 56eab33..5fefd98 100644 --- a/benchmark/pool.go +++ b/benchmark/pool.go @@ -47,8 +47,7 @@ func (p *ClientPool) Start(ctx context.Context, suite TransportSuite) error { func (p *ClientPool) runClient(ctx context.Context, clientID int, suite TransportSuite) { defer p.poolWg.Done() - err := suite.SetupClient(ctx) - if err != nil { + if err := suite.SetupClient(ctx); err != nil { fmt.Printf("Client %d setup failed: %v\n", clientID, err) return } diff --git a/benchmark/quic.go b/benchmark/quic.go index 6cb1855..866c0dd 100644 --- a/benchmark/quic.go +++ b/benchmark/quic.go @@ -36,7 +36,7 @@ func NewQuicSuite(fdbInstance *fdb.FDB) *QuicSuite { } // Start starts the QUIC server for benchmarking. -func (qs *QuicSuite) Start() error { +func (qs *QuicSuite) Start(ctx context.Context) error { quicTransport, err := qs.fdbInstance.GetTransportByType(types.QUICTransportType) if err != nil { return fmt.Errorf("failed to retrieve QUIC transport: %w", err) @@ -68,17 +68,25 @@ func (qs *QuicSuite) Start() error { } // Stop stops the QUIC server and closes the client connection and stream. -func (qs *QuicSuite) Stop() { +func (qs *QuicSuite) Stop(ctx context.Context) error { if qs.stream != nil { - qs.stream.Close() + if err := qs.stream.Close(); err != nil { + return err + } } if qs.client != nil { - qs.client.CloseWithError(0, "closing connection") + if err := qs.client.CloseWithError(0, "closing connection"); err != nil { + return err + } } if qs.quicServer != nil { - qs.quicServer.Stop() - fmt.Println("QUIC server stopped successfully") + if err := qs.quicServer.Stop(); err != nil { + return err + } } + + fmt.Println("QUIC server stopped successfully") + return nil } // SetupClient sets up a QUIC client and stream only once. diff --git a/benchmark/report.go b/benchmark/report.go index be8cd17..b03d134 100644 --- a/benchmark/report.go +++ b/benchmark/report.go @@ -3,6 +3,7 @@ package benchmark import ( "encoding/json" "fmt" + "github.com/dustin/go-humanize" "os" "time" ) @@ -20,8 +21,8 @@ type Report struct { LatencyHistogram []time.Duration `json:"latency_histogram"` } -// NewBenchmarkReport creates a new benchmark report. -func NewBenchmarkReport() *Report { +// NewReport creates a new benchmark report. +func NewReport() *Report { return &Report{ LatencyHistogram: make([]time.Duration, 0), } @@ -36,7 +37,7 @@ func (r *Report) PrintReport() { fmt.Printf("Failed Messages: %d\n", r.FailedMessages) fmt.Printf("Total Duration: %s\n", r.TotalDuration) fmt.Printf("Average Latency: %s\n", r.AvgLatency) - fmt.Printf("Throughput: %.2f messages/second\n", r.Throughput) + fmt.Printf("Throughput: %s messages/second\n", humanize.Comma(int64(r.Throughput))) fmt.Printf("Memory Used: %d bytes\n", r.MemoryUsed) } diff --git a/benchmark/type.go b/benchmark/type.go index 8620399..48eeb63 100644 --- a/benchmark/type.go +++ b/benchmark/type.go @@ -19,8 +19,8 @@ var ErrInvalidSuiteType = errors.New("invalid suite type") // TransportSuite defines a common interface that all transport-specific suites must implement. type TransportSuite interface { - Start() error - Stop() + Start(ctx context.Context) error + Stop(ctx context.Context) error SetupClient(ctx context.Context) error Run(ctx context.Context) error } diff --git a/cmd/benchmark.go b/cmd/benchmark.go index 64f1860..f01508a 100644 --- a/cmd/benchmark.go +++ b/cmd/benchmark.go @@ -41,6 +41,11 @@ func BenchmarkCommand() *cli.Command { // Configure transports (for now just QUIC) cnf := config.Config{ + Logger: config.Logger{ + Enabled: true, + Environment: "development", + Level: "info", + }, MdbxNodes: []config.MdbxNode{ { Name: "benchmark", @@ -68,7 +73,7 @@ func BenchmarkCommand() *cli.Command { Config: config.DummyTransport{ Enabled: true, IPv4: "127.0.0.1", - Port: 4433, + Port: 4434, TLS: config.TLS{ Key: "./data/certs/key.pem", Cert: "./data/certs/cert.pem", @@ -94,13 +99,13 @@ func BenchmarkCommand() *cli.Command { messagesPerClient := c.Int("messages") // Start the suite - if err := suiteManager.Start(suiteType); err != nil { + if err := suiteManager.Start(c.Context, suiteType); err != nil { return fmt.Errorf("failed to start suite: %w", err) } - defer suiteManager.Stop(suiteType) - // Create benchmark report - report := benchmark.NewBenchmarkReport() + defer suiteManager.Stop(c.Context, suiteType) + + report := benchmark.NewReport() // Create client pool and start the benchmarking process clientPool := benchmark.NewClientPool(totalClients, messagesPerClient, report) diff --git a/config/config.go b/config/config.go index 9344db3..fc77b87 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,7 @@ package config import "github.com/unpackdev/fdb/types" type Config struct { + Logger Logger `yaml:"logger"` Transports []Transport `yaml:"transports"` MdbxNodes MdbxNodes `yaml:"nodes"` } diff --git a/config/logger.go b/config/logger.go new file mode 100644 index 0000000..d4cd4d0 --- /dev/null +++ b/config/logger.go @@ -0,0 +1,7 @@ +package config + +type Logger struct { + Enabled bool `yaml:"enabled"` + Environment string `yaml:"environment"` + Level string `yaml:"level"` +} diff --git a/fdb.go b/fdb.go index 5749132..4d62bd7 100644 --- a/fdb.go +++ b/fdb.go @@ -2,13 +2,16 @@ package fdb import ( "context" + "fmt" "github.com/pkg/errors" "github.com/unpackdev/fdb/config" "github.com/unpackdev/fdb/db" + "github.com/unpackdev/fdb/logger" "github.com/unpackdev/fdb/transports" transport_dummy "github.com/unpackdev/fdb/transports/dummy" transport_quic "github.com/unpackdev/fdb/transports/quic" "github.com/unpackdev/fdb/types" + "go.uber.org/zap" ) type FDB struct { @@ -23,6 +26,18 @@ func New(ctx context.Context, cnf config.Config) (*FDB, error) { return nil, errors.Wrap(err, "failure to validate (f)db configuration") } + // Sets the global logger. + // I hate to pass by reference logger everywhere... + // In case you wish to use your own zap logger you can disable logger here, + // implement your own and set the globals on your end. + if cnf.Logger.Enabled { + zLog, zlErr := logger.GetLogger(cnf.Logger.Environment, cnf.Logger.Level) + if zlErr != nil { + return nil, errors.Wrap(zlErr, "failure to construct new logger") + } + zap.ReplaceGlobals(zLog) + } + // Create a new transport manager transportManager := transports.NewManager() @@ -66,7 +81,7 @@ func New(ctx context.Context, cnf config.Config) (*FDB, error) { return nil, errors.Wrap(err, "failed to register UDS transport") }*/ default: - return nil, errors.New("unknown transport type") + return nil, fmt.Errorf("unknown transport type provided: %v", t.GetTransportType()) } } diff --git a/go.mod b/go.mod index 92dca49..a4018e8 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( require ( github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/go-task/slim-sprig/v3 v3.0.0 // indirect github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect github.com/ianlancetaylor/cgosymbolizer v0.0.0-20240503222823-736c933a666d // indirect diff --git a/go.sum b/go.sum index 34c6a7b..13e543a 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46t github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/erigontech/mdbx-go v0.38.4 h1:S9T7mTe9KPcFe4dOoOtVdI6gPzht9y7wMnYfUBgrQLo= github.com/erigontech/mdbx-go v0.38.4/go.mod h1:IcOLQDPw3VM/asP6T5JVPPN4FHHgJtY16XfYjzWKVNI= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= diff --git a/transports/dummy/server.go b/transports/dummy/server.go index cfa3252..7a6f48a 100644 --- a/transports/dummy/server.go +++ b/transports/dummy/server.go @@ -17,7 +17,6 @@ type Server struct { ctx context.Context cnf config.DummyTransport handlerRegistry map[types.HandlerType]DummyHandler - addr string stopChan chan struct{} started chan struct{} } @@ -42,8 +41,8 @@ func (s *Server) Addr() string { // Start starts the UDS server func (s *Server) Start() error { s.stopChan = make(chan struct{}) - s.started = make(chan struct{}) // Initialize the started channel - listenAddr := "unix://" + s.addr + s.started = make(chan struct{}, 1) // Initialize the started channel + listenAddr := "udp://" + s.cnf.Addr() log.Printf("UDS Server started on %s", listenAddr) return gnet.Serve( @@ -67,18 +66,20 @@ func (s *Server) Tick() (delay time.Duration, action gnet.Action) { } // Stop stops the UDS server -func (s *Server) Stop() { +func (s *Server) Stop() error { close(s.stopChan) + return nil } func (s *Server) WaitStarted() <-chan struct{} { + defer close(s.started) return s.started } // OnInitComplete is called when the server starts func (s *Server) OnInitComplete(server gnet.Server) (action gnet.Action) { log.Printf("Dummy Server is listening on %s", server.Addr.String()) - close(s.started) // Signal that the server has started + s.started <- struct{}{} // Signal that the server has started return } diff --git a/transports/quic/server.go b/transports/quic/server.go index 65b30ed..8bcc4af 100644 --- a/transports/quic/server.go +++ b/transports/quic/server.go @@ -202,10 +202,14 @@ func (s *Server) handleStream(conn quic.Connection, stream quic.Stream) { } // Stop stops the QUIC server -func (s *Server) Stop() { +func (s *Server) Stop() error { close(s.stopChan) - s.listener.Close() + if err := s.listener.Close(); err != nil { + return err + } + s.wg.Wait() + return nil } // WaitStarted returns a channel that is closed when the server has started