Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network-tunnel: add CloudSQLProxy as a variant of network tunnel #2119

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions go/network-tunnel/gcloud_sql_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package network_tunnel

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"syscall"
"time"

log "github.com/sirupsen/logrus"
)

type CloudSQLProxyConfig struct {
InstanceConnectionName string
Credentials []byte
Port string
}

type CloudSQLProxy struct {
Config *CloudSQLProxyConfig
ctx context.Context
Cmd *exec.Cmd
tmpFileName string
cancel context.CancelFunc
}

func (c *CloudSQLProxyConfig) CreateTunnel() *CloudSQLProxy {
var ctx, cancel = context.WithCancel(context.Background())
return &CloudSQLProxy{
Config: c,
ctx: ctx,
cancel: cancel,
tmpFileName: "",
Cmd: nil,
}
}

// Start tunnel and wait until READY signal
func (t *CloudSQLProxy) Start() error {
if t.Cmd != nil {
return errors.New("This tunnel has already been started.")
}

var tmpKeyFile, err = os.CreateTemp("", "flow-cloud-sql-proxy")
if err != nil {
return fmt.Errorf("creating temporary file for credentials file: %w", err)
}
t.tmpFileName = tmpKeyFile.Name()

if _, err := tmpKeyFile.Write(t.Config.Credentials); err != nil {
return fmt.Errorf("writing credentials to temporary file: %w", err)
}

log.WithFields(log.Fields{
"instance-connection-name": t.Config.InstanceConnectionName,
"port": t.Config.Port,
}).Info("starting cloud-sql-proxy")

logLevel := log.GetLevel().String()
if logLevel == "warning" {
// Adapt the logrus level to a compatible `env_filter` level of the
// cloud-sql-proxy `tracing_subscriber`.
logLevel = "warn"
}

t.Cmd = exec.CommandContext(
t.ctx,
"cloud-sql-proxy",
"--port", t.Config.Port,
"--credentials-file", tmpKeyFile.Name(),
"--address", "127.0.0.1",
"--auto-iam-authn",
// Enables health check endpoints which we use to confirm the proxy is up
"--health-check",
t.Config.InstanceConnectionName,
)
// Assign process gid to this process and its children
// so we can kill them together in the end
t.Cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

stdout, err := t.Cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("could not get stdout pipe for cloud-sql-proxy: %w", err)
}

stderr, err := t.Cmd.StderrPipe()
if err != nil {
return fmt.Errorf("could not get stderr pipe for cloud-sql-proxy: %w", err)
}
go func() {
_, err = io.Copy(os.Stderr, stderr)
if err != nil {
log.WithField("error", err).Info("error copying stderr of proxy")
}
}()

go func() {
_, err = io.Copy(os.Stderr, stdout)
if err != nil {
log.WithField("error", err).Info("error copying stdout of proxy")
}
}()

if err := t.Cmd.Start(); err != nil {
return fmt.Errorf("starting cloud-sql-proxy: %w", err)
}

var startupTimeout = 10 * time.Second
var start = time.Now()
for {
resp, err := http.Get("http://localhost:9090/startup")
if err != nil {
if time.Since(start) > startupTimeout {
return fmt.Errorf("cloud-sql-proxy startup timeout: %w", err)
}
continue
}
defer resp.Body.Close()

var okBuf = make([]byte, 2)
var ok = []byte("ok")
if _, err = io.ReadFull(resp.Body, okBuf); err != nil {
return fmt.Errorf("cloud-sql-proxy startup reading output: %w", err)
} else if !bytes.Equal(okBuf, ok) {
return fmt.Errorf("cloud-sql-proxy returned %v instead of ok", okBuf)
}

break
}

log.Info("cloud-sql-proxy ready")

return nil
}

