Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added hep buffer #265

Merged
merged 14 commits into from
Nov 20, 2023
52 changes: 28 additions & 24 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,34 @@ import (
var Cfg Config

type Config struct {
Iface *InterfacesConfig
Logging *logp.Logging
Mode string
Dedup bool
Filter string
Discard string
DiscardMethod string
DiscardIP string
DiscardSrcIP string
DiscardDstIP string
Zip bool
HepCollector string
CollectOnlySip bool
HepServer string
HepNodePW string
HepNodeID uint
HepNodeName string
Network string
Protobuf bool
Reassembly bool
SipAssembly bool
SendRetries uint
Version bool
SkipVerify bool
Iface *InterfacesConfig
Logging *logp.Logging
Mode string
Dedup bool
Filter string
Discard string
DiscardMethod string
DiscardIP string
DiscardSrcIP string
DiscardDstIP string
Zip bool
HepCollector string
CollectOnlySip bool
HepServer string
HepNodePW string
HepNodeID uint
HepNodeName string
Network string
Protobuf bool
Reassembly bool
SipAssembly bool
SendRetries uint
Version bool
SkipVerify bool
HEPBufferEnable bool
HEPBufferSize string
HEPBufferFile string
MaxBufferSizeBytes int64
}

type InterfacesConfig struct {
Expand Down
16 changes: 10 additions & 6 deletions decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package decoder
import (
"bytes"
"container/list"
"github.com/segmentio/encoding/json"
"net"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/segmentio/encoding/json"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/tcpassembly"
Expand Down Expand Up @@ -932,7 +933,6 @@ func (d *Decoder) processTransport(foundLayerTypes *[]gopacket.LayerType, udp *l
case layers.LayerTypeTCP:
pkt.SrcPort = uint16(tcp.SrcPort)
pkt.DstPort = uint16(tcp.DstPort)
//pkt.Payload = tcp.Payload
atomic.AddUint64(&d.tcpCount, 1)
logp.Debug("payload", "TCP", pkt)

Expand All @@ -947,13 +947,17 @@ func (d *Decoder) processTransport(foundLayerTypes *[]gopacket.LayerType, udp *l
if !checkResult || payloadList.Len() <= 0 {
return
}
} else {

payloadList.PushBack(pkt.Payload)
}

for elem := payloadList.Front(); elem != nil; elem = elem.Next() {
extractCID(pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort, elem.Value.([]byte))
for elem := payloadList.Front(); elem != nil; elem = elem.Next() {
extractCID(pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort, elem.Value.([]byte))
}
} else {
pkt.Payload = tcp.Payload
extractCID(pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort, pkt.Payload)
}

case layers.LayerTypeSCTP:
pkt.SrcPort = uint16(sctp.SrcPort)
pkt.DstPort = uint16(sctp.DstPort)
Expand Down
55 changes: 54 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package main
import (
"flag"
"fmt"
"math"
"os"
"strconv"
"strings"
"sync"

"github.com/negbie/logp"
"github.com/sipcapture/heplify/config"
"github.com/sipcapture/heplify/sniffer"
)

const version = "heplify 1.65.10"
const version = "heplify 1.65.12"

func createFlags() {

Expand Down Expand Up @@ -79,6 +81,10 @@ func createFlags() {
flag.UintVar(&config.Cfg.SendRetries, "tcpsendretries", 0, "Number of retries for sending before giving up and reconnecting")
flag.BoolVar(&config.Cfg.Version, "version", false, "Show heplify version")
flag.BoolVar(&config.Cfg.SkipVerify, "skipverify", false, "skip certifcate validation")
flag.BoolVar(&config.Cfg.HEPBufferEnable, "hep-buffer", false, "enable buffer messages if connection to HEP server broken")
flag.StringVar(&config.Cfg.HEPBufferSize, "hep-buffer-max-size", "0", "max buffer size, can be B, MB, GB, TB. By default - unlimited")
flag.StringVar(&config.Cfg.HEPBufferFile, "hep-buffer-file", "HEP-Buffer.dump", "filename and location for hep-buffer file")

flag.Parse()

config.Cfg.Iface = &ifaceConfig
Expand All @@ -99,6 +105,43 @@ func createFlags() {
checkErr(err)
}

func Human2FileSize(size string) (int64, error) {

suffixes := [5]string{"B", "KB", "MB", "GB", "TB"} // Intialized with values
var bytesSize int64

for i, suffix := range suffixes {

if i == 0 {
continue
}

if strings.HasSuffix(size, suffix) {
dataBytes := strings.TrimSuffix(size, suffix)
baseVar, err := strconv.Atoi(dataBytes)
if err != nil {
return 0, err
} else {
bytesSize = int64(math.Pow(float64(1024), float64(i))) * int64(baseVar)
return int64(bytesSize), nil
}
}
}

if strings.HasSuffix(size, "B") {

dataBytes := strings.TrimSuffix(size, "B")
baseVar, err := strconv.Atoi(dataBytes)
if err != nil {
return 0, err
} else {
return int64(baseVar), nil
}
}

return bytesSize, fmt.Errorf("not found a valid suffix")
}

func checkErr(err error) {
if err != nil {
fmt.Printf("\nError: %v\n\n", err)
Expand Down Expand Up @@ -129,6 +172,16 @@ func main() {
worker = config.Cfg.Iface.FanoutWorker
}

if config.Cfg.HEPBufferEnable && (config.Cfg.HEPBufferSize != "0" && config.Cfg.HEPBufferSize != "") {
config.Cfg.MaxBufferSizeBytes, err = Human2FileSize(config.Cfg.HEPBufferSize)
if err != nil {
fmt.Println("couldn't convert buffer size to bytes", err)
os.Exit(1)
} else {
fmt.Println("Maximum HEP file size is ", config.Cfg.MaxBufferSizeBytes, "bytes. You provided: ", config.Cfg.HEPBufferSize)
}
}

var wg sync.WaitGroup
for i := 0; i < worker; i++ {
capture, err := sniffer.New(&config.Cfg)
Expand Down
107 changes: 106 additions & 1 deletion publish/hep.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"crypto/tls"
"fmt"
"net"
"os"
"runtime/debug"
"strings"
"unicode"

Expand Down Expand Up @@ -58,7 +60,13 @@ func (h *HEPOutputer) ReConnect(n int) (err error) {
return err
}
h.client[n].writer.Reset(h.client[n].conn)
//h.ReSendPingPacket()

if config.Cfg.HEPBufferEnable {
if _, err := h.copyHEPFileOut(n); err != nil {
logp.Err("Sending HEP from file error: %v", err)
}
}

return err
}

Expand Down Expand Up @@ -137,8 +145,16 @@ func (h *HEPOutputer) Send(msg []byte) {
err = h.client[n].writer.Flush()
if err != nil {
logp.Err("Bad resend: %v", err)
if config.Cfg.HEPBufferEnable {
h.copyHEPbufftoFile(msg)
}

}
}
} else {
if config.Cfg.HEPBufferEnable {
h.copyHEPbufftoFile(msg)
}
}
}
}
Expand All @@ -150,6 +166,95 @@ func (h *HEPOutputer) Start() {
}
}

func (h *HEPOutputer) copyHEPFileOut(n int) (int, error) {

defer func() {
if r := recover(); r != nil {
logp.Err("copy hep file out panic: %v, %v", r, debug.Stack())
return
}
}()

if _, err := os.Stat(config.Cfg.HEPBufferFile); err != nil {
logp.Debug("file doesn't exists: ", config.Cfg.HEPBufferFile)
return 0, fmt.Errorf("file doesn't exists")
}

HEPFileData, HEPFileDataerr := os.ReadFile(config.Cfg.HEPBufferFile)
if HEPFileDataerr != nil {
logp.Err("Read HEP file error: %v", HEPFileDataerr)
return 0, fmt.Errorf("bad read file")
}

if h.client[n].conn == nil {
logp.Err("connection is not up....")
return 0, fmt.Errorf("connection is broken")
}

//Send Logged HEP upon reconnect out to backend
hl, err := h.client[n].conn.Write(HEPFileData)
if err != nil {
err = h.client[n].writer.Flush()
}

if err != nil {
logp.Debug("collector", " ||-->X Send HEP from LOG error ", err)
} else {

fi, err := os.Stat(config.Cfg.HEPBufferFile)
if err != nil {
logp.Debug("collector", " Cannot stat HEP log file ", err)
}
if fi.Size() > 0 {
logp.Debug("collector", " Send HEP from LOG OK: ", hl, " bytes")
//Recreate file, thus cleaning the content
os.Create(config.Cfg.HEPBufferFile)
}
}

return hl, err
}

func (h *HEPOutputer) copyHEPbufftoFile(inbytes []byte) (int64, error) {

defer func() {
if r := recover(); r != nil {
logp.Err("copy buffer to panic: %v,\n%s", r, debug.Stack())
return
}
}()

destination, err := os.OpenFile(config.Cfg.HEPBufferFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
fmt.Println("Open HEP file error", err)
}

defer destination.Close()

if config.Cfg.MaxBufferSizeBytes > 0 {
fi, err := destination.Stat()
if err != nil {
logp.Debug("collector", fmt.Sprintf("couldn't retrive stats from buffer file error: %v", err.Error()))
return 0, err
} else {
if fi.Size() >= config.Cfg.MaxBufferSizeBytes {
logp.Debug("collector", fmt.Sprintln("Buffer size has been excited error: Maxsize: ", config.Cfg.MaxBufferSizeBytes, " vs CurrentSize: ", fi.Size()))
return 0, fmt.Errorf("buffer size has been excited: %d", fi.Size())
}
}
}

nBytes, err := destination.Write(inbytes)

if err != nil {
logp.Debug("collector", "File Send HEP from buffer to file error", err)
} else {
logp.Debug("collector", " File Send HEP from buffer to file OK")
}

return int64(nBytes), err
}

func cutSpace(str string) string {
return strings.Map(func(r rune) rune {
if unicode.IsSpace(r) {
Expand Down