diff --git a/api/authorization.go b/api/authorization.go index 281b227..6fc3544 100644 --- a/api/authorization.go +++ b/api/authorization.go @@ -39,6 +39,7 @@ var ( } userIdContextKey = &contextKeys{"userId"} + projectIdContextKey = &contextKeys{"projectId"} isCallerAdminContextKey = &contextKeys{"isCallerAdmin"} ) @@ -94,6 +95,11 @@ func authorization(authUrl string) middleware { r = r.WithContext(ctx) } + if projectID := authRes.Header.Get("X-Livepeer-Project-Id"); projectID != "" { + ctx := context.WithValue(r.Context(), projectIdContextKey, projectID) + r = r.WithContext(ctx) + } + if isCallerAdmin, err := strconv.ParseBool(authRes.Header.Get("X-Livepeer-Is-Caller-Admin")); err == nil { ctx := context.WithValue(r.Context(), isCallerAdminContextKey, isCallerAdmin) r = r.WithContext(ctx) @@ -139,6 +145,13 @@ func callerUserId(r *http.Request) string { return "" } +func callerProjectId(r *http.Request) string { + if val, ok := r.Context().Value(projectIdContextKey).(string); ok { + return val + } + return "" +} + func isCallerAdmin(r *http.Request) bool { if val, ok := r.Context().Value(isCallerAdminContextKey).(bool); ok { return val diff --git a/api/handler.go b/api/handler.go index a91804d..6690946 100644 --- a/api/handler.go +++ b/api/handler.go @@ -433,6 +433,7 @@ func (h *apiHandler) resolveViewershipQuerySpec(r *http.Request) (views.QuerySpe if userId == "" { return views.QuerySpec{}, http.StatusInternalServerError, []error{errors.New("request not authenticated")} } + projectId := callerProjectId(r) qs := r.URL.Query() assetID, streamID := qs.Get("assetId"), qs.Get("streamId") @@ -442,6 +443,7 @@ func (h *apiHandler) resolveViewershipQuerySpec(r *http.Request) (views.QuerySpe TimeStep: qs.Get("timeStep"), Filter: views.QueryFilter{ UserID: userId, + ProjectID: projectId, PlaybackID: qs.Get("playbackId"), CreatorID: qs.Get("creatorId"), }, diff --git a/go.mod b/go.mod index 4f16602..2d98159 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/golang/glog v1.1.1 github.com/google/uuid v1.6.0 github.com/ipfs/go-cid v0.4.1 - github.com/livepeer/go-api-client v0.4.6 + github.com/livepeer/go-api-client v0.4.23 github.com/peterbourgon/ff v1.7.1 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/common v0.42.0 diff --git a/go.sum b/go.sum index a568dd9..e89b892 100644 --- a/go.sum +++ b/go.sum @@ -193,6 +193,10 @@ github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhR github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw= github.com/livepeer/go-api-client v0.4.6 h1:Eo9mq5k9gnu8fgclbT7ibQdTgBV7a/NDIBIuYXHMzV0= github.com/livepeer/go-api-client v0.4.6/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= +github.com/livepeer/go-api-client v0.4.23-0.20240523110406-c26cb9ff5975 h1:uJ1aVcuuWLP7efVffq0pqPRZiEqkh1r3iEDSsw+1+xw= +github.com/livepeer/go-api-client v0.4.23-0.20240523110406-c26cb9ff5975/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= +github.com/livepeer/go-api-client v0.4.23 h1:buvWJGxLzwsmvkXaxdI2bOUOAiYZJtcTMfFWMqnOrco= +github.com/livepeer/go-api-client v0.4.23/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= diff --git a/views/clickhouse.go b/views/clickhouse.go index fc324c3..16237a2 100644 --- a/views/clickhouse.go +++ b/views/clickhouse.go @@ -96,6 +96,9 @@ func buildRealtimeViewsEventsQuery(spec QuerySpec) (string, []interface{}, error From("viewership_current_counts"). Where("user_id = ?", spec.Filter.UserID). Limit(maxClickhouseResultRows + 1) + if spec.Filter.ProjectID != "" { + query = query.Where("project_id = ?", spec.Filter.ProjectID) + } return toSqlWithFiltersAndBreakdown(query, spec) } @@ -110,6 +113,9 @@ func buildTimeSeriesRealtimeViewsEventsQuery(spec QuerySpec) (string, []interfac GroupBy("timestamp_ts"). OrderBy("timestamp_ts desc"). Limit(maxClickhouseResultRows + 1) + if spec.Filter.ProjectID != "" { + query = query.Where("project_id = ?", spec.Filter.ProjectID) + } if spec.From != nil { // timestamp_ts is DateTime, but it's automatically converted to seconds query = query.Where("timestamp_ts >= ?", spec.From.UnixMilli()/1000) diff --git a/views/client.go b/views/client.go index 99960b8..f52a488 100644 --- a/views/client.go +++ b/views/client.go @@ -177,6 +177,9 @@ func (c *Client) ResolvePlaybackId(spec QuerySpec, assetID, streamID string) (Qu if res.Filter.UserID != asset.UserID { return QuerySpec{}, fmt.Errorf("error getting asset: verify that asset exists and you are using proper credentials") } + if res.Filter.ProjectID != "" && asset.ProjectID != "" && res.Filter.ProjectID != asset.ProjectID { + return QuerySpec{}, fmt.Errorf("error getting asset: verify that asset exists and you are using proper credentials related to the project") + } } } else if streamID != "" { var stream *livepeer.Stream @@ -186,6 +189,9 @@ func (c *Client) ResolvePlaybackId(spec QuerySpec, assetID, streamID string) (Qu if res.Filter.UserID != stream.UserID { return QuerySpec{}, fmt.Errorf("error getting stream: verify that stream exists and you are using proper credentials") } + if res.Filter.ProjectID != "" && stream.ProjectID != "" && res.Filter.ProjectID != stream.UserID { + return QuerySpec{}, fmt.Errorf("error getting stream: verify that stream exists and you are using proper credentials related to the project") + } } } diff --git a/views/query_spec.go b/views/query_spec.go index 17bea05..f1dfd4c 100644 --- a/views/query_spec.go +++ b/views/query_spec.go @@ -10,6 +10,7 @@ type QueryFilter struct { PlaybackID string CreatorID string UserID string + ProjectID string } type QuerySpec struct {