diff --git a/README.md b/README.md index 1b02137..251f5c4 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,10 @@ make build && ./build/fdb benchmark --suite quic --clients 5 --messages 1000 --t There is a dummy transport, starts the (gnet) UDP and does pretty much nothing. We're going to use that one as a baseline for any other benchmark. +### DUMMY + +#### Write Benchmark + ``` make build && ./build/fdb benchmark --suite dummy --clients 50 --messages 1000000 --type write --timeout 120 @@ -145,30 +149,9 @@ Latency Jitter (StdDev): 346.418350µs ``` - ### TCP - -Write without ACK - -``` ---- Benchmark Report --- -Total Clients: 50 -Messages per Client: 1000000 -Total Messages: 50000000 -Success Messages: 50000000 -Failed Messages: 0 -Total Duration: 7.17357253s -Average Latency: 3.358µs -P50 Latency: 2.42µs -P90 Latency: 2.77µs -P99 Latency: 6.5µs -Throughput: 6,970,027 messages/second -Memory Used: 10.20 MB -Latency Jitter (StdDev): 30.940890µs -``` - -With ACK +#### Write Benchmark ``` --- Benchmark Report --- @@ -189,6 +172,8 @@ Latency Jitter (StdDev): 148.417551µs ### QUIC +#### Write Benchmark + ``` make build && ./build/fdb benchmark --suite quic --clients 50 --messages 100000 --type write --timeout 120 @@ -208,6 +193,27 @@ Memory Used: 17260.96 MB Latency Jitter (StdDev): 319.379812µs ``` +### UDP + +#### Write Benchmark + +``` +--- Benchmark Report --- +Total Clients: 50 +Messages per Client: 100000 +Total Messages: 5000000 +Success Messages: 5000000 +Failed Messages: 0 +Total Duration: 16.771189289s +Average Latency: 169.167µs +P50 Latency: 128.563µs +P90 Latency: 307.689µs +P99 Latency: 877.784µs +Throughput: 298,130 messages/second +Memory Used: 678.49 MB +Latency Jitter (StdDev): 173.144187µs +``` + ## For Developers - Main entrypoint to the application can be found at [entrypoint](./entrypoint) diff --git a/benchmark.yaml b/benchmark.yaml index b8cd6e1..ceaafc4 100644 --- a/benchmark.yaml +++ b/benchmark.yaml @@ -10,7 +10,7 @@ mdbx: path: /tmp/ maxReaders: 4096 maxSize: 1024 # Maximum database size (1 TB) - minSize: 1 # Minimum database size (1 MB) + minSize: 1 # Minimum database size (1 GB) growthStep: 4096 # Growth step size (4 KB) filePermissions: 0600 # File permissions for the database @@ -42,6 +42,16 @@ transports: ipv4: 127.0.0.1 port: 5011 tls: + insecure: true + key: ./data/certs/key.pem + cert: ./data/certs/cert.pem + + - type: udp + enabled: true + config: + ipv4: 127.0.0.1 + port: 5022 + dtls: insecure: true key: ./data/certs/key.pem cert: ./data/certs/cert.pem \ No newline at end of file diff --git a/benchmark/manager.go b/benchmark/manager.go index 219fa47..e2c15f5 100644 --- a/benchmark/manager.go +++ b/benchmark/manager.go @@ -24,6 +24,7 @@ func NewSuiteManager(fdb *fdb.FDB) *SuiteManager { manager.RegisterSuite(DummySuiteType, NewDummySuite(fdb, 500)) manager.RegisterSuite(UDSSuiteType, NewUdsSuite(fdb, 500)) manager.RegisterSuite(TCPSuiteType, NewTcpSuite(fdb, 500)) + manager.RegisterSuite(UDPSuiteType, NewUdpSuite(fdb, 500)) return manager } diff --git a/benchmark/type.go b/benchmark/type.go index 6722209..baababb 100644 --- a/benchmark/type.go +++ b/benchmark/type.go @@ -12,6 +12,7 @@ const ( QUICSuite SuiteType = "quic" UDSSuiteType SuiteType = "uds" // Example for future transport suites TCPSuiteType SuiteType = "tcp" + UDPSuiteType SuiteType = "udp" DummySuiteType SuiteType = "dummy" ) diff --git a/benchmark/udp.go b/benchmark/udp.go new file mode 100644 index 0000000..0d68e14 --- /dev/null +++ b/benchmark/udp.go @@ -0,0 +1,231 @@ +package benchmark + +import ( + "context" + "fmt" + "github.com/pkg/errors" + "github.com/unpackdev/fdb" + "github.com/unpackdev/fdb/db" + transport_udp "github.com/unpackdev/fdb/transports/udp" + "github.com/unpackdev/fdb/types" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "net" + "sync" + "sync/atomic" + "time" +) + +// UdpSuite represents the benchmarking suite for UDP with buffer reuse and latency sampling. +type UdpSuite struct { + fdb *fdb.FDB + server *transport_udp.Server + pool *sync.Pool // Buffer pool for reuse + latencySampling int // How often to sample latencies (e.g., every 1000th message) +} + +// NewUdpSuite initializes the UdpSuite with buffer reuse and latency sampling settings. +func NewUdpSuite(fdb *fdb.FDB, latencySampling int) *UdpSuite { + return &UdpSuite{ + fdb: fdb, + pool: &sync.Pool{ + New: func() interface{} { + // Dynamic buffer sizing - start with a small buffer + return make([]byte, 64) // Default buffer size is 64 bytes, will grow as needed + }, + }, + latencySampling: latencySampling, + } +} + +// Start starts the UDP server for benchmarking. +func (us *UdpSuite) Start(ctx context.Context) error { + udpTransport, err := us.fdb.GetTransportByType(types.UDPTransportType) + if err != nil { + return fmt.Errorf("failed to retrieve UDP transport: %w", err) + } + + udpServer, ok := udpTransport.(*transport_udp.Server) + if !ok { + return fmt.Errorf("failed to cast transport to UdpServer") + } + + bDb, err := us.fdb.GetDbManager().GetDb("benchmark") + if err != nil { + return fmt.Errorf("failed to retrieve benchmark database: %w", err) + } + + // Create a new BatchWriter with a batch size of 512 and flush interval of 1 second + batchWriter := db.NewBatchWriter(bDb.(*db.Db), 512, 500*time.Millisecond, 15) + + wHandler := transport_udp.NewUDPWriteHandler(bDb, batchWriter) + udpServer.RegisterHandler(types.WriteHandlerType, wHandler.HandleMessage) + + rHandler := transport_udp.NewUDPReadHandler(bDb) + udpServer.RegisterHandler(types.ReadHandlerType, rHandler.HandleMessage) + + if sErr := udpServer.Start(); sErr != nil { + zap.L().Error("failed to start UDP transport", zap.Error(sErr)) + } + + us.server = udpServer + zap.L().Info("UDP transport is ready to accept traffic", zap.String("addr", udpServer.Addr())) + return nil +} + +// Stop stops the UDP server and closes the client connection. +func (us *UdpSuite) Stop(ctx context.Context) error { + if us.server != nil { + if err := us.server.Stop(); err != nil { + return err + } + } + zap.L().Info("UDP transport stopped successfully") + return nil +} + +// AcquireClient creates and returns a new UDP client. +func (us *UdpSuite) AcquireClient() (*net.UDPConn, error) { + // Resolve the server address + serverAddr, err := net.ResolveUDPAddr("udp", us.server.Addr()) + if err != nil { + return nil, errors.Wrap(err, "failed to resolve server address") + } + + // Create the UDP client + client, err := net.DialUDP("udp", nil, serverAddr) + if err != nil { + return nil, errors.Wrap(err, "failed to connect to server") + } + + return client, nil +} + +// RunWriteBenchmark benchmarks writing messages through the UDP server. +func (us *UdpSuite) RunWriteBenchmark(ctx context.Context, numClients int, numMessagesPerClient int, report *Report) error { + return us.runBenchmark(ctx, numClients, numMessagesPerClient, report, true) +} + +// RunReadBenchmark benchmarks reading messages from the UDP server. +func (us *UdpSuite) RunReadBenchmark(ctx context.Context, numClients int, numMessagesPerClient int, report *Report) error { + return us.runBenchmark(ctx, numClients, numMessagesPerClient, report, false) +} + +// runBenchmark sends messages (writes or reads) and gathers benchmark results using goroutines. +func (us *UdpSuite) runBenchmark(ctx context.Context, numClients int, numMessagesPerClient int, report *Report, isWrite bool) error { + startTime := time.Now() + var totalLatency time.Duration + var successMessages int64 + var failedMessages int64 + + // Set the number of clients and messages per client in the report + report.TotalClients = numClients + report.MessagesPerClient = numMessagesPerClient + + g, ctx := errgroup.WithContext(ctx) + + for i := 1; i <= numClients; i++ { + g.Go(func() error { + client, err := us.AcquireClient() + if err != nil { + return err + } + defer client.Close() + + for j := 0; j < numMessagesPerClient; j++ { + select { + case <-ctx.Done(): + zap.L().Info("Context canceled, stopping benchmark execution") + return ctx.Err() + default: + // Retrieve a buffer from the pool + buf := us.pool.Get().([]byte) + + var err error + var latency time.Duration + messageStart := time.Now() + + if isWrite { + // Create and encode the write message (reusing the buffer) + message := createWriteMessage() + encodedMessage, err := message.EncodeWithBuffer(buf) + if err != nil { + // Return the buffer to the pool on error + us.pool.Put(buf) + return fmt.Errorf("failed to encode message: %w", err) + } + + // Write the message to the server + _, err = client.Write(encodedMessage) + if err != nil { + atomic.AddInt64(&failedMessages, 1) + us.pool.Put(buf) + return errors.Wrap(err, "failed to write UDP message") + } + + // Read the response from the server + responseBuf := make([]byte, len(encodedMessage)) // Adjust size as per response + _, err = client.Read(responseBuf) + if err != nil { + atomic.AddInt64(&failedMessages, 1) + us.pool.Put(buf) + return errors.Wrap(err, "failed to read UDP response") + } + + } + + if !isWrite { // For read-only benchmarking + // Read the response from the server + responseBuf := make([]byte, 1024) // Adjust size as per response + _, err = client.Read(responseBuf) + if err != nil { + atomic.AddInt64(&failedMessages, 1) + us.pool.Put(buf) + return errors.Wrap(err, "failed to read UDP response") + } + } + + latency = time.Since(messageStart) + + if err != nil { + atomic.AddInt64(&failedMessages, 1) + us.pool.Put(buf) + return errors.Wrap(err, "failed to write/read UDP message") + } else { + atomic.AddInt64(&successMessages, 1) + totalLatency += latency + + // Sample latencies + if j%us.latencySampling == 0 { + report.LatencyHistogram = append(report.LatencyHistogram, latency) + } + } + + // Return the buffer to the pool for reuse + us.pool.Put(buf) + } + } + return nil + }) + } + + // Wait for all clients to finish + if err := g.Wait(); err != nil { + return err + } + + // Calculate jitter (standard deviation of latencies) + report.Jitter = calculateStdDev(report.LatencyHistogram) + + // Update report after all clients have finished + report.SuccessMessages = int(successMessages) + report.FailedMessages = int(failedMessages) + report.TotalMessages = int(successMessages) + int(failedMessages) + report.TotalDuration = time.Since(startTime) + report.Throughput = float64(successMessages) / report.TotalDuration.Seconds() + + // Finalize the report to calculate average latency and other metrics + report.Finalize() + + return nil +} diff --git a/config/transports.go b/config/transports.go index 7a470de..e99f9d5 100644 --- a/config/transports.go +++ b/config/transports.go @@ -116,7 +116,13 @@ func (t *Transport) UnmarshalYAML(value *yaml.Node) error { } config.Type = types.TCPTransportType t.Config = &config - + case types.UDPTransportType: + var config UdpTransport + if err := aux.Config.Decode(&config); err != nil { + return fmt.Errorf("failed to unmarshal UDP transport config: %w", err) + } + config.Type = types.UDPTransportType + t.Config = &config default: return fmt.Errorf("unsupported transport type: %s", t.Type) } diff --git a/config/udp.go b/config/udp.go new file mode 100644 index 0000000..15b4239 --- /dev/null +++ b/config/udp.go @@ -0,0 +1,177 @@ +package config + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "github.com/unpackdev/fdb/types" + "gopkg.in/yaml.v3" + "os" +) + +// DTLS represents the DTLS configuration used by the UDP transport. +// It is similar to TLS, but designed for datagram-based communication. +type DTLS struct { + // Cert is the path to the certificate file used for DTLS encryption. + Cert string `yaml:"cert" json:"cert" mapstructure:"cert"` + + // Key is the path to the private key file used for DTLS encryption. + Key string `yaml:"key" json:"key" mapstructure:"key"` + + // RootCA is the path to the root CA file used to validate the peer's certificate. + RootCA string `yaml:"root_ca" json:"root_ca" mapstructure:"root_ca"` + + // Insecure determines if the DTLS should skip certificate verification. + Insecure bool `yaml:"insecure" json:"insecure" mapstructure:"insecure"` +} + +// UdpTransport represents the configuration for UDP-based transport, with optional DTLS support. +type UdpTransport struct { + // Type defines the transport type, typically represented as types.UDPTransportType. + Type types.TransportType `yaml:"type" json:"type" mapstructure:"type"` + + // Enabled determines if the UDP transport is enabled. + Enabled bool `yaml:"enabled" json:"enabled" mapstructure:"enabled"` + + // IPv4 is the IPv4 address where the UDP server or client will bind. + IPv4 string `yaml:"ipv4" json:"ipv4" mapstructure:"ipv4"` + + // Port is the port number on which the UDP transport will operate. + Port int `yaml:"port" json:"port" mapstructure:"port"` + + // DTLS holds the DTLS configuration for the UDP transport, if DTLS is required. + DTLS *DTLS `yaml:"dtls" json:"dtls" mapstructure:"dtls"` +} + +// Addr returns the full address (IPv4 and port) as a string for the UDP transport. +// This address is used by the UDP server or client to bind or connect to. +// +// Example usage: +// +// addr := udpTransport.Addr() +// +// Returns: +// +// string: The full IPv4 address and port. +func (t UdpTransport) Addr() string { + return fmt.Sprintf("%s:%d", t.IPv4, t.Port) +} + +// GetTransportType returns the transport type, which is typically UDP for this struct. +// +// Example usage: +// +// transportType := udpTransport.GetTransportType() +// +// Returns: +// +// types.TransportType: The transport type. +func (t UdpTransport) GetTransportType() types.TransportType { + return t.Type +} + +// GetDTLSConfig loads the DTLS configuration if specified. This allows the UDP transport +// to use DTLS for secure communication. +// +// Example usage: +// +// dtlsConfig, err := udpTransport.GetDTLSConfig() +// if err != nil { +// log.Fatalf("Failed to load DTLS config: %v", err) +// } +// +// Returns: +// +// *tls.Config: The DTLS configuration for the UDP transport, or nil if not using DTLS. +// error: Returns an error if DTLS setup fails. +func (t UdpTransport) GetDTLSConfig() (*tls.Config, error) { + if t.DTLS == nil { + return nil, nil // No DTLS configuration provided + } + + // Check if the certificate file exists + if _, err := os.Stat(t.DTLS.Cert); os.IsNotExist(err) { + return nil, fmt.Errorf("certificate file does not exist: %s", t.DTLS.Cert) + } + // Check if the key file exists + if _, err := os.Stat(t.DTLS.Key); os.IsNotExist(err) { + return nil, fmt.Errorf("key file does not exist: %s", t.DTLS.Key) + } + + // Load the certificate and key + cert, err := tls.LoadX509KeyPair(t.DTLS.Cert, t.DTLS.Key) + if err != nil { + return nil, fmt.Errorf("failed to load certificate and key: %w", err) + } + + // Prepare the DTLS configuration + tlsConfig := &tls.Config{ + InsecureSkipVerify: t.DTLS.Insecure, + Certificates: []tls.Certificate{cert}, + } + + // Load the Root CA if specified + if t.DTLS.RootCA != "" { + caCert, err := os.ReadFile(t.DTLS.RootCA) + if err != nil { + return nil, fmt.Errorf("failed to read root CA file: %w", err) + } + + // Append the Root CA to the pool + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(caCert); !ok { + return nil, fmt.Errorf("failed to append root CA certificates") + } + tlsConfig.RootCAs = caCertPool + } + + return tlsConfig, nil +} + +// UnmarshalYAML is a custom YAML unmarshaler for UdpTransport. +// It decodes the YAML configuration into the UdpTransport struct fields, +// mapping the common transport fields like Type, Enabled, IPv4, Port, and DTLS. +// +// Example YAML format: +// +// type: udp +// enabled: true +// ipv4: "127.0.0.1" +// port: 4242 +// dtls: +// insecure: true +// cert: "/path/to/cert.pem" +// key: "/path/to/key.pem" +// rootCa: "/path/to/rootCA.pem" +// +// Parameters: +// +// value (*yaml.Node): The YAML node to be decoded. +// +// Returns: +// +// error: Returns an error if unmarshaling fails; otherwise, nil. +func (t *UdpTransport) UnmarshalYAML(value *yaml.Node) error { + // Create a temporary struct to capture the common fields + aux := struct { + Type types.TransportType `yaml:"type"` + Enabled bool `yaml:"enabled"` + IPv4 string `yaml:"ipv4"` + Port int `yaml:"port"` + DTLS *DTLS `yaml:"dtls"` + }{} + + // Unmarshal the common fields, including the nested DTLS config + if err := value.Decode(&aux); err != nil { + return fmt.Errorf("failed to unmarshal UDP transport fields: %w", err) + } + + // Assign values to the actual struct + t.Type = aux.Type + t.Enabled = aux.Enabled + t.IPv4 = aux.IPv4 + t.Port = aux.Port + t.DTLS = aux.DTLS + + return nil +} diff --git a/fdb.go b/fdb.go index 0d6e720..bc589ac 100644 --- a/fdb.go +++ b/fdb.go @@ -3,6 +3,7 @@ package fdb import ( "context" "fmt" + "github.com/davecgh/go-spew/spew" "github.com/pkg/errors" "github.com/unpackdev/fdb/config" "github.com/unpackdev/fdb/db" @@ -11,6 +12,7 @@ import ( transport_dummy "github.com/unpackdev/fdb/transports/dummy" transport_quic "github.com/unpackdev/fdb/transports/quic" transport_tcp "github.com/unpackdev/fdb/transports/tcp" + transport_udp "github.com/unpackdev/fdb/transports/udp" transport_uds "github.com/unpackdev/fdb/transports/uds" "github.com/unpackdev/fdb/types" "go.uber.org/zap" @@ -90,6 +92,15 @@ func New(ctx context.Context, cnf config.Config) (*FDB, error) { if err := transportManager.RegisterTransport(types.TCPTransportType, tcpServer); err != nil { return nil, errors.Wrap(err, "failed to register TCP transport") } + case *config.UdpTransport: + spew.Dump(cnf) + udpServer, err := transport_udp.NewServer(ctx, *t) + if err != nil { + return nil, errors.Wrap(err, "failed to create UDP server") + } + if err := transportManager.RegisterTransport(types.UDPTransportType, udpServer); err != nil { + return nil, errors.Wrap(err, "failed to register UDP transport") + } default: return nil, fmt.Errorf("unknown transport type provided: %v", t.GetTransportType()) } diff --git a/transports/tcp/handler_read.go b/transports/tcp/handler_read.go index 3732063..3560a4d 100644 --- a/transports/tcp/handler_read.go +++ b/transports/tcp/handler_read.go @@ -27,21 +27,21 @@ func (rh *TCPReadHandler) HandleMessage(c gnet.Conn, frame []byte) { } // Extract the key (32 bytes starting from the second byte) - //key := frame[1:33] - - /* // Read from the database using the key - value, err := rh.db.Get(key) - if err != nil { - log.Printf("Error reading from database: %v", err) - c.SendTo([]byte("Error reading from database")) - return - } - - if len(value) == 0 { - log.Printf("No value found for key: %x", key) - c.SendTo([]byte("No value found for key")) - return - }*/ + key := frame[1:33] + + // Read from the database using the key + value, err := rh.db.Get(key) + if err != nil { + log.Printf("Error reading from database: %v", err) + c.SendTo([]byte("Error reading from database")) + return + } + + if len(value) == 0 { + log.Printf("No value found for key: %x", key) + c.SendTo([]byte("No value found for key")) + return + } // Send the value back to the client c.SendTo([]byte{0x01}) diff --git a/transports/tcp/tcp_server.go b/transports/tcp/server.go similarity index 100% rename from transports/tcp/tcp_server.go rename to transports/tcp/server.go diff --git a/transports/tcp/tcp_server_test.go b/transports/tcp/server_test.go similarity index 100% rename from transports/tcp/tcp_server_test.go rename to transports/tcp/server_test.go diff --git a/transports/udp/handler_read.go b/transports/udp/handler_read.go new file mode 100644 index 0000000..f68c3db --- /dev/null +++ b/transports/udp/handler_read.go @@ -0,0 +1,48 @@ +package transport_udp + +import ( + "github.com/panjf2000/gnet" + "github.com/unpackdev/fdb/db" + "log" +) + +// UDPReadHandler struct with MDBX database passed in +type UDPReadHandler struct { + db db.Provider // MDBX database instance +} + +// NewUDPReadHandler creates a new UDPReadHandler with an MDBX database +func NewUDPReadHandler(db db.Provider) *UDPReadHandler { + return &UDPReadHandler{ + db: db, + } +} + +// HandleMessage processes the incoming message using the UDPReadHandler +func (rh *UDPReadHandler) HandleMessage(c gnet.Conn, frame []byte) { + if len(frame) < 33 { // 1 byte action + 32-byte key + log.Printf("Invalid message length: %d, expected at least 33 bytes", len(frame)) + c.SendTo([]byte("Invalid message format")) + return + } + + // Extract the key (32 bytes starting from the second byte) + key := frame[1:33] + + // Read from the database using the key + value, err := rh.db.Get(key) + if err != nil { + log.Printf("Error reading from database: %v", err) + c.SendTo([]byte("Error reading from database")) + return + } + + if len(value) == 0 { + log.Printf("No value found for key: %x", key) + c.SendTo([]byte("No value found for key")) + return + } + + // Send the value back to the client + c.SendTo(value) +} diff --git a/transports/udp/udp_handler_read_test.go b/transports/udp/handler_read_test.go similarity index 99% rename from transports/udp/udp_handler_read_test.go rename to transports/udp/handler_read_test.go index 1777dd5..89cb73d 100644 --- a/transports/udp/udp_handler_read_test.go +++ b/transports/udp/handler_read_test.go @@ -1,4 +1,4 @@ -package fdb +package transport_udp import ( "context" diff --git a/transports/udp/handler_write.go b/transports/udp/handler_write.go new file mode 100644 index 0000000..cbdc934 --- /dev/null +++ b/transports/udp/handler_write.go @@ -0,0 +1,44 @@ +package transport_udp + +import ( + "github.com/panjf2000/gnet" + "github.com/unpackdev/fdb/db" + "log" +) + +// UDPWriteHandler struct with MDBX database passed in +type UDPWriteHandler struct { + db db.Provider // MDBX database instance + writer *db.BatchWriter // Batch writer instance +} + +// NewUDPWriteHandler creates a new UDPWriteHandler with an MDBX database +func NewUDPWriteHandler(db db.Provider, batchWriter *db.BatchWriter) *UDPWriteHandler { + return &UDPWriteHandler{ + db: db, + writer: batchWriter, + } +} + +// HandleMessage processes the incoming message using the UDPWriteHandler +func (wh *UDPWriteHandler) HandleMessage(c gnet.Conn, frame []byte) { + // Check if the message is at least 34 bytes (1 byte for action, 32 bytes for key, and at least 1 byte for value) + if len(frame) < 34 { + log.Printf("Invalid message length: %d, expected at least 34 bytes", len(frame)) + c.SendTo([]byte{0x01}) + return + } + + // Create a [32]byte key from the frame without using the pool + var key [32]byte + copy(key[:], frame[1:33]) // Copy directly from frame + + // The remaining part is the value (from byte 33 onwards) + value := frame[33:] + + // Buffer the write request with the key as [32]byte + wh.writer.BufferWrite(key, value) + + // Send success response + c.SendTo([]byte{0x00}) +} diff --git a/transports/udp/udp_handler_write_test.go b/transports/udp/handler_write_test.go similarity index 99% rename from transports/udp/udp_handler_write_test.go rename to transports/udp/handler_write_test.go index 9fc825f..72c25b7 100644 --- a/transports/udp/udp_handler_write_test.go +++ b/transports/udp/handler_write_test.go @@ -1,4 +1,4 @@ -package fdb +package transport_udp import ( "context" diff --git a/transports/udp/server.go b/transports/udp/server.go new file mode 100644 index 0000000..8f77474 --- /dev/null +++ b/transports/udp/server.go @@ -0,0 +1,169 @@ +package transport_udp + +import ( + "context" + "github.com/panjf2000/gnet" + "github.com/pkg/errors" + "github.com/unpackdev/fdb/config" + "github.com/unpackdev/fdb/types" + "go.uber.org/zap" + "time" +) + +// UDPHandler function type for UDP handlers +type UDPHandler func(c gnet.Conn, frame []byte) + +// Server struct represents the UDP server using gnet +type Server struct { + *gnet.EventServer + ctx context.Context + handlerRegistry map[types.HandlerType]UDPHandler + cnf config.UdpTransport + stopChan chan struct{} + started chan struct{} +} + +// NewServer creates a new UDP Server instance using the provided configuration +func NewServer(ctx context.Context, cnf config.UdpTransport) (*Server, error) { + server := &Server{ + ctx: ctx, + handlerRegistry: make(map[types.HandlerType]UDPHandler), + cnf: cnf, + stopChan: make(chan struct{}), + started: make(chan struct{}), + } + + return server, nil +} + +// Addr returns the UDP address as a string +func (s *Server) Addr() string { + return s.cnf.Addr() +} + +// Start starts the UDP server using the provided configuration +func (s *Server) Start() error { + s.stopChan = make(chan struct{}) + s.started = make(chan struct{}) // Initialize the started channel + listenAddr := "udp://" + s.cnf.Addr() + zap.L().Info("Starting UDP Server", zap.String("addr", listenAddr)) + + // Create an error channel to capture errors from the goroutine + errChan := make(chan error, 1) + + // Start the server asynchronously + go func() { + err := gnet.Serve( + s, listenAddr, + gnet.WithMulticore(true), + gnet.WithReusePort(true), + gnet.WithSocketRecvBuffer(1024*64), + gnet.WithLockOSThread(true), + gnet.WithTicker(true), + ) + if err != nil { + errChan <- err + return + } + close(errChan) // No error, close the channel + }() + + // Wait until OnInitComplete sends a signal or an error occurs + select { + case <-s.started: + close(s.started) + zap.L().Info("UDP Server successfully started", zap.String("addr", listenAddr)) + return nil + case err := <-errChan: + if err != nil { + return errors.Wrap(err, "failed to start UDP server") + } + return nil + case <-time.After(2 * time.Second): // Wait for up to 2 seconds + return errors.New("UDP server did not start in time") + } +} + +// Tick is called periodically by gnet +func (s *Server) Tick() (delay time.Duration, action gnet.Action) { + select { + case <-s.stopChan: + return 0, gnet.Shutdown + default: + return time.Second, gnet.None + } +} + +// Stop stops the UDP server +func (s *Server) Stop() error { + zap.L().Info("Stopping UDP Server", zap.String("addr", s.cnf.Addr())) + close(s.stopChan) + + zap.L().Info("UDP Server stopped successfully", zap.String("addr", s.cnf.Addr())) + return nil +} + +// WaitStarted returns the started channel for waiting until the server starts +func (s *Server) WaitStarted() <-chan struct{} { + return s.started +} + +// OnInitComplete is called when the server starts +func (s *Server) OnInitComplete(server gnet.Server) (action gnet.Action) { + zap.L().Info("UDP Server is listening", zap.String("addr", server.Addr.String())) + s.started <- struct{}{} // Signal that the server has started + return gnet.None +} + +// React handles incoming data +func (s *Server) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) { + if len(frame) < 1 { + zap.L().Warn("Invalid action received", zap.String("addr", c.RemoteAddr().String())) + return []byte("ERROR: Invalid action"), gnet.None + } + + // Parse the action type + actionType, err := s.parseActionType(frame) + if err != nil { + //zap.L().Warn("Failed to parse action type", zap.Error(err), zap.String("addr", c.RemoteAddr().String())) + return []byte("ERROR: Invalid action"), gnet.None + } + + // Check if the handler exists + handler, exists := s.handlerRegistry[actionType] + if !exists { + zap.L().Warn("Unknown action type", zap.Int("action_type", int(actionType)), zap.String("addr", c.RemoteAddr().String())) + return []byte("ERROR: Unknown action"), gnet.None + } + + // Call the handler + handler(c, frame) + return nil, gnet.None +} + +// parseActionType parses the action type from the frame +func (s *Server) parseActionType(frame []byte) (types.HandlerType, error) { + if len(frame) < 1 { + return 0, errors.New("invalid action: frame too short") + } + + var actionType types.HandlerType + err := actionType.FromByte(frame[0]) + if err != nil { + return 0, err + } + + return actionType, nil +} + +// RegisterHandler registers a handler for a specific action +func (s *Server) RegisterHandler(actionType types.HandlerType, handler UDPHandler) { + zap.L().Debug("Registering handler", zap.Int("action_type", int(actionType))) + s.handlerRegistry[actionType] = handler +} + +// DeregisterHandler deregisters a handler for a specific action +func (s *Server) DeregisterHandler(actionType types.HandlerType) { + zap.L().Debug("Deregistering handler", zap.Int("action_type", int(actionType))) + delete(s.handlerRegistry, actionType) +} diff --git a/transports/udp/udp_server_test.go b/transports/udp/server_test.go similarity index 99% rename from transports/udp/udp_server_test.go rename to transports/udp/server_test.go index a404a2d..466b8e1 100644 --- a/transports/udp/udp_server_test.go +++ b/transports/udp/server_test.go @@ -1,4 +1,4 @@ -package fdb +package transport_udp import ( "context" diff --git a/transports/udp/udp_handler_read.go b/transports/udp/udp_handler_read.go deleted file mode 100644 index 0505053..0000000 --- a/transports/udp/udp_handler_read.go +++ /dev/null @@ -1,39 +0,0 @@ -package fdb - -import ( - "github.com/panjf2000/gnet" - "github.com/unpackdev/fdb/db" - "log" -) - -// ReadHandler struct with MDBX database passed in -type ReadHandler struct { - db db.Provider // MDBX database instance -} - -// NewReadHandler creates a new ReadHandler with an MDBX database -func NewReadHandler(db db.Provider) *ReadHandler { - return &ReadHandler{ - db: db, - } -} - -// HandleMessage processes the incoming message using the ReadHandler -func (rh *ReadHandler) HandleMessage(c gnet.Conn, frame []byte) { - if len(frame) < 33 { // 1 byte action + 32-byte key - log.Printf("Invalid message length: %d, expected at least 33 bytes", len(frame)) - c.SendTo([]byte("Invalid message format")) - return - } - - // Read from the database - value, err := rh.db.Get(frame[1:33]) - if err != nil { - log.Printf("Error reading from database: %v", err) - c.SendTo([]byte("Error reading from database")) - return - } - - // Send the value back to the client - c.SendTo(value) -} diff --git a/transports/udp/udp_handler_write.go b/transports/udp/udp_handler_write.go deleted file mode 100644 index b11968d..0000000 --- a/transports/udp/udp_handler_write.go +++ /dev/null @@ -1,41 +0,0 @@ -package fdb - -import ( - "github.com/panjf2000/gnet" - "github.com/unpackdev/fdb/db" - "log" -) - -// WriteHandler struct with MDBX database passed in -type WriteHandler struct { - db db.Provider // Pass the MDBX database instance here -} - -// NewWriteHandler creates a new WriteHandler with an MDBX database -func NewWriteHandler(db db.Provider) *WriteHandler { - return &WriteHandler{ - db: db, - } -} - -// HandleMessage processes the incoming message using the WriteHandler -func (wh *WriteHandler) HandleMessage(c gnet.Conn, frame []byte) { - - // Check frame length - if len(frame) < 34 { // 1 byte action + 32-byte key + at least 1 byte value - log.Printf("Invalid message length: %d, expected at least 34 bytes", len(frame)) - c.SendTo([]byte("Invalid message format")) - return - } - - // Write to the database - err := wh.db.Set(frame[1:33], frame[33:]) - if err != nil { - log.Printf("Error writing to database: %v", err) - c.SendTo([]byte("Error writing to database")) - return - } - - // Send success response - c.SendTo([]byte("Message written to database")) -} diff --git a/transports/udp/udp_server.go b/transports/udp/udp_server.go deleted file mode 100644 index ab54c18..0000000 --- a/transports/udp/udp_server.go +++ /dev/null @@ -1,160 +0,0 @@ -package fdb - -import ( - "fmt" - "github.com/panjf2000/gnet" - "github.com/pkg/errors" - "log" - "net" - "time" -) - -// Handler function type -type Handler func(c gnet.Conn, frame []byte) - -// UdpServer struct represents the UDP server using gnet -type UdpServer struct { - *gnet.EventServer - handlerRegistry map[HandlerType]Handler - addr *net.UDPAddr - stopChan chan struct{} - started chan struct{} -} - -// NewUdpServer creates a new UdpServerGnet instance -func NewUdpServer(ip string, port int) (*UdpServer, error) { - listenAddr := fmt.Sprintf("%s:%d", ip, port) - netAddr, err := net.ResolveUDPAddr("udp", listenAddr) - if err != nil { - return nil, errors.Wrap(err, "failed to resolve address") - } - - server := &UdpServer{ - handlerRegistry: make(map[HandlerType]Handler), - addr: netAddr, - } - - return server, nil -} - -func (s *UdpServer) Addr() *net.UDPAddr { - return s.addr -} - -// Start starts the UDP server -func (s *UdpServer) Start() error { - s.stopChan = make(chan struct{}) - s.started = make(chan struct{}) // Initialize the started channel - listenAddr := "udp://" + s.addr.String() - log.Printf("UDP Server started on %s", listenAddr) - return gnet.Serve( - s, listenAddr, - gnet.WithMulticore(true), - gnet.WithReusePort(true), - gnet.WithSocketRecvBuffer(1024*64), - gnet.WithLockOSThread(true), - gnet.WithTicker(true), - ) -} - -// Tick is called periodically by gnet -func (s *UdpServer) Tick() (delay time.Duration, action gnet.Action) { - select { - case <-s.stopChan: - return 0, gnet.Shutdown - default: - return time.Second, gnet.None - } -} - -// Stop stops the UDP server -func (s *UdpServer) Stop() { - close(s.stopChan) -} - -func (s *UdpServer) WaitStarted() <-chan struct{} { - return s.started -} - -// OnInitComplete is called when the server starts -func (s *UdpServer) OnInitComplete(server gnet.Server) (action gnet.Action) { - log.Printf("UDP Server is listening on %s", server.Addr.String()) - close(s.started) // Signal that the server has started - log.Println("") - return -} - -// React handles incoming data -/*func (s *UdpServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) { - if len(frame) < 1 { - c.SendTo([]byte("ERROR: Invalid action")) - return - } - - actionType, err := s.parseActionType(frame) - if err != nil { - c.SendTo([]byte("ERROR: Invalid action")) - return - } - - handler, exists := s.handlerRegistry[actionType] - if !exists { - c.SendTo([]byte("ERROR: Unknown action")) - return - } - - // Call the handler - handler(c, frame) - - return -}*/ - -func (s *UdpServer) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) { - /* if len(frame) < 1 { - // Directly return without calling SendTo inside React - return []byte("ERROR: Invalid action"), gnet.None - } - - // Use switch for faster action parsing - actionType, err := s.parseActionType(frame) - if err != nil { - return []byte("ERROR: Invalid action"), gnet.None - } - - // Check handler existence - handler, exists := s.handlerRegistry[actionType] - if !exists { - return []byte("ERROR: Unknown action"), gnet.None - } - - // Call the handler directly, no blocking operations - handler(c, frame)*/ - - // No output to send from React - return nil, gnet.None -} - -// parseActionType parses the action type from the frame -func (s *UdpServer) parseActionType(frame []byte) (HandlerType, error) { - if len(frame) < 1 { - return 0, errors.New("invalid action: frame too short") - } - - var actionType HandlerType - err := actionType.FromByte(frame[0]) - if err != nil { - return 0, err - } - - return actionType, nil -} - -// RegisterHandler registers a handler for a specific action -func (s *UdpServer) RegisterHandler(actionType HandlerType, handler Handler) { - s.handlerRegistry[actionType] = handler -} - -// DeregisterHandler deregisters a handler for a specific action -func (s *UdpServer) DeregisterHandler(actionType HandlerType) { - delete(s.handlerRegistry, actionType) -} diff --git a/transports/uds/uds_server.go b/transports/uds/server.go similarity index 100% rename from transports/uds/uds_server.go rename to transports/uds/server.go diff --git a/transports/uds/uds_server_test.go b/transports/uds/server_test.go similarity index 100% rename from transports/uds/uds_server_test.go rename to transports/uds/server_test.go diff --git a/types/types.go b/types/types.go index 7d7014e..e0d9deb 100644 --- a/types/types.go +++ b/types/types.go @@ -13,6 +13,8 @@ func (t TransportType) String() string { return "quic" case TCPTransportType: return "tcp" + case UDPTransportType: + return "udp" case DummyTransportType: return "dummy" default: @@ -29,6 +31,8 @@ func ParseTransportType(s string) (TransportType, error) { return QUICTransportType, nil case "tcp": return TCPTransportType, nil + case "udp": + return UDPTransportType, nil case "dummy": return DummyTransportType, nil default: