Skip to content

Commit

Permalink
Adds initial metrics and request duration percentile logic
Browse files Browse the repository at this point in the history
  • Loading branch information
vladComan0 committed Jun 2, 2024
1 parent 9cb25a6 commit ab7ca50
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 14 deletions.
3 changes: 2 additions & 1 deletion cmd/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (app *application) createWorker(w http.ResponseWriter, r *http.Request) {
input.Concurrency,
input.RequestsPerTask,
input.HTTPMethod,
input.Body,
environment,
app.infoLog,
app.errorLog,
Expand All @@ -293,7 +294,7 @@ func (app *application) createWorker(w http.ResponseWriter, r *http.Request) {
worker.CreatedAt = workerFromDB.CreatedAt

wg := &sync.WaitGroup{}
go worker.Start(wg, app.workers)
go worker.Start(wg, app.workers.UpdateStatus)

// Make the application aware of that new location -> add the headers to the right json helper function
headers := make(http.Header)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.2
require (
github.com/go-sql-driver/mysql v1.8.1
github.com/justinas/alice v1.2.0
github.com/montanaflynn/stats v0.7.1
github.com/rs/cors v1.11.0
github.com/spf13/viper v1.18.2
github.com/vladComan0/tasty-byte v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI=
github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
91 changes: 91 additions & 0 deletions internal/data/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package data

import (
"github.com/montanaflynn/stats"
"sync"
"time"
)

type Metrics struct {
MaxLatency time.Duration `json:"max_latency,omitempty"`
Percentiles map[float64]float64 `json:"Percentiles,omitempty"`
TotalRequests int `json:"total_requests,omitempty"`
FailedRequests int `json:"failed_requests,omitempty"`
latencies []time.Duration
mu sync.Mutex
}

func NewMetrics() *Metrics {
return &Metrics{
Percentiles: make(map[float64]float64),
}
}

func (m *Metrics) AddLatency(latency time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()

m.latencies = append(m.latencies, latency)
}

func (m *Metrics) CalculateMaxLatency() {
m.mu.Lock()
defer m.mu.Unlock()

var maximum time.Duration
for _, latency := range m.latencies {
maximum = max(maximum, latency)
}
m.MaxLatency = maximum
}

func (m *Metrics) CalculatePercentiles(percentileRanks ...float64) error {
m.mu.Lock()
defer m.mu.Unlock()

latencies := make([]float64, len(m.latencies))
for i, latency := range m.latencies {
latencies[i] = float64(latency)
}

for _, rank := range percentileRanks {
result, err := calculatePercentile(latencies, rank)
if err != nil {
return err
}
m.Percentiles[rank] = result
}

return nil
}

func calculatePercentile(latencies []float64, rank float64) (float64, error) {
result, err := stats.Percentile(latencies, rank)
if err != nil {
return 0, err
}
return result, nil
}

func (m *Metrics) IncrementTotalRequests() {
m.mu.Lock()
defer m.mu.Unlock()
m.TotalRequests++
}

func (m *Metrics) IncrementFailedRequests() {
m.mu.Lock()
defer m.mu.Unlock()
m.FailedRequests++
}

func (m *Metrics) CalculateErrorRate() float64 {
m.mu.Lock()
defer m.mu.Unlock()

if m.TotalRequests == 0 {
return 0
}

return float64(m.FailedRequests) / float64(m.TotalRequests)
}
91 changes: 78 additions & 13 deletions internal/data/worker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package data

import (
"bytes"
"database/sql"
"encoding/json"
"errors"
"github.com/vladComan0/performance-analyzer/internal/custom_errors"
"github.com/vladComan0/performance-analyzer/pkg/tokens"
Expand Down Expand Up @@ -31,23 +33,28 @@ type Worker struct {
RequestsPerTask int `json:"requests_per_task"`
Report string `json:"report"`
HTTPMethod string `json:"http_method"`
Body *json.RawMessage `json:"body"`
Status Status `json:"status"`
CreatedAt time.Time `json:"-"`
Metrics *Metrics `json:"-"`
Environment *Environment `json:"-"`
TokenManager *tokens.TokenManager `json:"-"`
infoLog *log.Logger
errorLog *log.Logger
mu sync.Mutex
}

// NewWorker creates a new Worker with the given options.
func NewWorker(environmentID, concurrency, requestsPerTask int, httpMethod string, environment *Environment, infoLog *log.Logger, errorLog *log.Logger, options ...WorkerOption) *Worker {
func NewWorker(environmentID, concurrency, requestsPerTask int, httpMethod string, body *json.RawMessage, environment *Environment, infoLog *log.Logger, errorLog *log.Logger, options ...WorkerOption) *Worker {
worker := &Worker{
EnvironmentID: environmentID,
Concurrency: concurrency,
RequestsPerTask: requestsPerTask,
Environment: environment,
HTTPMethod: httpMethod,
Body: body,
Status: StatusCreated,
Metrics: NewMetrics(),
infoLog: infoLog,
errorLog: errorLog,
}
Expand All @@ -59,8 +66,8 @@ func NewWorker(environmentID, concurrency, requestsPerTask int, httpMethod strin
return worker
}

func (w *Worker) Start(wg *sync.WaitGroup, workerStorage WorkerStorageInterface) {
if err := workerStorage.UpdateStatus(w.ID, StatusRunning); err != nil {
func (w *Worker) Start(wg *sync.WaitGroup, updateFunc func(id int, status Status) error) {
if err := updateFunc(w.ID, StatusRunning); err != nil {
w.errorLog.Printf("Error updating status to running: %s", err)
return
}
Expand All @@ -72,11 +79,26 @@ func (w *Worker) Start(wg *sync.WaitGroup, workerStorage WorkerStorageInterface)
}
wg.Wait()

if err := workerStorage.UpdateStatus(w.ID, StatusFinished); err != nil {
if err := updateFunc(w.ID, StatusFinished); err != nil {
w.errorLog.Printf("Error updating status to finished: %s", err)
return
}
w.SetStatus(StatusFinished)

ranks := []float64{50, 95, 99, 99.9}
if err := w.Metrics.CalculatePercentiles(ranks...); err != nil {
w.errorLog.Printf("Error calculating Percentiles: %s", err)
return
}

w.infoLog.Printf("p50 latency: %.6f s", w.Metrics.Percentiles[50]/1e9)
w.infoLog.Printf("p95 latency: %.6f s", w.Metrics.Percentiles[95]/1e9)
w.infoLog.Printf("p99 latency: %.6f s", w.Metrics.Percentiles[99]/1e9)
w.infoLog.Printf("p999 latency: %.6f s", w.Metrics.Percentiles[99.9]/1e9)

w.Metrics.CalculateMaxLatency()
w.infoLog.Printf("Max latency: %.6f s", float64(w.Metrics.MaxLatency)/1e9)
w.infoLog.Printf("Error rate: %.2f%%", 100*w.Metrics.CalculateErrorRate())
}

func (w *Worker) run(wg *sync.WaitGroup) {
Expand All @@ -86,18 +108,37 @@ func (w *Worker) run(wg *sync.WaitGroup) {
case http.MethodGet:
w.get(w.Environment.Endpoint)
case http.MethodPost:
// implement
case http.MethodPut:
// implement
case http.MethodDelete:
// implement
w.post(w.Environment.Endpoint)
}
}
}

func (w *Worker) get(url string) {
client := &http.Client{}
req, err := http.NewRequest("GET", url, nil)
req, err := w.createRequest("GET", url)
if err != nil {
w.errorLog.Printf("Error creating request with HTTP method %s on the URL %s: %s", w.HTTPMethod, url, err)
return
}

start := time.Now()
resp, err := client.Do(req)
latency := time.Since(start)
w.Metrics.IncrementTotalRequests()

if err != nil {
w.errorLog.Printf("Error sending request with HTTP method %s to %s: %s", w.HTTPMethod, url, err)
w.Metrics.IncrementFailedRequests()
return
}
defer resp.Body.Close()

w.Metrics.AddLatency(latency)
}

func (w *Worker) post(url string) {
client := &http.Client{}
req, err := http.NewRequest("POST", url, bytes.NewReader(*w.Body))
if err != nil {
w.errorLog.Printf("Error creating request with HTTP method %s on the URL %s: %s", w.HTTPMethod, url, err)
return
Expand Down Expand Up @@ -126,15 +167,35 @@ func (w *Worker) get(url string) {
log.Printf("Status code: %d", resp.StatusCode)
}

func (w *Worker) createRequest(method, url string) (*http.Request, error) {
req, err := http.NewRequest(method, url, nil)
if err != nil {
return nil, err
}

if w.TokenManager != nil {
token, err := w.TokenManager.GetToken()
if err != nil {
w.errorLog.Printf("Error fetching token on the URL %s: %s ", w.Environment.TokenEndpoint, err)
return nil, err
}
w.infoLog.Printf("Token: %s", token)
req.Header.Add("Authorization", "Bearer "+token)
}

req.Header.Add("Content-Type", "application/json")
return req, nil
}

func (m *WorkerStorage) Insert(worker *Worker) (int, error) {
var workerID int

err := transactions.WithTransaction(m.DB, func(tx transactions.Transaction) error {
stmt := `
INSERT INTO workers (environment_id, concurrency, requests_per_task, report, http_method, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, UTC_TIMESTAMP())
INSERT INTO workers (environment_id, concurrency, requests_per_task, report, http_method, body, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, UTC_TIMESTAMP())
`
result, err := tx.Exec(stmt, worker.EnvironmentID, worker.Concurrency, worker.RequestsPerTask, worker.Report, worker.HTTPMethod, StatusCreated)
result, err := tx.Exec(stmt, worker.EnvironmentID, worker.Concurrency, worker.RequestsPerTask, worker.Report, worker.HTTPMethod, worker.Body, StatusCreated)
if err != nil {
return err
}
Expand Down Expand Up @@ -163,6 +224,7 @@ func (m *WorkerStorage) GetAll() ([]*Worker, error) {
requests_per_task,
report,
http_method,
body,
status,
created_at
FROM workers
Expand Down Expand Up @@ -191,6 +253,7 @@ func (m *WorkerStorage) GetAll() ([]*Worker, error) {
&worker.RequestsPerTask,
&worker.Report,
&worker.HTTPMethod,
&worker.Body,
&worker.Status,
&worker.CreatedAt,
)
Expand Down Expand Up @@ -243,6 +306,7 @@ func (m *WorkerStorage) getWithTx(tx transactions.Transaction, id int) (*Worker,
requests_per_task,
report,
http_method,
body,
status,
created_at
FROM
Expand All @@ -257,6 +321,7 @@ func (m *WorkerStorage) getWithTx(tx transactions.Transaction, id int) (*Worker,
&worker.RequestsPerTask,
&worker.Report,
&worker.HTTPMethod,
&worker.Body,
&worker.Status,
&worker.CreatedAt,
)
Expand Down
2 changes: 2 additions & 0 deletions internal/data/worker_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const (
)

func (w *Worker) SetStatus(s Status) {
w.mu.Lock()
defer w.mu.Unlock()
switch s {
case StatusCreated, StatusRunning, StatusFinished:
w.Status = s
Expand Down

0 comments on commit ab7ca50

Please sign in to comment.