Skip to content

Commit

Permalink
Merge pull request #5 from Clever/deip-615-add-snowflake
Browse files Browse the repository at this point in the history
Add snowflake client and configs
  • Loading branch information
jansenclever authored Nov 1, 2021
2 parents 1959984 + 0d49c1f commit 9e260c4
Show file tree
Hide file tree
Showing 20 changed files with 452 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ executors:
common-executor:
working_directory: /go/src/github.com/Clever/analytics-latency-config-service
docker:
- image: circleci/golang:1.13-stretch-node
- image: circleci/golang:1.16-stretch-node
- image: circleci/postgres:9.4-alpine-ram
- image: circleci/mongo:3.2.20-jessie-ram
environment:
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ PKGS := $(shell go list ./... | grep -v /vendor | grep -v /gen-go | grep -v /too
# Temporarily pin to wag 6.4.5 until after migrated to go mod and Go 1.16
WAG_VERSION := v6.4.5

$(eval $(call golang-version-check,1.13))
$(eval $(call golang-version-check,1.16))

export POSTGRES_USER?=postgres
export POSTGRES_HOST?=localhost
Expand Down Expand Up @@ -41,5 +41,5 @@ generate: wag-generate-deps

install_deps:
go mod vendor
go build -o bin/mockgen ./vendor/github.com/golang/mock/mockgen
go build -o bin/go-bindata ./vendor/github.com/kevinburke/go-bindata/go-bindata
go build -o bin/mockgen -mod=vendor github.com/golang/mock/mockgen
go build -o bin/go-bindata -mod=vendor github.com/kevinburke/go-bindata/go-bindata
17 changes: 17 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ var (
RDSExternalUsername string
RDSExternalPassword string

// We have one Snowflake instance
SnowflakeUsername string
SnowflakePassword string
SnowflakeAccount string
SnowflakeDatabase string
SnowflakeWarehouse string
SnowflakeRole string
SnowflakeAuthenticator string

DefaultOwner string
globalDefaultLatency string
GlobalDefaultThresholds models.Thresholds
Expand Down Expand Up @@ -93,6 +102,14 @@ func InitDBs() {
RDSExternalDatabase = requiredEnv("RDS_EXTERNAL_DATABASE")
RDSExternalUsername = requiredEnv("RDS_EXTERNAL_USER")
RDSExternalPassword = requiredEnv("RDS_EXTERNAL_PASSWORD")

SnowflakeUsername = requiredEnv("SNOWFLAKE_USER")
SnowflakePassword = requiredEnv("SNOWFLAKE_PASSWORD")
SnowflakeAccount = requiredEnv("SNOWFLAKE_ACCOUNT")
SnowflakeDatabase = requiredEnv("SNOWFLAKE_DATABASE")
SnowflakeWarehouse = requiredEnv("SNOWFLAKE_WAREHOUSE")
SnowflakeRole = requiredEnv("SNOWFLAKE_ROLE")
SnowflakeAuthenticator = os.Getenv("SNOWFLAKE_AUTHENTICATOR") // for local development
}

// InitConfig reads environment latency config variables and initializes the config.
Expand Down
1 change: 1 addition & 0 deletions config/config_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func validateLatencyConfig(configs models.AnalyticsLatencyConfigs) {
models.AnalyticsDatabaseRedshiftFast,
models.AnalyticsDatabaseRdsInternal,
models.AnalyticsDatabaseRdsExternal,
models.AnalyticsDatabaseSnowflake,
}

for _, db := range dbs {
Expand Down
19 changes: 14 additions & 5 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import (

// Controller implements server.Controller
type Controller struct {
redshiftProdConnection db.PostgresClient
redshiftFastConnection db.PostgresClient
rdsExternalConnection db.PostgresClient
rdsInternalConnection db.PostgresClient
redshiftProdConnection db.DBClient
redshiftFastConnection db.DBClient
rdsExternalConnection db.DBClient
rdsInternalConnection db.DBClient
snowflakeConnection db.DBClient
configChecks models.AnalyticsLatencyConfigs
}

func (c *Controller) getDatabaseConnection(database models.AnalyticsDatabase) (db.PostgresClient, error) {
func (c *Controller) getDatabaseConnection(database models.AnalyticsDatabase) (db.DBClient, error) {
switch database {
case models.AnalyticsDatabaseRedshiftProd:
return c.redshiftProdConnection, nil
Expand All @@ -31,6 +32,8 @@ func (c *Controller) getDatabaseConnection(database models.AnalyticsDatabase) (d
return c.rdsInternalConnection, nil
case models.AnalyticsDatabaseRdsExternal:
return c.rdsExternalConnection, nil
case models.AnalyticsDatabaseSnowflake:
return c.snowflakeConnection, nil
default:
return nil, fmt.Errorf("unexpected database")
}
Expand Down Expand Up @@ -58,6 +61,11 @@ func New() (*Controller, error) {
mErrors = multierror.Append(mErrors, fmt.Errorf("rds-external-failed-init: %s", err.Error()))
}

snowflakeConnection, err := db.NewSnowflakeProdClient()
if err != nil {
mErrors = multierror.Append(mErrors, fmt.Errorf("snowflake-failed-init: %s", err.Error()))
}

configChecks := config.ParseChecks()

if v := mErrors.ErrorOrNil(); v != nil {
Expand All @@ -69,6 +77,7 @@ func New() (*Controller, error) {
redshiftFastConnection: redshiftFastConnection,
rdsExternalConnection: rdsExternalConnection,
rdsInternalConnection: rdsInternalConnection,
snowflakeConnection: snowflakeConnection,
configChecks: configChecks,
}, nil
}
Expand Down
16 changes: 16 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package db

import "database/sql"

type DBClient interface {
GetClusterName() string
GetSession() *sql.DB
QueryTableMetadata(schemaName string) (map[string]TableMetadata, error)
QueryLatencyTable(schemaName, tableName string) (int64, bool, error)
}

// TableMetadata contains information about a table in Postgres
type TableMetadata struct {
TableName string
TimestampColumn string
}
15 changes: 1 addition & 14 deletions db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,13 @@ import (
_ "github.com/Clever/pq"
)

// PostgresClient provides a default implementation of PostgresClient
// Postgres provides an implementation of DBClient
// that contains the postgres client connection.
type Postgres struct {
session *sql.DB
clusterName string
}

type PostgresClient interface {
GetClusterName() string
GetSession() *sql.DB
QueryTableMetadata(schemaName string) (map[string]TableMetadata, error)
QueryLatencyTable(schemaName, tableName string) (int64, bool, error)
}

// PostgresCredentials contains the postgres credentials/information.
type PostgresCredentials struct {
Host string
Expand Down Expand Up @@ -144,12 +137,6 @@ func (c *Postgres) QueryLatencyTable(schemaName, tableName string) (int64, bool,
return hourDiff, latency.Valid, nil
}

// TableMetadata contains information about a table in Postgres
type TableMetadata struct {
TableName string
TimestampColumn string
}

// QueryTableMetadata returns a map of tables
// belonging to a given schema in Postgres, indexed
// by table name.
Expand Down
130 changes: 130 additions & 0 deletions db/snowflake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package db

import (
"database/sql"
"fmt"
"time"

"github.com/Clever/analytics-latency-config-service/config"

_ "github.com/snowflakedb/gosnowflake" // Snowflake driver
)

// SnowflakeCredentials provides an implementation of DBClient
// that contains the postgres client connection.
type Snowflake struct {
session *sql.DB
}

// SnowflakeCredentials contains the Snowflake credentials/information.
type SnowflakeCredentials struct {
Username string
Password string
Account string
Database string
Role string
Warehouse string
Authenticator string
}

// NewSnowflakeClient creates a Snowflake db client.
func NewSnowflakeClient(info SnowflakeCredentials) (*Snowflake, error) {
connectionParams := fmt.Sprintf("%s:%s@%s/%s?role=%s&warehouse=%s",
info.Username, info.Password, info.Account, info.Database, info.Role, info.Warehouse)
// Connection for local development
if info.Authenticator != "" {
connectionParams = fmt.Sprintf("%s@%s/%s?role=%s&warehouse=%s&authenticator=%s",
info.Username, info.Account, info.Database, info.Role, info.Warehouse, info.Authenticator)
}
session, err := sql.Open("snowflake", connectionParams)

if err != nil {
return nil, err
}

return &Snowflake{session}, nil
}

// NewProdSnowflakeClient initializes a client to Snowflake
func NewSnowflakeProdClient() (*Snowflake, error) {
info := SnowflakeCredentials{
Account: config.SnowflakeAccount,
Username: config.SnowflakeUsername,
Password: config.SnowflakePassword,
Database: config.SnowflakeDatabase,
Warehouse: config.SnowflakeWarehouse,
Role: config.SnowflakeRole,
Authenticator: config.SnowflakeAuthenticator,
}

return NewSnowflakeClient(info)
}

// GetClusterName returns "snowflake" (more relevant for postgres/redshift)
func (c *Snowflake) GetClusterName() string {
return "snowflake"
}

// GetSession returns the Snowflake session
func (c *Snowflake) GetSession() *sql.DB {
return c.session
}

func (c *Snowflake) QueryLatencyTable(schemaName, tableName string) (int64, bool, error) {
check := fmt.Sprintf("'%s.%s'", schemaName, tableName)
if schemaName == "PUBLIC" {
// We don't always prepend the schema name if it's "public". Check both.
check = fmt.Sprintf("%s OR name = '%s'", check, tableName)
}

latencyQuery := fmt.Sprintf("SELECT extract(epoch from last_update) FROM latencies WHERE name = %s", check)
var latency sql.NullFloat64
err := c.GetSession().QueryRow(latencyQuery).Scan(&latency)
if err != nil {
if err == sql.ErrNoRows {
return 0, false, nil
}
return 0, false, fmt.Errorf("error scanning latency table for %s: %s", check, err)
}
if !latency.Valid {
return 0, false, nil
}

hourDiff := (time.Now().Unix() - int64(latency.Float64)) / 3600
return hourDiff, latency.Valid, nil
}

// QueryTableMetadata returns a map of tables
// belonging to a given schema in Postgres, indexed
// by table name.
// It also attempts to infer the timestamp column, by
// choosing the alphabetically highest column with a
// timestamp type. We use this as a heuristic since a
// lot of our timestamp columns are prefixed with "_".
func (c *Snowflake) QueryTableMetadata(schemaName string) (map[string]TableMetadata, error) {
query := fmt.Sprintf(`
SELECT table_name, max("column_name")
FROM information_schema.columns
WHERE table_schema ILIKE '%s'
AND data_type ILIKE '%%timestamp%%'
GROUP BY table_name
`, schemaName)

tableMetadata := make(map[string]TableMetadata)
rows, err := c.GetSession().Query(query)
if err != nil {
return nil, err
}
defer rows.Close()

for rows.Next() {
var row TableMetadata
if err := rows.Scan(&row.TableName, &row.TimestampColumn); err != nil {
return tableMetadata, fmt.Errorf("Unable to scan row for schema %s: %s", schemaName, err)
}

tableMetadata[row.TableName] = row
}

return tableMetadata, nil
}
2 changes: 1 addition & 1 deletion gen-go/client/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion gen-go/models/analytics_database.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions gen-go/models/analytics_latency_configs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion gen-js/index.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion gen-js/index.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9e260c4

Please sign in to comment.