Skip to content

Commit

Permalink
Add Realtime Viewership API (#185)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Mar 13, 2024
1 parent 1798dc0 commit 3cf1819
Show file tree
Hide file tree
Showing 9 changed files with 697 additions and 158 deletions.
110 changes: 82 additions & 28 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ func (h *apiHandler) viewershipHandler() chi.Router {
h.withMetrics(router, "query_application_viewership").
With(h.cache(true)).
MethodFunc("GET", `/query`, h.queryViewership(true))
// realtime viewership
h.withMetrics(router, "query_realtime_viewership").
MethodFunc("GET", `/active`, h.queryRealtimeViewership())

return router
}
Expand Down Expand Up @@ -272,37 +275,14 @@ func ensureIsCreatorQuery(next http.Handler) http.Handler {

func (h *apiHandler) queryViewership(detailed bool) http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) {
var (
from, err1 = parseInputTimestamp(r.URL.Query().Get("from"))
to, err2 = parseInputTimestamp(r.URL.Query().Get("to"))
)
if errs := nonNilErrs(err1, err2); len(errs) > 0 {
respondError(rw, http.StatusBadRequest, errs...)
querySpec, httpErroCode, errs := h.resolveViewershipQuerySpec(r)
if len(errs) > 0 {
respondError(rw, httpErroCode, errs...)
return
}
querySpec.Detailed = detailed

userId := callerUserId(r)
if userId == "" {
respondError(rw, http.StatusInternalServerError, errors.New("request not authenticated"))
return
}

qs := r.URL.Query()
assetID, streamID := qs.Get("assetId"), qs.Get("streamId")
query := views.QuerySpec{
From: from,
To: to,
TimeStep: qs.Get("timeStep"),
Filter: views.QueryFilter{
UserID: userId,
PlaybackID: qs.Get("playbackId"),
CreatorID: qs.Get("creatorId"),
},
BreakdownBy: qs["breakdownBy[]"],
Detailed: detailed,
}

metrics, err := h.views.QueryEvents(r.Context(), query, assetID, streamID)
metrics, err := h.views.QueryEvents(r.Context(), querySpec)
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
Expand Down Expand Up @@ -406,6 +386,80 @@ func (h *apiHandler) queryTotalUsage() http.HandlerFunc {
}
}

func (h *apiHandler) queryRealtimeViewership() http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) {
querySpec, httpErrorCode, errs := h.resolveRealtimeViewershipQuerySpec(r)
if len(errs) > 0 {
respondError(rw, httpErrorCode, errs...)
return
}
metrics, err := h.views.QueryRealtimeEvents(r.Context(), querySpec)
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}
respondJson(rw, http.StatusOK, metrics)
}
}

func (h *apiHandler) resolveViewershipQuerySpec(r *http.Request) (views.QuerySpec, int, []error) {
var (
from, err1 = parseInputTimestamp(r.URL.Query().Get("from"))
to, err2 = parseInputTimestamp(r.URL.Query().Get("to"))
)
if errs := nonNilErrs(err1, err2); len(errs) > 0 {
return views.QuerySpec{}, http.StatusBadRequest, errs
}

userId := callerUserId(r)
if userId == "" {
return views.QuerySpec{}, http.StatusInternalServerError, []error{errors.New("request not authenticated")}
}

qs := r.URL.Query()
assetID, streamID := qs.Get("assetId"), qs.Get("streamId")
spec := views.QuerySpec{
From: from,
To: to,
TimeStep: qs.Get("timeStep"),
Filter: views.QueryFilter{
UserID: userId,
PlaybackID: qs.Get("playbackId"),
CreatorID: qs.Get("creatorId"),
},
BreakdownBy: qs["breakdownBy[]"],
}
spec, err := h.views.ResolvePlaybackId(spec, assetID, streamID)
if err != nil {
return views.QuerySpec{}, http.StatusInternalServerError, []error{err}
}

return spec, 0, []error{}
}