func (t *CloudSQLProxy) Stop() {
if t.Cmd != nil {
// Using the negative pid signals kill to a process group.
// This ensures the children of the process are also killed
syscall.Kill(-t.Cmd.Process.Pid, syscall.SIGKILL)
}
t.cancel()
// cleanup key file
if t.tmpFileName != "" {
os.Remove(t.tmpFileName)
}
}
File renamed without changes.
24 changes: 23 additions & 1 deletion materialize-mysql/.snapshots/TestSpecification
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,29 @@
"privateKey"
],
"title": "SSH Forwarding"
},
"cloud_sql_proxy": {
"properties": {
"instance_connection_name": {
"type": "string",
"title": "Instance Connection Name",
"description": "In the format of PROJECT:REGION:INSTANCE."
},
"credentials": {
"type": "string",
"title": "Service Account Credentials",
"description": "Credentials JSON file of a service account with access to the instance.",
"multiline": true,
"secret": true
}
},
"additionalProperties": false,
"type": "object",
"required": [
"instance_connection_name",
"credentials"
],
"title": "Cloud SQL Proxy"
}
},
"additionalProperties": false,
Expand All @@ -177,7 +200,6 @@
"required": [
"address",
"user",
"password",
"database"
],
"title": "SQL Connection"
Expand Down
1 change: 1 addition & 0 deletions materialize-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ WORKDIR /connector
ENV PATH="/connector:$PATH"

COPY --from=builder /builder/connector ./materialize-mysql
COPY --from=gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.14.0 /cloud-sql-proxy /usr/bin/cloud-sql-proxy
COPY --from=ghcr.io/estuary/network-tunnel:dev /flow-network-tunnel /usr/bin/flow-network-tunnel

