Skip to content

mqsrr/snowy

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

13 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Snowy

Create new automations by dragging components onto the canvas and connecting them to define the execution steps.

snowy-intro-ezgif com-video-to-gif-converter

Configure each component to set up your credentials or adjust local settings.

snowy-second-ezgif com-video-to-gif-converter

This project was inspired by n8n, so if you are looking for an automation builder, you should definitely check it out. However, I have invested most of my time in developing an easy way to add new integrations into an existing application. If you are interested in architecture and design patterns, you might find some interesting choices here.

Documentation

The backend is divided into two separate packages: http and workflow. As the name suggests, the http package contains everything related to HTTP, including repositories and database connections, while the workflow package implements the main workflow engine that runs automations in the background. It’s important to note that the workflow package doesn’t have knowledge of the http package, allowing for the possibility of extracting it into a separate project.

image

To make it easier to add or remove external integrations (such as Discord, Slack, etc.), I have implemented an abstract factory. The workflow package provides various interfaces to facilitate the setup of a new service, and additionally, it provides services that these new integrations can utilize.

Adding a New Integration

Let’s say you want to add an integration with Discord. To keep everything organized, you should create a new folder named after your integration inside the plugins folder. This folder will hold the implementation of nodes and triggers specific to this integration. You can always define sub-packages within the nodes folder to further organize them (similar to the system nodes).

image

To register a new factory in the registry, create a struct that inherits from NamespacedScopedFactory.

type Factory struct {
	*runtime.NamespacedScopedFactory // base struct
}

func NewDiscordFactory() *Factory {
	return &Factory{                                                      
		NamespacedScopedFactory: runtime.NewNamespacedScopedFactory(nodes.NewDiscordFactory(), triggers.NewDiscordTriggerFactory()), // register node and trigger factories for this namespace (discord namespace)
	}
}

func RegisterFactory(factory *runtime.FactoryRegistry) { // self registration in the registry
	systemFactory := NewDiscordFactory()
	factory.RegisterFactory(systemFactory)
}

func (f *Factory) Namespace() string {
	return "discord"
}

Once you have a namespaced factory, you need to define a component factory. Inside the nodes or triggers folder, there should be another factory definition that registers components. You can see an example inside the system namespace.

func NewSystemNodeFactory() *runtime.NodeFactory {
	f := runtime.NewNodeFactory()

	f.RegisterNode("http", http.NewHttpNode, http.NewHttpNodeSchema(), http.NewHTTPDependencies())
	f.RegisterNode("waiter", waiter.NewWaiterNode, waiter.NewWaiterNodeSchema(), nil)
	return f
}
func NewSystemTriggerFactory() *runtime.TriggerFactory {
	f := runtime.NewTriggerFactory()

	f.RegisterTrigger("click", click.NewClickTrigger, click.NewClickTriggerSchema(), nil)
	f.RegisterTrigger("webhook", webhook.NewWebhookTrigger, webhook.NewWebhookSchema(), webhook.NewWebhookDependencies())
	return f
}

Each integration is defined in the namespaced factory, which contains the node and trigger factory. See the diagram if you are confused. When creating a new node or trigger, simply implement the relevant interface and define your behavior.

type sendMessageNode struct {
	id     uuid.UUID
	client *http.Client
	config *Config // custom config defined in this package

	alias         string
	credentialRef *uuid.UUID
	credProvider  model.CredentialProvider // service provided during factory init
	registrar     runtime.HttpRegistrar // service provided by factory registry

	contentTpl *template.Template
	prev       []model.Node
	next       []model.Node
}

