-
Notifications
You must be signed in to change notification settings - Fork 3
/
slammer.go
194 lines (177 loc) · 6.37 KB
/
slammer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package main
import (
"bufio"
"database/sql"
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"strings"
"sync"
"time"
// Load the drivers
// MySQL
_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
)
type config struct {
connString string
db string
pauseInterval time.Duration
workers int
debugMode bool
}
type result struct {
start time.Time
end time.Time
dbTime time.Duration
workCount int
errors int
}
func main() {
cfg, err := getConfig()
if err != nil {
log.Fatal(err)
}
// Open the connection to the DB - should each worker open it's own connection?
// Food for thought
db, err := sql.Open(cfg.db, cfg.connString)
if err != nil {
log.Fatal(err)
}
// Declare the channel we'll be using as a work queue
inputChan := make(chan (string))
// Declare the channel that will gather results
outputChan := make(chan (result), cfg.workers)
// Declare a waitgroup to help prevent log interleaving - I technically do not
// need one, but without it, I find there are stray log messages creeping into
// the final report. Setting sync() on STDOUT/ERR didn't seem to fix it.
var wg sync.WaitGroup
wg.Add(cfg.workers)
// Start the pool of workers up, reading from the channel
totalStart := time.Now()
startWorkers(cfg.workers, inputChan, outputChan, db, &wg, cfg.pauseInterval, cfg.debugMode)
// Warm up error and line so I can use error in the for loop with running into
// a shadowing issue
err = nil
line := ""
totalWorkCount := 0
// Read from STDIN in the main thread
input := bufio.NewReader(os.Stdin)
for err != io.EOF {
line, err = input.ReadString('\n')
if err == nil {
// Get rid of any unwanted stuff
line = strings.TrimRight(line, "\r\n")
// Push that onto the work queue
inputChan <- line
totalWorkCount++
} else if cfg.debugMode {
log.Println(err)
}
}
// Close the channel, since it's done receiving input
close(inputChan)
// As I mentioned above, because workers wont finish at the same time, I need
// to use a waitgroup, otherwise the output below gets potentially mixed in with
// debug or error messages from the workers. The waitgroup semaphore prevents this
// even though it probably looks redundant
wg.Wait()
totalEnd := time.Now()
wallTime := totalEnd.Sub(totalStart)
// Collect all results, report them. This will block and wait until all results
// are in
fmt.Println("Slammer Status:")
totalErrors := 0
var totalDbTime time.Duration
for i := 0; i < cfg.workers; i++ {
r := <-outputChan
workerDuration := r.end.Sub(r.start)
totalErrors += r.errors
totalDbTime += r.dbTime
fmt.Printf("---- Worker #%d ----\n", i)
fmt.Printf(" Started at %s , Ended at %s, Wall time %s, DB time %s\n", r.start.Format("2006-01-02 15:04:05"), r.end.Format("2006-01-02 15:04:05"), workerDuration.String(), r.dbTime)
fmt.Printf(" Units of work: %d, Percentage work: %f, Average work over DB time: %f\n", r.workCount, float64(r.workCount)/float64(totalWorkCount), float64(r.workCount)/float64(r.dbTime))
fmt.Printf(" Errors: %d , Percentage errors: %f, Average errors per second: %f\n", r.errors, float64(r.errors)/float64(r.workCount), float64(r.errors)/workerDuration.Seconds())
}
// TODO work on improving what we report here
fmt.Printf("---- Overall ----\n")
fmt.Printf(" Started at %s , Ended at %s, Wall time %s, DB time %s\n", totalStart.Format("2006-01-02 15:04:05"), totalEnd.Format("2006-01-02 15:04:05"), wallTime, totalDbTime)
fmt.Printf(" Units of work: %d, Average work over DB time: %f\n", totalWorkCount, float64(totalWorkCount)/float64(totalDbTime))
fmt.Printf(" Errors: %d, Percentage errors: %f\n", totalErrors, float64(totalErrors)/float64(totalWorkCount))
// Lets just be nice and tidy
close(outputChan)
}
func startWorkers(count int, ic <-chan string, oc chan<- result, db *sql.DB, wg *sync.WaitGroup, pause time.Duration, debugMode bool) {
// Start the pool of workers up, reading from the channel
for i := 0; i < count; i++ {
// register a signal chan for handling shutdown
sc := make(chan os.Signal)
signal.Notify(sc, os.Interrupt)
// Pass in everything it needs
go startWorker(i, ic, oc, sc, db, wg, pause, debugMode)
}
}
func startWorker(workerNum int, ic <-chan string, oc chan<- result, sc <-chan os.Signal, db *sql.DB, done *sync.WaitGroup, pause time.Duration, debugMode bool) {
// Prep the result object
r := result{start: time.Now()}
for line := range ic {
// First thing is first - do a non blocking read from the signal channel, and
// handle it if something came through the pipe
select {
case _ = <-sc:
// UGH I ACTUALLY ALMOST USED A GOTO HERE BUT I JUST CANT DO IT
// NO NO NO NO NO NO I WONT YOU CANT MAKE ME NO
// I could put it into an anonymous function defer, though...
r.end = time.Now()
oc <- r
done.Done()
return
default:
// NOOP
}
t := time.Now()
_, err := db.Exec(line)
r.dbTime += time.Since(t)
// TODO should this be after the err != nil? It counts towards work attempted
// but not work completed.
r.workCount++
if err != nil {
r.errors++
if debugMode {
log.Printf("Worker #%d: %s - %s", workerNum, line, err.Error())
}
} else {
// Sleep for the configured amount of pause time between each call
time.Sleep(pause)
}
}
// Let everyone know we're done, and bail out
r.end = time.Now()
oc <- r
done.Done()
}
func getConfig() (*config, error) {
p := flag.String("p", "1s", "The time to pause between each call to the database")
c := flag.String("c", "", "The connection string to use when connecting to the database")
db := flag.String("db", "mysql", "The database driver to load. Defaults to mysql")
w := flag.Int("w", 1, "The number of workers to use. A number greater than 1 will enable statements to be issued concurrently")
d := flag.Bool("d", false, "Debug mode - turn this on to have errors printed to the terminal")
// TODO support an "interactive" flag to drop you into a shell that outputs things like
// sparklines of the current worker throughputs
flag.Parse()
if *c == "" {
return nil, errors.New("You must provide a connection string using the -c option")
}
pi, err := time.ParseDuration(*p)
if err != nil {
return nil, errors.New("You must provide a proper duration value with -p")
}
if *w <= 0 {
return nil, errors.New("You must provide a worker count > 0 with -w")
}
return &config{db: *db, connString: *c, pauseInterval: pi, workers: *w, debugMode: *d}, nil
}