From 356544835cfda12c7fa236829719cd4d410b4f63 Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Thu, 28 Sep 2023 23:20:38 +0200 Subject: [PATCH 01/13] added hep buffer --- config/config.go | 52 ++++++++++++++------------- main.go | 55 +++++++++++++++++++++++++++- publish/hep.go | 93 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 175 insertions(+), 25 deletions(-) diff --git a/config/config.go b/config/config.go index 57fe395..ccb10f5 100644 --- a/config/config.go +++ b/config/config.go @@ -7,30 +7,34 @@ import ( var Cfg Config type Config struct { - Iface *InterfacesConfig - Logging *logp.Logging - Mode string - Dedup bool - Filter string - Discard string - DiscardMethod string - DiscardIP string - DiscardSrcIP string - DiscardDstIP string - Zip bool - HepCollector string - CollectOnlySip bool - HepServer string - HepNodePW string - HepNodeID uint - HepNodeName string - Network string - Protobuf bool - Reassembly bool - SipAssembly bool - SendRetries uint - Version bool - SkipVerify bool + Iface *InterfacesConfig + Logging *logp.Logging + Mode string + Dedup bool + Filter string + Discard string + DiscardMethod string + DiscardIP string + DiscardSrcIP string + DiscardDstIP string + Zip bool + HepCollector string + CollectOnlySip bool + HepServer string + HepNodePW string + HepNodeID uint + HepNodeName string + Network string + Protobuf bool + Reassembly bool + SipAssembly bool + SendRetries uint + Version bool + SkipVerify bool + HEPBufferEnable bool + HEPBufferSize string + HEPBufferFile string + MaxBufferSizeBytes int64 } type InterfacesConfig struct { diff --git a/main.go b/main.go index 0036489..794a693 100644 --- a/main.go +++ b/main.go @@ -3,8 +3,10 @@ package main import ( "flag" "fmt" + "math" "os" "strconv" + "strings" "sync" "github.com/negbie/logp" @@ -12,7 +14,7 @@ import ( "github.com/sipcapture/heplify/sniffer" ) -const version = "heplify 1.65.10" +const version = "heplify 1.65.11" func createFlags() { @@ -79,6 +81,10 @@ func createFlags() { flag.UintVar(&config.Cfg.SendRetries, "tcpsendretries", 0, "Number of retries for sending before giving up and reconnecting") flag.BoolVar(&config.Cfg.Version, "version", false, "Show heplify version") flag.BoolVar(&config.Cfg.SkipVerify, "skipverify", false, "skip certifcate validation") + flag.BoolVar(&config.Cfg.HEPBufferEnable, "hep-buffer", false, "enable buffer messages if connection to HEP server broken") + flag.StringVar(&config.Cfg.HEPBufferSize, "hep-buffer-max-size", "0", "max buffer size, can be B, MB, GB, TB. By default - unlimited") + flag.StringVar(&config.Cfg.HEPBufferFile, "hep-buffer-file", "HEP-Buffer.dump", "filename and location for hep-buffer file") + flag.Parse() config.Cfg.Iface = &ifaceConfig @@ -99,6 +105,43 @@ func createFlags() { checkErr(err) } +func Human2FileSize(size string) (int64, error) { + + suffixes := [5]string{"B", "KB", "MB", "GB", "TB"} // Intialized with values + var bytesSize int64 + + for i, suffix := range suffixes { + + if i == 0 { + continue + } + + if strings.HasSuffix(size, suffix) { + dataBytes := strings.TrimSuffix(size, suffix) + baseVar, err := strconv.Atoi(dataBytes) + if err != nil { + return 0, err + } else { + bytesSize = int64(math.Pow(float64(1024), float64(i))) * int64(baseVar) + return int64(bytesSize), nil + } + } + } + + if strings.HasSuffix(size, "B") { + + dataBytes := strings.TrimSuffix(size, "B") + baseVar, err := strconv.Atoi(dataBytes) + if err != nil { + return 0, err + } else { + return int64(baseVar), nil + } + } + + return bytesSize, fmt.Errorf("not found a valid suffix") +} + func checkErr(err error) { if err != nil { fmt.Printf("\nError: %v\n\n", err) @@ -129,6 +172,16 @@ func main() { worker = config.Cfg.Iface.FanoutWorker } + if config.Cfg.HEPBufferSize != "0" && config.Cfg.HEPBufferSize != "" { + config.Cfg.MaxBufferSizeBytes, err = Human2FileSize(config.Cfg.HEPBufferSize) + if err != nil { + fmt.Println("couldn't convert buffer size to bytes", err) + os.Exit(1) + } else { + fmt.Println("Maximum HEP file size is ", config.Cfg.MaxBufferSizeBytes, "bytes. You provided: ", config.Cfg.HEPBufferSize) + } + } + var wg sync.WaitGroup for i := 0; i < worker; i++ { capture, err := sniffer.New(&config.Cfg) diff --git a/publish/hep.go b/publish/hep.go index 5ea7096..324b833 100644 --- a/publish/hep.go +++ b/publish/hep.go @@ -5,6 +5,8 @@ import ( "crypto/tls" "fmt" "net" + "os" + "runtime/debug" "strings" "unicode" @@ -58,6 +60,10 @@ func (h *HEPOutputer) ReConnect(n int) (err error) { return err } h.client[n].writer.Reset(h.client[n].conn) + + if _, err := h.copyHEPFileOut(n); err != nil { + logp.Err("Sending HEP from file error....:", err) + } //h.ReSendPingPacket() return err } @@ -137,8 +143,12 @@ func (h *HEPOutputer) Send(msg []byte) { err = h.client[n].writer.Flush() if err != nil { logp.Err("Bad resend: %v", err) + h.copyHEPbufftoFile(msg) + } } + } else { + h.copyHEPbufftoFile(msg) } } } @@ -150,6 +160,89 @@ func (h *HEPOutputer) Start() { } } +func (h *HEPOutputer) copyHEPFileOut(n int) (int, error) { + + defer func() { + if r := recover(); r != nil { + logp.Err("copy hep file out panic:", r, debug.Stack()) + return + } + }() + + HEPFileData, HEPFileDataerr := os.ReadFile(config.Cfg.HEPBufferFile) + if HEPFileDataerr != nil { + logp.Err("Read HEP file error", HEPFileDataerr) + } + + if h.client[n].conn == nil { + logp.Err("connection is broken....") + return 0, fmt.Errorf("connection is broken") + } + + //Send Logged HEP upon reconnect out to backend + hl, err := h.client[n].conn.Write(HEPFileData) + if err != nil { + err = h.client[n].writer.Flush() + } + + if err != nil { + logp.Debug("collector", " ||-->X Send HEP from LOG error ", err) + } else { + + fi, err := os.Stat(config.Cfg.HEPBufferFile) + if err != nil { + logp.Debug("collector", " Cannot stat HEP log file ", err) + } + if fi.Size() > 0 { + logp.Debug("collector", " Send HEP from LOG OK: ", hl, " bytes") + //Recreate file, thus cleaning the content + os.Create(config.Cfg.HEPBufferFile) + } + } + + return hl, err +} + +func (h *HEPOutputer) copyHEPbufftoFile(inbytes []byte) (int64, error) { + + defer func() { + if r := recover(); r != nil { + logp.Err("copy buffer to panic: %v,\n%s", r, debug.Stack()) + return + } + }() + + destination, err := os.OpenFile(config.Cfg.HEPBufferFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + fmt.Println("Open HEP file error", err) + } + + defer destination.Close() + + if config.Cfg.MaxBufferSizeBytes > 0 { + fi, err := destination.Stat() + if err != nil { + logp.Debug("collector", fmt.Sprintf("couldn't retrive stats from buffer file error: %v", err.Error())) + return 0, err + } else { + if fi.Size() >= config.Cfg.MaxBufferSizeBytes { + logp.Debug("collector", fmt.Sprintln("Buffer size has been excited error: Maxsize: ", config.Cfg.MaxBufferSizeBytes, " vs CurrentSize: ", fi.Size())) + return 0, fmt.Errorf("buffer size has been excited: %d", fi.Size()) + } + } + } + + nBytes, err := destination.Write(inbytes) + + if err != nil { + logp.Debug("collector", "File Send HEP from buffer to file error", err) + } else { + logp.Debug("collector", " File Send HEP from buffer to file OK") + } + + return int64(nBytes), err +} + func cutSpace(str string) string { return strings.Map(func(r rune) rune { if unicode.IsSpace(r) { From ef136b2ba24e37dca64eb1aa57c235dfce678fa4 Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Thu, 28 Sep 2023 23:24:50 +0200 Subject: [PATCH 02/13] fixed param --- main.go | 2 +- publish/hep.go | 20 +++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/main.go b/main.go index 794a693..7b75882 100644 --- a/main.go +++ b/main.go @@ -172,7 +172,7 @@ func main() { worker = config.Cfg.Iface.FanoutWorker } - if config.Cfg.HEPBufferSize != "0" && config.Cfg.HEPBufferSize != "" { + if config.Cfg.HEPBufferEnable && (config.Cfg.HEPBufferSize != "0" && config.Cfg.HEPBufferSize != "") { config.Cfg.MaxBufferSizeBytes, err = Human2FileSize(config.Cfg.HEPBufferSize) if err != nil { fmt.Println("couldn't convert buffer size to bytes", err) diff --git a/publish/hep.go b/publish/hep.go index 324b833..fd72385 100644 --- a/publish/hep.go +++ b/publish/hep.go @@ -61,10 +61,12 @@ func (h *HEPOutputer) ReConnect(n int) (err error) { } h.client[n].writer.Reset(h.client[n].conn) - if _, err := h.copyHEPFileOut(n); err != nil { - logp.Err("Sending HEP from file error....:", err) + if config.Cfg.HEPBufferEnable { + if _, err := h.copyHEPFileOut(n); err != nil { + logp.Err("Sending HEP from file error: %v", err) + } } - //h.ReSendPingPacket() + return err } @@ -143,12 +145,16 @@ func (h *HEPOutputer) Send(msg []byte) { err = h.client[n].writer.Flush() if err != nil { logp.Err("Bad resend: %v", err) - h.copyHEPbufftoFile(msg) + if config.Cfg.HEPBufferEnable { + h.copyHEPbufftoFile(msg) + } } } } else { - h.copyHEPbufftoFile(msg) + if config.Cfg.HEPBufferEnable { + h.copyHEPbufftoFile(msg) + } } } } @@ -164,14 +170,14 @@ func (h *HEPOutputer) copyHEPFileOut(n int) (int, error) { defer func() { if r := recover(); r != nil { - logp.Err("copy hep file out panic:", r, debug.Stack()) + logp.Err("copy hep file out panic: %v, %v", r, debug.Stack()) return } }() HEPFileData, HEPFileDataerr := os.ReadFile(config.Cfg.HEPBufferFile) if HEPFileDataerr != nil { - logp.Err("Read HEP file error", HEPFileDataerr) + logp.Err("Read HEP file error: %v", HEPFileDataerr) } if h.client[n].conn == nil { From de399e4222a41906850f0d101d081541e9526c24 Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Fri, 29 Sep 2023 09:44:15 +0200 Subject: [PATCH 03/13] fixed non sip assembly process --- decoder/decoder.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/decoder/decoder.go b/decoder/decoder.go index db7c620..08289cf 100644 --- a/decoder/decoder.go +++ b/decoder/decoder.go @@ -3,13 +3,14 @@ package decoder import ( "bytes" "container/list" - "github.com/segmentio/encoding/json" "net" "strconv" "strings" "sync/atomic" "time" + "github.com/segmentio/encoding/json" + "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/tcpassembly" @@ -932,7 +933,6 @@ func (d *Decoder) processTransport(foundLayerTypes *[]gopacket.LayerType, udp *l case layers.LayerTypeTCP: pkt.SrcPort = uint16(tcp.SrcPort) pkt.DstPort = uint16(tcp.DstPort) - //pkt.Payload = tcp.Payload atomic.AddUint64(&d.tcpCount, 1) logp.Debug("payload", "TCP", pkt) @@ -947,13 +947,17 @@ func (d *Decoder) processTransport(foundLayerTypes *[]gopacket.LayerType, udp *l if !checkResult || payloadList.Len() <= 0 { return } - } else { + payloadList.PushBack(pkt.Payload) - } - for elem := payloadList.Front(); elem != nil; elem = elem.Next() { - extractCID(pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort, elem.Value.([]byte)) + for elem := payloadList.Front(); elem != nil; elem = elem.Next() { + extractCID(pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort, elem.Value.([]byte)) + } + } else { + pkt.Payload = tcp.Payload + extractCID(pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort, pkt.Payload) } + case layers.LayerTypeSCTP: pkt.SrcPort = uint16(sctp.SrcPort) pkt.DstPort = uint16(sctp.DstPort) From f3c605049dae805eafb28e51b1dfcaeee285155c Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Fri, 29 Sep 2023 16:10:25 +0200 Subject: [PATCH 04/13] fix for sipassembly --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 7b75882..7522294 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,7 @@ import ( "github.com/sipcapture/heplify/sniffer" ) -const version = "heplify 1.65.11" +const version = "heplify 1.65.12" func createFlags() { From 5d57bbe7d561110c51c8708a960da6680f45cb67 Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Tue, 17 Oct 2023 11:24:06 +0200 Subject: [PATCH 05/13] fixed file check --- publish/hep.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/publish/hep.go b/publish/hep.go index fd72385..5815552 100644 --- a/publish/hep.go +++ b/publish/hep.go @@ -175,13 +175,19 @@ func (h *HEPOutputer) copyHEPFileOut(n int) (int, error) { } }() + if _, err := os.Stat(config.Cfg.HEPBufferFile); err != nil { + logp.Debug("file doesn't exists", config.Cfg.HEPBufferFile) + return 0, fmt.Errorf("file doesn't exists") + } + HEPFileData, HEPFileDataerr := os.ReadFile(config.Cfg.HEPBufferFile) if HEPFileDataerr != nil { logp.Err("Read HEP file error: %v", HEPFileDataerr) + return 0, fmt.Errorf("bad read file") } if h.client[n].conn == nil { - logp.Err("connection is broken....") + logp.Err("connection is not up....") return 0, fmt.Errorf("connection is broken") } From b9da99a333e69c6fd6d590944f8cf1b635edc30d Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Tue, 17 Oct 2023 11:24:49 +0200 Subject: [PATCH 06/13] fixed file check --- publish/hep.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/publish/hep.go b/publish/hep.go index 5815552..5663c53 100644 --- a/publish/hep.go +++ b/publish/hep.go @@ -176,7 +176,7 @@ func (h *HEPOutputer) copyHEPFileOut(n int) (int, error) { }() if _, err := os.Stat(config.Cfg.HEPBufferFile); err != nil { - logp.Debug("file doesn't exists", config.Cfg.HEPBufferFile) + logp.Debug("file doesn't exists: ", config.Cfg.HEPBufferFile) return 0, fmt.Errorf("file doesn't exists") } From 8e6ca6a978349fe8a869ffd602ea4f0d74c39cea Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Tue, 17 Oct 2023 11:55:31 +0200 Subject: [PATCH 07/13] added more debug --- publish/hep.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/publish/hep.go b/publish/hep.go index 5663c53..af55e6e 100644 --- a/publish/hep.go +++ b/publish/hep.go @@ -177,7 +177,7 @@ func (h *HEPOutputer) copyHEPFileOut(n int) (int, error) { if _, err := os.Stat(config.Cfg.HEPBufferFile); err != nil { logp.Debug("file doesn't exists: ", config.Cfg.HEPBufferFile) - return 0, fmt.Errorf("file doesn't exists") + return 0, fmt.Errorf("file doesn't exists: ", config.Cfg.HEPBufferFile) } HEPFileData, HEPFileDataerr := os.ReadFile(config.Cfg.HEPBufferFile) @@ -226,7 +226,8 @@ func (h *HEPOutputer) copyHEPbufftoFile(inbytes []byte) (int64, error) { destination, err := os.OpenFile(config.Cfg.HEPBufferFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) if err != nil { - fmt.Println("Open HEP file error", err) + logp.Err("open HEP file error: %v\n", err) + return 0, fmt.Errorf("open HEP file error: %v", err) } defer destination.Close() @@ -247,7 +248,8 @@ func (h *HEPOutputer) copyHEPbufftoFile(inbytes []byte) (int64, error) { nBytes, err := destination.Write(inbytes) if err != nil { - logp.Debug("collector", "File Send HEP from buffer to file error", err) + logp.Err("file Send HEP from buffer to file error: %v", err.Error()) + return 0, fmt.Errorf("file Send HEP from buffer to file error: %v", err.Error()) } else { logp.Debug("collector", " File Send HEP from buffer to file OK") } From 34f90ad04d1cf9bc6f1d979fe56826869c7a40b2 Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Wed, 18 Oct 2023 14:19:50 +0200 Subject: [PATCH 08/13] fixed some conditions --- config/config.go | 1 + main.go | 2 +- publish/hep.go | 11 ++++++++++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index ccb10f5..4d03b94 100644 --- a/config/config.go +++ b/config/config.go @@ -31,6 +31,7 @@ type Config struct { SendRetries uint Version bool SkipVerify bool + HEPBufferDebug bool HEPBufferEnable bool HEPBufferSize string HEPBufferFile string diff --git a/main.go b/main.go index 6d95e4a..010a5de 100644 --- a/main.go +++ b/main.go @@ -16,7 +16,6 @@ import ( const version = "heplify 1.65.12" - func createFlags() { flag.Usage = func() { @@ -83,6 +82,7 @@ func createFlags() { flag.BoolVar(&config.Cfg.Version, "version", false, "Show heplify version") flag.BoolVar(&config.Cfg.SkipVerify, "skipverify", false, "skip certifcate validation") flag.BoolVar(&config.Cfg.HEPBufferEnable, "hep-buffer", false, "enable buffer messages if connection to HEP server broken") + flag.BoolVar(&config.Cfg.HEPBufferDebug, "hep-buffer-debug", false, "enable debug buffer messages") flag.StringVar(&config.Cfg.HEPBufferSize, "hep-buffer-max-size", "0", "max buffer size, can be B, MB, GB, TB. By default - unlimited") flag.StringVar(&config.Cfg.HEPBufferFile, "hep-buffer-file", "HEP-Buffer.dump", "filename and location for hep-buffer file") diff --git a/publish/hep.go b/publish/hep.go index af55e6e..d1d268f 100644 --- a/publish/hep.go +++ b/publish/hep.go @@ -123,6 +123,9 @@ func (h *HEPOutputer) Send(msg []byte) { var retry bool if config.Cfg.SendRetries > 0 { retry = (h.client[n].errCnt % config.Cfg.SendRetries) == 0 + if config.Cfg.HEPBufferEnable { + h.copyHEPbufftoFile(msg) + } } else { retry = true } @@ -130,6 +133,9 @@ func (h *HEPOutputer) Send(msg []byte) { h.client[n].errCnt = 0 if err = h.ReConnect(n); err != nil { logp.Err("reconnect error: %v", err) + if config.Cfg.HEPBufferEnable { + h.copyHEPbufftoFile(msg) + } return } else { if h.msgPing != nil { @@ -148,7 +154,6 @@ func (h *HEPOutputer) Send(msg []byte) { if config.Cfg.HEPBufferEnable { h.copyHEPbufftoFile(msg) } - } } } else { @@ -224,6 +229,10 @@ func (h *HEPOutputer) copyHEPbufftoFile(inbytes []byte) (int64, error) { } }() + if config.Cfg.HEPBufferDebug { + logp.Err("adding packet to BUFFER: %s\n", string(inbytes)) + } + destination, err := os.OpenFile(config.Cfg.HEPBufferFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) if err != nil { logp.Err("open HEP file error: %v\n", err) From b7974fd5b8d426b8857271d80f0bcccd1b217887 Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Wed, 18 Oct 2023 22:19:23 +0200 Subject: [PATCH 09/13] fixed param --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 010a5de..00c8623 100644 --- a/main.go +++ b/main.go @@ -81,7 +81,7 @@ func createFlags() { flag.UintVar(&config.Cfg.SendRetries, "tcpsendretries", 0, "Number of retries for sending before giving up and reconnecting") flag.BoolVar(&config.Cfg.Version, "version", false, "Show heplify version") flag.BoolVar(&config.Cfg.SkipVerify, "skipverify", false, "skip certifcate validation") - flag.BoolVar(&config.Cfg.HEPBufferEnable, "hep-buffer", false, "enable buffer messages if connection to HEP server broken") + flag.BoolVar(&config.Cfg.HEPBufferEnable, "hep-buffer-activate", false, "enable buffer messages if connection to HEP server broken") flag.BoolVar(&config.Cfg.HEPBufferDebug, "hep-buffer-debug", false, "enable debug buffer messages") flag.StringVar(&config.Cfg.HEPBufferSize, "hep-buffer-max-size", "0", "max buffer size, can be B, MB, GB, TB. By default - unlimited") flag.StringVar(&config.Cfg.HEPBufferFile, "hep-buffer-file", "HEP-Buffer.dump", "filename and location for hep-buffer file") From 2af57d988ef46ae4ba31551a3d4c5cd7c86d7c64 Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Fri, 17 Nov 2023 22:48:57 +0100 Subject: [PATCH 10/13] added description --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 00c8623..0813fa6 100644 --- a/main.go +++ b/main.go @@ -83,7 +83,7 @@ func createFlags() { flag.BoolVar(&config.Cfg.SkipVerify, "skipverify", false, "skip certifcate validation") flag.BoolVar(&config.Cfg.HEPBufferEnable, "hep-buffer-activate", false, "enable buffer messages if connection to HEP server broken") flag.BoolVar(&config.Cfg.HEPBufferDebug, "hep-buffer-debug", false, "enable debug buffer messages") - flag.StringVar(&config.Cfg.HEPBufferSize, "hep-buffer-max-size", "0", "max buffer size, can be B, MB, GB, TB. By default - unlimited") + flag.StringVar(&config.Cfg.HEPBufferSize, "hep-buffer-max-size", "0", "max buffer size, can be B, KB, MB, GB, TB. By default - unlimited") flag.StringVar(&config.Cfg.HEPBufferFile, "hep-buffer-file", "HEP-Buffer.dump", "filename and location for hep-buffer file") flag.Parse() From c81ffd78f9ae154983d325af789cdb893ddb762c Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Fri, 17 Nov 2023 23:40:25 +0100 Subject: [PATCH 11/13] added resend buffer on start --- main.go | 2 +- publish/hep.go | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 0813fa6..655bde7 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,7 @@ import ( "github.com/sipcapture/heplify/sniffer" ) -const version = "heplify 1.65.12" +const version = "heplify 1.65.13" func createFlags() { diff --git a/publish/hep.go b/publish/hep.go index d1d268f..5ec499b 100644 --- a/publish/hep.go +++ b/publish/hep.go @@ -39,6 +39,12 @@ func NewHEPOutputer(serverAddr string) (*HEPOutputer, error) { if err := h.ConnectServer(n); err != nil { logp.Err("%v", err) errCnt++ + } else { + if config.Cfg.HEPBufferEnable { + if _, err := h.copyHEPFileOut(n); err != nil { + logp.Err("Sending HEP from file error: %v", err) + } + } } } if errCnt == l { From f235f317e11fa6573be6df2a601e8b9c4a894ef1 Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Fri, 17 Nov 2023 23:45:45 +0100 Subject: [PATCH 12/13] upgraded --- go.mod | 7 ++++--- go.sum | 27 +++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index bb19cd0..2d774c7 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,11 @@ go 1.15 require ( github.com/cespare/xxhash v1.1.0 // indirect github.com/gogo/protobuf v1.3.2 - github.com/google/gopacket v1.1.18 + github.com/google/gopacket v1.1.19 github.com/negbie/freecache v1.1.0 github.com/negbie/logp v0.0.0-20190313141056-04cebff7f846 - github.com/segmentio/encoding v0.1.15 + github.com/segmentio/asm v1.2.0 // indirect + github.com/segmentio/encoding v0.3.6 github.com/stretchr/testify v1.6.1 - golang.org/x/net v0.7.0 + golang.org/x/net v0.18.0 ) diff --git a/go.sum b/go.sum index 0832212..360aebe 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/google/gopacket v1.1.18 h1:lum7VRA9kdlvBi7/v2p7/zcbkduHaCH/SVVyurs7OpY= github.com/google/gopacket v1.1.18/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM= +github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= +github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/negbie/freecache v1.1.0 h1:21vPkO0aFvrSlh0j37cwXlIe3swtjYi3xt+InoxjOt4= @@ -16,8 +18,13 @@ github.com/negbie/logp v0.0.0-20190313141056-04cebff7f846 h1:PAr5hcOgvc2m71W4Slb github.com/negbie/logp v0.0.0-20190313141056-04cebff7f846/go.mod h1:xTKf9aKLuVL0r9wxWouR+/4ctK2ywm0rTPFY2klrnOg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/asm v1.1.3/go.mod h1:Ld3L4ZXGNcSLRg4JBsZ3//1+f/TjYl0Mzen/DQy1EJg= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/segmentio/encoding v0.1.15 h1:btgfyAuFo3uLw7eOrRDPo8H4Bc881+bSPHzAEe0ukho= github.com/segmentio/encoding v0.1.15/go.mod h1:RWhr02uzMB9gQC1x+MfYxedtmBibb9cZ6Vv9VxRSSbw= +github.com/segmentio/encoding v0.3.6 h1:E6lVLyDPseWEulBmCmAKPanDd3jiyGDo5gMcugCRwZQ= +github.com/segmentio/encoding v0.3.6/go.mod h1:n0JeuIqEQrQoPDGsjo8UNd1iA0U8d8+oHAA4E3G3OxM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -30,43 +37,63 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g= +golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190405154228-4b34438f7a67/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211110154304-99a53858aa08/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.14.0/go.mod h1:TySc+nGkYR6qt8km8wUhuFRTVSMIX3XPR58y2lC8vww= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= From 1c077e6dda74a9d0229959e975cb07e86ab689fd Mon Sep 17 00:00:00 2001 From: Alexandr Dubovikov Date: Mon, 20 Nov 2023 11:31:31 +0100 Subject: [PATCH 13/13] fix retry --- publish/hep.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/publish/hep.go b/publish/hep.go index 5ec499b..dc97612 100644 --- a/publish/hep.go +++ b/publish/hep.go @@ -41,8 +41,10 @@ func NewHEPOutputer(serverAddr string) (*HEPOutputer, error) { errCnt++ } else { if config.Cfg.HEPBufferEnable { - if _, err := h.copyHEPFileOut(n); err != nil { - logp.Err("Sending HEP from file error: %v", err) + if _, err := os.Stat(config.Cfg.HEPBufferFile); err == nil { + if _, err := h.copyHEPFileOut(n); err != nil { + logp.Err("Sending HEP from file error: %v", err) + } } } } @@ -124,14 +126,10 @@ func (h *HEPOutputer) Send(msg []byte) { h.client[n].writer.Write(msg) err := h.client[n].writer.Flush() if err != nil { - logp.Err("%v", err) h.client[n].errCnt++ var retry bool if config.Cfg.SendRetries > 0 { retry = (h.client[n].errCnt % config.Cfg.SendRetries) == 0 - if config.Cfg.HEPBufferEnable { - h.copyHEPbufftoFile(msg) - } } else { retry = true }