Skip to content

Commit

Permalink
Add in memory source
Browse files Browse the repository at this point in the history
This kind of source can be useful in tests. This commit adds example test for HTTP target with in memory source in `cmd/inmemory_tests`.
  • Loading branch information
pondzix committed Jul 24, 2024
1 parent a16dfc9 commit 54ccb75
Show file tree
Hide file tree
Showing 5 changed files with 361 additions and 74 deletions.
154 changes: 80 additions & 74 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,12 @@ const (
appCopyright = "(c) 2020-present Snowplow Analytics Ltd. All rights reserved."
)

// RunCli runs the app
// RunCli allows running application from cli
func RunCli(supportedSources []config.ConfigurationPair, supportedTransformations []config.ConfigurationPair) {
cfg, sentryEnabled, err := cmd.Init()
config, sentryEnabled, err := cmd.Init()
if err != nil {
exitWithError(err, sentryEnabled)
}

app := cli.NewApp()
app.Name = appName
app.Usage = appUsage
Expand Down Expand Up @@ -85,100 +84,107 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation
}()
}

s, err := sourceconfig.GetSource(cfg, supportedSources)
if err != nil {
return err
}
return RunApp(config, supportedSources, supportedTransformations)
}

tr, err := transformconfig.GetTransformations(cfg, supportedTransformations)
app.ExitErrHandler = func(context *cli.Context, err error) {
if err != nil {
return err
exitWithError(err, sentryEnabled)
}
}

t, err := cfg.GetTarget()
if err != nil {
return err
}
t.Open()
app.Run(os.Args)
}

ft, err := cfg.GetFailureTarget(cmd.AppName, cmd.AppVersion)
if err != nil {
return err
}
ft.Open()
// RunApp runs application (without cli stuff)
func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, supportedTransformations []config.ConfigurationPair) error {
s, err := sourceconfig.GetSource(cfg, supportedSources)
if err != nil {
return err
}

tags, err := cfg.GetTags()
if err != nil {
return err
}
o, err := cfg.GetObserver(tags)
if err != nil {
return err
}
o.Start()
tr, err := transformconfig.GetTransformations(cfg, supportedTransformations)
if err != nil {
return err
}

stopTelemetry := telemetry.InitTelemetryWithCollector(cfg)
t, err := cfg.GetTarget()
if err != nil {
return err
}
t.Open()

// Handle SIGTERM
sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, os.Kill)
go func() {
<-sig
log.Warn("SIGTERM called, cleaning up and closing application ...")
ft, err := cfg.GetFailureTarget(cmd.AppName, cmd.AppVersion)
if err != nil {
return err
}
ft.Open()

stop := make(chan struct{}, 1)
go func() {
s.Stop()
stop <- struct{}{}
}()
tags, err := cfg.GetTags()
if err != nil {
return err
}
o, err := cfg.GetObserver(tags)
if err != nil {
return err
}
o.Start()

select {
case <-stop:
log.Debug("source.Stop() finished successfully!")
stopTelemetry := telemetry.InitTelemetryWithCollector(cfg)

stopTelemetry()
if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}
case <-time.After(5 * time.Second):
log.Error("source.Stop() took more than 5 seconds, forcing shutdown ...")
// Handle SIGTERM
sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, os.Kill)
go func() {
<-sig
log.Warn("SIGTERM called, cleaning up and closing application ...")

t.Close()
ft.Close()
o.Stop()
stopTelemetry()
stop := make(chan struct{}, 1)
go func() {
s.Stop()
stop <- struct{}{}
}()

if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}
select {
case <-stop:
log.Debug("source.Stop() finished successfully!")

os.Exit(1)
stopTelemetry()
if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}
}()
case <-time.After(5 * time.Second):
log.Error("source.Stop() took more than 5 seconds, forcing shutdown ...")

// Callback functions for the source to leverage when writing data
sf := sourceiface.SourceFunctions{
WriteToTarget: sourceWriteFunc(t, ft, tr, o),
}
t.Close()
ft.Close()
o.Stop()
stopTelemetry()

// Read is a long running process and will only return when the source
// is exhausted or if an error occurs
err = s.Read(&sf)
if err != nil {
return err
if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}

os.Exit(1)
}
}()

t.Close()
ft.Close()
o.Stop()
return nil
// Callback functions for the source to leverage when writing data
sf := sourceiface.SourceFunctions{
WriteToTarget: sourceWriteFunc(t, ft, tr, o),
}

err1 := app.Run(os.Args)
if err1 != nil {
exitWithError(err1, sentryEnabled)
// Read is a long running process and will only return when the source
// is exhausted or if an error occurs
err = s.Read(&sf)
if err != nil {
return err
}

t.Close()
ft.Close()
o.Stop()
return nil
}

// sourceWriteFunc builds the function which wraps the different objects together to handle:
Expand Down
91 changes: 91 additions & 0 deletions cmd/inmemory_tests/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package cmd

import (
"io"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/snowplow/snowbridge/cmd"
"github.com/snowplow/snowbridge/cmd/cli"
"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/source/inmemory"
"github.com/snowplow/snowbridge/pkg/transform/transformconfig"
"github.com/stretchr/testify/assert"
)

func TestHTTPTarget(t *testing.T) {
inputChannel := make(chan []string)
outputBuffer := []string{}
wg := sync.WaitGroup{}
server := createHTTPServer(&outputBuffer, &wg)
t.Setenv("SNOWBRIDGE_CONFIG_FILE", "./test_config.hcl")
t.Setenv("SERVER_URL", server.URL)
defer server.Close()

runTest(inputChannel)

wg.Add(3) // 3 batches -> 3 output HTTP requests
inputChannel <- []string{`{"field": "mes1"}`, `{"field": "mes2"}`}
time.Sleep(2 * time.Second)
inputChannel <- []string{`{"field": "mes3"}`}
time.Sleep(2 * time.Second)
inputChannel <- []string{`{"field": "mes4"}`, `{"field": "mes5"}`}

if ok := withTimeout(time.Second, &wg); !ok {
assert.Fail(t, "Timeout! No expected data within configured time")
}

expectedOutput := []string{
`[{"field":"mes1"},{"field":"mes2"}]`,
`[{"field":"mes3"}]`,
`[{"field":"mes4"},{"field":"mes5"}]`,
}

assert.Equal(t, expectedOutput, outputBuffer)
}

func runTest(input chan []string) {
sourceConfigPairs := []config.ConfigurationPair{inmemory.ConfigPair(input)}

config, _, err := cmd.Init()

if err != nil {
panic(err)
}

go func() {
err := cli.RunApp(config, sourceConfigPairs, transformconfig.SupportedTransformations)
if err != nil {
panic(err)
}
}()
}

func createHTTPServer(output *[]string, wg *sync.WaitGroup) *httptest.Server{
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
data, err := io.ReadAll(req.Body)
if err != nil {
panic(err)
}
*output = append(*output, string(data))
defer wg.Done()
}))
}

func withTimeout(timeout time.Duration, wg *sync.WaitGroup) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return true
case <-time.After(timeout):
return false
}
}
13 changes: 13 additions & 0 deletions cmd/inmemory_tests/test_config.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
target {
use "http" {
url = env.SERVER_URL
}
}

source {
use "inMemory" {}
}

license {
accept = true
}
Loading

0 comments on commit 54ccb75

Please sign in to comment.