Skip to content

Commit

Permalink
🔖 Initial Release
Browse files Browse the repository at this point in the history
  • Loading branch information
Templum authored Dec 19, 2018
2 parents a5f4a7e + ae05902 commit 0ac5be7
Show file tree
Hide file tree
Showing 29 changed files with 4,203 additions and 482 deletions.
13 changes: 11 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
language: go

services:
- docker

go:
- "1.10.x"

before_install:
- curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
- dep ensure
- docker pull rabbitmq:3.7.4
- docker run -p 5672:5672 -e RABBITMQ_DEFAULT_USER="user" -e RABBITMQ_DEFAULT_PASS="pass" -d --restart always --name ci_rabbitmq rabbitmq:3.7.4

script:
- go test -race -v ./...
- cd pkg && cd rabbitmq
- go test -integration=true integration_test.go
- cd .. && cd ..
- go build

after_success:
- bash <(curl -s https://codecov.io/bash)
after_script:
- docker stop ci_rabbitmq
- docker rm ci_rabbitmq
22 changes: 17 additions & 5 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[prune]
go-tests = true
unused-packages = true
non-go = true

[[constraint]]
branch = "master"
Expand All @@ -10,3 +11,6 @@
name = "github.com/openfaas/faas"
version = "0.9.11"

[[constraint]]
name = "github.com/docker/docker"
version = "v1.13.1"
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func main() {
// TODO: Wait at least for the first map sync

connector := rabbitmq.MakeConnector(config.GenerateRabbitMQUrl(), controller)
connector.StartConnector()
connector.Start()
defer connector.Close()

signalChannel := make(chan os.Signal, 2)
Expand Down
158 changes: 103 additions & 55 deletions pkg/rabbitmq/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,26 @@ import (
"log"
"math"
"runtime"
"strings"
"time"

"github.com/Templum/rabbitmq-connector/pkg/config"
"github.com/openfaas-incubator/connector-sdk/types"
"github.com/streadway/amqp"
)


type connector struct {
uri string
closed bool

con *amqp.Connection
client *types.Controller
client Invoker

// Consumers
workers []*worker

// Sig Channel
errorChannel chan *amqp.Error
}

func MakeConnector(uri string, client *types.Controller) *connector {
func MakeConnector(uri string, client Invoker) Connector {
return &connector{
uri,
false,
Expand All @@ -36,91 +34,141 @@ func MakeConnector(uri string, client *types.Controller) *connector {
client,

nil,
nil,
}
}

func (c *connector) StartConnector() {
log.Println("Starting Connector")

c.init()
// Invoker is the Interface used by the OpenFaaS Connector SDK to perform invocations
// of Lambdas based on a provided topic and message
type Invoker interface {
Invoke(topic string, message *[]byte)
}

func (c *connector) Close() {
if !c.closed {
log.Println("Shutting down Connector")
c.closed = true
defer c.con.Close()
type Recoverer interface {
recover(receivedError *amqp.Error)
}

for _, worker := range c.workers {
worker.Close()
}
c.workers = nil
}
type Connector interface {
Start()
Close()
Recoverer
}

func (c *connector) init() {
func (c *connector) Start() {
log.Println("Starting Init Process of Connector")
var err error
c.con, err = connectToRabbitMQ(c.uri, 3)
if err != nil {
log.Panicf("Failed to connect to %s, recieved %s", c.uri, err)
} else {
log.Printf("Successfully connected to %s", c.uri)
log.Panicf("Failed to connect to %s after 3 retries", SanitizeConnectionUri(c.uri))
}

log.Printf("Successfully established connection with %s", SanitizeConnectionUri(c.uri))

// Related to Self Healing
c.errorChannel = make(chan *amqp.Error)
c.con.NotifyClose(c.errorChannel)
go c.registerSelfHealer() // Leaking Go Routine ?
oneTimeErrorChannel := make(chan *amqp.Error) // Maybe switch later to an struct holding the channel
c.con.NotifyClose(oneTimeErrorChannel)
go Healer(c, oneTimeErrorChannel)

// Queues: 1 Topic === 1 Queue
topics := config.GetTopics()
amountOfTopics := len(topics)
for _, topic := range topics {
workerCount := int(math.Round(float64(runtime.NumCPU()*2)/float64(amountOfTopics))) + 1
log.Printf("Spawning %d Workers for Topic: %s", workerCount, topic)
c.spawnWorkers(config.GetTopics())

for i := 0; i < workerCount; i++ {
worker := NewWorker(c.con, c.client, topic)
worker.Start()
// TODO: Maybe add Thread Lock here
c.workers = append(c.workers, worker)
}
log.Println("Connector finished Init Process and is now running")
}

func (c *connector) recover(receivedError *amqp.Error) {
if c.closed {
return
}

if receivedError.Recover {
log.Printf("Performing a recovery from following recoverable error: [Status: %d Reason: %s]", receivedError.Code, receivedError.Reason)
} else {
log.Panicf("Recieved unrecovarable error: [Status: %d Reason: %s]", receivedError.Code, receivedError.Reason)
}

log.Println("Will now clear worker pool")
c.clearWorkers()

c.Start()
}

func (c *connector) registerSelfHealer() {
for {
err := <-c.errorChannel
if !c.closed {
log.Printf("Recieved following error %s", err)
func (c *connector) Close() {
if !c.closed && c.con != nil {
log.Println("Shutting down Connector")
c.closed = true
defer c.con.Close()
log.Println("Clearing worker pool")
c.clearWorkers()
}
}

time.Sleep(30 * time.Second)
c.recover()
// spawnWorkers will spawn Workers per topic based on the available resources. Further it
// assigns the newly created worker to the worker pool before starting them.
func (c *connector) spawnWorkers(topics []string) {
amountOfTopics := len(topics)
workerCount := CalculateWorkerCount(amountOfTopics)
log.Printf("%d Topics are registered. Will be spawning %d Workers per Topic. ", amountOfTopics, workerCount)

for _, topic := range topics {
for i := 0; i < workerCount; i++ {
worker := NewWorker(c.con, c.client, topic)
c.workers = append(c.workers, worker)
go worker.Start()
}
}
}

func (c *connector) recover() {
log.Printf("Performing a Recovery")

for _, topicQueue := range c.workers {
topicQueue.Close()
// clearWorkers will stop Workers from the worker pool and afterwards cleanup the references
func (c *connector) clearWorkers() {
for _, worker := range c.workers {
worker.Close()
}
c.workers = nil

c.init()
}

// Helper Functions

// TODO: Documentation & Eventuell Type für Connection (AMK)
func connectToRabbitMQ(uri string, retries int) (*amqp.Connection, error) {
con, err := amqp.Dial(uri)

if err != nil && retries > 0 {
log.Printf("Failed to connect to %s with error %s. Retries left %d", uri, err, retries)
log.Printf("Failed to connect to %s with error %s. Retries left %d", SanitizeConnectionUri(uri), err, retries)
time.Sleep(5 * time.Second)
return connectToRabbitMQ(uri, retries-1)
}

return con, err
}

// Healer will take the reference to something "Recoverable" along with an error channel for amqp errors.
// The method is only meant for one time usage.
func Healer(c Recoverer, errorStream chan *amqp.Error) {
err := <-errorStream
// Give it some time before beginning recovering
time.Sleep(30 * time.Second)
c.recover(err)
}

// CalculateWorkerCount will calculate the amount of workers that will be spawned
// based on the following formula: Available CPU's * 2 / Amount of Topics
func CalculateWorkerCount(amountOfTopics int) int {
targetGoRoutines := float64(runtime.NumCPU() * 2)

if int(targetGoRoutines) < amountOfTopics {
return int(math.Floor(targetGoRoutines)/float64(amountOfTopics)) + 1
} else {
return int(math.Floor(targetGoRoutines) / float64(amountOfTopics))
}

}

// SanitizeConnectionUri takes an uri in the format user:pass@domain and removes the sensitive credentials.
// Prior it will perform a check for @, if it is not included it will return the unchanged uri
func SanitizeConnectionUri(uri string) string {
const SEPARATOR = "@"

if idx := strings.Index(uri, SEPARATOR); idx != -1 {
return strings.Split(uri, SEPARATOR)[1]
} else {
return uri
}
}
Loading

0 comments on commit 0ac5be7

Please sign in to comment.