Skip to content

Commit

Permalink
[TT-12893]: Adding first implementation of streams API (#6496)
Browse files Browse the repository at this point in the history
### **User description**
<!-- Provide a general summary of your changes in the Title above -->

## Description
Based off this POC
[here](#6291)
[TT-12893](https://tyktech.atlassian.net/browse/TT-12893)

<!-- Describe your changes in detail -->

## Related Issue

<!-- This project only accepts pull requests related to open issues. -->
<!-- If suggesting a new feature or change, please discuss it in an
issue first. -->
<!-- If fixing a bug, there should be an issue describing it with steps
to reproduce. -->
<!-- OSS: Please link to the issue here. Tyk: please create/link the
JIRA ticket. -->

## Motivation and Context

<!-- Why is this change required? What problem does it solve? -->

## How This Has Been Tested

<!-- Please describe in detail how you tested your changes -->
<!-- Include details of your testing environment, and the tests -->
<!-- you ran to see how your change affects other areas of the code,
etc. -->
<!-- This information is helpful for reviewers and QA. -->

## Screenshots (if appropriate)

## Types of changes

<!-- What types of changes does your code introduce? Put an `x` in all
the boxes that apply: -->

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Refactoring or add test (improvements in base code or adds test
coverage to functionality)

## Checklist

<!-- Go over all the following points, and put an `x` in all the boxes
that apply -->
<!-- If there are no documentation updates required, mark the item as
checked. -->
<!-- Raise up any additional concerns not covered by the checklist. -->

- [ ] I ensured that the documentation is up to date
- [ ] I explained why this PR updates go.mod in detail with reasoning
why it's required
- [ ] I would like a code coverage CI quality gate exception and have
explained why


[TT-12893]:
https://tyktech.atlassian.net/browse/TT-12893?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ


___

### **PR Type**
Enhancement, Tests


___

### **Description**
- Implemented `StreamingMiddleware` to handle streaming functionality,
including stream management and integration with API specifications.
- Added comprehensive tests for streaming API, covering single and
multiple client scenarios, asynchronous API, and WebSocket connections.
- Developed `Stream` struct for managing streaming configurations, with
methods for starting, stopping, and resetting streams.
- Introduced `PortalClient` for interacting with the developer portal,
including methods to list webhook credentials and fetch app details.
- Implemented `portalOutput` for sending messages to webhooks, with
configuration and connection handling.
- Enhanced middleware interface to support unloading functionality, with
hooks for unloading middleware when API specs change.
- Updated dependencies to support new streaming and portal
functionalities, including Kafka, NATS, and Benthos integration.


___



### **Changes walkthrough** 📝
<table><thead><tr><th></th><th align="left">Relevant
files</th></tr></thead><tbody><tr><td><strong>Tests</strong></td><td><details><summary>3
files</summary><table>
<tr>
  <td>
    <details>
<summary><strong>mw_streaming_test.go</strong><dd><code>Add
comprehensive tests for streaming API functionality</code>&nbsp; &nbsp;
</dd></summary>
<hr>

gateway/mw_streaming_test.go

<li>Added tests for streaming API with single and multiple clients.<br>
<li> Implemented setup functions for streaming API tests.<br> <li>
Included test cases for asynchronous API and WebSocket connections.<br>


</details>


  </td>
<td><a
href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-a0d1bd0196a741537a3c850e340225c8993e49d709c838af0f1b48b9893af1da">+670/-0</a>&nbsp;
</td>

</tr>                    

<tr>
  <td>
    <details>
<summary><strong>manager_test.go</strong><dd><code>Implement tests for
Stream management and safety checks</code>&nbsp; &nbsp; </dd></summary>
<hr>

internal/streaming/manager_test.go

<li>Added tests for starting and stopping streams.<br> <li> Tested
removal and whitelisting of unsafe components.<br> <li> Verified stream
configuration handling.<br>


</details>


  </td>
<td><a
href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-c3ed576ae7d6430b9ac57b46c39d64981c274188c807ff4f0cfc4f15753e67af">+151/-0</a>&nbsp;
</td>

</tr>                    

<tr>
  <td>
    <details>
<summary><strong>portal_test.go</strong><dd><code>Add tests for
PortalClient webhook credential listing</code>&nbsp; &nbsp; &nbsp;
&nbsp; </dd></summary>
<hr>

internal/portal/portal_test.go

<li>Added mock server for testing portal client interactions.<br> <li>
Implemented tests for listing webhook credentials.<br> <li> Verified
correct handling of multiple apps and webhooks.<br>


</details>


  </td>
<td><a
href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-2d5d9f02d0c6c149d531f5471b69936ccbf414a02d977813803fc3eafe15052d">+76/-0</a>&nbsp;
&nbsp; </td>

</tr>                    

</table></details></td></tr><tr><td><strong>Enhancement</strong></td><td><details><summary>6
files</summary><table>
<tr>
  <td>
    <details>
<summary><strong>mw_streaming.go</strong><dd><code>Implement
StreamingMiddleware for API streaming
functionality</code></dd></summary>
<hr>

gateway/mw_streaming.go

<li>Implemented <code>StreamingMiddleware</code> for handling streaming
functionality.<br> <li> Added methods for initializing, creating, and
removing streams.<br> <li> Integrated stream management with API
specifications.<br>


</details>


  </td>
<td><a
href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-6f565750150d990575c808f1ca8f38483160dc6edf05f1534cd0bedb27c2e6c8">+298/-0</a>&nbsp;
</td>

</tr>                    

<tr>
  <td>
    <details>
<summary><strong>manager.go</strong><dd><code>Develop Stream struct for
managing streaming configurations</code></dd></summary>
<hr>

internal/streaming/manager.go

<li>Created <code>Stream</code> struct for managing streaming
configurations.<br> <li> Added methods for starting, stopping, and
resetting streams.<br> <li> Implemented safety checks for removing
unsafe components.<br>


</details>


  </td>
<td><a
href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-8cda02479026085d17a6c5559a15bf59638174d16fb5f3ad65bebcecb25ad8d8">+236/-0</a>&nbsp;
</td>

</tr>                    

<tr>
  <td>
    <details>
<summary><strong>portal_client.go</strong><dd><code>Add PortalClient for
developer portal interactions</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; </dd></summary>
<hr>

internal/portal/portal_client.go

<li>Introduced <code>PortalClient</code> for interacting with the
developer portal.<br> <li> Added methods to list webhook credentials and
fetch app details.<br> <li> Defined structures for app and webhook
details.<br>


</details>


  </td>
<td><a
href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-0752a118f5d1513887f5ebf9930ae9c382a16798c3a4a301dac34523b5010289">+152/-0</a>&nbsp;
</td>

</tr>                    

<tr>
  <td>
    <details>
<summary><strong>portal_output.go</strong><dd><code>Implement
portalOutput for webhook message delivery</code>&nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; </dd></summary>
<hr>

internal/portal/portal_output.go

<li>Implemented <code>portalOutput</code> for sending messages to
webhooks.<br> <li> Added configuration and connection handling for
portal output.<br> <li> Registered output plugin with Benthos
service.<br>


</details>


  </td>
<td><a
href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-5225b7c3a3d3677eed2edb98be754e5cc70a124b0af26485511a0528f535048b">+128/-0</a>&nbsp;
</td>

</tr>                    

<tr>
  <td>
    <details>
<summary><strong>api_loader.go</strong><dd><code>Integrate
StreamingMiddleware into API loading process</code>&nbsp; &nbsp; &nbsp;
</dd></summary>
<hr>

gateway/api_loader.go

<li>Integrated <code>StreamingMiddleware</code> into the API loading
process.<br> <li> Adjusted logic for unloading API specifications.<br>


</details>


  </td>
<td><a
href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-cdf0b7f176c9d18e1a314b78ddefc2cb3a94b3de66f1f360174692c915734c68">+9/-5</a>&nbsp;
&nbsp; &nbsp; </td>

</tr>                    

<tr>
  <td>
    <details>
<summary><strong>middleware.go</strong><dd><code>Enhance middleware
interface with unload functionality</code>&nbsp; &nbsp; &nbsp;
</dd></summary>
<hr>

gateway/middleware.go

<li>Enhanced middleware interface with unload functionality.<br> <li>
Added hooks for unloading middleware when API specs change.<br>


</details>


  </td>
<td><a
href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-703054910891a4db633eca0f42ed779d6b4fa75cd9b3aa4c503e681364201c1b">+15/-1</a>&nbsp;
&nbsp; </td>

</tr>                    
</table></details></td></tr><tr><td><strong>Configuration
changes</strong></td><td><details><summary>1 files</summary><table>
<tr>
  <td>
    <details>
<summary><strong>config.go</strong><dd><code>Add StreamingConfig to
global configuration settings</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
</dd></summary>
<hr>

config/config.go

<li>Added <code>StreamingConfig</code> struct to configuration.<br> <li>
Enabled streaming configuration in global settings.<br>


</details>


  </td>
<td><a
href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-fe44f09c4d5977b5f5eaea29170b6a0748819c9d02271746a20d81a5f3efca17">+26/-0</a>&nbsp;
&nbsp; </td>

</tr>                    

</table></details></td></tr><tr><td><strong>Dependencies</strong></td><td><details><summary>1
files</summary><table>
<tr>
  <td>
    <details>
<summary><strong>go.mod</strong><dd><code>Update dependencies for
streaming and portal integration</code>&nbsp; </dd></summary>
<hr>

go.mod

<li>Updated dependencies for streaming and portal functionalities.<br>
<li> Added new modules for Kafka, NATS, and Benthos integration.<br>


</details>


  </td>
<td><a
href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-33ef32bf6c23acb95f5902d7097b7a1d5128ca061167ec0716715b0b9eeaa5f6">+307/-23</a></td>

</tr>                    
</table></details></td></tr><tr><td><strong>Additional files
(token-limit)</strong></td><td><details><summary>1
files</summary><table>
<tr>
  <td>
    <details>
<summary><strong>go.sum</strong><dd><code>...</code>&nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; </dd></summary>
<hr>

go.sum

...



</details>


  </td>
<td><a
href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-3295df7234525439d778f1b282d146a4f1ff6b415248aaac074e8042d9f42d63">+1134/-65</a></td>

</tr>                    
</table></details></td></tr></tr></tbody></table>

___

> 💡 **PR-Agent usage**:
>Comment `/help` on the PR to get a list of all available PR-Agent tools
and their descriptions

---------

Co-authored-by: Leonid Bugaev <leonsbox@gmail.com>
Co-authored-by: Martin Buhr <martin@tyk.io>
  • Loading branch information
3 people authored Sep 12, 2024
1 parent 6f61900 commit 954e48c
Show file tree
Hide file tree
Showing 22 changed files with 3,338 additions and 100 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ tyk-gateway.pid
*.go-e

tyk_linux_*
.aider*
/dist/

.terraform**
.terraform.lock.hcl
.task/
*.test

main
main
2 changes: 1 addition & 1 deletion apidef/oas/linter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestXTykGateway_Lint(t *testing.T) {
settings.Server.EventHandlers[i].Webhook.CoolDownPeriod = ReadableDuration(time.Second * 20)
}

for idx, _ := range settings.Middleware.Operations {
for idx := range settings.Middleware.Operations {
settings.Middleware.Operations[idx].CircuitBreaker.Threshold = 0.5
}

Expand Down
11 changes: 11 additions & 0 deletions benthos.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
input:
stdin: {}

output:
http_server:
address: "0.0.0.0:4196"
path: /get
stream_path: /get/stream
ws_path: /get/ws
allowed_verbs:
- GET
19 changes: 19 additions & 0 deletions cli/linter/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,23 @@
}
}
}
},
"Streaming": {
"type": "object",
"properties": {
"enabled": {
"type": "boolean"
},
"allow_unsafe": {
"type": "array"
}
}
}
},
"properties": {
"streaming": {
"$ref": "#/definitions/Streaming"
},
"allow_insecure_configs": {
"type": "boolean"
},
Expand Down Expand Up @@ -1203,6 +1217,11 @@
"oas_config": {
"validate_examples": false,
"validate_schema_defaults": false
},
"labs": {
"type": ["object", "null"],
"additionalProperties": true,
"properties": {}
}
}
}
26 changes: 26 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ var (
LivenessCheck: LivenessCheckConfig{
CheckDuration: time.Second * 10,
},
Streaming: StreamingConfig{
Enabled: false,
AllowUnsafe: []string{},
},
}
)

