From b45fe33fbff2e5e8b7e94abbc85e2debb40e883d Mon Sep 17 00:00:00 2001 From: Nevio Date: Tue, 17 Sep 2024 21:25:03 +0200 Subject: [PATCH] Local sync --- Makefile | 36 ++--- README.md | 126 ++++++++++++--- c/Makefile | 47 ++++++ c/obj/ebpf_program.o | Bin 0 -> 10384 bytes c/src/ebpf_program.c | 70 ++++++++ client/.gitkeep | 0 client/client.go | 78 +++++++++ client/client_tcp_test.go | 124 ++++++++++++++ client/config.go | 13 ++ client/handlers.go | 16 ++ client/tcp.go | 153 ++++++++++++++++++ client/todo.md | 10 ++ client/transport.go | 18 +++ client/types.go | 12 ++ cmd/ebpf.go | 150 +++++++++++++++++ Dockerfile => deployments/Dockerfile | 0 .../otel-collector-config.yaml | 9 +- deployments/prometheus.yaml | 7 + docker-compose.yml => docker-compose.yaml | 39 ++++- entrypoint/main.go | 1 + go.mod | 1 + go.sum | 3 + helpers/ebpf.go | 43 +++++ messages/message.go | 40 +++++ transports/ebf/handler_write.go | 21 +++ transports/ebf/server.go | 100 ++++++++++++ transports/tcp/handler_read.go | 10 +- transports/tcp/handler_write.go | 6 +- transports/tcp/server.go | 111 +++++++++---- 29 files changed, 1152 insertions(+), 92 deletions(-) create mode 100644 c/Makefile create mode 100644 c/obj/ebpf_program.o create mode 100644 c/src/ebpf_program.c delete mode 100644 client/.gitkeep create mode 100644 client/client.go create mode 100644 client/client_tcp_test.go create mode 100644 client/config.go create mode 100644 client/handlers.go create mode 100644 client/tcp.go create mode 100644 client/todo.md create mode 100644 client/transport.go create mode 100644 client/types.go create mode 100644 cmd/ebpf.go rename Dockerfile => deployments/Dockerfile (100%) rename otel-collector-config.yaml => deployments/otel-collector-config.yaml (57%) create mode 100644 deployments/prometheus.yaml rename docker-compose.yml => docker-compose.yaml (51%) create mode 100644 helpers/ebpf.go create mode 100644 transports/ebf/handler_write.go create mode 100644 transports/ebf/server.go diff --git a/Makefile b/Makefile index d04ecff..403b34b 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,5 @@ +# Main Makefile + .DEFAULT_GOAL := help BIN_NAME := build/fdb @@ -8,6 +10,9 @@ UNAME_S_LOWERCASE := $(shell echo $(UNAME_S) | tr A-Z a-z) BUILD_TARGET := build-$(UNAME_S_LOWERCASE) COMMIT_HASH := $(shell git rev-parse HEAD) +# Include the eBPF Makefile (adjust the path as needed) +include c/Makefile + .PHONY: submodule submodule: ## Update submodules git submodule update --init --recursive @@ -16,24 +21,16 @@ submodule: ## Update submodules help: ## Display this help @awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m\033[0m\n"} /^[a-zA-Z_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m%s\033[0m\n", substr($$0, 5) } ' $(MAKEFILE_LIST) +# Other commands as before... + .PHONY: deps deps: ## Install dependencies ifeq ($(UNAME_S),Linux) - sudo apt-get update && sudo apt-get install -y golang sqlite3 redis-server pipx - sudo -v ; curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sudo sh -s -- -b $(go env GOPATH)/bin v1.58.1 - sudo -v ; curl https://rclone.org/install.sh | sudo bash - pipx ensurepath - pipx install slither-analyzer -endif -ifeq ($(UNAME_S),Darwin) - brew install go sqlite golangci-lint redis pipx + sudo apt-get update && sudo apt-get install -y golang sqlite3 redis-server pipx clang llvm libelf-dev gcc make linux-tools-$(uname -r) sudo apt install linux-tools-common iproute2 +##sudo -v ; curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sudo sh -s -- -b $(go env GOPATH)/bin v1.58.1 sudo -v ; curl https://rclone.org/install.sh | sudo bash - pipx ensurepath - pipx install slither-analyzer -endif -ifeq ($(OS),Windows_NT) - choco install golang sqlite golangci-lint redis endif +# MacOS and Windows dependencies... .PHONY: lint lint: ## Lint the Go code using golangci-lint @@ -46,13 +43,7 @@ build: build-linux ## Build the binary for the current OS/Arch build-linux: ## Build the binary for Linux @GOOS=linux GOARCH=amd64 go build -o ./$(BIN_NAME) -ldflags "-X main.Version=$(VERSION) -X main.CommitHash=$(COMMIT_HASH)" ./entrypoint/main.go -.PHONY: build-darwin -build-darwin: ## Build the binary for MacOS - GOOS=darwin GOARCH=amd64 go build -o ./$(BIN_NAME) -ldflags "-X main.Version=$(VERSION) -X main.CommitHash=$(COMMIT_HASH)" ./entrypoint/main.go - -.PHONY: build-windows -build-windows: ## Build the binary for Windows - GOOS=windows GOARCH=amd64 go build -o ./$(BIN_NAME).exe -ldflags "-X main.Version=$(VERSION) -X main.CommitHash=$(COMMIT_HASH)" ./entrypoint/main.go +# More build targets... .PHONY: run run: build ## Run the binary @@ -80,4 +71,7 @@ ifeq ($(OS),Windows_NT) # Windows del /Q $(BIN_NAME).exe else rm -f $(BIN_NAME) -endif \ No newline at end of file +endif + +# eBPF-specific commands wrapped under ebpf namespace +.PHONY: ebpf-build ebpf-load ebpf-unload ebpf-clean diff --git a/README.md b/README.md index 2055ff3..5a54d9b 100644 --- a/README.md +++ b/README.md @@ -10,19 +10,77 @@ # (f)db -**NOTE: At this moment I am adding all possible faster ways, including TCP, to be able to do proper benchmarking first. -In the future, I may drop transports that prove to be inefficient based on benchmark results. At the same time I will slowly start to write wrappers around the packages for convenient usage incl. deployments.** +**NOTE: At the moment, I am exploring and integrating all potential high-performance transport options, including +TCP, to establish a solid foundation for benchmarking. In the future, I may discard any transport methods that +prove inefficient based on the benchmarking results. Concurrently, I will begin writing convenient wrappers around +the underlying packages to streamline usage and deployments.** -**UNDER ACTIVE DEVELOPMENT** +**CURRENTLY UNDER ACTIVE DEVELOPMENT** -This is currently a prototype, with the idea of building incredibly fast transport layers on -top of key-value (KV) databases. The goal is to allow one or multiple instances of these -databases to be started and cross-shared in user space or accessed remotely bypassing general locks -that are enforced in KV databases or some of the OLAP databases such as DuckDB. +**Meaning everything about this repository can be changed. Best to keep track, not use the code yet besides to play with it...** + +(f)db is born from frustrations experienced while working with key-value (KV) databases. +The goal is to build extremely fast transport layers on top of KV databases. This project is going to be either f**k databases or fast database... There is no third solution... -Though this will be hard to achieve without DPDK. Will not overkill the prototype with it for now... +### Why is this necessary, and is it just another over-engineered solution in the web world? + +It might not be essential for every use case, but it becomes particularly interesting if you require a distributed key-value database with replicated data across multiple nodes. +Additionally, it offers the ability to access and manage data efficiently across multiple servers, each sharing a single connection endpoint, +to offload data to different databases and/or nodes. + +For instance, imagine using the same client to seamlessly push data to either an MDBX or DuckDB instance, +regardless of whether they are hosted on the same or different servers. +This abstraction provides flexibility in data management and replication across distributed systems. + +### Why I need this? + +I require a solution that enables different services to write to and read from different databases while using a unified transport layer. Achieving this with existing tools is complex, if not impossible, without a dedicated transport layer on top of the database. This transport layer allows for cross-service communication and interaction with the underlying database. + +This project addresses that gap by building a high-performance, flexible transport layer that enables seamless interaction with various databases in a distributed environment. + +### Why Multiple Transports? + +One core feature of (f)db is its support for multiple transport protocols, including UDP, UDS, TCP, and QUIC. The reason for this is simple: no single transport protocol is perfect. Each excels in different areas, such as throughput, latency, reliability, or efficiency under specific network conditions. + +By offering multiple transport options, (f)db provides the flexibility to fine-tune and optimize performance depending on your specific needs and environment. This allows users to select the most appropriate transport based on their use case, whether it's high-throughput, low-latency, or optimized for specific infrastructure. + +### Networking + +This project uses [gnet](https://github.com/panjf2000/gnet), an event-driven, high-performance networking library built for Go. gnet is much faster than Go’s standard net package for several reasons: + +- Event-Driven Model: gnet uses an event-driven approach, meaning it reacts to specific network events (like data being available to read or connections being closed) instead of continually polling or blocking threads like the standard net package. This reduces CPU overhead and leads to faster processing. +- Efficient Use of Goroutines: Unlike Go’s net package, gnet minimizes the use of goroutines, which helps reduce the context-switching overhead that can slow down highly concurrent applications. Instead, gnet directly handles network I/O in an optimized way. +- Zero-Copy Data Handling: gnet offers zero-copy mechanisms for data processing, meaning that memory is not repeatedly copied between kernel and user space, significantly improving throughput for high-performance networking applications. +- Multi-Event Processing: gnet allows the processing of multiple events within a single loop, which can lead to higher efficiency when handling a large number of simultaneous connections. This feature is particularly valuable for real-time, high-frequency applications that need to handle many clients at once. + +By leveraging gnet’s capabilities, (f)db can react quickly to network events, optimize throughput, and minimize latencies, making it ideal for distributed systems where performance is critical. + + +### eBPF Integration + +As part of the ongoing development of high-performance transport layers for (f)db, eBPF (Extended Berkeley Packet Filter) has been integrated to enhance packet processing and monitoring capabilities within the network stack. + +By leveraging eBPF, we can build efficient, scalable, and secure transport layers that directly interact with network traffic, providing real-time insights and optimizations for packet flow, without adding latency or reducing throughput. + +- Real-Time Packet Filtering: With eBPF, (f)db can selectively filter out unnecessary traffic at the kernel level, significantly reducing the amount of data that needs to be processed in user space. +- Network Performance Insights: eBPF programs can monitor performance metrics, such as packet drops, latencies, and bandwidth usage, allowing for dynamic tuning of transport protocols based on real-time traffic conditions. +- Low-Latency Processing: eBPF’s ability to operate within the kernel ensures that packet processing happens as close to the hardware as possible, minimizing delays and improving overall system responsiveness. +- Ring Buffer for Data Handling: eBPF uses a ring buffer to store and transfer network data efficiently to user-space applications, ensuring fast and reliable communication between the kernel and (f)db. + +**Right now some basic program exists that can be loaded but it does not do anything more then writing into ring buffer** + +This is something to be dealt with later on... + +For example, can be used for mass writes without ACK, different types of discoveries or for example DDoS detection or non-whitelisted server ip access, idk... + +### Future Considerations + +While achieving all these goals without using advanced techniques like DPDK (Data Plane Development Kit) will be challenging, the initial prototype will focus on building a solid foundation. Advanced optimizations can be layered on top later, depending on the project's needs and performance demands. + +This approach ensures that (f)db remains both adaptable and scalable, with the potential to handle a variety of use cases while maintaining high performance. + ## Diagram @@ -168,6 +226,48 @@ docker-compose up -d This will bring up the [(f)db](https://github.com/unpackdev/fdb) instance, [OpenTelemetry](https://opentelemetry.io/docs/languages/go/) collector, and [Jaeger](https://www.jaegertracing.io/) for tracing, making the system ready for production-level monitoring and telemetry. +### Loading the eBPF Program + +To get started, you'll first need to download and update your local environment by installing the necessary dependencies. +Additionally, ensure that you have the (f)db project built to properly load and unload the eBPF program. + +``` +make deps +make build +sudo setcap cap_bpf+ep c/obj/ebpf_program.o +``` + +Next, compile the eBPF program located at [program](./c/src/ebpf_program.c). + +``` +make ebpf-build +``` + +Before loading the program, identify the network interface you want to bind the eBPF program to. +The default interface is typically eth0, but it can vary based on your system configuration. +To inspect your available network interfaces, use the following command: + +``` +./build/fdb ebpf interfaces +``` + +Finally, load the eBPF program onto your desired network interface: + +``` +sudo make ebpf-load INTERFACE={interface-name} +``` + +Once you’ve completed the above steps, your output should look something like this: + +``` +xxx:xx$ make ebpf-build && sudo make ebpf-load INTERFACE=xxx +eBPF program compiled successfully. +sudo ip link set dev enp73s0 xdp obj c/obj/ebpf_program.o sec xdp +eBPF program loaded onto interface xxx. +``` + +With that, your eBPF program is successfully loaded and running on the specified interface! + ### Running the Binary For a more custom or direct deployment, you can build and run the fdb binary manually. @@ -190,16 +290,6 @@ make build Ensure that your configuration file is tuned for production, including settings for transports, database paths, logging levels, and performance profiling. By default, this will start the server with all the transports and services configured in the config.yaml, ready for high-performance and production use. -## GNET - -gnet is a high-performance, lightweight, non-blocking, event-driven networking framework written in pure Go. - -gnet is an event-driven networking framework that is ultra-fast and lightweight. It is built from scratch by exploiting epoll and kqueue and it can achieve much higher performance with lower memory consumption than Go net in many specific scenarios. - -https://github.com/panjf2000/gnet - - - ## QUIC (HTTP/3) https://github.com/quic-go/quic-go/wiki/UDP-Buffer-Sizes diff --git a/c/Makefile b/c/Makefile new file mode 100644 index 0000000..e30c9b4 --- /dev/null +++ b/c/Makefile @@ -0,0 +1,47 @@ +# eBPF Makefile for handling eBPF-related tasks + +# Variables +CLANG := clang +BPFTOOL := bpftool +SRC_DIR := c/src +OBJ_DIR := c/obj +SRC := $(SRC_DIR)/ebpf_program.c +OBJ := $(OBJ_DIR)/ebpf_program.o +INTERFACE := eth0 # Replace with the network interface you want to attach to + +# eBPF targets +.PHONY: ebpf-all ebpf-build ebpf-load ebpf-unload ebpf-status ebpf-clean + +# Default eBPF target +ebpf-all: ebpf-build + +# Create object directory if it doesn't exist +$(OBJ_DIR): + mkdir -p $(OBJ_DIR) + +# Compile the eBPF program +$(OBJ): $(SRC) | $(OBJ_DIR) + $(CLANG) -O2 -g -target bpf -c $(SRC) -o $(OBJ) + +# Build the eBPF program +ebpf-build: $(OBJ) + @echo "eBPF program compiled successfully." + +# Load the eBPF program onto the interface +ebpf-load: $(OBJ) + sudo ip link set dev $(INTERFACE) xdp obj $(OBJ) sec xdp + @echo "eBPF program loaded onto interface $(INTERFACE)." + +# Unload the eBPF program from the interface +ebpf-unload: + sudo ip link set dev $(INTERFACE) xdp off + @echo "eBPF program unloaded from interface $(INTERFACE)." + +# Check if the eBPF program is loaded +ebpf-status: + $(BPFTOOL) prog + +# Clean up the compiled objects +ebpf-clean: + rm -rf $(OBJ_DIR) + @echo "Cleaned up eBPF compiled files." \ No newline at end of file diff --git a/c/obj/ebpf_program.o b/c/obj/ebpf_program.o new file mode 100644 index 0000000000000000000000000000000000000000..94035e43b384663ff0756bc1e304200520a32ba0 GIT binary patch literal 10384 zcmb_i3ve69dEUbTAVBbae1M`vooG=eX%V0vp%gtRQX(u-A|;3ftw$V#AaEoQfdGXA zRXUYz){$M=qZzxNv{DF?+!SuXgAa8Kl9!G-@pIAuiM?*JK%xP@SP4xVwOnkzgUr3R>m%`D(d}Q?Po3U ze8J6_13cDb6vo2QrR4ExHQkxs)|h59>Y+V1FJZ@3t{WagK_UWNSbx^^CL zJXgt6it|7_u2*n=U_DRRxI^W3dwd(Ry6aYOhvPY;VdD;&p{eYWZp4+zjP;VA;^zj+ zucdrjJGv;puAb-ng+X}%?V)zRfO1G40=@uTqDDd*JVv`O{hq-sf2)4-KTduOg+OQ= zN$mc_$&cV-Mny~IpF;kn30xq5Xe75>cZY}BURYfiEvkxo-3qh=cK|e;TL35XO9z>= z=A0}YV$Pn;(kOHKKaj|_Af4vMa}K13nN%yaw<|8WJ=avz)Y4wtQs>s2&PW(;$3`Z% z4=FpExA?aD9H>c_q#*@muVO>WCLbEH8V-FvA5=G~K3^ZyGDY(FC&c(mSURDf&@UPS{6zrC5UN*DL4szx`e8==eXMrk!`V5x18$f?c=Z>rGLYiWl5q{%;UX}lxh$2&LEZV#f68qFqg;W z^gd`A%c(qw#C+v{g48il;XRME;v2YB^2;|#qYJC5{##rtq?ImTna5Z8Dv0A+x%Wq8 zd=nSL|0-#}!iBl2T%PLdP->(WS9O`w`)ksDwK6*BI91{GAgwJ{!hCf+Cs$ENL4K#) z=T{V_)JsZ(Lur(iCMT<>9H(=I(#(`)lG5T(mOGUdZl%?utgKMfT4hzE(zZgmZI!aR zQ)v$y?fTO6T25*FDM|lgip@r8}(zbR~F7QhH8H%DP7+W&LB4vf-wrY<$?E zeCqcd%=;BI&DHCXyceJ}Y_D>C&9mtTWcxbvZ05>ex;>x%8CjmIU^2P$@pZH6e}TTu ztyaH{bgSfdZ7FkjZ$NJtsV_U!*moA2P1zAAYwYLRQK!>0z_pm<*+~VKO*%ZgxDt1I z2Dy@OdqUht^LXy$%9Pi$`#hC2UEVZGX|lqPyWeyV*I5~K&(Jdl@$54cC-V&7|B3mS zG1QYCQUmYB9`~ARx|@65ZOwdEN=NrIh33*Sc7S2K!2FlOtl>`))8aV9nn%bYA7qW( z;ylW3dxo-HpJBZ;m6qSly8a!8<)39i@?hR_1gyBrFAw=!!&H;J*UwrHa&4dA**eO# zVZWz!jBES-6|E7j9q`w*j&tpxzn)c(qnsn&Wqh(7_H?n6C>@KA%PL&y=|C&xC=dm7 z-~oVk+ta`ofoFkl11|$V1>OMO2Hpc`UN!=4z#YIwU@Nc(7z6GBjsf$)L%<&ZPXT`c zd=q#X_&z{)^DW?g;3nY1?PvtrfOWtQU>|T8(10B9An+Kl2wVWZ1$-BH4fr|mCU6b- z2#_!i9-taf06(AttAVvZ5ZDZC146()U<^0{OaN0r8pr_)z$xGn;0fSK-~#Y%;8ozq zz|VoVfcJp+0SN~NCon!am&wnm@pLRRt)9?wdMcYy*YyT^0=;U-x=(eocu>#9gWBZm zRCG3%ozBH(0&x~hW@ofuMmv$p1{2ze;9O=l7C#o8N=&k7Iu+M4y2j$kSdK-b1EZq@ zN1|gxM?=xbk#Hy)Wi$G8G?&UuPtHxT{JpapOJ(wGCN>|{GWlFe(;3xeIz)4tuH{Z> zY%ZgxrZZYXO=r98cRfvX}OG+X566=LlwY#!}^k<6Dd6&%Oy%| z`rPD9D$foNhNA;J4~#}agIpgRJrL$fcwlUdE0M!I84V2%jfQqb*nDC(8jI&KdTb~h z9z75_5FH8^)m@%%h|@-cqF3P>2r zj0O|d2&TUtO--dT32mO`=A*~wv^gzmP^~nVX5dVM8XHUV=As{%_#4(o^Q@qto6K5T zI+}pRH_P(5SVo`4kVKPMLRyY3$3{v=){m9*?@0VCOAd?J9SOh2u3`#K^>k3-LK_WJ z3zTs$Y=Af$;ei8~yT9cJ3m1oZG$&FL1I~*^8O|CI9QCZa_}QAu>j&sP0#a3mi`f?9 zj&fY+DTDHzbRORh&^eD}CqTLi>@)R#q~oSOfmAc~1*CaXUqt!^0OFy;;3Hh+n=CJj zDB4g-{r)4s3(z4BwhDS<`Dflk#@`4r>@aiTRytVISP_0<_*GHA$>|+%q~ub#U6l7r zo1NaTNGP6fwM*^Ed>^*&!S+qsoLT=r)7039OSWcO1Es41HGaY~Cb;A?33FX7DWyZt z=jP&h)o8xjg=#w0-(@PKP(AGzY8UlP-L7^B=FSd!jC3NdyN_J9eq1cA+utId+>A0( zQ))+Hp{m=rt3B$;lWNh@r&=3ebR;s~$?K(s56l?H)2(fPlZ zj@|vYD}65UF{Pi#rV?sb$!3KyUgB)fvBox8*OrXjTD7OMIO3~H3M=-(Jlm8WAv#K& z#bRkBI<476Gs@mB^I##CfTpNwFsKe1=MFWW)Kv3$lTi6)tl}QiDFz$AIu%Q$b$;XR zMH}anB}!)ZnFlWG_!XFm&FaizSo*YQtF#9|>P=K&^H6CcfzWnOCu@`P??JSekd0E3 z3jm$A*|6bH+=+lq5yOXgKls+e#l%ei7VzjLq{T z?clGN`FpX6wu4+X`Lp0HucHq5k^g^!d(lYh7x60Wh2CpMyX-t{=8sbTb;f2*{yew~ zojPrDIyp-FG149nj5ovUroIEukZhl zy6>*eJPM;D-}b8eAKrNQou_Xk|NOp3e-wNYMmhJ#2OqjCa`B1R{_($`{?bq0{kx~h z_?veJKJ)xj7ccxx^4)*RIzH@thK!9nA|EZg@r#xJHc(yvj{n0Ge?`Vi=U=RPY~qOr z-h1kirm?ZPuYQY+^=IcV{A>Bj8=kMdGQZ>UzE1LV?oIs6`{IX_i_aw2Zu#Dsly-@X z`+xZMNbb-N9@fs@e))4}zchhoj~BPa5V=lrZ7OebZC&N^uW`!>RbIZAUjr_$TP|1K zd*!35ye8nPB<5;&t8!SCdjjq@x4d7K*9TlJ?$s{u^lCVURJXhh7~JM^x#gFc>XLRi z92nwHctZ)!>A7Gk6Hm`2v|vo1>7LdyS}ujR8hZv-U8y0R%FNA|nkG|ued!k@JdC>2 z>E4ZjB!9g~YUx=mhhyoKZdh$#TKsRpwc6la#ORo=y_}c`EEM5k=w_~o5*mo zU*23QUp9_SC0Wa83c^lv+b)V}r!{65+8@ZD)|y@D?V9YgW<-%0ow#llxo|=$&65kK zFVSv%%PqB1t@Ne(R*`R6jfRiR8gF%mMJ}9Ri`i{Z>yTrKwVuBYL#=gEg>LPKv@B}* zmt7-*pRu4?<3+VS$GQ zj>i)htA5&TNi2T1K?{6B;0b{z1)di8tibaEpBMN^fiDRBl)z65{EWaK7x)(ieoo*| z3Vc!Ee3Em7X|*3!2e$0uL%760>3Qq*988$z~2z~6@kAg@T&s9 zCh+S5|CPW$5cqEdPR9fiYktx>!p`Y?yqz2SbkW~Dt`y?y1%Em#+2fZBoX$@6{M7>Q z5cpbw_XwQ6!rIH*Ebwgt?-%%>!1oAzSm0rSM+AOY;CBmrLf{F3Ck37sxb|JHZlrvsf-rooT@OZrthd!4t)-?m6ys)WmzDdJ~0#JA68S- zCBG>Yeu1!s;P9^5i@8I1|QY6MV!J~qh$`n@weHG9j3ckb+oe%eSj8Ogd0W@_DP zb434d$@>VSdnw9SRKwC`-XAMpp!@?M*=*lTxof`_Af&Wz>~aR=lj5!Y#fq`^CySqk zLg(0i%V4HgjK6^t_OE5IQ$MZv8q@C(QmS7ZgV)WDT7IT`A>QKjYv5AxSIu~_zoyv= zXFXq>ys+Rc=AZ6ez#%OV!_RZmd`M_YR|edY=*N|EyU*-3PloYC1Lu zH_)ozN>3ul((Uo*X>*6D{93$-zVpahs{JbcSqEaZi^h-kSF!zNC{Qeu;$AZAf8Ojr z*+_)$@L#Q$sy}Sjukt8k{A!?!_1k{~F-kFkry|Ar ze+rvW1uXd1hdc2 z*>@1XRQ-$Ai?|s;_0!h`vHql4zco)lxlEcb_CK@?W@^=MrHio3!#A@?7Jz~*s)lMU lf2yCviZdAgQ2@o% literal 0 HcmV?d00001 diff --git a/c/src/ebpf_program.c b/c/src/ebpf_program.c new file mode 100644 index 0000000..bf7c231 --- /dev/null +++ b/c/src/ebpf_program.c @@ -0,0 +1,70 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define ETHERNET_MTU 1500 // Ethernet MTU size + +struct { + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 1 << 24); // 16MB ring buffer size +} msg_ringbuf SEC(".maps"); + +SEC("xdp") +int handle_packet(struct xdp_md *ctx) { + // Get the start and end pointers for the packet data + unsigned char *data = (unsigned char *)(long)ctx->data; + unsigned char *data_end = (unsigned char *)(long)ctx->data_end; + + // Calculate the packet length directly from ctx + uint32_t total_len = data_end - data; + + // Bound the packet length by Ethernet MTU + if (total_len == 0 || total_len > ETHERNET_MTU) + return XDP_PASS; // We pass the packet instead of dropping it + + // Ensure the packet has enough data for the Ethernet header + if (data + sizeof(struct ethhdr) > data_end) + return XDP_PASS; // Not enough data for Ethernet header + + struct ethhdr *eth = (struct ethhdr *)data; + + // Only process IPv4 packets + if (__constant_htons(eth->h_proto) != ETH_P_IP) + return XDP_PASS; // We only handle IPv4, pass the rest + + // Ensure the packet has enough data for the IP header + struct iphdr *ip = (struct iphdr *)(data + sizeof(struct ethhdr)); + if ((unsigned char *)ip + sizeof(struct iphdr) > data_end) + return XDP_PASS; // Not enough data for IP header + + // Check if the packet has enough data for TCP or UDP headers + unsigned char *transport_header = data + sizeof(struct ethhdr) + sizeof(struct iphdr); + if (ip->protocol == IPPROTO_TCP && (transport_header + sizeof(struct tcphdr) > data_end)) + return XDP_PASS; // Not enough data for TCP header + if (ip->protocol == IPPROTO_UDP && (transport_header + sizeof(struct udphdr) > data_end)) + return XDP_PASS; // Not enough data for UDP header + + // Reserve space in the ring buffer for the packet data + void *ringbuf_space = bpf_ringbuf_reserve(&msg_ringbuf, ETHERNET_MTU, 0); + if (!ringbuf_space) + return XDP_PASS; // If we can't reserve space, pass the packet + + // Copy the entire packet into the ring buffer + if (bpf_probe_read_kernel(ringbuf_space, ETHERNET_MTU, data)) { + bpf_ringbuf_discard(ringbuf_space, 0); // Discard the reserved space if reading fails + return XDP_ABORTED; // Abort if copying the packet fails + } + + // Submit the packet to the ring buffer + bpf_ringbuf_submit(ringbuf_space, 0); + + return XDP_PASS; // Pass the packet after processing +} + +char _license[] SEC("license") = "GPL"; diff --git a/client/.gitkeep b/client/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..2f2619f --- /dev/null +++ b/client/client.go @@ -0,0 +1,78 @@ +package client + +import ( + "context" + "errors" + "fmt" + "sync" +) + +// Client manages multiple transports and handlers using the config +type Client struct { + transports map[string]Transport + ctx context.Context + mu sync.RWMutex +} + +// NewClient creates a new Client using the provided config +func NewClient(ctx context.Context, cfg *Config) *Client { + return &Client{ + ctx: ctx, + transports: cfg.Transports, + } +} + +// RegisterTransport adds a transport to the config +func (c *Client) RegisterTransport(name string, transport Transport) error { + if _, exists := c.transports[name]; exists { + return fmt.Errorf("transport %s already registered", name) + } + c.transports[name] = transport + return nil +} + +// GetTransport retrieves a registered transport by name +func (c *Client) GetTransport(name string) (Transport, error) { + c.mu.RLock() + defer c.mu.RUnlock() + transport, exists := c.transports[name] + if !exists { + return nil, errors.New("transport not found") + } + return transport, nil +} + +// SendMessage sends a message using the specified transport +func (c *Client) SendMessage(name string, data []byte) error { + transport, err := c.GetTransport(name) + if err != nil { + return err + } + return transport.Send(data) +} + +// Start starts all transports in the client +func (c *Client) Start(ctx context.Context) error { + c.mu.RLock() + defer c.mu.RUnlock() + for _, transport := range c.transports { + err := transport.Connect(ctx) + if err != nil { + return err + } + } + return nil +} + +// Close shuts down all transports in the client +func (c *Client) Close() error { + c.mu.RLock() + defer c.mu.RUnlock() + for _, transport := range c.transports { + err := transport.Close() + if err != nil { + return err + } + } + return nil +} diff --git a/client/client_tcp_test.go b/client/client_tcp_test.go new file mode 100644 index 0000000..9a07551 --- /dev/null +++ b/client/client_tcp_test.go @@ -0,0 +1,124 @@ +package client_test + +import ( + "context" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" + "time" + + "github.com/panjf2000/gnet/v2" + "github.com/unpackdev/fdb/client" + "github.com/unpackdev/fdb/messages" + "github.com/unpackdev/fdb/types" + "go.uber.org/zap" +) + +func TestTCPClientSendMessage(t *testing.T) { + ctx := context.Background() + logger, _ := zap.NewDevelopment() + + // Create configuration + cfg := client.NewConfig() + + // Create a new client + c := client.NewClient(ctx, cfg) + + // Create a new TCP transport with gnet options + tcpTransport := client.NewTCPTransport("127.0.0.1:5011", logger, + gnet.WithMulticore(true), + gnet.WithTCPNoDelay(gnet.TCPNoDelay), + ) + + // Register the transport with the client + err := c.RegisterTransport("tcp", tcpTransport) + require.NoError(t, err, "Failed to register transport") + + // Start the client (connects all registered transports) + err = c.Start(ctx) + require.NoError(t, err, "failed to start client") + + // Define test cases for table-driven tests + testCases := []struct { + name string + messageType client.MessageType + expectedResp string + handlerType types.HandlerType + expectedError error + }{ + { + name: "Valid Write Message", + handlerType: types.WriteHandlerType, + messageType: client.WriteSuccessMessageType, + expectedResp: "", + }, + /* Uncomment and add more test cases as needed */ + // { + // name: "Invalid Action Message", + // messageType: client.InvalidActionMessageType, + // expectedResp: "Invalid Action Response", + // handlerType: client.InvalidActionMessageType, + // }, + // { + // name: "Unknown Handler Message", + // messageType: 0, // Assuming 0 is not mapped to any known handler + // expectedResp: "Unknown Handler Response", + // handlerType: 0, + // }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + // Start the client (connects all transports) + err = c.Start(ctx) + require.NoError(t, err, "Failed to start client") + + // Wait for connection to establish + time.Sleep(1 * time.Second) + + // Generate a random message based on the message type in the test case + msg, msgErr := messages.GenerateRandomMessage(tc.handlerType) + assert.NoError(t, msgErr) + require.NotNil(t, msg) + + // Encode the message + encodedMsg, err := msg.Encode() + require.NoError(t, err, "Failed to encode message") + + // Capture the time before sending the message + sentTime := time.Now() + + // Register a handler for the expected response + tcpTransport.RegisterHandler(tc.messageType, func(c gnet.Conn, data []byte) error { + // Log the time when the response is received + receivedTime := time.Now() + duration := receivedTime.Sub(sentTime) + + t.Logf("Received response: %s", string(data)) + t.Logf("Time taken for response: %s", duration) + + // Check if the received response matches the expected one + assert.Equal(t, tc.expectedResp, string(data)) + + return nil + }) + + // Send the message + err = tcpTransport.Send(encodedMsg) + if tc.expectedError != nil { + require.Equal(t, tc.expectedError, err) + } else { + require.NoError(t, err, "Failed to send message") + } + + // Wait for potential responses + time.Sleep(2 * time.Second) + + }) + } + + // Close the client + err = c.Close() + require.NoError(t, err, "Failed to close all clients") +} diff --git a/client/config.go b/client/config.go new file mode 100644 index 0000000..29a316d --- /dev/null +++ b/client/config.go @@ -0,0 +1,13 @@ +package client + +// Config holds the configuration for the Client, including transports +type Config struct { + Transports map[string]Transport +} + +// NewConfig creates and initializes a Config instance +func NewConfig() *Config { + return &Config{ + Transports: make(map[string]Transport), + } +} diff --git a/client/handlers.go b/client/handlers.go new file mode 100644 index 0000000..b4a2136 --- /dev/null +++ b/client/handlers.go @@ -0,0 +1,16 @@ +package client + +import ( + "github.com/panjf2000/gnet/v2" + "go.uber.org/zap" +) + +var messageRegistry = map[MessageType]func(c gnet.Conn, data []byte) error{ + InvalidActionMessageType: func(c gnet.Conn, data []byte) error { + zap.L().Error( + "Invalid action....", + zap.String("action", string(data)), + ) + return nil + }, +} diff --git a/client/tcp.go b/client/tcp.go new file mode 100644 index 0000000..35e2cf9 --- /dev/null +++ b/client/tcp.go @@ -0,0 +1,153 @@ +package client + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/panjf2000/gnet/v2" + "go.uber.org/zap" +) + +// TCPTransport implements the Transport interface using gnet +type TCPTransport struct { + address string + opts []gnet.Option + handlers map[MessageType]HandlerFunc + client *gnet.Client + conn gnet.Conn + mu sync.Mutex + ctx context.Context + cancel context.CancelFunc + logger *zap.Logger +} + +// NewTCPTransport creates a new TCPTransport +func NewTCPTransport(address string, logger *zap.Logger, opts ...gnet.Option) *TCPTransport { + return &TCPTransport{ + address: address, + opts: opts, + handlers: make(map[MessageType]HandlerFunc), + logger: logger, + } +} + +// Connect establishes the TCP connection +func (t *TCPTransport) Connect(ctx context.Context) error { + t.ctx, t.cancel = context.WithCancel(ctx) + + // Initialize gnet client + client, err := gnet.NewClient(&tcpEventHandler{ + transport: t, + }, t.opts...) + if err != nil { + return err + } + t.client = client + + // Start the client + go func() { + if err := t.client.Start(); err != nil { + t.logger.Error("Failed to start gnet client", zap.Error(err)) + } + }() + + // Dial the server + conn, err := t.client.Dial("tcp", t.address) + if err != nil { + return err + } + + t.conn = conn + + return nil +} + +// Send sends a message over the TCP connection +func (t *TCPTransport) Send(data []byte) error { + t.mu.Lock() + defer t.mu.Unlock() + + if t.conn == nil { + return errors.New("no active connection") + } + + return t.conn.AsyncWrite(data, nil) +} + +// Close closes the TCP connection +func (t *TCPTransport) Close() error { + if t.client != nil { + t.cancel() + return t.client.Stop() + } + return nil +} + +// RegisterHandler registers a handler for a specific message type +func (t *TCPTransport) RegisterHandler(messageType MessageType, handler HandlerFunc) { + t.handlers[messageType] = handler +} + +// tcpEventHandler implements gnet.EventHandler for the TCPTransport +type tcpEventHandler struct { + transport *TCPTransport +} + +// OnBoot is called when the client starts +func (h *tcpEventHandler) OnBoot(eng gnet.Engine) gnet.Action { + h.transport.logger.Info("TCP client started") + return gnet.None +} + +// OnShutdown is called when the client is shutting down +func (h *tcpEventHandler) OnShutdown(eng gnet.Engine) { + h.transport.logger.Info("TCP client shutting down") +} + +// OnOpen is called when a new connection is established +func (h *tcpEventHandler) OnOpen(c gnet.Conn) ([]byte, gnet.Action) { + h.transport.logger.Info("Connected to server", zap.String("remote", c.RemoteAddr().String())) + h.transport.conn = c // Store the connection + return nil, gnet.None +} + +// OnClose is called when the connection is closed +func (h *tcpEventHandler) OnClose(c gnet.Conn, err error) gnet.Action { + h.transport.logger.Info("Connection closed", zap.Error(err)) + h.transport.conn = nil + return gnet.None +} + +// OnTraffic is called when data is received +func (h *tcpEventHandler) OnTraffic(c gnet.Conn) gnet.Action { + data, err := c.Next(-1) + if err != nil { + h.transport.logger.Error("Error reading data", zap.Error(err)) + return gnet.Close + } + + if len(data) < 1 { + h.transport.logger.Warn("Received empty data") + return gnet.None + } + + messageType := MessageType(data[0]) + handler, exists := h.transport.handlers[messageType] + if exists { + if err := handler(c, data[1:]); err != nil { + h.transport.logger.Error("Handler error", zap.Error(err)) + } + } else { + h.transport.logger.Warn("No handler for message type", zap.Uint64("type", messageType.Uint64())) + } + + return gnet.None +} + +// OnTick is called periodically +func (h *tcpEventHandler) OnTick() (time.Duration, gnet.Action) { + // Implement if needed + return time.Second, gnet.None +} diff --git a/client/todo.md b/client/todo.md new file mode 100644 index 0000000..fe1d592 --- /dev/null +++ b/client/todo.md @@ -0,0 +1,10 @@ +# Client TODOs + +- [ ] There should be standard handlers that are loaded with each client and those are basically for general message passthrough... +- [ ] Each type of transport should have its own type like TCPTransport is... +- [ ] You should be able to quickly select the transport and send or read from the server. +- [ ] There should be a way to just do write-and-forget for 1Mil+ req/s +- [ ] Write client examples +- [ ] Implement this client approach directly into the benchmark instead of what we're doing now utilising go/net +- [ ] See what to do with QUIC as it's slow... +- \ No newline at end of file diff --git a/client/transport.go b/client/transport.go new file mode 100644 index 0000000..0fe2287 --- /dev/null +++ b/client/transport.go @@ -0,0 +1,18 @@ +package client + +import ( + "context" + + "github.com/panjf2000/gnet/v2" +) + +// HandlerFunc defines the function signature for handlers +type HandlerFunc func(c gnet.Conn, data []byte) error + +// Transport interface defines the methods that all transports must implement +type Transport interface { + Connect(ctx context.Context) error + Send(data []byte) error + Close() error + RegisterHandler(messageType MessageType, handler HandlerFunc) +} diff --git a/client/types.go b/client/types.go new file mode 100644 index 0000000..ba5edfc --- /dev/null +++ b/client/types.go @@ -0,0 +1,12 @@ +package client + +type MessageType byte + +func (t MessageType) Uint64() uint64 { + return uint64(t) +} + +var ( + InvalidActionMessageType MessageType = 0x69 + WriteSuccessMessageType MessageType = 0x00 +) diff --git a/cmd/ebpf.go b/cmd/ebpf.go new file mode 100644 index 0000000..f6c7f88 --- /dev/null +++ b/cmd/ebpf.go @@ -0,0 +1,150 @@ +package cmd + +import ( + "fmt" + "github.com/unpackdev/fdb/helpers" + "net" + "strings" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" + "github.com/urfave/cli/v2" +) + +// EbpfCommands returns a cli.Command that manages eBPF programs using cilium/ebpf +func EbpfCommands() *cli.Command { + return &cli.Command{ + Name: "ebpf", + Usage: "Manage eBPF programs", + Subcommands: []*cli.Command{ + { + Name: "load", + Usage: "Load an eBPF program onto a network interface using Cilium", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "interface", + Usage: "Network interface to attach the eBPF program to (e.g., eth0)", + Required: true, + }, + &cli.StringFlag{ + Name: "obj", + Usage: "Path to the compiled eBPF object file", + Value: "c/obj/ebpf_program.o", // Default location of compiled object file + }, + &cli.StringFlag{ + Name: "section", + Usage: "eBPF section to load (e.g., xdp)", + Value: "xdp", // Default section + }, + }, + Action: func(c *cli.Context) error { + iface := c.String("interface") + objPath := c.String("obj") + section := c.String("section") + + // Load the compiled eBPF object file + spec, err := ebpf.LoadCollectionSpec(objPath) + if err != nil { + return fmt.Errorf("failed to load eBPF object: %w", err) + } + + // Load the eBPF collection + coll, err := ebpf.NewCollection(spec) + if err != nil { + return fmt.Errorf("failed to load eBPF collection: %w", err) + } + defer coll.Close() + + // Select the program by section + prog := coll.Programs[section] + if prog == nil { + return fmt.Errorf("program section '%s' not found in eBPF object", section) + } + + // Attach the program to the specified interface using XDP + l, err := link.AttachXDP(link.XDPOptions{ + Program: prog, + Interface: helpers.IfaceIndex(iface), + }) + if err != nil { + return fmt.Errorf("failed to attach eBPF program to interface %s: %w", iface, err) + } + defer l.Close() + + fmt.Printf("eBPF program successfully loaded onto interface %s\n", iface) + return nil + }, + }, + { + Name: "interfaces", + Usage: "List all available network interfaces and their essential details", + Action: func(c *cli.Context) error { + ifaces, err := net.Interfaces() + if err != nil { + return fmt.Errorf("failed to list network interfaces: %w", err) + } + + fmt.Println("Available network interfaces:") + + for _, iface := range ifaces { + // Get interface flags + status := "down" + if iface.Flags&net.FlagUp != 0 { + status = "up" + } + + // Get associated IP addresses + addrs, err := iface.Addrs() + if err != nil { + fmt.Printf(" Error getting addresses for %s: %v\n", iface.Name, err) + continue + } + + // Collect IP addresses + var ipList []string + for _, addr := range addrs { + ipList = append(ipList, addr.String()) + } + + // Display interface details with indentation + fmt.Printf("Name: %s\n", iface.Name) + fmt.Printf(" Index: %d\n", iface.Index) + fmt.Printf(" MTU: %d\n", iface.MTU) + fmt.Printf(" HardwareAddr: %s\n", iface.HardwareAddr) + fmt.Printf(" Status: %s\n", status) + fmt.Printf(" IPs: %s\n\n", strings.Join(ipList, ", ")) + } + return nil + }, + }, + { + Name: "status", + Usage: "Verify if an eBPF program is loaded on a specific interface", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "interface", + Usage: "Network interface to check (e.g., eth0)", + Required: true, + }, + }, + Action: func(c *cli.Context) error { + iface := c.String("interface") + + // Check if the eBPF program is attached to the interface + xdpAttached, info, err := helpers.IsXDPAttached(iface) + if err != nil { + return fmt.Errorf("failed to check XDP status on interface %s: %w", iface, err) + } + + if xdpAttached { + fmt.Printf("An eBPF program is loaded on interface %s\n", iface) + fmt.Println(info) + } else { + fmt.Printf("No eBPF program is loaded on interface %s\n", iface) + } + return nil + }, + }, + }, + } +} diff --git a/Dockerfile b/deployments/Dockerfile similarity index 100% rename from Dockerfile rename to deployments/Dockerfile diff --git a/otel-collector-config.yaml b/deployments/otel-collector-config.yaml similarity index 57% rename from otel-collector-config.yaml rename to deployments/otel-collector-config.yaml index a8e76c1..48cc58c 100644 --- a/otel-collector-config.yaml +++ b/deployments/otel-collector-config.yaml @@ -5,12 +5,13 @@ receivers: http: exporters: - jaeger: - endpoint: "http://jaeger:14250" - insecure: true + otlp: + endpoint: "http://jaeger:4317" + tls: + insecure: true service: pipelines: traces: receivers: [otlp] - exporters: [jaeger] \ No newline at end of file + exporters: [otlp] \ No newline at end of file diff --git a/deployments/prometheus.yaml b/deployments/prometheus.yaml new file mode 100644 index 0000000..7876fc5 --- /dev/null +++ b/deployments/prometheus.yaml @@ -0,0 +1,7 @@ +global: + scrape_interval: 15s + +scrape_configs: + - job_name: 'otel-collector' + static_configs: + - targets: ['otel-collector:55680'] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yaml similarity index 51% rename from docker-compose.yml rename to docker-compose.yaml index 3033d69..f40517a 100644 --- a/docker-compose.yml +++ b/docker-compose.yaml @@ -4,12 +4,12 @@ services: fdb: build: context: . - dockerfile: Dockerfile + dockerfile: deployments/Dockerfile ports: - - "4434:4434" # Mapping the dummy service port - - "4433:4433" # Mapping the QUIC service port - - "5011:5011" # Mapping the TCP service port - - "5022:5022" # Mapping the UDP service port + - "4435:4434" # Mapping the dummy service port + - "4436:4433" # Mapping the QUIC service port + - "5012:5011" # Mapping the TCP service port + - "5023:5022" # Mapping the UDP service port - "4060:4060" # Mapping for pprof restart: always environment: @@ -20,6 +20,10 @@ services: command: ["./fdb", "serve", "--config", "/fdb/config.yaml"] depends_on: - otel-collector # Ensure fdb service starts after OpenTelemetry collector + deploy: + resources: + limits: + memory: 16G # Adjust as necessary otel-collector: image: otel/opentelemetry-collector:latest @@ -29,7 +33,7 @@ services: - "55680:55680" # OpenTelemetry protocol receiver command: ["--config=/etc/otel-collector-config.yaml"] volumes: - - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml # Mounting custom config for OpenTelemetry collector + - ./deployments/otel-collector-config.yaml:/etc/otel-collector-config.yaml # Mounting custom config for OpenTelemetry collector jaeger: image: jaegertracing/all-in-one:1.29 @@ -39,3 +43,26 @@ services: - "6831:6831/udp" # Jaeger UDP port for traces environment: - COLLECTOR_ZIPKIN_HTTP_PORT=9411 # Zipkin compatibility mode + + grafana: + image: grafana/grafana:latest + ports: + - "3500:3000" # Grafana web interface + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin # Default admin password + volumes: + - grafana-storage:/var/lib/grafana # Persist Grafana data + depends_on: + - otel-collector # Start Grafana after OpenTelemetry + + prometheus: + image: prom/prometheus:latest + ports: + - "9690:9090" + volumes: + - ./deployments/prometheus.yml:/etc/prometheus/prometheus.yaml + command: + - '--config.file=/etc/prometheus/prometheus.yaml' + +volumes: + grafana-storage: diff --git a/entrypoint/main.go b/entrypoint/main.go index ccf556e..81a9395 100644 --- a/entrypoint/main.go +++ b/entrypoint/main.go @@ -14,6 +14,7 @@ func main() { Commands: []*cli.Command{ cmd.CertsCommand(), // Command for handling certificates cmd.BenchmarkCommand(), // Command for running benchmarks + cmd.EbpfCommands(), // Command for running eBPF specific workload cmd.ServeCommand(), // Command to start the server }, } diff --git a/go.mod b/go.mod index 381ccd3..1853b6a 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( ) require ( + github.com/cilium/ebpf v0.16.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect diff --git a/go.sum b/go.sum index 72d044e..ad8a748 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/cilium/ebpf v0.16.0 h1:+BiEnHL6Z7lXnlGUsXQPPAE7+kenAd4ES8MQ5min0Ok= +github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEngfwE= github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -17,6 +19,7 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= diff --git a/helpers/ebpf.go b/helpers/ebpf.go new file mode 100644 index 0000000..633013a --- /dev/null +++ b/helpers/ebpf.go @@ -0,0 +1,43 @@ +package helpers + +import ( + "fmt" + "net" + "os/exec" + "regexp" + "strings" +) + +// IfaceIndex Helper to get interface index by name +func IfaceIndex(iface string) int { + i, err := net.InterfaceByName(iface) + if err != nil { + panic(fmt.Sprintf("could not find interface %s: %v", iface, err)) + } + return i.Index +} + +// IsXDPAttached Helper to check if XDP program is attached to interface and return additional information +func IsXDPAttached(iface string) (bool, string, error) { + cmd := exec.Command("ip", "link", "show", iface) + output, err := cmd.Output() + if err != nil { + return false, "", err + } + + outputStr := string(output) + + // Check if the output contains "xdp" which indicates an XDP program is attached + if strings.Contains(outputStr, "xdp") { + // Use a regex to extract the XDP program ID (if available) + re := regexp.MustCompile(`prog/xdp id (\d+)`) + match := re.FindStringSubmatch(outputStr) + if len(match) > 1 { + progID := match[1] // XDP program ID + return true, fmt.Sprintf("XDP program ID: %s", progID), nil + } + return true, "XDP program is attached, but no ID found.", nil + } + + return false, "No XDP program attached.", nil +} diff --git a/messages/message.go b/messages/message.go index 07481e2..eae0e0f 100644 --- a/messages/message.go +++ b/messages/message.go @@ -1,9 +1,11 @@ package messages import ( + "crypto/rand" "encoding/binary" "fmt" "github.com/unpackdev/fdb/types" + "io" ) // Message struct represents a UDP message @@ -91,3 +93,41 @@ func Decode(data []byte) (*Message, error) { return msg, nil } + +// GenerateRandomMessage generates a Message with a random handler and key, and no data. +func GenerateRandomMessage(handler types.HandlerType) (*Message, error) { + key, err := generateRandomKey() + if err != nil { + return nil, err + } + + return &Message{ + Handler: handler, + Key: key, + Data: nil, // No data for this message + }, nil +} + +// GenerateRandomMessageWithData generates a Message with a key, and a specified data payload. +func GenerateRandomMessageWithData(handler types.HandlerType, data []byte) (*Message, error) { + key, err := generateRandomKey() + if err != nil { + return nil, err + } + + return &Message{ + Handler: handler, + Key: key, + Data: data, + }, nil +} + +// Helper function to generate a random 32-byte key. +func generateRandomKey() ([32]byte, error) { + var key [32]byte + _, err := io.ReadFull(rand.Reader, key[:]) + if err != nil { + return [32]byte{}, fmt.Errorf("failed to generate random key: %w", err) + } + return key, nil +} diff --git a/transports/ebf/handler_write.go b/transports/ebf/handler_write.go new file mode 100644 index 0000000..7ca9e5c --- /dev/null +++ b/transports/ebf/handler_write.go @@ -0,0 +1,21 @@ +package transport_ebpf + +import ( + "github.com/unpackdev/fdb/db" +) + +// EbpfWriteHandler struct for handling write operations from eBPF ring buffer +type EbpfWriteHandler struct { +} + +// NewEbpfWriteHandler creates a new handler for write operations via eBPF ring buffer +func NewEbpfWriteHandler(db db.Provider) *EbpfWriteHandler { + return &EbpfWriteHandler{} +} + +// HandleMessage processes the incoming message from the eBPF ring buffer +func (rh *EbpfWriteHandler) HandleMessage(frame []byte) { + // Handle the incoming message from eBPF + // For example, log or process the packet here + // You could also interact with a database if needed +} diff --git a/transports/ebf/server.go b/transports/ebf/server.go new file mode 100644 index 0000000..12ab457 --- /dev/null +++ b/transports/ebf/server.go @@ -0,0 +1,100 @@ +package transport_ebpf + +/* +import ( + "context" + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/ringbuf" + "github.com/unpackdev/fdb/config" + "log" +) + +type EbpfHandler func(frame []byte) + +type EbpfServer struct { + ctx context.Context + cnf config.EbpfTransport + handlerRegistry map[uint8]EbpfHandler // Register handlers by action type + stopChan chan struct{} + started chan struct{} + ringBuffer *ringbuf.Reader +} + +// NewEbpfServer initializes the eBPF server with the config and context +func NewEbpfServer(ctx context.Context, cnf config.EbpfTransport) (*EbpfServer, error) { + server := &EbpfServer{ + ctx: ctx, + cnf: cnf, + handlerRegistry: make(map[uint8]EbpfHandler), + stopChan: make(chan struct{}), + started: make(chan struct{}), + } + + return server, nil +} + +// Start begins listening to the eBPF ring buffer for packets +func (s *EbpfServer) Start() error { + s.stopChan = make(chan struct{}) + s.started = make(chan struct{}, 1) + + // Load the pinned ring buffer map (assuming it's pinned by the eBPF program) + rb, err := ebpf.LoadPinnedMap(s.cnf.PinnedRingBuf, nil) + if err != nil { + return err + } + + s.ringBuffer, err = ringbuf.NewReader(rb) + if err != nil { + return err + } + + go func() { + for { + select { + case <-s.stopChan: + return + default: + record, err := s.ringBuffer.Read() + if err != nil { + if err == ringbuf.ErrClosed { + return + } + log.Printf("Error reading from ring buffer: %s", err) + continue + } + + // Handle the packet by calling the appropriate handler + actionType := record[0] // Assuming the first byte is the action type + handler, exists := s.handlerRegistry[actionType] + if exists { + handler(record) + } else { + log.Printf("Unknown action type: %d", actionType) + } + } + } + }() + + // Wait for the signal to start processing + close(s.started) + return nil +} + +// Stop gracefully stops the eBPF server +func (s *EbpfServer) Stop() { + close(s.stopChan) + if s.ringBuffer != nil { + s.ringBuffer.Close() + } +} + +// RegisterHandler adds a new handler for a specific action type +func (s *EbpfServer) RegisterHandler(actionType uint8, handler EbpfHandler) { + s.handlerRegistry[actionType] = handler +} + +// DeregisterHandler removes a handler for a specific action type +func (s *EbpfServer) DeregisterHandler(actionType uint8) { + delete(s.handlerRegistry, actionType) +}*/ diff --git a/transports/tcp/handler_read.go b/transports/tcp/handler_read.go index 3560a4d..c89ae47 100644 --- a/transports/tcp/handler_read.go +++ b/transports/tcp/handler_read.go @@ -1,7 +1,7 @@ package transport_tcp import ( - "github.com/panjf2000/gnet" + "github.com/panjf2000/gnet/v2" "github.com/unpackdev/fdb/db" "log" ) @@ -22,7 +22,7 @@ func NewTCPReadHandler(db db.Provider) *TCPReadHandler { func (rh *TCPReadHandler) HandleMessage(c gnet.Conn, frame []byte) { if len(frame) < 33 { // 1 byte action + 32-byte key log.Printf("Invalid message length: %d, expected at least 33 bytes", len(frame)) - c.SendTo([]byte("Invalid message format")) + c.AsyncWrite([]byte("Invalid message format"), nil) return } @@ -33,16 +33,16 @@ func (rh *TCPReadHandler) HandleMessage(c gnet.Conn, frame []byte) { value, err := rh.db.Get(key) if err != nil { log.Printf("Error reading from database: %v", err) - c.SendTo([]byte("Error reading from database")) + c.AsyncWrite([]byte("Error reading from database"), nil) return } if len(value) == 0 { log.Printf("No value found for key: %x", key) - c.SendTo([]byte("No value found for key")) + c.AsyncWrite([]byte("No value found for key"), nil) return } // Send the value back to the client - c.SendTo([]byte{0x01}) + c.AsyncWrite(value, nil) } diff --git a/transports/tcp/handler_write.go b/transports/tcp/handler_write.go index edcd874..551bacb 100644 --- a/transports/tcp/handler_write.go +++ b/transports/tcp/handler_write.go @@ -1,7 +1,7 @@ package transport_tcp import ( - "github.com/panjf2000/gnet" + "github.com/panjf2000/gnet/v2" "github.com/unpackdev/fdb/db" "log" ) @@ -25,7 +25,7 @@ func (wh *TCPWriteHandler) HandleMessage(c gnet.Conn, frame []byte) { // Check if the message is at least 34 bytes (1 byte for action, 32 bytes for key, and at least 1 byte for value) if len(frame) < 34 { log.Printf("Invalid message length: %d, expected at least 34 bytes", len(frame)) - c.SendTo([]byte{0x01}) + c.AsyncWrite([]byte{0x01}, nil) // Error code return } @@ -40,5 +40,5 @@ func (wh *TCPWriteHandler) HandleMessage(c gnet.Conn, frame []byte) { wh.writer.BufferWrite(key, value) // Send success response - c.SendTo([]byte{0x00}) + c.AsyncWrite([]byte{0x00}, nil) // Success code } diff --git a/transports/tcp/server.go b/transports/tcp/server.go index 51cd168..679d0fe 100644 --- a/transports/tcp/server.go +++ b/transports/tcp/server.go @@ -2,12 +2,14 @@ package transport_tcp import ( "context" - "github.com/panjf2000/gnet" + "io" + "time" + + "github.com/panjf2000/gnet/v2" "github.com/pkg/errors" "github.com/unpackdev/fdb/config" "github.com/unpackdev/fdb/types" "go.uber.org/zap" - "time" ) // TCPHandler function type for TCP handlers @@ -15,12 +17,12 @@ type TCPHandler func(c gnet.Conn, frame []byte) // Server struct represents the TCP server using gnet type Server struct { - *gnet.EventServer ctx context.Context handlerRegistry map[types.HandlerType]TCPHandler cnf config.TcpTransport stopChan chan struct{} started chan struct{} + eng gnet.Engine } // NewServer creates a new TCP Server instance using the provided configuration @@ -45,7 +47,7 @@ func (s *Server) Addr() string { func (s *Server) Start(ctx context.Context) error { s.stopChan = make(chan struct{}) s.started = make(chan struct{}) // Initialize the started channel - listenAddr := s.cnf.Addr() + listenAddr := "tcp://" + s.cnf.Addr() zap.L().Info("Starting TCP Server", zap.String("addr", listenAddr)) // Create an error channel to capture errors from the goroutine @@ -53,7 +55,7 @@ func (s *Server) Start(ctx context.Context) error { // Start the server asynchronously go func() { - err := gnet.Serve( + err := gnet.Run( s, listenAddr, gnet.WithMulticore(true), gnet.WithReusePort(true), @@ -69,7 +71,7 @@ func (s *Server) Start(ctx context.Context) error { close(errChan) // No error, close the channel }() - // Wait until OnInitComplete sends a signal or an error occurs + // Wait until OnBoot sends a signal or an error occurs select { case <-s.started: close(s.started) @@ -85,61 +87,100 @@ func (s *Server) Start(ctx context.Context) error { } } -// Tick is called periodically by gnet -func (s *Server) Tick() (delay time.Duration, action gnet.Action) { - select { - case <-s.stopChan: - return 0, gnet.Shutdown - default: - return time.Second, gnet.None - } -} +// OnBoot is called when the server starts +func (s *Server) OnBoot(eng gnet.Engine) (action gnet.Action) { + s.eng = eng // Store the engine -// Stop stops the TCP server -func (s *Server) Stop() error { - zap.L().Info("Stopping TCP Server", zap.String("addr", s.cnf.Addr())) - close(s.stopChan) + zap.L().Info("TCP Server is listening", zap.String("addr", s.cnf.Addr())) - zap.L().Info("TCP Server stopped successfully", zap.String("addr", s.cnf.Addr())) - return nil + s.started <- struct{}{} // Signal that the server has started + return gnet.None } -// WaitStarted returns the started channel for waiting until the server starts -func (s *Server) WaitStarted() <-chan struct{} { - return s.started +// OnShutdown is called when the server is shutting down +func (s *Server) OnShutdown(eng gnet.Engine) { + zap.L().Info("TCP Server is shutting down", zap.String("addr", s.cnf.Addr())) } -// OnInitComplete is called when the server starts -func (s *Server) OnInitComplete(server gnet.Server) (action gnet.Action) { - zap.L().Info("TCP Server is listening", zap.String("addr", server.Addr.String())) - s.started <- struct{}{} // Signal that the server has started +// OnOpen is called when a new connection is opened +func (s *Server) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { + return nil, gnet.None +} + +// OnClose is called when a connection is closed +func (s *Server) OnClose(c gnet.Conn, err error) (action gnet.Action) { + if err != nil && !errors.Is(err, io.EOF) { + zap.L().Error( + "Connection closed", + zap.Error(err), + zap.String("addr", c.RemoteAddr().String()), + ) + } return gnet.None } -// React handles incoming data -func (s *Server) React(frame []byte, c gnet.Conn) (out []byte, action gnet.Action) { +// OnTraffic handles incoming data +func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action) { + // Read all available data from the connection buffer + frame, err := c.Next(-1) + if err != nil { + zap.L().Error("Error reading data", zap.Error(err)) + return gnet.Close + } + if len(frame) < 1 { zap.L().Warn("Invalid action received", zap.String("addr", c.RemoteAddr().String())) - return []byte("ERROR: Invalid action"), gnet.None + c.AsyncWrite([]byte("ERROR: Invalid action"), nil) + return gnet.None } // Parse the action type actionType, err := s.parseActionType(frame) if err != nil { - //zap.L().Warn("Failed to parse action type", zap.Error(err), zap.String("addr", c.RemoteAddr().String())) - return []byte("ERROR: Invalid action"), gnet.None + c.AsyncWrite([]byte("ERROR: Invalid action"), nil) + return gnet.None } // Check if the handler exists handler, exists := s.handlerRegistry[actionType] if !exists { zap.L().Warn("Unknown action type", zap.Int("action_type", int(actionType)), zap.String("addr", c.RemoteAddr().String())) - return []byte("ERROR: Unknown action"), gnet.None + c.AsyncWrite([]byte("ERROR: Unknown action"), nil) + return gnet.None } // Call the handler handler(c, frame) - return nil, gnet.None + return gnet.None +} + +// OnTick is called periodically by gnet +func (s *Server) OnTick() (delay time.Duration, action gnet.Action) { + select { + case <-s.stopChan: + return 0, gnet.Shutdown + default: + return time.Second, gnet.None + } +} + +// Stop stops the TCP server +func (s *Server) Stop() error { + zap.L().Info("Stopping TCP Server", zap.String("addr", s.cnf.Addr())) + + err := s.eng.Stop(s.ctx) + if err != nil { + zap.L().Error("Error stopping TCP server", zap.Error(err)) + return err + } + + zap.L().Info("TCP Server stopped successfully", zap.String("addr", s.cnf.Addr())) + return nil +} + +// WaitStarted returns the started channel for waiting until the server starts +func (s *Server) WaitStarted() <-chan struct{} { + return s.started } // parseActionType parses the action type from the frame