Skip to content

Commit

Permalink
Experimental testing
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Jul 23, 2024
1 parent a16dfc9 commit 9e07805
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 73 deletions.
151 changes: 78 additions & 73 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,10 @@ const (

// RunCli runs the app
func RunCli(supportedSources []config.ConfigurationPair, supportedTransformations []config.ConfigurationPair) {
cfg, sentryEnabled, err := cmd.Init()
config, sentryEnabled, err := cmd.Init()
if err != nil {
exitWithError(err, sentryEnabled)
}

app := cli.NewApp()
app.Name = appName
app.Usage = appUsage
Expand Down Expand Up @@ -85,100 +84,106 @@ func RunCli(supportedSources []config.ConfigurationPair, supportedTransformation
}()
}

s, err := sourceconfig.GetSource(cfg, supportedSources)
if err != nil {
return err
}
return RunApp(config, supportedSources, supportedTransformations)
}

tr, err := transformconfig.GetTransformations(cfg, supportedTransformations)
app.ExitErrHandler = func(context *cli.Context, err error) {
if err != nil {
return err
exitWithError(err, sentryEnabled)
}
}

t, err := cfg.GetTarget()
if err != nil {
return err
}
t.Open()
app.Run(os.Args)
}

ft, err := cfg.GetFailureTarget(cmd.AppName, cmd.AppVersion)
if err != nil {
return err
}
ft.Open()
func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, supportedTransformations []config.ConfigurationPair) error {
s, err := sourceconfig.GetSource(cfg, supportedSources)
if err != nil {
return err
}

tags, err := cfg.GetTags()
if err != nil {
return err
}
o, err := cfg.GetObserver(tags)
if err != nil {
return err
}
o.Start()
tr, err := transformconfig.GetTransformations(cfg, supportedTransformations)
if err != nil {
return err
}

stopTelemetry := telemetry.InitTelemetryWithCollector(cfg)
t, err := cfg.GetTarget()
if err != nil {
return err
}
t.Open()

// Handle SIGTERM
sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, os.Kill)
go func() {
<-sig
log.Warn("SIGTERM called, cleaning up and closing application ...")
ft, err := cfg.GetFailureTarget(cmd.AppName, cmd.AppVersion)
if err != nil {
return err
}
ft.Open()

stop := make(chan struct{}, 1)
go func() {
s.Stop()
stop <- struct{}{}
}()
tags, err := cfg.GetTags()
if err != nil {
return err
}
o, err := cfg.GetObserver(tags)
if err != nil {
return err
}
o.Start()

select {
case <-stop:
log.Debug("source.Stop() finished successfully!")
stopTelemetry := telemetry.InitTelemetryWithCollector(cfg)

stopTelemetry()
if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}
case <-time.After(5 * time.Second):
log.Error("source.Stop() took more than 5 seconds, forcing shutdown ...")
// Handle SIGTERM
sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, os.Kill)
go func() {
<-sig
log.Warn("SIGTERM called, cleaning up and closing application ...")

t.Close()
ft.Close()
o.Stop()
stopTelemetry()
stop := make(chan struct{}, 1)
go func() {
s.Stop()
stop <- struct{}{}
}()

if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}
select {
case <-stop:
log.Debug("source.Stop() finished successfully!")

os.Exit(1)
stopTelemetry()
if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}
}()
case <-time.After(5 * time.Second):
log.Error("source.Stop() took more than 5 seconds, forcing shutdown ...")

// Callback functions for the source to leverage when writing data
sf := sourceiface.SourceFunctions{
WriteToTarget: sourceWriteFunc(t, ft, tr, o),
}
t.Close()
ft.Close()
o.Stop()
stopTelemetry()

// Read is a long running process and will only return when the source
// is exhausted or if an error occurs
err = s.Read(&sf)
if err != nil {
return err
if err != nil {
log.Debugf(`error deleting tmp directory: %v`, err)
}

os.Exit(1)
}
}()

t.Close()
ft.Close()
o.Stop()
return nil
// Callback functions for the source to leverage when writing data
sf := sourceiface.SourceFunctions{
WriteToTarget: sourceWriteFunc(t, ft, tr, o),
}

err1 := app.Run(os.Args)
if err1 != nil {
exitWithError(err1, sentryEnabled)
// Read is a long running process and will only return when the source
// is exhausted or if an error occurs
err = s.Read(&sf)
if err != nil {
return err
}

t.Close()
ft.Close()
o.Stop()
return nil
}

// sourceWriteFunc builds the function which wraps the different objects together to handle:
Expand Down
116 changes: 116 additions & 0 deletions pkg/source/inmemory/in_memory_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Copyright (c) 2020-present Snowplow Analytics Ltd.
* All rights reserved.
*
* This software is made available by Snowplow Analytics, Ltd.,
* under the terms of the Snowplow Limited Use License Agreement, Version 1.0
* located at https://docs.snowplow.io/limited-use-license-1.0
* BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION
* OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT.
*/

