Skip to content

Commit

Permalink
feat: add blob size as a metric when GET/PUT
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Sep 6, 2024
1 parent b5e3a07 commit 8cb04ac
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 16 deletions.
20 changes: 20 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Metricer interface {
RecordInfo(version string)
RecordUp()
RecordRPCServerRequest(method string) func(status string, commitmentMode string, version string)
RecordBlobSize(method string, mode string, ver string, size int)

Document() []metrics.DocumentedMetric
}
Expand All @@ -42,6 +43,7 @@ type Metrics struct {
HTTPServerRequestsTotal *prometheus.CounterVec
HTTPServerBadRequestHeader *prometheus.CounterVec
HTTPServerRequestDurationSeconds *prometheus.HistogramVec
BlobSizeBytes *prometheus.HistogramVec

registry *prometheus.Registry
factory metrics.Factory
Expand Down Expand Up @@ -101,6 +103,16 @@ func NewMetrics(subsystem string) *Metrics {
}, []string{
"method", "commitment_mode", "commitment_version", // no status on histograms because those are very expensive
}),
BlobSizeBytes: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "blob_size_bytes",
Help: "Histogram of blob size in bytes",
Buckets: prometheus.ExponentialBucketsRange(1, 1000000000, 20),
}, []string{
//Todo: add store as a label
"method", "commitment_mode", "certificate_version",
}),
registry: registry,
factory: factory,
}
Expand Down Expand Up @@ -131,6 +143,11 @@ func (m *Metrics) RecordRPCServerRequest(method string) func(status string, mode
}
}

// RecordBlobSize records the size of a blob in bytes
func (m *Metrics) RecordBlobSize(method string, mode string, ver string, size int) {
m.BlobSizeBytes.WithLabelValues(method, mode, ver).Observe(float64(size))
}

// StartServer starts the metrics server on the given hostname and port.
func (m *Metrics) StartServer(hostname string, port int) (*ophttp.HTTPServer, error) {
addr := net.JoinHostPort(hostname, strconv.Itoa(port))
Expand Down Expand Up @@ -162,3 +179,6 @@ func (n *noopMetricer) RecordUp() {
func (n *noopMetricer) RecordRPCServerRequest(string) func(status, mode, ver string) {
return func(string, string, string) {}
}

func (n *noopMetricer) RecordBlobSize(string, string, string, int) {
}
39 changes: 23 additions & 16 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ type Server struct {
listener net.Listener
}

type ServiceResult struct {
meta commitments.CommitmentMeta
bodyLength uint
}

func NewServer(host string, port int, router store.IRouter, log log.Logger,
m metrics.Metricer) *Server {
endpoint := net.JoinHostPort(host, strconv.Itoa(port))
Expand All @@ -63,14 +68,16 @@ func NewServer(host string, port int, router store.IRouter, log log.Logger,
}

// WithMetrics is a middleware that records metrics for the route path.
func WithMetrics(handleFn func(http.ResponseWriter, *http.Request) (commitments.CommitmentMeta, error),
func WithMetrics(handleFn func(http.ResponseWriter, *http.Request) (ServiceResult, error),
m metrics.Metricer) func(http.ResponseWriter, *http.Request) error {
return func(w http.ResponseWriter, r *http.Request) error {
recordDur := m.RecordRPCServerRequest(r.Method)

meta, err := handleFn(w, r)
res, err := handleFn(w, r)

// we assume that every route will set the status header
recordDur(w.Header().Get("status"), string(meta.Mode), meta.CertVersion)
recordDur(w.Header().Get("status"), string(res.meta.Mode), res.meta.CertVersion)
m.RecordBlobSize(r.Method, string(res.meta.Mode), res.meta.CertVersion, int(res.bodyLength))
return err
}
}
Expand Down Expand Up @@ -149,47 +156,47 @@ func (svr *Server) Health(w http.ResponseWriter, _ *http.Request) error {
return nil
}

func (svr *Server) HandleGet(w http.ResponseWriter, r *http.Request) (commitments.CommitmentMeta, error) {
func (svr *Server) HandleGet(w http.ResponseWriter, r *http.Request) (ServiceResult, error) {
meta, err := ReadCommitmentMeta(r)
if err != nil {
svr.WriteBadRequest(w, invalidCommitmentMode)
return meta, err
return ServiceResult{meta: meta}, err
}
key := path.Base(r.URL.Path)
comm, err := commitments.StringToDecodedCommitment(key, meta.Mode)
if err != nil {
svr.log.Info("failed to decode commitment", "err", err, "commitment", comm)
w.WriteHeader(http.StatusBadRequest)
return meta, err
return ServiceResult{meta: meta}, err
}

input, err := svr.router.Get(r.Context(), comm, meta.Mode)
if err != nil && errors.Is(err, ErrNotFound) {
svr.WriteNotFound(w, err.Error())
return meta, err
return ServiceResult{meta: meta}, err
}

if err != nil {
svr.WriteInternalError(w, err)
return meta, err
return ServiceResult{meta: meta}, err
}

svr.WriteResponse(w, input)
return meta, nil
return ServiceResult{meta: meta, bodyLength: uint(len(input))}, nil
}

func (svr *Server) HandlePut(w http.ResponseWriter, r *http.Request) (commitments.CommitmentMeta, error) {
func (svr *Server) HandlePut(w http.ResponseWriter, r *http.Request) (ServiceResult, error) {
meta, err := ReadCommitmentMeta(r)
if err != nil {
svr.WriteBadRequest(w, invalidCommitmentMode)
return meta, err
return ServiceResult{meta: meta}, err
}

input, err := io.ReadAll(r.Body)
if err != nil {
svr.log.Error("Failed to read request body", "err", err)
w.WriteHeader(http.StatusBadRequest)
return meta, err
return ServiceResult{meta: meta}, err
}

key := path.Base(r.URL.Path)
Expand All @@ -200,27 +207,27 @@ func (svr *Server) HandlePut(w http.ResponseWriter, r *http.Request) (commitment
if err != nil {
svr.log.Info("failed to decode commitment", "err", err, "key", key)
w.WriteHeader(http.StatusBadRequest)
return meta, err
return ServiceResult{meta: meta}, err
}
}

commitment, err := svr.router.Put(r.Context(), meta.Mode, comm, input)
if err != nil {
svr.WriteInternalError(w, err)
return meta, err
return ServiceResult{meta: meta}, err
}

responseCommit, err := commitments.EncodeCommitment(commitment, meta.Mode)
if err != nil {
svr.log.Info("failed to encode commitment", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return meta, err
return ServiceResult{meta: meta}, err
}

svr.log.Info(fmt.Sprintf("write commitment: %x\n", comm))
// write out encoded commitment
svr.WriteResponse(w, responseCommit)
return meta, nil
return ServiceResult{meta: meta, bodyLength: uint(len(input))}, nil
}

func (svr *Server) WriteResponse(w http.ResponseWriter, data []byte) {
Expand Down

0 comments on commit 8cb04ac

Please sign in to comment.