Skip to content

Commit

Permalink
Enabling Job-Distributor container module (#1123)
Browse files Browse the repository at this point in the history
  • Loading branch information
AnieeG authored Sep 9, 2024
1 parent 5e0b15b commit d7929ca
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 4 deletions.
193 changes: 193 additions & 0 deletions lib/docker/test_env/job_distributor/jd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package job_distributor

import (
"fmt"
"testing"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
tc "github.com/testcontainers/testcontainers-go"
tcwait "github.com/testcontainers/testcontainers-go/wait"

"github.com/smartcontractkit/chainlink-testing-framework/lib/docker"
"github.com/smartcontractkit/chainlink-testing-framework/lib/docker/test_env"
"github.com/smartcontractkit/chainlink-testing-framework/lib/logging"
"github.com/smartcontractkit/chainlink-testing-framework/lib/utils/testcontext"
)

const (
JDContainerName string = "job-distributor"
DEAFULTJDContainerPort string = "42242"
DEFAULTCSAKeyEncryptionKey string = "!PASsword000!"
DEAFULTWSRPCContainerPort string = "8080"
)

type Option = func(j *Component)

type Component struct {
test_env.EnvComponent
Grpc string
Wsrpc string
InternalGRPC string
InternalWSRPC string
l zerolog.Logger
t *testing.T
dbConnection string
containerPort string
wsrpcPort string
csaKeyEncryptionKey string
}

func (j *Component) startOrRestartContainer(withReuse bool) error {
req := j.getContainerRequest()
l := logging.GetTestContainersGoTestLogger(j.t)
c, err := docker.StartContainerWithRetry(j.l, tc.GenericContainerRequest{
ContainerRequest: *req,
Started: true,
Reuse: withReuse,
Logger: l,
})
if err != nil {
return err
}
j.Container = c
ctx := testcontext.Get(j.t)
host, err := test_env.GetHost(ctx, c)
if err != nil {
return errors.Wrapf(err, "cannot get host for container %s", j.ContainerName)
}

p, err := c.MappedPort(ctx, test_env.NatPort(j.containerPort))
if err != nil {
return errors.Wrapf(err, "cannot get container mapped port for container %s", j.ContainerName)
}
j.Grpc = fmt.Sprintf("%s:%s", host, p.Port())

p, err = c.MappedPort(ctx, test_env.NatPort(j.wsrpcPort))
if err != nil {
return errors.Wrapf(err, "cannot get wsrpc mapped port for container %s", j.ContainerName)
}
j.Wsrpc = fmt.Sprintf("%s:%s", host, p.Port())
j.InternalGRPC = fmt.Sprintf("%s:%s", j.ContainerName, j.containerPort)

j.InternalWSRPC = fmt.Sprintf("%s:%s", j.ContainerName, j.wsrpcPort)
j.l.Info().
Str("containerName", j.ContainerName).
Str("grpcURI", j.Grpc).
Str("wsrpcURI", j.Wsrpc).
Str("internalGRPC", j.InternalGRPC).
Str("internalWSRPC", j.InternalWSRPC).
Msg("Started Job Distributor container")

return nil
}

func (j *Component) getContainerRequest() *tc.ContainerRequest {
return &tc.ContainerRequest{
Name: j.ContainerName,
Image: fmt.Sprintf("%s:%s", j.ContainerImage, j.ContainerVersion),
ExposedPorts: []string{
test_env.NatPortFormat(j.containerPort),
test_env.NatPortFormat(j.wsrpcPort),
},
Env: map[string]string{
"DATABASE_URL": j.dbConnection,
"PORT": j.containerPort,
"NODE_RPC_PORT": j.wsrpcPort,
"CSA_KEY_ENCRYPTION_SECRET": j.csaKeyEncryptionKey,
},
Networks: j.Networks,
WaitingFor: tcwait.ForAll(
tcwait.ForListeningPort(test_env.NatPort(j.containerPort)),
tcwait.ForListeningPort(test_env.NatPort(j.wsrpcPort)),
),
LifecycleHooks: []tc.ContainerLifecycleHooks{
{
PostStarts: j.PostStartsHooks,
PostStops: j.PostStopsHooks,
},
},
}
}

func (j *Component) StartContainer() error {
return j.startOrRestartContainer(false)
}

func (j *Component) RestartContainer() error {
return j.startOrRestartContainer(true)
}

func New(networks []string, opts ...Option) *Component {
id, _ := uuid.NewRandom()
j := &Component{
EnvComponent: test_env.EnvComponent{
ContainerName: fmt.Sprintf("%s-%s", JDContainerName, id.String()[0:8]),
Networks: networks,
StartupTimeout: 2 * time.Minute,
},
containerPort: DEAFULTJDContainerPort,
wsrpcPort: DEAFULTWSRPCContainerPort,
csaKeyEncryptionKey: DEFAULTCSAKeyEncryptionKey,
l: log.Logger,
}
j.SetDefaultHooks()
for _, opt := range opts {
opt(j)
}
return j
}

func WithTestInstance(t *testing.T) Option {
return func(j *Component) {
j.l = logging.GetTestLogger(t)
j.t = t
}
}

func WithContainerPort(port string) Option {
return func(j *Component) {
j.containerPort = port
}
}

func WithWSRPCContainerPort(port string) Option {
return func(j *Component) {
j.wsrpcPort = port
}
}

func WithDBURL(db string) Option {
return func(j *Component) {
if db != "" {
j.dbConnection = db
}
}
}

func WithContainerName(name string) Option {
return func(j *Component) {
j.ContainerName = name
}
}

func WithImage(image string) Option {
return func(j *Component) {
j.ContainerImage = image
}
}

func WithVersion(version string) Option {
return func(j *Component) {
j.ContainerVersion = version
}
}

func WithCSAKeyEncryptionKey(key string) Option {
return func(j *Component) {
j.csaKeyEncryptionKey = key
}
}
54 changes: 54 additions & 0 deletions lib/docker/test_env/job_distributor/jd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package job_distributor

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/smartcontractkit/chainlink-testing-framework/lib/docker"
"github.com/smartcontractkit/chainlink-testing-framework/lib/docker/test_env"
"github.com/smartcontractkit/chainlink-testing-framework/lib/logging"
)