func NewSendMessageNode(ctx *model.NodeRuntimeContext) (model.Node, error) { // runtime context contains all deps and configuration for this node based on the schema and deps. At this point the node just parses it into more managable struct
	config, err := parseSendMessageConfig(ctx.Args)
	if err != nil {
		return nil, fmt.Errorf("failed to parse send message config: %w", err)
	}

	credProvider, ok := deps.GetDep(ctx.Container, model.CredentialProviderKey)
	if !ok {
		return nil, errors.New("failed to find credential provider")
	}

	registrar, ok := deps.GetDep(ctx.Container, runtime.HttpRegistrarKey)
	if !ok {
		return nil, fmt.Errorf("unable to find registrar for %s", ctx.Alias)
	}

	if len(ctx.Alias) == 0 {
		ctx.Alias = "httpRequest"
	}

	node := &sendMessageNode{
		id:            ctx.ID,
		client:        &http.Client{},
		config:        config,
		alias:         ctx.Alias,
		credentialRef: ctx.CredentialRef,
		credProvider:  credProvider,
		registrar:     registrar,
	}

	if err := node.parseTemplates(); err != nil {
		return nil, err
	}

	node.SetNext(ctx.Next)
	return node, nil
}

func (s *sendMessageNode) ID() uuid.UUID {
	return s.id
}

func (s *sendMessageNode) Alias() string {
	return s.alias
}

func (s *sendMessageNode) Run(ctx context.Context, input map[string]any) (output map[string]any, next []model.Node, err error) {
	botToken, err := s.prepareNode(ctx)
	if err != nil {
		return nil, s.next, err
	}
	if len(s.config.userID) > 0 {
		output, err = s.sendDmMessage(ctx, botToken, input)
		return output, s.next, err
	}

	output, err = s.sendMessage(ctx, s.config.channelID, botToken, input)
	return output, s.next, err
}

func (s *sendMessageNode) TestRun(ctx context.Context, input map[string]any) (output map[string]any, err error) {
	botToken, err := s.prepareNode(ctx)
	if err != nil {
		return nil, err
	}

	if len(s.config.userID) > 0 {
		return s.sendDmMessage(ctx, botToken, input)
	}

	return s.sendMessage(ctx, s.config.channelID, botToken, input)
}

func (s *sendMessageNode) SetNext(next []model.Node) {
	s.next = next
	for _, n := range next {
		if en, ok := n.(*sendMessageNode); ok {
			en.prev = append(en.prev, s)
		}
	}
}

func (s *sendMessageNode) prepareNode(ctx context.Context) (string, error) {
	credId := s.credentialRef
	if credId == nil {
		return "", errors.New(fmt.Sprintf("failed to find credential id for %s", s.id.String()))
	}

	secrets, err := s.credProvider.GetSecretById(ctx, *credId)
	if err != nil {
		return "", err
	}

	botToken, ok := secrets.Values["botToken"]
	if !ok {
		return "", errors.New(fmt.Sprintf("failed to find bot token for %s", s.id.String()))
	}

	return botToken, nil
}

Each component defines the schema and a required list of dependencies. The schema informs the factory and client of the arguments and authentication it supports. It may also include validation in case there are additional requirements for component arguments. For instance, the Discord Send Message node requires the client to provide only one of the following arguments: channelId or userId. If both are provided, validation will fail.

func NewSendMessageSchema() *model.Schema {
	return &model.Schema{
		Args: map[string]*model.ArgumentSchema{
			"channelId": {
				Type:        model.ArgString,
				Required:    false,
				Description: "Discord channel ID. Use channelId to send guild messages",
			},
			"userId": {
				Type:        model.ArgString,
				Required:    false,
				Description: "Discord user ID. Use userId to send DM",
			},
			"content": {
				Type:        model.ArgString,
				Required:    true,
				Description: "Message content",
			},
		},
		Validation: &model.ArgumentValidation{
			OneOf:             []string{"channelId", "userId"},
			MutuallyExclusive: []string{"channelId", "userId"},
		},
		Auth: &model.AuthSchema{
			Required:    true,
			Description: "Discord authentication is required to send message",
			Options: []*credentials.CredentialOption{
				credentials.BearerToken("botToken", "Discord Bot Token"),
			},
		},
	}
}

The validation is automatically handled by the factory registry, so every time this node is created, it will check if the validation succeeds.

For dependencies, a component can request a service interface that it relies on. This is useful when you want to use other services that should remain abstracted (e.g., using HTTP or calling a database. The Discord node should not be aware of those operations). In this case, you would define the dependencies by including the keys of the required services.

