Skip to content

Commit

Permalink
Feat: Schema Convertor Tool (#11)
Browse files Browse the repository at this point in the history
* Feat: Schema Convertor Tool

* add headers

* Add double type in GetSpannerColumnType (#12)

Add double type in GetSpannerColumnType

* feat: add support for useRowTTL and useRowTimestamp options (#9)

* feat: add support for useRowTTL and useRowTimestamp options
Fixes #<issue_number_goes_here>

> It's a good idea to open an issue first for discussion.

- [ ] Tests pass
- [ ] Appropriate changes to documentation are included in the PR

* Fix update

* Fix update path

* rebase

* update README
  • Loading branch information
mayurkale22 authored Oct 30, 2024
1 parent 3f443c6 commit 8ff7ae8
Show file tree
Hide file tree
Showing 9 changed files with 950 additions and 4 deletions.
95 changes: 95 additions & 0 deletions schema_converter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# CQL to Spanner Table Converter

This script converts Cassandra `CREATE TABLE` queries from a CQL file into Spanner DDL and applies the generated DDL on a specified Spanner database. It also creates a `TableConfigurations` table in Spanner to store metadata about each converted table.

## Features

- Converts Cassandra `CREATE TABLE` queries from a CQL file into Google Spanner `CREATE TABLE` queries.
- Executes the translated DDL on the specified Spanner database.
- Creates a `TableConfigurations` table to store details about each CQL query processed (if not already present).
- Offers an optional keyspace flattening flag for handling keyspaces in the converted schema.

## Requirements

- **Go**: Ensure that Go is installed on your system.
- **Google Cloud SDK**: Ensure `gcloud` is installed and authenticated with proper permissions.
- **Spanner Database**: You must have a Spanner instance and database ready to execute queries.

## Setup

### 1. Clone the Repository

```bash
git clone https://github.com/cloudspannerecosystem/cassandra-to-spanner-proxy.git
cd cassandra-to-spanner-proxy/schema_converter
```

### 2. Install Dependencies

Ensure that all necessary Go modules are installed:

```bash
go mod tidy
```

### 3. Set Up Google Cloud Credentials

Set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to the path of your service account key file.

```bash
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-file.json"
```

### 4. Configure Spanner Details

Make sure to pass the necessary Spanner details in the script:
- **Project ID**: Google Cloud project containing the Spanner instance.
- **Instance ID**: Spanner instance to connect to.
- **Database ID**: Spanner database where the DDL will be applied.

## Usage

### 1. Running the Script

To run the script, pass the necessary flags along with a CQL file containing the `CREATE TABLE` queries. The script will convert them into Spanner-compatible DDL and execute them on the specified Spanner database. Additionally, you can enable or disable keyspace flattening using the `--keyspaceFlatter` flag.

```bash
go run cql_to_spanner_schema_converter.go --project <PROJECT_ID> --instance <INSTANCE_ID> --database <DATABASE_ID> --cql <PATH_TO_CQL_FILE> [--keyspaceFlatter]
```

- `<PROJECT_ID>`: Google Cloud project ID.
- `<INSTANCE_ID>`: Spanner instance ID.
- `<DATABASE_ID>`: Spanner database ID.
- `<PATH_TO_CQL_FILE>`: Path to the CQL file containing `CREATE TABLE` queries.
- `[--keyspaceFlatter]`: Optional flag to enable keyspace flattening in the conversion process.
- `[--table]`: Optional flag to specify different table name for TableConfigurations.
- `[--enableUsingTimestamp]`: Optional flag to enable 'Using Timestamp' features, default is false.
- `[--enableUsingTTL]`: Optional flag to enable 'Using TTL' features, default is false.

### 2. Example Commands

#### With Keyspace Flattening

```bash
go run cql_to_spanner_schema_converter.go --project cassandra-to-spanner --instance spanner-instance-dev --database cluster10 --cql /path/to/cql-file.cql --keyspaceFlatter
```

In this mode, keyspaces from the CQL schema are flattened.

#### Without Keyspace Flattening

```bash
go run cql_to_spanner_schema_converter.go --project cassandra-to-spanner --instance spanner-instance-dev --database cluster10 --cql /path/to/cql-file.cql
```

In this mode, keyspaces are preserved in the schema.

### 3. Important Notes

- The script **does not drop** tables before creating them. Ensure the schema is adjusted accordingly if needed.
- The `TableConfigurations` table is created to track schema metadata.
- The script supports the same data types as the proxy adaptor, as mentioned in the [FAQs](https://github.com/cloudspannerecosystem/cassandra-to-spanner-proxy/blob/develop/docs/faq.md#how-are-cql-data-types-mapped-with-cloud-spanner).

### Expected Output

The script logs details about each processed query, including any errors. Metadata about each translated table is stored in the `TableConfigurations` table and the spanner tables are created.
277 changes: 277 additions & 0 deletions schema_converter/cql_to_spanner_schema_converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package main

import (
"flag"
"fmt"
"log"
"time"

"bufio"
"context"
"os"
"strings"

"cloud.google.com/go/spanner"
database "cloud.google.com/go/spanner/admin/database/apiv1"
"cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
"github.com/cloudspannerecosystem/cassandra-to-spanner-proxy/third_party/datastax/parser"
"github.com/cloudspannerecosystem/cassandra-to-spanner-proxy/translator"
)

type ColumnMetadata struct {
ColumnName string
CQLType string
IsPrimary bool
PKPrecedence int
}

// checkGCPCredentials checks if Google Cloud credentials are set in the environment.
func checkGCPCredentials() error {
credentials := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS")
if credentials == "" {
return fmt.Errorf("GCP credentials not found. Set GOOGLE_APPLICATION_CREDENTIALS environment variable")
}
return nil
}

// extractQueries reads a CQL file and extracts queries line by line.
// It treats any line ending with a semicolon as a complete query.
func extractQueries(filePath string) ([]string, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()

var queries []string
var currentQuery strings.Builder
scanner := bufio.NewScanner(file)

for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())

// Skip empty lines and comments
if line == "" || strings.HasPrefix(line, "--") || strings.HasPrefix(line, "//") {
continue
}

currentQuery.WriteString(line + " ")

// If the line ends with a semicolon, treat it as a full query
if strings.HasSuffix(line, ";") {
queries = append(queries, currentQuery.String())
currentQuery.Reset()
}
}

if err := scanner.Err(); err != nil {
return nil, err
}
return queries, nil
}

// getPrimaryKeyInfo checks if a column is a primary key and returns the primary key precedence if found
func getPrimaryKeyInfo(columnName string, primaryKeys []string) (bool, int) {
for i, pk := range primaryKeys {
if columnName == pk {
return true, i + 1
}
}
return false, 0
}

// insertDataToTable inserts the mutations (rows) into the specified Spanner table
func insertDataToTable(ctx context.Context, client *spanner.Client, mutations []*spanner.Mutation) error {
_, err := client.Apply(ctx, mutations)
if err != nil {
return fmt.Errorf("failed to insert data: %v", err)
}
return nil
}

// createSpannerTable sends a DDL query to Spanner to create or update a table
func createSpannerTable(ctx context.Context, queries []string, dbAdminClient *database.DatabaseAdminClient, db string) error {
// Set a timeout for the DDL operation
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()

req := &databasepb.UpdateDatabaseDdlRequest{
Database: db,
Statements: queries,
}

// Initiate the DDL operation
op, err := dbAdminClient.UpdateDatabaseDdl(ctx, req)
if err != nil {
return fmt.Errorf("failed to initiate DDL operation: %v", err)
}

// Wait for the DDL operation to complete
if err := op.Wait(ctx); err != nil {
return fmt.Errorf("failed to complete DDL operation: %v", err)
}

fmt.Println("DDL queries executed successfully")
return nil
}

func main() {
// Define command-line flags for required input parameters
projectID := flag.String("project", "", "The project ID")
instanceID := flag.String("instance", "", "The Spanner instance ID")
databaseID := flag.String("database", "", "The Spanner database ID")
cqlFile := flag.String("cql", "", "Path to the CQL file")
keyspaceFlatter := flag.Bool("keyspaceFlatter", false, "Whether to enable keyspace flattening (default: false)")
tableName := flag.String("table", "TableConfigurations", "The name of the table (default: TableConfigurations)")
enableUsingTimestamp := flag.Bool("enableUsingTimestamp", false, "Whether to enable using timestamp (default: false)")
enableUsingTTL := flag.Bool("enableUsingTTL", false, "Whether to enable TTL (default: false)")
flag.Parse()

// Check if all required flags are provided
checkMissingFlags := func() []string {
missingFlags := []string{}

if *projectID == "" {
missingFlags = append(missingFlags, "-project")
}
if *instanceID == "" {
missingFlags = append(missingFlags, "-instance")
}
if *databaseID == "" {
missingFlags = append(missingFlags, "-database")
}
if *cqlFile == "" {
missingFlags = append(missingFlags, "-cql")
}
return missingFlags
}

// Check for missing flags
if missingFlags := checkMissingFlags(); len(missingFlags) > 0 {
fmt.Println("Missing required flags:", missingFlags)
flag.PrintDefaults()
os.Exit(1)
}

// Ensure that GCP credentials are set
if err := checkGCPCredentials(); err != nil {
log.Fatalf("Error: %v", err)
}

ctx := context.Background()

// Construct the Spanner database path
db := fmt.Sprintf("projects/%s/instances/%s/databases/%s", *projectID, *instanceID, *databaseID)

// Create a Spanner Database Admin client
adminClient, err := database.NewDatabaseAdminClient(ctx)
if err != nil {
log.Fatalf("Failed to create admin client: %v", err)
}
defer adminClient.Close()

// Define the query to create the TableConfigurations table
createTableQuery := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
KeySpaceName STRING(MAX),
TableName STRING(MAX),
ColumnName STRING(MAX),
ColumnType STRING(MAX),
IsPrimaryKey BOOL,
PK_Precedence INT64
) PRIMARY KEY(TableName, ColumnName, KeySpaceName)
`, *tableName)

// Send the DDL query to create the table in Spanner
req := &databasepb.UpdateDatabaseDdlRequest{
Database: db,
Statements: []string{createTableQuery},
}

op, err := adminClient.UpdateDatabaseDdl(ctx, req)
if err != nil {
log.Fatalf("Failed to create table: %v", err)
}

// Wait for the table creation to complete
if err := op.Wait(ctx); err != nil {
log.Fatalf("Failed to finish creating table: %v", err)
}

// Create a Spanner client to interact with the database
spannerClient, err := spanner.NewClient(ctx, db)
if err != nil {
log.Fatalf("Failed to create spanner client: %v", err)
}
defer spannerClient.Close()

// Extract all queries from the provided CQL file
queries, err := extractQueries(*cqlFile)
if err != nil {
fmt.Printf("Error reading file: %v\n", err)
return
}

// Create a new Translator object
translatorObj := translator.Translator{}

var mutations []*spanner.Mutation
var spannerCreateTableQueries []string
// Process each query from the CQL file
for _, query := range queries {
// Check if the query is a CREATE TABLE query
if parser.IsQueryCreateTableType(query) && strings.Contains(strings.ToLower(query), "create table") {
// Convert the CQL query to Spanner DDL using the Translator
updateQueryMetadata, err := translatorObj.ToSpannerCreate(query, *keyspaceFlatter, *enableUsingTimestamp, *enableUsingTTL)
if err != nil {
fmt.Printf("Error parsing query: %v\n", err)
return
}

// Ensure the Keyspace is present in the metadata
if updateQueryMetadata.Keyspace == "" {
fmt.Printf("Error: Keyspace missing for table %s\n", updateQueryMetadata.Table)
continue
}

// Prepare Spanner mutations to insert metadata into the TableConfigurations table
for _, column := range updateQueryMetadata.Columns {
if column.CQLType != "" {
isPrimary, pkPrecedence := getPrimaryKeyInfo(column.Name, updateQueryMetadata.PrimaryKeys)
mutation := spanner.InsertOrUpdate(
*tableName,
[]string{"KeySpaceName", "TableName", "ColumnName", "ColumnType", "IsPrimaryKey", "PK_Precedence"},
[]interface{}{updateQueryMetadata.Keyspace, updateQueryMetadata.Table, column.Name, column.CQLType, isPrimary, pkPrecedence},
)
mutations = append(mutations, mutation)
}
}
spannerCreateTableQueries = append(spannerCreateTableQueries, updateQueryMetadata.SpannerQuery)
}

}
// Insert metadata into the TableConfigurations table
if err := insertDataToTable(ctx, spannerClient, mutations); err != nil {
fmt.Printf("Failed to insert data: %v\n", err)
}
// Execute the DDL query to create the corresponding Spanner table
if err := createSpannerTable(ctx, spannerCreateTableQueries, adminClient, db); err != nil {
fmt.Printf("Error executing create table query: %v\n", err)
}
}
Loading

0 comments on commit 8ff7ae8

Please sign in to comment.