func (h *apiHandler) resolveRealtimeViewershipQuerySpec(r *http.Request) (views.QuerySpec, int, []error) {
spec, httpErrorCode, errs := h.resolveViewershipQuerySpec(r)
if spec.TimeStep != "" {
return views.QuerySpec{}, http.StatusBadRequest, []error{errors.New("timeStep is not supported for Realtime Viewership API")}
}
if spec.From == nil && spec.To != nil {
return views.QuerySpec{}, http.StatusBadRequest, []error{errors.New("param 'to' cannot be specified if 'from' is not defined")}
}
if spec.From != nil || spec.To != nil {
// If using time range, then we allow to query max 1 min before now(),
// because the current "per minute" aggregation may not be finalized yet
lastToAllowed := time.Now().Add(-1 * time.Minute)
if spec.To == nil || spec.To.After(lastToAllowed) {
spec.To = &lastToAllowed
}
if spec.To.Sub(*spec.From) > 3*time.Hour {
return views.QuerySpec{}, http.StatusBadRequest, []error{errors.New("requested time range cannot exceed 3 hours")}
}
}

return spec, httpErrorCode, errs
}

func (h *apiHandler) queryActiveUsersUsage() http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) {

Expand Down
4 changes: 4 additions & 0 deletions cmd/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func parseFlags(version string) cliFlags {
fs.StringVar(&cli.usageOpts.DailyUsageTable, "daily-data-table", "livepeer-analytics.staging.explorer_day_data", "BigQuery table to read total usage metrics from")
fs.StringVar(&cli.usageOpts.UsersTable, "users-table", "livepeer-analytics.staging.studio_users", "BigQuery table to read studio users from")
fs.Int64Var(&cli.viewsOpts.MaxBytesBilledPerBigQuery, "max-bytes-billed-per-big-query", 100*1024*1024 /* 100 MB */, "Max bytes billed configuration to use for the queries to BigQuery")
fs.StringVar(&cli.viewsOpts.ClickhouseOptions.Addr, "clickhouse-addr", "", "Address of Clickhouse Database")
fs.StringVar(&cli.viewsOpts.ClickhouseOptions.Database, "clickhouse-database", "", "Database name in Clickhouse")
fs.StringVar(&cli.viewsOpts.ClickhouseOptions.User, "clickhouse-user", "", "Clickhouse User")
fs.StringVar(&cli.viewsOpts.ClickhouseOptions.Password, "clickhouse-password", "", "Clickhouse Password")

flag.Set("logtostderr", "true")
glogVFlag := flag.Lookup("v")
Expand Down
65 changes: 39 additions & 26 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,50 +1,56 @@
module github.com/livepeer/livepeer-data

go 1.20
go 1.21

require (
cloud.google.com/go/bigquery v1.50.0
cloud.google.com/go/bigquery v1.52.0
github.com/ClickHouse/clickhouse-go/v2 v2.20.0
github.com/Masterminds/squirrel v1.5.4
github.com/go-chi/chi/v5 v5.0.8
github.com/golang/glog v1.1.1
github.com/google/uuid v1.3.0
github.com/google/uuid v1.6.0
github.com/ipfs/go-cid v0.4.1
github.com/livepeer/go-api-client v0.4.6
github.com/peterbourgon/ff v1.7.1
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.42.0
github.com/rabbitmq/amqp091-go v1.8.0
github.com/rabbitmq/rabbitmq-stream-go-client v1.1.1
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.4
github.com/victorspringer/http-cache v0.0.0-20221205073845-df6d061f29cb
golang.org/x/sync v0.1.0
google.golang.org/api v0.114.0
golang.org/x/sync v0.6.0
google.golang.org/api v0.126.0
)

require (
cloud.google.com/go v0.110.0 // indirect
cloud.google.com/go/compute v1.19.0 // indirect
cloud.google.com/go v0.110.4 // indirect
cloud.google.com/go/compute v1.21.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v0.13.0 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/arrow/go/v11 v11.0.0 // indirect
cloud.google.com/go/iam v1.1.1 // indirect
github.com/ClickHouse/ch-go v0.61.3 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/apache/arrow/go/v12 v12.0.0 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eventials/go-tus v0.0.0-20220610120217-05d0564bb571 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/goccy/go-json v0.9.11 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/googleapis/gax-go/v2 v2.11.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.16.3 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
Expand All @@ -60,26 +66,33 @@ require (
github.com/multiformats/go-multibase v0.0.3 // indirect
github.com/multiformats/go-multihash v0.0.15 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/tomnomnom/linkheader v0.0.0-20180905144013-02ca5825eb80 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/oauth2 v0.10.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230330154414-c0448cd141ea // indirect
google.golang.org/grpc v1.54.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 3cf1819

Please sign in to comment.