Skip to content

Commit

Permalink
Merge pull request #56 from teepark/master
Browse files Browse the repository at this point in the history
add a TCP listener
  • Loading branch information
markrechler committed May 6, 2015
2 parents 7c44467 + 6fc8f8b commit c19b47a
Show file tree
Hide file tree
Showing 3 changed files with 388 additions and 197 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ Usage of ./statsdaemon:
-postfix="": Postfix for all stats
-delete-gauges=true: don't send values to graphite for inactive gauges, as opposed to sending the previous value
-receive-counter="": Metric name for total metrics received per interval
-tcpaddr="": TCP service address, if set
-version=false: print version string
```
320 changes: 210 additions & 110 deletions statsdaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"io"
"log"
"math"
"net"
Expand All @@ -21,6 +22,7 @@ import (

const (
MAX_UNPROCESSED_PACKETS = 1000
TCP_READ_SIZE = 4096
)

var signalchan chan os.Signal
Expand Down Expand Up @@ -79,18 +81,19 @@ func sanitizeBucket(bucket string) string {
}

var (
serviceAddress = flag.String("address", ":8125", "UDP service address")
maxUdpPacketSize = flag.Int64("max-udp-packet-size", 1472, "Maximum UDP packet size")
graphiteAddress = flag.String("graphite", "127.0.0.1:2003", "Graphite service address (or - to disable)")
flushInterval = flag.Int64("flush-interval", 10, "Flush interval (seconds)")
debug = flag.Bool("debug", false, "print statistics sent to graphite")
showVersion = flag.Bool("version", false, "print version string")
deleteGauges = flag.Bool("delete-gauges", true, "don't send values to graphite for inactive gauges, as opposed to sending the previous value")
persistCountKeys = flag.Int64("persist-count-keys", 60, "number of flush-intervals to persist count keys")
receiveCounter = flag.String("receive-counter", "", "Metric name for total metrics received per interval")
percentThreshold = Percentiles{}
prefix = flag.String("prefix", "", "Prefix for all stats")
postfix = flag.String("postfix", "", "Postfix for all stats")
serviceAddress = flag.String("address", ":8125", "UDP service address")
tcpServiceAddress = flag.String("tcpaddr", "", "TCP service address, if set")
maxUdpPacketSize = flag.Int64("max-udp-packet-size", 1472, "Maximum UDP packet size")
graphiteAddress = flag.String("graphite", "127.0.0.1:2003", "Graphite service address (or - to disable)")
flushInterval = flag.Int64("flush-interval", 10, "Flush interval (seconds)")
debug = flag.Bool("debug", false, "print statistics sent to graphite")
showVersion = flag.Bool("version", false, "print version string")
deleteGauges = flag.Bool("delete-gauges", true, "don't send values to graphite for inactive gauges, as opposed to sending the previous value")
persistCountKeys = flag.Int64("persist-count-keys", 60, "number of flush-intervals to persist count keys")
receiveCounter = flag.String("receive-counter", "", "Metric name for total metrics received per interval")
percentThreshold = Percentiles{}
prefix = flag.String("prefix", "", "Prefix for all stats")
postfix = flag.String("postfix", "", "Postfix for all stats")
)

func init() {
Expand Down Expand Up @@ -365,122 +368,210 @@ func processTimers(buffer *bytes.Buffer, now int64, pctls Percentiles) int64 {
return num
}

func parseMessage(data []byte) []*Packet {
var (
output []*Packet
input []byte
)
type MsgParser struct {
reader io.Reader
buffer []byte
partialReads bool
done bool
}

for _, line := range bytes.Split(data, []byte("\n")) {
if len(line) == 0 {
continue
}
input = line
func NewParser(reader io.Reader, partialReads bool) *MsgParser {
return &MsgParser{reader, []byte{}, partialReads, false}
}

index := bytes.IndexByte(input, ':')
if index < 0 || index == len(input)-1 {
if *debug {
log.Printf("ERROR: failed to parse line: %s\n", string(line))
}
continue
func (mp *MsgParser) Next() (*Packet, bool) {
buf := mp.buffer

for {
line, rest := mp.lineFrom(buf)

if line != nil {
mp.buffer = rest
return parseLine(line), true
}

name := input[:index]
if mp.done {
return parseLine(rest), false
}

index++
input = input[index:]
idx := len(buf)
end := idx
if mp.partialReads {
end += TCP_READ_SIZE
} else {
end += int(*maxUdpPacketSize)
}
if cap(buf) >= end {
buf = buf[:end]
} else {
tmp := buf
buf = make([]byte, end)
copy(buf, tmp)
}

index = bytes.IndexByte(input, '|')
if index < 0 || index == len(input)-1 {
if *debug {
log.Printf("ERROR: failed to parse line: %s\n", string(line))
n, err := mp.reader.Read(buf[idx:])
buf = buf[:idx+n]
if err != nil {
if err != io.EOF {
log.Printf("ERROR: %s", err)
}
continue
}

val := input[:index]
index++
mp.done = true

var mtypeStr string
line, rest = mp.lineFrom(buf)
if line != nil {
mp.buffer = rest
return parseLine(line), len(rest) > 0
}

if input[index] == 'm' {
index++
if index >= len(input) || input[index] != 's' {
if *debug {
log.Printf("ERROR: failed to parse line: %s\n", string(line))
}
continue
if len(rest) > 0 {
return parseLine(rest), false
}
mtypeStr = "ms"
} else {
mtypeStr = string(input[index])

return nil, false
}
}
}

index++
input = input[index:]
func (mp *MsgParser) lineFrom(input []byte) ([]byte, []byte) {
split := bytes.SplitAfterN(input, []byte("\n"), 2)
if len(split) == 2 {
return split[0][:len(split[0])-1], split[1]
}

var (
value interface{}
err error
)
if !mp.partialReads {
if len(input) == 0 {
input = nil
}
return input, []byte{}
}

if mtypeStr[0] == 'c' {
value, err = strconv.ParseInt(string(val), 10, 64)
if err != nil {
log.Printf("ERROR: failed to ParseInt %s - %s", string(val), err)
continue
}
} else if mtypeStr[0] == 'g' {
var relative, negative bool
var stringToParse string

switch val[0] {
case '+', '-':
relative = true
negative = val[0] == '-'
stringToParse = string(val[1:])
default:
relative = false
negative = false
stringToParse = string(val)
}
if bytes.HasSuffix(input, []byte("\n")) {
return input[:len(input)-1], []byte{}
}

gaugeValue, err := strconv.ParseUint(stringToParse, 10, 64)
if err != nil {
log.Printf("ERROR: failed to ParseUint %s - %s", string(val), err)
continue
}
return nil, input
}

value = GaugeData{relative, negative, gaugeValue}
} else if mtypeStr[0] == 's' {
value = string(val)
} else {
value, err = strconv.ParseUint(string(val), 10, 64)
func parseLine(line []byte) *Packet {
split := bytes.SplitN(line, []byte{'|'}, 3)
if len(split) < 2 {
logParseFail(line)
return nil
}

keyval := split[0]
typeCode := string(split[1])

sampling := float32(1)
if strings.HasPrefix(typeCode, "c") || strings.HasPrefix(typeCode, "ms") {
if len(split) == 3 && len(split[2]) > 0 && split[2][0] == '@' {
f64, err := strconv.ParseFloat(string(split[2][1:]), 32)
if err != nil {
log.Printf("ERROR: failed to ParseUint %s - %s", string(val), err)
continue
log.Printf(
"ERROR: failed to ParseFloat %s - %s",
string(split[2][1:]),
err,
)
return nil
}
sampling = float32(f64)
}
}

var sampleRate float32 = 1
split = bytes.SplitN(keyval, []byte{':'}, 2)
if len(split) < 2 {
logParseFail(line)
return nil
}
name := string(split[0])
val := split[1]
if len(val) == 0 {
logParseFail(line)
return nil
}

if len(input) > 0 && bytes.HasPrefix(input, []byte("|@")) {
input = input[2:]
rate, err := strconv.ParseFloat(string(input), 32)
if err == nil {
sampleRate = float32(rate)
}
var (
err error
value interface{}
)

switch typeCode {
case "c":
value, err = strconv.ParseInt(string(val), 10, 64)
if err != nil {
log.Printf("ERROR: failed to ParseInt %s - %s", string(val), err)
return nil
}
case "g":
var rel, neg bool
var s string

switch val[0] {
case '+':
rel = true
neg = false
s = string(val[1:])
case '-':
rel = true
neg = true
s = string(val[1:])
default:
rel = false
neg = false
s = string(val)
}

packet := &Packet{
Bucket: sanitizeBucket(*prefix + string(name) + *postfix),
Value: value,
Modifier: mtypeStr,
Sampling: sampleRate,
value, err = strconv.ParseUint(s, 10, 64)
if err != nil {
log.Printf("ERROR: failed to ParseUint %s - %s", string(val), err)
return nil
}

value = GaugeData{rel, neg, value.(uint64)}
case "s":
value = string(val)
case "ms":
value, err = strconv.ParseUint(string(val), 10, 64)
if err != nil {
log.Printf("ERROR: failed to ParseUint %s - %s", string(val), err)
return nil
}
default:
log.Printf("ERROR: unrecognized type code %q", typeCode)
return nil
}

return &Packet{
Bucket: sanitizeBucket(*prefix + string(name) + *postfix),
Value: value,
Modifier: typeCode,
Sampling: sampling,
}
}

func logParseFail(line []byte) {
if *debug {
log.Printf("ERROR: failed to parse line: %q\n", string(line))
}
}

func parseTo(conn io.ReadCloser, partialReads bool, out chan<- *Packet) {
defer conn.Close()

parser := NewParser(conn, partialReads)
for {
p, more := parser.Next()
if p == nil {
break
}

out <- p

if !more {
break
}
output = append(output, packet)
}
return output
}

func udpListener() {
Expand All @@ -490,19 +581,25 @@ func udpListener() {
if err != nil {
log.Fatalf("ERROR: ListenUDP - %s", err)
}

parseTo(listener, false, In)
}

func tcpListener() {
address, _ := net.ResolveTCPAddr("tcp", *tcpServiceAddress)
log.Printf("listening on %s", address)
listener, err := net.ListenTCP("tcp", address)
if err != nil {
log.Fatalf("ERROR: ListenTCP - %s", err)
}
defer listener.Close()

message := make([]byte, *maxUdpPacketSize)
for {
n, remaddr, err := listener.ReadFromUDP(message)
conn, err := listener.AcceptTCP()
if err != nil {
log.Printf("ERROR: reading UDP packet from %+v - %s", remaddr, err)
continue
}

for _, p := range parseMessage(message[:n]) {
In <- p
log.Fatalf("ERROR: AcceptTCP - %s", err)
}
go parseTo(conn, true, In)
}
}

Expand All @@ -518,5 +615,8 @@ func main() {
signal.Notify(signalchan, syscall.SIGTERM)

go udpListener()
if *tcpServiceAddress != "" {
go tcpListener()
}
monitor()
}
Loading

0 comments on commit c19b47a

Please sign in to comment.