Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TT-12893]: Adding first implementation of streams API #6496

Merged
merged 42 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
f91b5bb
Streaming POC
buger May 18, 2024
c100416
Directly manage streams instead of benthos rest api
buger May 19, 2024
649246f
Add TYk integration
buger May 20, 2024
f836206
Add fan-out mechanism
buger May 20, 2024
448600e
Add support for redis streams
buger May 21, 2024
5085d46
fixed panic where redis client wasn't ready when server came up
lonelycode May 23, 2024
f9a8ada
fixd panic where if messages were still in redis stream when client d…
lonelycode May 23, 2024
2a35bde
Fix bugs
buger May 23, 2024
95102b9
Add portal support
buger May 23, 2024
155e860
Fix router issue
buger Jun 27, 2024
ed4f297
Refactor streaming functionality to become a standard middleware
buger Jun 29, 2024
9fa9e11
Fix consumer group and some panics
buger Jun 29, 2024
2e7fd38
Add new Labs configuration option, and ensure that streaming runs onl…
buger Jun 29, 2024
5ac6fb8
Fix linter
buger Jun 30, 2024
fe874e7
Do not load unsafe components
buger Jun 30, 2024
ed19b44
Fix gomod
buger Jun 30, 2024
47772fc
Add mutex to unload function
buger Jun 30, 2024
8495893
refactor: Implement direct user-to-stream connections in streaming mi…
buger Aug 11, 2024
a0bf8a0
feat: initialize default consumer group manager in StreamingMiddleware
buger Aug 12, 2024
a310bc7
refactor: optimize streaming middleware request processing
buger Aug 12, 2024
d096c8d
Rename a few methods
buger Aug 21, 2024
a254a72
refactor: update streaming implementation to use single Stream object
buger Aug 21, 2024
5e94fee
refactor: update Stream struct and methods in manager.go
buger Aug 21, 2024
74c65ef
refactor: optimize stream reset in StreamingMiddleware Unload method
buger Aug 21, 2024
bb554c5
refactor: update stream counting in StreamingMiddleware Unload method
buger Aug 21, 2024
e5c9184
refactor: improve stream stopping mechanism
buger Aug 21, 2024
1398fa7
feat: add time import to streaming manager
buger Aug 21, 2024
81c48b1
feat: enhance AsyncAPI test robustness and logging
buger Aug 21, 2024
f8d36a4
feat: enhance AsyncAPI test with increased timeouts and logging
buger Aug 21, 2024
19519ae
test: increase timeouts and add more logging in AsyncAPI test
buger Aug 21, 2024
3b0e40c
feat: optimize AsyncAPI HTTP test for faster execution
buger Aug 21, 2024
a3956df
refactor: simplify streaming middleware configuration
buger Aug 21, 2024
444fa16
refactor: remove GC job and simplify resource management in streaming…
buger Aug 21, 2024
8c70245
Small refactor
buger Aug 21, 2024
ec949b5
More unused constants
buger Aug 21, 2024
22b77ea
modified manager test
kofoworola Sep 3, 2024
0c2e70d
nats stream
kofoworola Sep 4, 2024
75689c4
fixed tests
kofoworola Sep 10, 2024
c4f9fc9
fixed syntax error
kofoworola Sep 11, 2024
dd96a6a
gofmt run
kofoworola Sep 11, 2024
6766c86
fix schema
kofoworola Sep 12, 2024
1cc0698
fix go imports
kofoworola Sep 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ tyk-gateway.pid
*.go-e

tyk_linux_*
.aider*
/dist/

.terraform**
.terraform.lock.hcl
.task/
*.test

main
main
2 changes: 1 addition & 1 deletion apidef/oas/linter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestXTykGateway_Lint(t *testing.T) {
settings.Server.EventHandlers[i].Webhook.CoolDownPeriod = ReadableDuration(time.Second * 20)
}

