diff --git a/go/network-tunnel/gcloud_sql_proxy.go b/go/network-tunnel/gcloud_sql_proxy.go new file mode 100644 index 0000000000..b9d996a2de --- /dev/null +++ b/go/network-tunnel/gcloud_sql_proxy.go @@ -0,0 +1,152 @@ +package network_tunnel + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "syscall" + "time" + + log "github.com/sirupsen/logrus" +) + +type CloudSQLProxyConfig struct { + InstanceConnectionName string + Credentials []byte + Port string +} + +type CloudSQLProxy struct { + Config *CloudSQLProxyConfig + ctx context.Context + Cmd *exec.Cmd + tmpFileName string + cancel context.CancelFunc +} + +func (c *CloudSQLProxyConfig) CreateTunnel() *CloudSQLProxy { + var ctx, cancel = context.WithCancel(context.Background()) + return &CloudSQLProxy{ + Config: c, + ctx: ctx, + cancel: cancel, + tmpFileName: "", + Cmd: nil, + } +} + +// Start tunnel and wait until READY signal +func (t *CloudSQLProxy) Start() error { + if t.Cmd != nil { + return errors.New("This tunnel has already been started.") + } + + var tmpKeyFile, err = os.CreateTemp("", "flow-cloud-sql-proxy") + if err != nil { + return fmt.Errorf("creating temporary file for credentials file: %w", err) + } + t.tmpFileName = tmpKeyFile.Name() + + if _, err := tmpKeyFile.Write(t.Config.Credentials); err != nil { + return fmt.Errorf("writing credentials to temporary file: %w", err) + } + + log.WithFields(log.Fields{ + "instance-connection-name": t.Config.InstanceConnectionName, + "port": t.Config.Port, + }).Info("starting cloud-sql-proxy") + + logLevel := log.GetLevel().String() + if logLevel == "warning" { + // Adapt the logrus level to a compatible `env_filter` level of the + // cloud-sql-proxy `tracing_subscriber`. + logLevel = "warn" + } + + t.Cmd = exec.CommandContext( + t.ctx, + "cloud-sql-proxy", + "--port", t.Config.Port, + "--credentials-file", tmpKeyFile.Name(), + "--address", "127.0.0.1", + "--auto-iam-authn", + // Enables health check endpoints which we use to confirm the proxy is up + "--health-check", + t.Config.InstanceConnectionName, + ) + // Assign process gid to this process and its children + // so we can kill them together in the end + t.Cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + stdout, err := t.Cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("could not get stdout pipe for cloud-sql-proxy: %w", err) + } + + stderr, err := t.Cmd.StderrPipe() + if err != nil { + return fmt.Errorf("could not get stderr pipe for cloud-sql-proxy: %w", err) + } + go func() { + _, err = io.Copy(os.Stderr, stderr) + if err != nil { + log.WithField("error", err).Info("error copying stderr of proxy") + } + }() + + go func() { + _, err = io.Copy(os.Stderr, stdout) + if err != nil { + log.WithField("error", err).Info("error copying stdout of proxy") + } + }() + + if err := t.Cmd.Start(); err != nil { + return fmt.Errorf("starting cloud-sql-proxy: %w", err) + } + + var startupTimeout = 10 * time.Second + var start = time.Now() + for { + resp, err := http.Get("http://localhost:9090/startup") + if err != nil { + if time.Since(start) > startupTimeout { + return fmt.Errorf("cloud-sql-proxy startup timeout: %w", err) + } + continue + } + defer resp.Body.Close() + + var okBuf = make([]byte, 2) + var ok = []byte("ok") + if _, err = io.ReadFull(resp.Body, okBuf); err != nil { + return fmt.Errorf("cloud-sql-proxy startup reading output: %w", err) + } else if !bytes.Equal(okBuf, ok) { + return fmt.Errorf("cloud-sql-proxy returned %v instead of ok", okBuf) + } + + break + } + + log.Info("cloud-sql-proxy ready") + + return nil +} + +func (t *CloudSQLProxy) Stop() { + if t.Cmd != nil { + // Using the negative pid signals kill to a process group. + // This ensures the children of the process are also killed + syscall.Kill(-t.Cmd.Process.Pid, syscall.SIGKILL) + } + t.cancel() + // cleanup key file + if t.tmpFileName != "" { + os.Remove(t.tmpFileName) + } +} diff --git a/go/network-tunnel/network_tunnel.go b/go/network-tunnel/ssh_tunnel.go similarity index 100% rename from go/network-tunnel/network_tunnel.go rename to go/network-tunnel/ssh_tunnel.go diff --git a/materialize-mysql/.snapshots/TestSpecification b/materialize-mysql/.snapshots/TestSpecification index c9d37f820a..ba25e5096a 100644 --- a/materialize-mysql/.snapshots/TestSpecification +++ b/materialize-mysql/.snapshots/TestSpecification @@ -165,6 +165,29 @@ "privateKey" ], "title": "SSH Forwarding" + }, + "cloud_sql_proxy": { + "properties": { + "instance_connection_name": { + "type": "string", + "title": "Instance Connection Name", + "description": "In the format of PROJECT:REGION:INSTANCE." + }, + "credentials": { + "type": "string", + "title": "Service Account Credentials", + "description": "Credentials JSON file of a service account with access to the instance.", + "multiline": true, + "secret": true + } + }, + "additionalProperties": false, + "type": "object", + "required": [ + "instance_connection_name", + "credentials" + ], + "title": "Cloud SQL Proxy" } }, "additionalProperties": false, @@ -177,7 +200,6 @@ "required": [ "address", "user", - "password", "database" ], "title": "SQL Connection" diff --git a/materialize-mysql/Dockerfile b/materialize-mysql/Dockerfile index 0ee4199c0d..51f60ae5d3 100644 --- a/materialize-mysql/Dockerfile +++ b/materialize-mysql/Dockerfile @@ -33,6 +33,7 @@ WORKDIR /connector ENV PATH="/connector:$PATH" COPY --from=builder /builder/connector ./materialize-mysql +COPY --from=gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.14.0 /cloud-sql-proxy /usr/bin/cloud-sql-proxy COPY --from=ghcr.io/estuary/network-tunnel:dev /flow-network-tunnel /usr/bin/flow-network-tunnel # Avoid running the connector as root. diff --git a/materialize-mysql/config_test.go b/materialize-mysql/config_test.go index 19cc37ee55..136f968b3f 100644 --- a/materialize-mysql/config_test.go +++ b/materialize-mysql/config_test.go @@ -36,10 +36,6 @@ func TestMySQLConfig(t *testing.T) { noUser.User = "" require.Error(t, noUser.Validate(), "expected validation error") - var noPass = validConfig - noPass.Password = "" - require.Error(t, noPass.Validate(), "expected validation error") - var noDatabase = validConfig noDatabase.Database = "" require.Error(t, noDatabase.Validate(), "expected validation error") diff --git a/materialize-mysql/driver.go b/materialize-mysql/driver.go index 5208c4293f..6752ffcde9 100644 --- a/materialize-mysql/driver.go +++ b/materialize-mysql/driver.go @@ -32,15 +32,21 @@ type sshForwarding struct { PrivateKey string `json:"privateKey" jsonschema:"title=SSH Private Key,description=Private key to connect to the remote SSH server." jsonschema_extras:"secret=true,multiline=true"` } +type cloudSQLProxy struct { + InstanceConnectionName string `json:"instance_connection_name" jsonschema:"title=Instance Connection Name,description=In the format of PROJECT:REGION:INSTANCE."` + Credentials string `json:"credentials" jsonschema:"title=Service Account Credentials,description=Credentials JSON file of a service account with access to the instance." jsonschema_extras:"secret=true,multiline=true"` +} + type tunnelConfig struct { SshForwarding *sshForwarding `json:"sshForwarding,omitempty" jsonschema:"title=SSH Forwarding"` + CloudSQLProxy *cloudSQLProxy `json:"cloud_sql_proxy,omitempty" jsonschema:"title=Cloud SQL Proxy"` } // config represents the endpoint configuration for mysql. type config struct { Address string `json:"address" jsonschema:"title=Address,description=Host and port of the database (in the form of host[:port]). Port 3306 is used as the default if no specific port is provided." jsonschema_extras:"order=0"` User string `json:"user" jsonschema:"title=User,description=Database user to connect as." jsonschema_extras:"order=1"` - Password string `json:"password" jsonschema:"title=Password,description=Password for the specified database user." jsonschema_extras:"secret=true,order=2"` + Password string `json:"password,omitempty" jsonschema:"title=Password,description=Password for the specified database user." jsonschema_extras:"secret=true,order=2"` Database string `json:"database" jsonschema:"title=Database,description=Name of the logical database to materialize to." jsonschema_extras:"order=3"` Timezone string `json:"timezone,omitempty" jsonschema:"title=Timezone,description=Timezone to use when materializing datetime columns. Should normally be left blank to use the database's 'time_zone' system variable. Only required if the 'time_zone' system variable cannot be read. Must be a valid IANA time zone name or +HH:MM offset. Takes precedence over the 'time_zone' system variable if both are set." jsonschema_extras:"order=4"` HardDelete bool `json:"hardDelete,omitempty" jsonschema:"title=Hard Delete,description=If this option is enabled items deleted in the source will also be deleted from the destination. By default is disabled and _meta/op in the destination will signify whether rows have been deleted (soft-delete).,default=false" jsonschema_extras:"order=5"` @@ -65,7 +71,6 @@ func (c *config) Validate() error { var requiredProperties = [][]string{ {"address", c.Address}, {"user", c.User}, - {"password", c.Password}, {"database", c.Database}, } for _, req := range requiredProperties { @@ -160,7 +165,7 @@ func (c *config) ToURI() string { var address = c.Address // If SSH Tunnel is configured, we are going to create a tunnel from localhost:3306 // to address through the bastion server, so we use the tunnel's address - if c.NetworkTunnel != nil && c.NetworkTunnel.SshForwarding != nil && c.NetworkTunnel.SshForwarding.SshEndpoint != "" { + if c.NetworkTunnel != nil && ((c.NetworkTunnel.SshForwarding != nil && c.NetworkTunnel.SshForwarding.SshEndpoint != "") || (c.NetworkTunnel.CloudSQLProxy != nil && c.NetworkTunnel.CloudSQLProxy.InstanceConnectionName != "")) { address = "localhost:3306" } @@ -242,6 +247,19 @@ func newMysqlDriver() *sql.Driver { } var tunnel = sshConfig.CreateTunnel() + // FIXME/question: do we need to shut down the tunnel manually if it is a child process? + // at the moment tunnel.Stop is not being called anywhere, but if the connector shuts down, the child process also shuts down. + if err := tunnel.Start(); err != nil { + return err + } + } else if cfg.NetworkTunnel != nil && cfg.NetworkTunnel.CloudSQLProxy != nil && cfg.NetworkTunnel.CloudSQLProxy.InstanceConnectionName != "" { + var sshConfig = &networkTunnel.CloudSQLProxyConfig{ + InstanceConnectionName: cfg.NetworkTunnel.CloudSQLProxy.InstanceConnectionName, + Credentials: []byte(cfg.NetworkTunnel.CloudSQLProxy.Credentials), + Port: "3306", + } + var tunnel = sshConfig.CreateTunnel() + // FIXME/question: do we need to shut down the tunnel manually if it is a child process? // at the moment tunnel.Stop is not being called anywhere, but if the connector shuts down, the child process also shuts down. if err := tunnel.Start(); err != nil { diff --git a/materialize-postgres/.snapshots/TestSpecification b/materialize-postgres/.snapshots/TestSpecification index 697667f60e..5bae4eec00 100644 --- a/materialize-postgres/.snapshots/TestSpecification +++ b/materialize-postgres/.snapshots/TestSpecification @@ -146,6 +146,29 @@ "privateKey" ], "title": "SSH Forwarding" + }, + "cloud_sql_proxy": { + "properties": { + "instance_connection_name": { + "type": "string", + "title": "Instance Connection Name", + "description": "In the format of PROJECT:REGION:INSTANCE." + }, + "credentials": { + "type": "string", + "title": "Service Account Credentials", + "description": "Credentials JSON file of a service account with access to the instance.", + "multiline": true, + "secret": true + } + }, + "additionalProperties": false, + "type": "object", + "required": [ + "instance_connection_name", + "credentials" + ], + "title": "Cloud SQL Proxy" } }, "additionalProperties": false, diff --git a/materialize-postgres/Dockerfile b/materialize-postgres/Dockerfile index b22a291010..f1bdd0f98c 100644 --- a/materialize-postgres/Dockerfile +++ b/materialize-postgres/Dockerfile @@ -33,6 +33,7 @@ WORKDIR /connector ENV PATH="/connector:$PATH" COPY --from=builder /builder/connector ./materialize-postgres +COPY --from=gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.14.0 /cloud-sql-proxy /usr/bin/cloud-sql-proxy COPY --from=ghcr.io/estuary/network-tunnel:dev /flow-network-tunnel /usr/bin/flow-network-tunnel # Avoid running the connector as root. diff --git a/materialize-postgres/driver.go b/materialize-postgres/driver.go index da1f691a14..0db2ac90de 100644 --- a/materialize-postgres/driver.go +++ b/materialize-postgres/driver.go @@ -44,8 +44,14 @@ type sshForwarding struct { PrivateKey string `json:"privateKey" jsonschema:"title=SSH Private Key,description=Private key to connect to the remote SSH server." jsonschema_extras:"secret=true,multiline=true"` } +type cloudSQLProxy struct { + InstanceConnectionName string `json:"instance_connection_name" jsonschema:"title=Instance Connection Name,description=In the format of PROJECT:REGION:INSTANCE."` + Credentials string `json:"credentials" jsonschema:"title=Service Account Credentials,description=Credentials JSON file of a service account with access to the instance." jsonschema_extras:"secret=true,multiline=true"` +} + type tunnelConfig struct { SshForwarding *sshForwarding `json:"sshForwarding,omitempty" jsonschema:"title=SSH Forwarding"` + CloudSQLProxy *cloudSQLProxy `json:"cloud_sql_proxy,omitempty" jsonschema:"title=Cloud SQL Proxy"` } // config represents the endpoint configuration for postgres. @@ -200,6 +206,19 @@ func newPostgresDriver() *sql.Driver { } var tunnel = sshConfig.CreateTunnel() + // FIXME/question: do we need to shut down the tunnel manually if it is a child process? + // at the moment tunnel.Stop is not being called anywhere, but if the connector shuts down, the child process also shuts down. + if err := tunnel.Start(); err != nil { + return err + } + } else if cfg.NetworkTunnel != nil && cfg.NetworkTunnel.CloudSQLProxy != nil && cfg.NetworkTunnel.CloudSQLProxy.InstanceConnectionName != "" { + var sshConfig = &networkTunnel.CloudSQLProxyConfig{ + InstanceConnectionName: cfg.NetworkTunnel.CloudSQLProxy.InstanceConnectionName, + Credentials: []byte(cfg.NetworkTunnel.CloudSQLProxy.Credentials), + Port: "5432", + } + var tunnel = sshConfig.CreateTunnel() + // FIXME/question: do we need to shut down the tunnel manually if it is a child process? // at the moment tunnel.Stop is not being called anywhere, but if the connector shuts down, the child process also shuts down. if err := tunnel.Start(); err != nil { diff --git a/materialize-sqlserver/Dockerfile b/materialize-sqlserver/Dockerfile index 666ae5569d..9659713717 100644 --- a/materialize-sqlserver/Dockerfile +++ b/materialize-sqlserver/Dockerfile @@ -40,6 +40,7 @@ WORKDIR /connector ENV PATH="/connector:$PATH" COPY --from=builder /builder/connector ./materialize-sqlserver +COPY --from=gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.14.0 /cloud-sql-proxy /usr/bin/cloud-sql-proxy COPY --from=ghcr.io/estuary/network-tunnel:dev /flow-network-tunnel /usr/bin/flow-network-tunnel # Avoid running the connector as root. diff --git a/source-mysql/Dockerfile b/source-mysql/Dockerfile index 3572b96a5f..001b1e3250 100644 --- a/source-mysql/Dockerfile +++ b/source-mysql/Dockerfile @@ -37,6 +37,7 @@ COPY --from=busybox:latest /bin/sh /bin/sh # Bring in the compiled connector artifact from the builder. COPY --from=ghcr.io/estuary/network-tunnel:dev /flow-network-tunnel /usr/bin/flow-network-tunnel +COPY --from=gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.14.0 /cloud-sql-proxy /usr/bin/cloud-sql-proxy COPY --from=builder /builder/connector ./source-mysql # Avoid running the connector as root. diff --git a/source-postgres/Dockerfile b/source-postgres/Dockerfile index 51fdfb9019..a274d76a4b 100644 --- a/source-postgres/Dockerfile +++ b/source-postgres/Dockerfile @@ -43,6 +43,7 @@ COPY --from=busybox:latest /bin/sh /bin/sh # Bring in the compiled connector artifacts from the builder. COPY --from=builder /builder/connector ./source-postgres COPY --from=builder /lib/x86_64-linux-gnu/libgcc_s.so.1 /lib/x86_64-linux-gnu/ +COPY --from=gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.14.0 /cloud-sql-proxy /usr/bin/cloud-sql-proxy COPY --from=ghcr.io/estuary/network-tunnel:dev /flow-network-tunnel /usr/bin/flow-network-tunnel # Avoid running the connector as root. diff --git a/source-sqlserver/Dockerfile b/source-sqlserver/Dockerfile index 5d69492f88..b3ece9b311 100644 --- a/source-sqlserver/Dockerfile +++ b/source-sqlserver/Dockerfile @@ -38,6 +38,7 @@ COPY --from=busybox:latest /bin/sh /bin/sh # Bring in the compiled connector artifacts from the builder. COPY --from=builder /builder/connector ./source-sqlserver COPY --from=builder /lib/x86_64-linux-gnu/libgcc_s.so.1 /lib/x86_64-linux-gnu/ +COPY --from=gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.14.0 /cloud-sql-proxy /usr/bin/cloud-sql-proxy COPY --from=ghcr.io/estuary/network-tunnel:dev /flow-network-tunnel /usr/bin/flow-network-tunnel # Avoid running the connector as root.