func NewSendMessageDeps() []deps.Key {
	return []deps.Key{
		model.CredentialProviderKey,
		runtime.HttpRegistrarKey,
	}
}

These keys are specified in the engine package and essentially act as wrappers for interfaces. You can define them in the same package if they are not intended for use in other namespaces.

var (
	HttpRegistrarKey = deps.NewDepKey[HttpRegistrar]("engine.httpRegistrar")
)

In this example, the Discord Send Message node requests the implementation of two interfaces:

type HttpRegistrar interface {
	RegisterHandler(workflowID uuid.UUID, pattern, method string, handler http.Handler)
	UnregisterHandler(workflowID uuid.UUID, pattern, method string)

	RegisterSSE(workflowID uuid.UUID, bufferSize int) error
	UnregisterSSE(workflowID uuid.UUID)
	GetSSEBroker(workflowID uuid.UUID) (*SSEBroker, bool)

	PublishExecutionEvent(workflowID uuid.UUID, event *model.ExecutionEvent) error
	ServeHTTP(w http.ResponseWriter, req *http.Request) error

	Shutdown()
}

type CredentialProvider interface {
	GetSecretById(ctx context.Context, id uuid.UUID) (secrets *credentials.Secrets, err error)
}
  1. Http Registrar: This is used to dynamically register the HTTP handler during runtime, primarily utilized by triggers (registering a webhook handler when the workflow is active and unregistering it when the workflow is deactivated).
  2. Credential Provider: This is typically used to fetch decrypted credentials during runtime (e.g., fetching the Discord bot token just before sending the Discord message).

Some of these services are automatically created by the factory registry. However, if a service requires an external connection, like a repository, you will register it during the factory initialization.

	credService := credential2.NewCredentialsService(repo.NewCredentialsRepository(pool), cfg.Encryption)
	hashEncoder, err := encoder.NewHashEncoder(cfg.Hash)
	if err != nil {
		panic(err)
	}

	factory, err := runtime.NewFactoryRegistryBuilder(
		runtime.WithServices( // provide interface implementations
			credService,
			hashEncoder,
		),
		runtime.RegisterFactories(systemplugin.RegisterFactory, discordplugin.RegisterFactory), // register namespace factories
	).BuildRegistry()
	if err != nil {
		panic(err)
	}

That's it! The factory registry will automatically connect all registered factories and their dependencies. Each factory will receive a copy of the dependency container, which includes the requested dependencies. As mentioned earlier, every time a workflow is created or updated, the components of that workflow will be validated against the defined schema.

You can add a new integration simply by creating a new folder and using the existing interfaces and structs. No changes in the engine package are required. If you want to disable certain components or factories, just unregister them.

How to run

You can run the backend in either a development or production environment. Depending on the environment, the API will retrieve secrets in different ways.

Secrets can be loaded from the configs folder located in the root of the project, which contains a .env file. This method works for both development and production environments.

Development

If you do not specify the environment using the ENVIRONMENT variable, the API will default to the development environment. It will read the configuration from the ./configs/development.json file.

  1. Create a folder named configs in the root of the project and add a JSON file called development.json inside it.
  2. Copy and paste the configuration below into your JSON file.
{
"server": {
  "port": 8080,
  "environment": "development"
},
"logger": {
  "level": "info",
  "addSource": true
},
"database": {
  "uri": "postgres://myUser:myPass@localhost:5432/db?sslmode=disable"
},
"encryption": {
  "key": "12345678901234567890123456789012"
},
"jwt": {
  "secret": "12345678901234567890123456789012",
  "issuer": "snowy-api",
  "audience": "snowy-app",
  "expire": 60
},
"hash": {
  "salt": "12345678901234567890123456789012"
}
}
  1. Start the app

Production

To run in production, you need to define the ENVIRONMENT variable. The backend will load environment variables from ./configs/.env, but you can also define environment variables directly inside the Docker container. It will attempt to load the .env file and will fall back to using the environmental variables if that file is not present.

For a list of required variables, refer to the JSON configuration mentioned above.

Example JWT_AUDIENCE, ENCRYPTION_KEY, DATABASE_URI

About

Workflow Automation Builder

Resources

Stars

Watchers

Forks