Skip to content

Commit dbdf2ef

Browse files
Merge pull request #6 from Cleafy/feature/mixed-improvements
Feature/mixed improvements
2 parents 880d284 + 3cfe99d commit dbdf2ef

File tree

4 files changed

+132
-44
lines changed

4 files changed

+132
-44
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ usage: promplay [<flags>]
6262
6363
Flags:
6464
--help Show context-sensitive help (also try --help-long and --help-man).
65-
--debug Enable debug mode.
65+
--debug Enable debug mode. (VERY VERBOSE!)
66+
--verbose (-v) Enable info-level message
6667
--nopromcfg Disable the generation of the prometheus cfg file (prometheus.yml)
6768
-d, --dir="/tmp" Input directory.
6869
--version Show application version.

bin/promplay/main.go

Lines changed: 116 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,47 @@ import (
88
"io/ioutil"
99
"net/http"
1010
"os"
11+
"path/filepath"
1112
"sort"
13+
"strings"
1214
"time"
1315

14-
cm "github.com/cleafy/promqueen/model"
16+
cm "github.com/Cleafy/promqueen/model"
17+
1518
"github.com/mattetti/filebuffer"
1619
dto "github.com/prometheus/client_model/go"
1720
"github.com/prometheus/common/expfmt"
1821
"github.com/prometheus/common/model"
1922
"github.com/prometheus/prometheus/storage/local"
2023
"github.com/sirupsen/logrus"
2124
kingpin "gopkg.in/alecthomas/kingpin.v2"
25+
pb "gopkg.in/cheggaaa/pb.v2"
2226
filetype "gopkg.in/h2non/filetype.v1"
2327
)
2428

2529
var replayType = filetype.NewType("rep", "application/replay")
2630