for idx, _ := range settings.Middleware.Operations {
for idx := range settings.Middleware.Operations {
settings.Middleware.Operations[idx].CircuitBreaker.Threshold = 0.5
}

Expand Down
11 changes: 11 additions & 0 deletions benthos.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
input:
stdin: {}

output:
http_server:
address: "0.0.0.0:4196"
path: /get
stream_path: /get/stream
ws_path: /get/ws
allowed_verbs:
- GET
19 changes: 19 additions & 0 deletions cli/linter/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,23 @@
}
}
}
},
"Streaming": {
"type": "object",
"properties": {
"enabled": {
"type": "boolean"
},
"allow_unsafe": {
"type": "array"
}
}
}
},
"properties": {
"streaming": {
"$ref": "#/definitions/Streaming"
},
"allow_insecure_configs": {
"type": "boolean"
},
Expand Down Expand Up @@ -1203,6 +1217,11 @@
"oas_config": {
"validate_examples": false,
"validate_schema_defaults": false
},
"labs": {
"type": ["object", "null"],
"additionalProperties": true,
"properties": {}
}
}
}
26 changes: 26 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ var (
LivenessCheck: LivenessCheckConfig{
CheckDuration: time.Second * 10,
},
Streaming: StreamingConfig{
Enabled: false,
AllowUnsafe: []string{},
},
}
)

Expand Down Expand Up @@ -663,6 +667,12 @@ func (pwl *PortsWhiteList) Decode(value string) error {
return nil
}

// Add this new struct definition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should be changed

type StreamingConfig struct {
Enabled bool `json:"enabled"`
AllowUnsafe []string `json:"allow_unsafe"`
}