Expand Down Expand Up @@ -663,6 +667,12 @@ func (pwl *PortsWhiteList) Decode(value string) error {
return nil
}

// Add this new struct definition
type StreamingConfig struct {
Enabled bool `json:"enabled"`
AllowUnsafe []string `json:"allow_unsafe"`
}

// Config is the configuration object used by Tyk to set up various parameters.
type Config struct {
// Force your Gateway to work only on a specific domain name. Can be overridden by API custom domain.
Expand Down Expand Up @@ -1131,6 +1141,22 @@ type Config struct {

// OAS holds the configuration for various OpenAPI-specific functionalities
OAS OASConfig `json:"oas_config"`

Streaming StreamingConfig `json:"streaming"`

Labs labsConfig `json:"labs"`
}

type labsConfig map[string]interface{}

func (lc *labsConfig) Decode(value string) error {
var temp map[string]interface{}
if err := json.Unmarshal([]byte(value), &temp); err != nil {
log.Error("Error unmarshalling labsConfig: ", err)
return err
}
*lc = temp
return nil
}

// OASConfig holds the configuration for various OpenAPI-specific functionalities
Expand Down
16 changes: 15 additions & 1 deletion gateway/api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ type APISpec struct {
AnalyticsPluginConfig *GoAnalyticsPlugin

middlewareChain *ChainObject
unloadHooks []func()

network analytics.NetworkStats

Expand All @@ -237,8 +238,16 @@ func (a *APISpec) GetSessionLifetimeRespectsKeyExpiration() bool {
return a.SessionLifetimeRespectsKeyExpiration
}

// AddUnloadHook adds a function to be called when the API spec is unloaded
func (s *APISpec) AddUnloadHook(hook func()) {
s.unloadHooks = append(s.unloadHooks, hook)
}

// Release releases all resources associated with API spec
func (s *APISpec) Release() {
func (s *APISpec) Unload() {
s.Lock()
defer s.Unlock()

// release circuit breaker resources
for _, path := range s.RxPaths {
for _, urlSpec := range path {
Expand Down Expand Up @@ -267,6 +276,11 @@ func (s *APISpec) Release() {
s.HTTPTransport.transport.CloseIdleConnections()
s.HTTPTransport = nil
}

for _, hook := range s.unloadHooks {
hook()
}
s.unloadHooks = nil
}

// Validate returns nil if s is a valid spec and an error stating why the spec is not valid.
Expand Down
14 changes: 9 additions & 5 deletions gateway/api_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ func (gw *Gateway) processSpec(spec *APISpec, apisByListen map[string]int,

gw.mwAppendEnabled(&chainArray, &RateLimitForAPI{BaseMiddleware: baseMid})
gw.mwAppendEnabled(&chainArray, &GraphQLMiddleware{BaseMiddleware: baseMid})
gw.mwAppendEnabled(&chainArray, &StreamingMiddleware{BaseMiddleware: baseMid})

if !spec.UseKeylessAccess {
gw.mwAppendEnabled(&chainArray, &GraphQLComplexityMiddleware{BaseMiddleware: baseMid})
gw.mwAppendEnabled(&chainArray, &GraphQLGranularAccessMiddleware{BaseMiddleware: baseMid})
Expand Down Expand Up @@ -946,7 +948,6 @@ func (gw *Gateway) loadApps(specs []*APISpec) {
gw.loadControlAPIEndpoints(router)

muxer.setRouter(port, "", router, gw.GetConfig())

gs := gw.prepareStorage()
shouldTrace := trace.IsEnabled()

Expand Down Expand Up @@ -996,13 +997,15 @@ func (gw *Gateway) loadApps(specs []*APISpec) {

gw.DefaultProxyMux.swap(muxer, gw)

var specsToRelease []*APISpec
var specsToUnload []*APISpec

gw.apisMu.Lock()

for _, spec := range specs {
curSpec, ok := gw.apisByID[spec.APIID]
if ok && curSpec != nil && shouldReloadSpec(curSpec, spec) {
specsToRelease = append(specsToRelease, curSpec)
mainLog.Debugf("Spec %s has changed and needs to be reloaded", curSpec.APIID)
specsToUnload = append(specsToUnload, curSpec)
}

// Bind versions to base APIs again
Expand All @@ -1018,8 +1021,9 @@ func (gw *Gateway) loadApps(specs []*APISpec) {

gw.apisMu.Unlock()

for _, spec := range specsToRelease {
spec.Release()
for _, spec := range specsToUnload {
mainLog.Debugf("Unloading spec %s", spec.APIID)
spec.Unload()
}

mainLog.Debug("Checker host list")
Expand Down
16 changes: 15 additions & 1 deletion gateway/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type TykMiddleware interface {
ProcessRequest(w http.ResponseWriter, r *http.Request, conf interface{}) (error, int) // Handles request
EnabledForSpec() bool
Name() string

GetSpec() *APISpec

Unload()
}

type TraceMiddleware struct {
Expand Down Expand Up @@ -124,6 +128,9 @@ func (gw *Gateway) createMiddleware(actualMW TykMiddleware) func(http.Handler) h
mw.SetName(mw.Name())
mw.Logger().Debug("Init")

spec := mw.GetSpec()
spec.AddUnloadHook(actualMW.Unload)

// Pull the configuration
mwConf, err := mw.Config()
if err != nil {
Expand Down Expand Up @@ -158,14 +165,15 @@ func (gw *Gateway) createMiddleware(actualMW TykMiddleware) func(http.Handler) h
}

startTime := time.Now()
mw.Logger().WithField("ts", startTime.UnixNano()).Debug("Started")
mw.Logger().WithField("ts", startTime.UnixNano()).WithField("mw", mw.Name()).Debug("Started")

if mw.Base().Spec.CORS.OptionsPassthrough && r.Method == "OPTIONS" {
h.ServeHTTP(w, r)
return
}

err, errCode := mw.ProcessRequest(w, r, mwConf)

if err != nil {
writeResponse := true
// Prevent double error write
Expand Down Expand Up @@ -271,6 +279,12 @@ func (t *BaseMiddleware) Config() (interface{}, error) {
return nil, nil
}

func (t *BaseMiddleware) Unload() {}

func (t *BaseMiddleware) GetSpec() *APISpec {
return t.Spec
}

func (t *BaseMiddleware) OrgSession(orgID string) (user.SessionState, bool) {

if rpc.IsEmergencyMode() {
Expand Down
9 changes: 8 additions & 1 deletion gateway/mw_context_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (m *MiddlewareContextVars) Name() string {
}

func (m *MiddlewareContextVars) EnabledForSpec() bool {
return m.Spec.EnableContextVars
return true
}

// ProcessRequest will run any checks on the request on the way through the system, return an error to have the chain fail
Expand Down Expand Up @@ -46,6 +46,13 @@ func (m *MiddlewareContextVars) ProcessRequest(w http.ResponseWriter, r *http.Re
contextDataObject[name] = c.Value
}

for key, vals := range r.Form {
name := "request_data_" + strings.Replace(key, "-", "_", -1)
if len(vals) > 0 {
contextDataObject[name] = vals[0]
}
}

ctxSetData(r, contextDataObject)

return nil, http.StatusOK
Expand Down
Loading

0 comments on commit 954e48c

Please sign in to comment.