-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[TT-12893]: Adding first implementation of streams API #6496
Conversation
API Changes --- prev.txt 2024-09-12 15:16:42.513965515 +0000
+++ current.txt 2024-09-12 15:16:39.493961087 +0000
@@ -5155,6 +5155,10 @@
LivenessCheck: LivenessCheckConfig{
CheckDuration: time.Second * 10,
},
+ Streaming: StreamingConfig{
+ Enabled: false,
+ AllowUnsafe: []string{},
+ },
}
)
var Global func() Config
@@ -5765,6 +5769,10 @@
// OAS holds the configuration for various OpenAPI-specific functionalities
OAS OASConfig `json:"oas_config"`
+
+ Streaming StreamingConfig `json:"streaming"`
+
+ Labs labsConfig `json:"labs"`
}
Config is the configuration object used by Tyk to set up various parameters.
@@ -6363,6 +6371,12 @@
func (config *StorageOptionsConf) HostAddrs() (addrs []string)
HostAddrs returns a sanitized list of hosts to connect to.
+type StreamingConfig struct {
+ Enabled bool `json:"enabled"`
+ AllowUnsafe []string `json:"allow_unsafe"`
+}
+ Add this new struct definition
+
type Tracer struct {
// The name of the tracer to initialize. For instance appdash, to use appdash tracer
Name string `json:"name"`
@@ -7641,6 +7655,9 @@
The name for event handlers as defined in the API Definition JSON/BSON
format
+const (
+ ExtensionTykStreaming = "x-tyk-streaming"
+)
const ListDetailed = "detailed"
const LoopScheme = "tyk"
const OIDPREFIX = "openid"
@@ -7904,6 +7921,9 @@
func CloneAPI(a *APISpec) *APISpec
+func (s *APISpec) AddUnloadHook(hook func())
+ AddUnloadHook adds a function to be called when the API spec is unloaded
+
func (a *APISpec) CheckSpecMatchesStatus(r *http.Request, rxPaths []URLSpec, mode URLStatus) (bool, interface{})
CheckSpecMatchesStatus checks if a URL spec has a specific status.
Deprecated: The function doesn't follow go return conventions (T, ok);
@@ -7925,9 +7945,6 @@
func (a *APISpec) Init(authStore, sessionStore, healthStore, orgStore storage.Handler)
-func (s *APISpec) Release()
- Release releases all resources associated with API spec
-
func (a *APISpec) RequestValid(r *http.Request) (bool, RequestStatus)
RequestValid will check if an incoming request has valid version data and
return a RequestStatus that describes the status of the request
@@ -7947,6 +7964,9 @@
func (a *APISpec) URLAllowedAndIgnored(r *http.Request, rxPaths []URLSpec, whiteListStatus bool) (RequestStatus, interface{})
URLAllowedAndIgnored checks if a url is allowed and ignored.
+func (s *APISpec) Unload()
+ Release releases all resources associated with API spec
+
func (s *APISpec) Validate(oasConfig config.OASConfig) error
Validate returns nil if s is a valid spec and an error stating why the spec
is not valid.
@@ -8039,6 +8059,8 @@
FireEvent is added to the BaseMiddleware object so it is available across
the entire stack
+func (t *BaseMiddleware) GetSpec() *APISpec
+
func (t *BaseMiddleware) Init()
func (t *BaseMiddleware) Logger() (logger *logrus.Entry)
@@ -8053,6 +8075,8 @@
func (t *BaseMiddleware) SetRequestLogger(r *http.Request)
+func (t *BaseMiddleware) Unload()
+
func (t *BaseMiddleware) UpdateRequestSession(r *http.Request) bool
type BaseTykResponseHandler struct {
@@ -10305,6 +10329,29 @@
type StatsDSinkSanitizationFunc func(*bytes.Buffer, string)
+type StreamManager struct {
+ // Has unexported fields.
+}
+
+type StreamingMiddleware struct {
+ *BaseMiddleware
+
+ // Has unexported fields.
+}
+ StreamingMiddleware is a middleware that handles streaming functionality
+
+func (s *StreamingMiddleware) EnabledForSpec() bool
+
+func (s *StreamingMiddleware) Init()
+ Init initializes the middleware
+
+func (s *StreamingMiddleware) Name() string
+
+func (s *StreamingMiddleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ interface{}) (error, int)
+ ProcessRequest will handle the streaming functionality
+
+func (s *StreamingMiddleware) Unload()
+
type StripAuth struct {
*BaseMiddleware
}
@@ -10535,6 +10582,10 @@
ProcessRequest(w http.ResponseWriter, r *http.Request, conf interface{}) (error, int) // Handles request
EnabledForSpec() bool
Name() string
+
+ GetSpec() *APISpec
+
+ Unload()
}
type TykOsinServer struct { |
PR Reviewer Guide 🔍
|
PR Code Suggestions ✨
|
40e900d
to
af4484f
Compare
@@ -614,6 +618,12 @@ func (pwl *PortsWhiteList) Decode(value string) error { | |||
return nil | |||
} | |||
|
|||
// Add this new struct definition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment should be changed
@@ -18,7 +18,7 @@ func (m *MiddlewareContextVars) Name() string { | |||
} | |||
|
|||
func (m *MiddlewareContextVars) EnabledForSpec() bool { | |||
return m.Spec.EnableContextVars | |||
return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does MiddlewareContextVars.EnabledForSpec
return always true
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea for streaming is to have context vars always enabled. There won't be an option to disable it in UI
return true | ||
}) | ||
|
||
s.streamManagers = sync.Map{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to protect the fields of StreamingMiddleware
struct with a lock? Members of the struct have been modified by different methods. How can be sure about thread-safety? It's hard and error prone to follow the execution flow with eye. We can use the race detector but it might be useful with an integration test that crafted for this purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agree, i will be changing it in the next Pr that comes with a refactor.
This commit refactors the `internal/streaming/manager.go` file to update the `Stream` struct and its methods. The main changes include: 1. Remove unused imports: `crypto/sha256` and `sync`. 2. Update `addMetadata`, `GetHTTPPaths`, `removeUnsafe`, and `removeConsumerGroup` methods to work with the `Stream` struct instead of `StreamManager`. 3. Remove references to `StreamManager` and update method receivers to use `Stream`. 4. Update `GetHTTPPaths` to use the `streamConfig` field of the `Stream` struct instead of loading from a map. These changes align the `Stream` struct with the new design, allowing it to manage a single stream configuration and its associated operations.
This commit enhances the Stream.Stop() method to make it more robust and prevent nil pointer dereferences. The changes include: 1. Adding a nil check for the stream before attempting to stop it. 2. Implementing a timeout mechanism using context to prevent indefinite hanging. 3. Using a goroutine and channel for asynchronous stream stopping. 4. Improving logging to provide more detailed information about the stopping process. These modifications aim to resolve issues related to stopping non-existent streams and make the overall process more reliable.
This commit improves the `testAsyncAPIHttp` function in the `gateway/mw_streaming_test.go` file to increase test reliability and provide more detailed logging. The changes include: 1. Increase initial WebSocket connection delay from 1 to 2 seconds 2. Extend overall timeout from 10 to 30 seconds 3. Increase inactivity timeout from 2 to 5 seconds 4. Add final log message showing total received messages These modifications aim to give the test more time to receive messages and offer more comprehensive logging information, facilitating better diagnosis of potential issues in message reception.
This commit improves the `testAsyncAPIHttp` function in the `mw_streaming_test.go` file to provide better debugging information and increase the chances of successful message reception. The changes include: 1. Increase initial WebSocket connection stabilization delay to 5 seconds 2. Add logging for WebSocket connection stabilization 3. Extend overall timeout to 60 seconds 4. Increase inactivity timeout to 10 seconds 5. Add a 5-second delay before closing WebSocket connections 6. Improve logging throughout the test execution These modifications aim to address potential timing issues and provide more detailed information about the test's progress, facilitating easier debugging of any remaining issues.
194dcbc
to
6766c86
Compare
Quality Gate failedFailed conditions |
Labs labsConfig `json:"labs"` | ||
} | ||
|
||
type labsConfig map[string]interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a public symbol as it's part of the Config data model
This reverts commit 954e48c.
### **User description** Reverts #6496 ___ ### **PR Type** Bug fix, Enhancement, Dependencies ___ ### **Description** - Removed `StreamingMiddleware` from the middleware chain and updated spec handling methods in `gateway/api_loader.go`. - Simplified the `TykMiddleware` interface by removing unused methods and updated middleware creation in `gateway/middleware.go`. - Refactored API spec resource management by renaming and removing methods in `gateway/api_definition.go`. - Fixed a bug in `gateway/mw_context_vars.go` by updating the condition for enabling context variables. - Corrected type declarations in tests and improved code readability in `internal/graphql/graphql_request_test.go` and `apidef/oas/linter_test.go`. - Updated dependencies in `go.mod`, including downgrading `google.golang.org/grpc` and removing unnecessary indirect dependencies. ___ ### **Changes walkthrough** 📝 <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Enhancement</strong></td><td><details><summary>5 files</summary><table> <tr> <td> <details> <summary><strong>api_loader.go</strong><dd><code>Remove streaming middleware and update spec handling</code> </dd></summary> <hr> gateway/api_loader.go <li>Removed <code>StreamingMiddleware</code> from middleware chain.<br> <li> Renamed <code>specsToUnload</code> to <code>specsToRelease</code>.<br> <li> Changed method from <code>Unload</code> to <code>Release</code>.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6509/files#diff-cdf0b7f176c9d18e1a314b78ddefc2cb3a94b3de66f1f360174692c915734c68">+5/-9</a> </td> </tr> <tr> <td> <details> <summary><strong>middleware.go</strong><dd><code>Simplify middleware interface and creation process</code> </dd></summary> <hr> gateway/middleware.go <li>Removed <code>GetSpec</code> and <code>Unload</code> methods from <code>TykMiddleware</code> interface.<br> <li> Removed <code>AddUnloadHook</code> call in <code>createMiddleware</code>.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6509/files#diff-703054910891a4db633eca0f42ed779d6b4fa75cd9b3aa4c503e681364201c1b">+1/-15</a> </td> </tr> <tr> <td> <details> <summary><strong>api_definition.go</strong><dd><code>Refactor API spec resource management</code> </dd></summary> <hr> gateway/api_definition.go <li>Removed <code>unloadHooks</code> from <code>APISpec</code>.<br> <li> Renamed <code>Unload</code> method to <code>Release</code>.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6509/files#diff-0cf80174bbafb36f6d4f4308ebbd971b2833b76a936bad568220aa1a4ba0ee8b">+1/-15</a> </td> </tr> <tr> <td> <details> <summary><strong>linter_test.go</strong><dd><code>Improve loop readability in linter test</code> </dd></summary> <hr> apidef/oas/linter_test.go - Added explicit index variable in loop. </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6509/files#diff-b92239afd81e77a829fe7fe8410044dfd4dfda525d17dbf5f8811714a9c986d3">+1/-1</a> </td> </tr> <tr> <td> <details> <summary><strong>graphql_request.go</strong><dd><code>Enhance loop clarity in GraphQL request</code> </dd></summary> <hr> internal/graphql/graphql_request.go - Added explicit index variable in loop. </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6509/files#diff-8cc52a1c92c2035fddfc3c896e8028361b656a29a37c155ad262e9351ea8d540">+1/-1</a> </td> </tr> </table></details></td></tr><tr><td><strong>Bug fix</strong></td><td><details><summary>2 files</summary><table> <tr> <td> <details> <summary><strong>mw_context_vars.go</strong><dd><code>Update context variable enabling condition</code> </dd></summary> <hr> gateway/mw_context_vars.go - Changed `EnabledForSpec` to check `EnableContextVars`. </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6509/files#diff-73366a9f9970b8f9250a3fc204167d712626e646fb761a3b9295b7d74b176b55">+1/-8</a> </td> </tr> <tr> <td> <details> <summary><strong>graphql_request_test.go</strong><dd><code>Fix type declaration in GraphQL stats test</code> </dd></summary> <hr> internal/graphql/graphql_request_test.go - Updated map initialization to use explicit type declaration. </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6509/files#diff-70a41285c5f3c784f5725d991841d45e6345c9dcba57465dd7c63d0257168897">+2/-2</a> </td> </tr> </table></details></td></tr><tr><td><strong>Dependencies</strong></td><td><details><summary>1 files</summary><table> <tr> <td> <details> <summary><strong>go.mod</strong><dd><code>Update dependencies and clean up go.mod</code> </dd></summary> <hr> go.mod <li>Downgraded <code>google.golang.org/grpc</code> version.<br> <li> Removed several indirect dependencies.<br> </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6509/files#diff-33ef32bf6c23acb95f5902d7097b7a1d5128ca061167ec0716715b0b9eeaa5f6">+23/-307</a></td> </tr> </table></details></td></tr><tr><td><strong>Additional files (token-limit)</strong></td><td><details><summary>1 files</summary><table> <tr> <td> <details> <summary><strong>go.sum</strong><dd><code>...</code> </dd></summary> <hr> go.sum ... </details> </td> <td><a href="https://github.com/TykTechnologies/tyk/pull/6509/files#diff-3295df7234525439d778f1b282d146a4f1ff6b415248aaac074e8042d9f42d63">+65/-1134</a></td> </tr> </table></details></td></tr></tr></tbody></table> ___ > 💡 **PR-Agent usage**: >Comment `/help` on the PR to get a list of all available PR-Agent tools and their descriptions
User description
Description
Based off this POC here
TT-12893
Related Issue
Motivation and Context
How This Has Been Tested
Screenshots (if appropriate)
Types of changes
Checklist
PR Type
Enhancement, Tests
Description
StreamingMiddleware
to handle streaming functionality, including stream management and integration with API specifications.Stream
struct for managing streaming configurations, with methods for starting, stopping, and resetting streams.PortalClient
for interacting with the developer portal, including methods to list webhook credentials and fetch app details.portalOutput
for sending messages to webhooks, with configuration and connection handling.Changes walkthrough 📝
3 files
mw_streaming_test.go
Add comprehensive tests for streaming API functionality
gateway/mw_streaming_test.go
manager_test.go
Implement tests for Stream management and safety checks
internal/streaming/manager_test.go
portal_test.go
Add tests for PortalClient webhook credential listing
internal/portal/portal_test.go
6 files
mw_streaming.go
Implement StreamingMiddleware for API streaming functionality
gateway/mw_streaming.go
StreamingMiddleware
for handling streaming functionality.manager.go
Develop Stream struct for managing streaming configurations
internal/streaming/manager.go
Stream
struct for managing streaming configurations.portal_client.go
Add PortalClient for developer portal interactions
internal/portal/portal_client.go
PortalClient
for interacting with the developer portal.portal_output.go
Implement portalOutput for webhook message delivery
internal/portal/portal_output.go
portalOutput
for sending messages to webhooks.api_loader.go
Integrate StreamingMiddleware into API loading process
gateway/api_loader.go
StreamingMiddleware
into the API loading process.middleware.go
Enhance middleware interface with unload functionality
gateway/middleware.go
1 files
config.go
Add StreamingConfig to global configuration settings
config/config.go
StreamingConfig
struct to configuration.1 files
go.mod
Update dependencies for streaming and portal integration
go.mod
1 files
go.sum
...
go.sum
...