# Avoid running the connector as root.
Expand Down
4 changes: 0 additions & 4 deletions materialize-mysql/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ func TestMySQLConfig(t *testing.T) {
noUser.User = ""
require.Error(t, noUser.Validate(), "expected validation error")

var noPass = validConfig
noPass.Password = ""
require.Error(t, noPass.Validate(), "expected validation error")

var noDatabase = validConfig
noDatabase.Database = ""
require.Error(t, noDatabase.Validate(), "expected validation error")
Expand Down
24 changes: 21 additions & 3 deletions materialize-mysql/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,21 @@ type sshForwarding struct {
PrivateKey string `json:"privateKey" jsonschema:"title=SSH Private Key,description=Private key to connect to the remote SSH server." jsonschema_extras:"secret=true,multiline=true"`
}

type cloudSQLProxy struct {
InstanceConnectionName string `json:"instance_connection_name" jsonschema:"title=Instance Connection Name,description=In the format of PROJECT:REGION:INSTANCE."`
Credentials string `json:"credentials" jsonschema:"title=Service Account Credentials,description=Credentials JSON file of a service account with access to the instance." jsonschema_extras:"secret=true,multiline=true"`
}

type tunnelConfig struct {
SshForwarding *sshForwarding `json:"sshForwarding,omitempty" jsonschema:"title=SSH Forwarding"`
CloudSQLProxy *cloudSQLProxy `json:"cloud_sql_proxy,omitempty" jsonschema:"title=Cloud SQL Proxy"`
}

// config represents the endpoint configuration for mysql.
type config struct {
Address string `json:"address" jsonschema:"title=Address,description=Host and port of the database (in the form of host[:port]). Port 3306 is used as the default if no specific port is provided." jsonschema_extras:"order=0"`
User string `json:"user" jsonschema:"title=User,description=Database user to connect as." jsonschema_extras:"order=1"`
Password string `json:"password" jsonschema:"title=Password,description=Password for the specified database user." jsonschema_extras:"secret=true,order=2"`
Password string `json:"password,omitempty" jsonschema:"title=Password,description=Password for the specified database user." jsonschema_extras:"secret=true,order=2"`
Database string `json:"database" jsonschema:"title=Database,description=Name of the logical database to materialize to." jsonschema_extras:"order=3"`
Timezone string `json:"timezone,omitempty" jsonschema:"title=Timezone,description=Timezone to use when materializing datetime columns. Should normally be left blank to use the database's 'time_zone' system variable. Only required if the 'time_zone' system variable cannot be read. Must be a valid IANA time zone name or +HH:MM offset. Takes precedence over the 'time_zone' system variable if both are set." jsonschema_extras:"order=4"`
HardDelete bool `json:"hardDelete,omitempty" jsonschema:"title=Hard Delete,description=If this option is enabled items deleted in the source will also be deleted from the destination. By default is disabled and _meta/op in the destination will signify whether rows have been deleted (soft-delete).,default=false" jsonschema_extras:"order=5"`
Expand All @@ -65,7 +71,6 @@ func (c *config) Validate() error {
var requiredProperties = [][]string{
{"address", c.Address},
{"user", c.User},
{"password", c.Password},
{"database", c.Database},
}
for _, req := range requiredProperties {
Expand Down Expand Up @@ -160,7 +165,7 @@ func (c *config) ToURI() string {
var address = c.Address
// If SSH Tunnel is configured, we are going to create a tunnel from localhost:3306
// to address through the bastion server, so we use the tunnel's address
if c.NetworkTunnel != nil && c.NetworkTunnel.SshForwarding != nil && c.NetworkTunnel.SshForwarding.SshEndpoint != "" {
if c.NetworkTunnel != nil && ((c.NetworkTunnel.SshForwarding != nil && c.NetworkTunnel.SshForwarding.SshEndpoint != "") || (c.NetworkTunnel.CloudSQLProxy != nil && c.NetworkTunnel.CloudSQLProxy.InstanceConnectionName != "")) {
address = "localhost:3306"
}

Expand Down Expand Up @@ -242,6 +247,19 @@ func newMysqlDriver() *sql.Driver {
}
var tunnel = sshConfig.CreateTunnel()

// FIXME/question: do we need to shut down the tunnel manually if it is a child process?
// at the moment tunnel.Stop is not being called anywhere, but if the connector shuts down, the child process also shuts down.
if err := tunnel.Start(); err != nil {
return err
}
} else if cfg.NetworkTunnel != nil && cfg.NetworkTunnel.CloudSQLProxy != nil && cfg.NetworkTunnel.CloudSQLProxy.InstanceConnectionName != "" {
var sshConfig = &networkTunnel.CloudSQLProxyConfig{
InstanceConnectionName: cfg.NetworkTunnel.CloudSQLProxy.InstanceConnectionName,
Credentials: []byte(cfg.NetworkTunnel.CloudSQLProxy.Credentials),
Port: "3306",
}
var tunnel = sshConfig.CreateTunnel()

// FIXME/question: do we need to shut down the tunnel manually if it is a child process?
// at the moment tunnel.Stop is not being called anywhere, but if the connector shuts down, the child process also shuts down.
if err := tunnel.Start(); err != nil {
Expand Down
23 changes: 23 additions & 0 deletions materialize-postgres/.snapshots/TestSpecification
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,29 @@
"privateKey"
],
"title": "SSH Forwarding"
},
"cloud_sql_proxy": {
"properties": {
"instance_connection_name": {
"type": "string",
"title": "Instance Connection Name",
"description": "In the format of PROJECT:REGION:INSTANCE."
},
"credentials": {
"type": "string",
"title": "Service Account Credentials",
"description": "Credentials JSON file of a service account with access to the instance.",
"multiline": true,
"secret": true
}
},
"additionalProperties": false,
"type": "object",
"required": [
"instance_connection_name",
"credentials"
],
"title": "Cloud SQL Proxy"
}
},
"additionalProperties": false,
Expand Down
1 change: 1 addition & 0 deletions materialize-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ WORKDIR /connector
ENV PATH="/connector:$PATH"

COPY --from=builder /builder/connector ./materialize-postgres
COPY --from=gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.14.0 /cloud-sql-proxy /usr/bin/cloud-sql-proxy
COPY --from=ghcr.io/estuary/network-tunnel:dev /flow-network-tunnel /usr/bin/flow-network-tunnel

# Avoid running the connector as root.
Expand Down
19 changes: 19 additions & 0 deletions materialize-postgres/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ type sshForwarding struct {
PrivateKey string `json:"privateKey" jsonschema:"title=SSH Private Key,description=Private key to connect to the remote SSH server." jsonschema_extras:"secret=true,multiline=true"`
}

type cloudSQLProxy struct {
InstanceConnectionName string `json:"instance_connection_name" jsonschema:"title=Instance Connection Name,description=In the format of PROJECT:REGION:INSTANCE."`
Credentials string `json:"credentials" jsonschema:"title=Service Account Credentials,description=Credentials JSON file of a service account with access to the instance." jsonschema_extras:"secret=true,multiline=true"`
}

type tunnelConfig struct {
SshForwarding *sshForwarding `json:"sshForwarding,omitempty" jsonschema:"title=SSH Forwarding"`
CloudSQLProxy *cloudSQLProxy `json:"cloud_sql_proxy,omitempty" jsonschema:"title=Cloud SQL Proxy"`
}

// config represents the endpoint configuration for postgres.
Expand Down Expand Up @@ -200,6 +206,19 @@ func newPostgresDriver() *sql.Driver {
}
var tunnel = sshConfig.CreateTunnel()

// FIXME/question: do we need to shut down the tunnel manually if it is a child process?
// at the moment tunnel.Stop is not being called anywhere, but if the connector shuts down, the child process also shuts down.
if err := tunnel.Start(); err != nil {
return err
}
} else if cfg.NetworkTunnel != nil && cfg.NetworkTunnel.CloudSQLProxy != nil && cfg.NetworkTunnel.CloudSQLProxy.InstanceConnectionName != "" {
var sshConfig = &networkTunnel.CloudSQLProxyConfig{
InstanceConnectionName: cfg.NetworkTunnel.CloudSQLProxy.InstanceConnectionName,
Credentials: []byte(cfg.NetworkTunnel.CloudSQLProxy.Credentials),
Port: "5432",
}
var tunnel = sshConfig.CreateTunnel()

// FIXME/question: do we need to shut down the tunnel manually if it is a child process?
// at the moment tunnel.Stop is not being called anywhere, but if the connector shuts down, the child process also shuts down.
if err := tunnel.Start(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions materialize-sqlserver/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ WORKDIR /connector
ENV PATH="/connector:$PATH"

COPY --from=builder /builder/connector ./materialize-sqlserver
COPY --from=gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.14.0 /cloud-sql-proxy /usr/bin/cloud-sql-proxy
COPY --from=ghcr.io/estuary/network-tunnel:dev /flow-network-tunnel /usr/bin/flow-network-tunnel

# Avoid running the connector as root.
Expand Down
1 change: 1 addition & 0 deletions source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ COPY --from=busybox:latest /bin/sh /bin/sh

# Bring in the compiled connector artifact from the builder.
COPY --from=ghcr.io/estuary/network-tunnel:dev /flow-network-tunnel /usr/bin/flow-network-tunnel
COPY --from=gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.14.0 /cloud-sql-proxy /usr/bin/cloud-sql-proxy
COPY --from=builder /builder/connector ./source-mysql

# Avoid running the connector as root.
Expand Down
1 change: 1 addition & 0 deletions source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ COPY --from=busybox:latest /bin/sh /bin/sh
# Bring in the compiled connector artifacts from the builder.
COPY --from=builder /builder/connector ./source-postgres
COPY --from=builder /lib/x86_64-linux-gnu/libgcc_s.so.1 /lib/x86_64-linux-gnu/
COPY --from=gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.14.0 /cloud-sql-proxy /usr/bin/cloud-sql-proxy
COPY --from=ghcr.io/estuary/network-tunnel:dev /flow-network-tunnel /usr/bin/flow-network-tunnel

# Avoid running the connector as root.
Expand Down
1 change: 1 addition & 0 deletions source-sqlserver/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ COPY --from=busybox:latest /bin/sh /bin/sh
# Bring in the compiled connector artifacts from the builder.
COPY --from=builder /builder/connector ./source-sqlserver
COPY --from=builder /lib/x86_64-linux-gnu/libgcc_s.so.1 /lib/x86_64-linux-gnu/
COPY --from=gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.14.0 /cloud-sql-proxy /usr/bin/cloud-sql-proxy
COPY --from=ghcr.io/estuary/network-tunnel:dev /flow-network-tunnel /usr/bin/flow-network-tunnel

# Avoid running the connector as root.
Expand Down
Loading