package inmemory

import (
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/models"
"github.com/snowplow/snowbridge/pkg/source/sourceiface"
"github.com/twinj/uuid"
)

type Configuration struct{}

type inMemorySource struct {
messages chan []string
log *log.Entry
exitSignal chan struct{}
}

func configfunction(messages chan []string) func(c *Configuration) (sourceiface.Source, error) {
return func(c *Configuration) (sourceiface.Source, error) {
return newInMemorySource(messages)
}
}

type adapter func(i interface{}) (interface{}, error)

func (f adapter) Create(i interface{}) (interface{}, error) {
return f(i)
}

func (f adapter) ProvideDefault() (interface{}, error) {
cfg := &Configuration{}

return cfg, nil
}

func adapterGenerator(f func(c *Configuration) (sourceiface.Source, error)) adapter {
return func(i interface{}) (interface{}, error) {
cfg, ok := i.(*Configuration)
if !ok {
return nil, errors.New("invalid input")
}

return f(cfg)
}
}

func ConfigPair(messages chan []string) config.ConfigurationPair {
return config.ConfigurationPair{
Name: "inMemory",
Handle: adapterGenerator(configfunction(messages)),
}
}

func newInMemorySource(messages chan []string) (*inMemorySource, error) {
return &inMemorySource{
log: log.WithFields(log.Fields{"source": "in_memory"}),
messages: messages,
exitSignal: make(chan struct{}),
}, nil
}

func (ss *inMemorySource) Read(sf *sourceiface.SourceFunctions) error {
ss.log.Infof("Reading messages from in memory buffer")

processing:
for {
select {
case <-ss.exitSignal:
break processing
case msgs := <-ss.messages:
timeNow := time.Now().UTC()
var mods []*models.Message
for _, m := range msgs {
mod := models.Message{
Data: []byte(m),
PartitionKey: uuid.NewV4().String(),
TimeCreated: timeNow,
TimePulled: timeNow,
}
mods = append(mods, &mod)
}

err := sf.WriteToTarget(mods)
if err != nil {
ss.log.WithFields(log.Fields{"error": err}).Error(err)
}
}
}

ss.log.Infof("Done with processing")
return nil
}

func (ss *inMemorySource) Stop() {
ss.log.Warn("Stopping in memory source")
ss.exitSignal <- struct{}{}
}

func (ss *inMemorySource) GetID() string {
return "inMemory"
}
70 changes: 70 additions & 0 deletions tests_experimental/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package experimental

import (
"io"
"sync"
"testing"
"time"
"net/http"

"github.com/snowplow/snowbridge/cmd"
"github.com/snowplow/snowbridge/cmd/cli"
"github.com/snowplow/snowbridge/config"
inmemorysource "github.com/snowplow/snowbridge/pkg/source/inmemory"
"github.com/snowplow/snowbridge/pkg/transform/transformconfig"
"github.com/stretchr/testify/assert"
)

const url = "localhost:10000"

func TestApp1(t *testing.T) {
assert := assert.New(t)
t.Setenv("SNOWBRIDGE_CONFIG_FILE", "./test_config.hcl")
inputMessages := make(chan []string)
outputMessages := []string{}
wg := sync.WaitGroup{}

run(inputMessages, &outputMessages, &wg)

wg.Add(5)
inputMessages <- []string{"mes1", "mes2"}
time.Sleep(2 * time.Second)
inputMessages <- []string{"mes3"}
time.Sleep(2 * time.Second)
inputMessages <- []string{"mes4", "mes5"}
wg.Wait()

assert.Equal([]string{"mes1", "mes2", "mes3", "mes4", "mes5"}, outputMessages)
}

func run(input chan []string, output *[]string, wg *sync.WaitGroup) {
runHTTPServer(output, wg)

sourceConfigPairs := []config.ConfigurationPair{inmemorysource.ConfigPair(input)}

config, _, _ := cmd.Init()
//TODO handle cancellation/stopping better
go func() {
err := cli.RunApp(config, sourceConfigPairs, transformconfig.SupportedTransformations)
panic(err)
}()
}

//Use httptest
func runHTTPServer(output *[]string, wg *sync.WaitGroup) {
mutex := &sync.Mutex{}
http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
data, err := io.ReadAll(req.Body)
if err != nil {
panic(err)
}
mutex.Lock()
*output = append(*output, string(data))
mutex.Unlock()
defer wg.Done()
})
go http.ListenAndServe(url, nil)
}


13 changes: 13 additions & 0 deletions tests_experimental/test_config.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
target {
use "http" {
url = "http://localhost:10000"
}
}

source {
use "inMemory" {}
}

license {
accept = true
}

0 comments on commit 9e07805

Please sign in to comment.