func TestJDSpinUp(t *testing.T) {
t.Skipf("TODO enable this when jd image is available in ci")
l := logging.GetTestLogger(t)
network, err := docker.CreateNetwork(l)
require.NoError(t, err)

// create a postgres first
pg, err := test_env.NewPostgresDb(
[]string{network.Name},
test_env.WithPostgresDbName("jd-db"),
test_env.WithPostgresImageVersion("14.1"))
require.NoError(t, err)
err = pg.StartContainer()
require.NoError(t, err)

jd := New([]string{network.Name},
//TODO: replace with actual image
WithImage("localhost:5001/jd"),
WithVersion("latest"),
WithDBURL(pg.InternalURL.String()),
)

err = jd.StartContainer()
require.NoError(t, err)
// create a jd connection
_, err = newConnection(jd.Grpc)
require.NoError(t, err)
}

func newConnection(target string) (*grpc.ClientConn, error) {
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(target, opts...)
if err != nil {
return nil, fmt.Errorf("Failed to connect to service at %s. Err: %w", target, err)
}

return conn, nil
}
4 changes: 2 additions & 2 deletions lib/docker/test_env/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ func (pg *PostgresDb) startOrRestartContainer(withReuse bool) error {
internalUrl, err := url.Parse(fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable",
pg.User, pg.Password, pg.ContainerName, "5432", pg.DbName))
if err != nil {
return fmt.Errorf("error parsing mercury db internal url: %w", err)
return errors.Wrap(err, "error parsing db internal url")
}
pg.InternalURL = internalUrl
externalUrl, err := url.Parse(fmt.Sprintf("postgres://%s:%s@127.0.0.1:%s/%s?sslmode=disable",
pg.User, pg.Password, externalPort.Port(), pg.DbName))
if err != nil {
return fmt.Errorf("error parsing mercury db external url: %w", err)
return errors.Wrap(err, "error parsing db external url")
}
pg.ExternalURL = externalUrl

Expand Down
2 changes: 1 addition & 1 deletion lib/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ require (
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/grpc v1.65.0 // indirect
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion wasp/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (m *Profile) printDashboardLink() {
if err != nil {
log.Warn().Msgf("could not get dashboard link: %s", err)
} else {
log.Info().Msgf("Dashboard URL: " + url)
log.Info().Msgf("Dashboard URL: %s", url)
}
}

Expand Down

0 comments on commit d7929ca

Please sign in to comment.