// Config is the configuration object used by Tyk to set up various parameters.
type Config struct {
// Force your Gateway to work only on a specific domain name. Can be overridden by API custom domain.
Expand Down Expand Up @@ -1131,6 +1141,22 @@ type Config struct {

// OAS holds the configuration for various OpenAPI-specific functionalities
OAS OASConfig `json:"oas_config"`

Streaming StreamingConfig `json:"streaming"`

Labs labsConfig `json:"labs"`
}

type labsConfig map[string]interface{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a public symbol as it's part of the Config data model


func (lc *labsConfig) Decode(value string) error {
var temp map[string]interface{}
if err := json.Unmarshal([]byte(value), &temp); err != nil {
log.Error("Error unmarshalling labsConfig: ", err)
return err
}
*lc = temp
return nil
}

// OASConfig holds the configuration for various OpenAPI-specific functionalities
Expand Down
16 changes: 15 additions & 1 deletion gateway/api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ type APISpec struct {
AnalyticsPluginConfig *GoAnalyticsPlugin

middlewareChain *ChainObject
unloadHooks []func()

network analytics.NetworkStats

Expand All @@ -237,8 +238,16 @@ func (a *APISpec) GetSessionLifetimeRespectsKeyExpiration() bool {
return a.SessionLifetimeRespectsKeyExpiration
}

// AddUnloadHook adds a function to be called when the API spec is unloaded
func (s *APISpec) AddUnloadHook(hook func()) {
s.unloadHooks = append(s.unloadHooks, hook)
}

// Release releases all resources associated with API spec
func (s *APISpec) Release() {
func (s *APISpec) Unload() {
s.Lock()
defer s.Unlock()

// release circuit breaker resources
for _, path := range s.RxPaths {
for _, urlSpec := range path {
Expand Down Expand Up @@ -267,6 +276,11 @@ func (s *APISpec) Release() {
s.HTTPTransport.transport.CloseIdleConnections()
s.HTTPTransport = nil
}

for _, hook := range s.unloadHooks {
hook()
}
s.unloadHooks = nil
}

// Validate returns nil if s is a valid spec and an error stating why the spec is not valid.
Expand Down
14 changes: 9 additions & 5 deletions gateway/api_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ func (gw *Gateway) processSpec(spec *APISpec, apisByListen map[string]int,

gw.mwAppendEnabled(&chainArray, &RateLimitForAPI{BaseMiddleware: baseMid})
gw.mwAppendEnabled(&chainArray, &GraphQLMiddleware{BaseMiddleware: baseMid})
gw.mwAppendEnabled(&chainArray, &StreamingMiddleware{BaseMiddleware: baseMid})

if !spec.UseKeylessAccess {
gw.mwAppendEnabled(&chainArray, &GraphQLComplexityMiddleware{BaseMiddleware: baseMid})
gw.mwAppendEnabled(&chainArray, &GraphQLGranularAccessMiddleware{BaseMiddleware: baseMid})
Expand Down Expand Up @@ -946,7 +948,6 @@ func (gw *Gateway) loadApps(specs []*APISpec) {
gw.loadControlAPIEndpoints(router)

muxer.setRouter(port, "", router, gw.GetConfig())

gs := gw.prepareStorage()
shouldTrace := trace.IsEnabled()

Expand Down Expand Up @@ -996,13 +997,15 @@ func (gw *Gateway) loadApps(specs []*APISpec) {

gw.DefaultProxyMux.swap(muxer, gw)

var specsToRelease []*APISpec
var specsToUnload []*APISpec

gw.apisMu.Lock()

for _, spec := range specs {
curSpec, ok := gw.apisByID[spec.APIID]
if ok && curSpec != nil && shouldReloadSpec(curSpec, spec) {
specsToRelease = append(specsToRelease, curSpec)
mainLog.Debugf("Spec %s has changed and needs to be reloaded", curSpec.APIID)
specsToUnload = append(specsToUnload, curSpec)
}

// Bind versions to base APIs again
Expand All @@ -1018,8 +1021,9 @@ func (gw *Gateway) loadApps(specs []*APISpec) {

gw.apisMu.Unlock()

for _, spec := range specsToRelease {
spec.Release()
for _, spec := range specsToUnload {
mainLog.Debugf("Unloading spec %s", spec.APIID)
spec.Unload()
}

mainLog.Debug("Checker host list")
Expand Down
16 changes: 15 additions & 1 deletion gateway/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type TykMiddleware interface {
ProcessRequest(w http.ResponseWriter, r *http.Request, conf interface{}) (error, int) // Handles request
EnabledForSpec() bool
Name() string

GetSpec() *APISpec

Unload()
}

type TraceMiddleware struct {
Expand Down Expand Up @@ -124,6 +128,9 @@ func (gw *Gateway) createMiddleware(actualMW TykMiddleware) func(http.Handler) h
mw.SetName(mw.Name())
mw.Logger().Debug("Init")

spec := mw.GetSpec()
spec.AddUnloadHook(actualMW.Unload)

// Pull the configuration
mwConf, err := mw.Config()
if err != nil {
Expand Down Expand Up @@ -158,14 +165,15 @@ func (gw *Gateway) createMiddleware(actualMW TykMiddleware) func(http.Handler) h
}

startTime := time.Now()
mw.Logger().WithField("ts", startTime.UnixNano()).Debug("Started")
mw.Logger().WithField("ts", startTime.UnixNano()).WithField("mw", mw.Name()).Debug("Started")

if mw.Base().Spec.CORS.OptionsPassthrough && r.Method == "OPTIONS" {
h.ServeHTTP(w, r)
return
}

err, errCode := mw.ProcessRequest(w, r, mwConf)

if err != nil {
writeResponse := true
// Prevent double error write
Expand Down Expand Up @@ -271,6 +279,12 @@ func (t *BaseMiddleware) Config() (interface{}, error) {
return nil, nil
}

func (t *BaseMiddleware) Unload() {}

func (t *BaseMiddleware) GetSpec() *APISpec {
return t.Spec
}

func (t *BaseMiddleware) OrgSession(orgID string) (user.SessionState, bool) {

if rpc.IsEmergencyMode() {
Expand Down
9 changes: 8 additions & 1 deletion gateway/mw_context_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (m *MiddlewareContextVars) Name() string {
}

func (m *MiddlewareContextVars) EnabledForSpec() bool {
return m.Spec.EnableContextVars
return true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does MiddlewareContextVars.EnabledForSpec return always true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea for streaming is to have context vars always enabled. There won't be an option to disable it in UI

}

// ProcessRequest will run any checks on the request on the way through the system, return an error to have the chain fail
Expand Down Expand Up @@ -46,6 +46,13 @@ func (m *MiddlewareContextVars) ProcessRequest(w http.ResponseWriter, r *http.Re
contextDataObject[name] = c.Value
}

for key, vals := range r.Form {
name := "request_data_" + strings.Replace(key, "-", "_", -1)
if len(vals) > 0 {
contextDataObject[name] = vals[0]
}
}

ctxSetData(r, contextDataObject)

return nil, http.StatusOK
Expand Down
Loading
Loading