2731
func replayMatcher(buf []byte) bool {
2832
header, err := cm.ReadFrameHeader(filebuffer.New(buf))
29-
if err != nil {
30-
return false
33+
if err == nil {
34+
return cm.CheckVersion(header)
3135
}
32-
return cm.CheckVersion(header)
36+
logrus.Errorf("Malformed frame header!")
37+
return false
3338
}
3439

3540
var (
36-
debug = kingpin.Flag("debug", "Enable debug mode.").Bool()
37-
nopromcfg = kingpin.Flag("nopromcfg", "Disable the generation of the prometheus cfg file (prometheus.yml)").Bool()
38-
dir = kingpin.Flag("dir", "Input directory.").Short('d').OverrideDefaultFromEnvar("INPUT_DIRECTORY").Default(".").String()
39-
framereader = make(<-chan cm.Frame)
40-
Version = "unversioned"
41-
cfgMemoryStorage = local.MemorySeriesStorageOptions{
42-
MemoryChunks: 1024,
43-
MaxChunksToPersist: 1024,
41+
debug = kingpin.Flag("debug", "Enable debug mode. More verbose than --verbose").Default("false").Bool()
42+
verbose = kingpin.Flag("verbose", "Enable info-only mode").Short('v').Default("false").Bool()
43+
nopromcfg = kingpin.Flag("nopromcfg", "Disable the generation of the prometheus cfg file (prometheus.yml)").Bool()
44+
dir = kingpin.Flag("dir", "Input directory.").Short('d').OverrideDefaultFromEnvar("INPUT_DIRECTORY").Default(".").String()
45+
memoryChunk = kingpin.Flag("memoryChunk", "Maximum number of chunks in memory").Default("100000000").Int()
46+
maxChunkToPersist = kingpin.Flag("mexChunkToPersist", "Maximum number of chunks waiting, in memory, to be written on the disk").Default("100000000").Int()
47+
framereader = make(<-chan cm.Frame)
48+
Version = "unversioned"
49+
cfgMemoryStorage = local.MemorySeriesStorageOptions{
50+
MemoryChunks: 0,
51+
MaxChunksToPersist: 0,
4452
//PersistenceStoragePath:
4553
//PersistenceRetentionPeriod:
4654
//CheckpointInterval: time.Minute*30,
@@ -59,44 +67,62 @@ func osfile2fname(fss []os.FileInfo, dir string) []string {
5967
return out
6068
}
6169

62-
func generateFramereader() {
70+
func generateFramereader() int {
71+
defer func() {
72+
if e := recover(); e != nil {
73+
logrus.Errorf("Frame reader generation failed!, MESSAGE: %v", e)
74+
}
75+
}()
76+
77+
logrus.Infoln("Preliminary file read started...")
78+
var count int = 0
6379
// 1. Check for every file that is GZip or csave format and create the filemap
6480
files, err := ioutil.ReadDir(*dir)
6581
if err != nil {
66-
logrus.Fatalf("generateFilemap: %v", err)
82+
panic(err)
6783
}
6884
readers := make([]io.Reader, 0)
6985

7086
fnames := osfile2fname(files, *dir)
71-
sort.Sort(cm.ByNumber(fnames))
87+
sort.Sort(sort.Reverse(cm.ByNumber(fnames)))
7288

7389
logrus.Debugf("fnames: %v", fnames)
7490

75-
for _, filepath := range fnames {
76-
logrus.Debugf("filepath: %v", filepath)
77-
ftype, err := filetype.MatchFile(filepath)
91+
for _, path := range fnames {
92+
logrus.Debugf("filepath: %v", path)
93+
ftype, err := filetype.MatchFile(path)
7894
if err != nil {
7995
logrus.Debugf("err %v", err)
8096
}
8197
if ftype.MIME.Value == "application/replay" {
82-
f, _ := os.Open(filepath)
98+
f, _ := os.Open(path)
99+
100+
count += len(cm.ReadAll(f).Data)
101+
f.Seek(0, 0)
102+
83103
readers = append(readers, f)
84104
}
85-
86105
if ftype.MIME.Value == "application/gzip" {
87-
f, _ := os.Open(filepath)
88-
logrus.Debugf("reading header: %v", filepath)
89-
gz, _ := gzip.NewReader(f)
90-
header, err := cm.ReadFrameHeader(gz)
91-
if err == nil && cm.CheckVersion(header) {
92-
f.Seek(0, io.SeekStart)
93-
gz, _ = gzip.NewReader(f)
94-
readers = append(readers, gz)
95-
}
106+
filename := filepath.Base(path)
107+
ungzip(path, "./tmp/"+trimSuffix(filename, ".gz"))
108+
109+
f, _ := os.Open("./tmp/" + trimSuffix(filename, ".gz"))
110+
111+
count += len(cm.ReadAll(f).Data)
112+
f.Seek(0, 0)
113+
114+
readers = append(readers, f)
96115
}
97116
}
98-
99117
framereader = cm.NewMultiReader(readers)
118+
return count
119+
}
120+
121+
func trimSuffix(s, suffix string) string {
122+
if strings.HasSuffix(s, suffix) {
123+
s = s[:len(s)-len(suffix)]
124+
}
125+
return s
100126
}
101127

102128
func updateURLTimestamp(timestamp int64, name string, url string, body io.Reader) io.Reader {
@@ -137,7 +163,7 @@ func updateURLTimestamp(timestamp int64, name string, url string, body io.Reader
137163

138164
enc.Encode(&metrics)
139165

140-
count += 1
166+
count++
141167
}
142168

143169
logrus.Printf("%d metrics unmarshalled for %s", count, url)
@@ -147,7 +173,40 @@ func updateURLTimestamp(timestamp int64, name string, url string, body io.Reader
147173
return pr
148174
}
149175

176+
func ungzip(source, target string) {
177+
defer func() {
178+
if e := recover(); e != nil {
179+
logrus.Errorf("Errors during decompression of %v", source)
180+
}
181+
}()
182+
183+
reader, err := os.Open(source)
184+
if err != nil {
185+
panic(err)
186+
}
187+
defer reader.Close()
188+
189+
archive, err := gzip.NewReader(reader)
190+
if err != nil {
191+
panic(err)
192+
}
193+
defer archive.Close()
194+
195+
target = filepath.Join(target, archive.Name)
196+
writer, err := os.Create(target)
197+
if err != nil {
198+
panic(err)
199+
}
200+
defer writer.Close()
201+
202+
_, err = io.Copy(writer, archive)
203+
if err != nil {
204+
panic(err)
205+
}
206+
}
207+
150208
func main() {
209+
151210
kingpin.Version(Version)
152211

153212
kingpin.Flag("storage.path", "Directory path to create and fill the data store under.").Default("data").StringVar(&cfgMemoryStorage.PersistenceStoragePath)
@@ -163,7 +222,20 @@ func main() {
163222
flag.Set("log.level", "debug")
164223
}
165224

225+
if !*verbose {
226+
logrus.SetLevel(logrus.ErrorLevel)
227+
flag.Set("log.level", "error")
228+
}
229+
230+
// create temp directory to store ungzipped files
231+
os.Mkdir("./tmp", 0700)
232+
defer os.RemoveAll("./tmp")
233+
166234
logrus.Infoln("Prefilling into", cfgMemoryStorage.PersistenceStoragePath)
235+
236+
cfgMemoryStorage.MaxChunksToPersist = *maxChunkToPersist
237+
cfgMemoryStorage.MemoryChunks = *memoryChunk
238+
167239
localStorage := local.NewMemorySeriesStorage(&cfgMemoryStorage)
168240

169241
sampleAppender := localStorage
@@ -181,19 +253,24 @@ func main() {
181253

182254
filetype.AddMatcher(replayType, replayMatcher)
183255

184-
generateFramereader()
256+
count := generateFramereader()
257+
185258
logrus.Debug("frameReader %+v", framereader)
186259

187260
sout := bufio.NewWriter(os.Stdout)
188261
defer sout.Flush()
189262

190263
r := &http.Request{}
191264

265+
bar := pb.ProgressBarTemplate(`{{ red "Frames processed:" }} {{bar . | green}} {{rtime . "ETA %s" | blue }} {{percent . }}`).Start(count)
266+
defer bar.Finish()
267+
192268
for frame := range framereader {
269+
bar.Increment()
193270
response, err := http.ReadResponse(bufio.NewReader(filebuffer.New(frame.Data)), r)
194271
if err != nil {
195-
logrus.Error(err)
196-
return
272+
logrus.Errorf("Errors occured while reading frame %d, MESSAGE: %v", frame.NameString, err)
273+
continue
197274
}
198275
bytesReader := updateURLTimestamp(frame.Header.Timestamp, frame.NameString(), frame.URIString(), response.Body)
199276

@@ -214,7 +291,7 @@ func main() {
214291
logrus.Infoln("Ingested", len(decSamples), "metrics")
215292

216293
for sampleAppender.NeedsThrottling() {
217-
logrus.Debugln("Waiting 100ms for appender to be ready for more data")
294+
logrus.Debugln("THROTTLING: Waiting 100ms for appender to be ready for more data")
218295
time.Sleep(time.Millisecond * 100)
219296
}
220297

@@ -224,25 +301,26 @@ func main() {
224301
)
225302

226303
for _, s := range model.Samples(decSamples) {
304+
227305
if err := sampleAppender.Append(s); err != nil {
228306
switch err {
229307
case local.ErrOutOfOrderSample:
230308
numOutOfOrder++
231309
logrus.WithFields(logrus.Fields{
232310
"sample": s,
233311
"error": err,
234-
}).Info("Sample discarded")
312+
}).Error("Sample discarded")
235313
case local.ErrDuplicateSampleForTimestamp:
236314
numDuplicates++
237315
logrus.WithFields(logrus.Fields{
238316
"sample": s,
239317
"error": err,
240-
}).Info("Sample discarded")
318+
}).Error("Sample discarded")
241319
default:
242320
logrus.WithFields(logrus.Fields{
243321
"sample": s,
244322
"error": err,
245-
}).Info("Sample discarded")
323+
}).Error("Sample discarded")
246324
}
247325
}
248326
}

model/reader.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ func NewMultiReader(r []io.Reader) <-chan Frame {
3030
}
3131
logrus.Infof("Frames ended")
3232
}()
33-
3433
return chframe
3534
}
3635

@@ -87,11 +86,19 @@ func ReadFrameHeader(r io.Reader) (*FrameHeader, error) {
8786
// ReadFrame reads the next frame from the Reader or returns an error in
8887
// case it cannot interpret the Frame
8988
func ReadFrame(r io.Reader) (frame *Frame, err error) {
89+
defer func() {
90+
if e := recover(); e != nil {
91+
if e.(error).Error() != "EOF" {
92+
logrus.Errorf("Errors occured while reading frame %v, MESSAGE: %v", frame.NameString, e)
93+
}
94+
}
95+
}()
96+
9097
frame = NewEmptyFrame()
9198
frame.Header, err = ReadFrameHeader(r)
9299

93100
if err != nil {
94-
return
101+
panic(err)
95102
}
96103

97104
// generate the correct framesize for .Data
@@ -100,14 +107,16 @@ func ReadFrame(r io.Reader) (frame *Frame, err error) {
100107
// read the frame Data
101108
data, err := readNextBytes(r, int64(len(frame.Data)))
102109
if err != nil {
103-
return
110+
panic(err)
104111
}
112+
105113
buffer := bytes.NewBuffer(data)
106114

107115
err = binary.Read(buffer, binary.BigEndian, frame.Data)
108116
if err != nil {
109-
return
117+
panic(err)
110118
}
119+
111120
logrus.Debugf("ReadFrame: frame.Data %d", frame.Data)
112121

113122
return

model/sort.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
var reNumber *regexp.Regexp
99

1010
func init() {
11-
reNumber, _ = regexp.Compile("([0-9]*)$")
11+
reNumber, _ = regexp.Compile("[0-9]+")
1212
}
1313

1414
// ByNumber helper struct to sort by last number all the log files

0 commit comments

Comments
 (0)