Skip to content

Commit

Permalink
Merge pull request #11 from civitaspo/develop
Browse files Browse the repository at this point in the history
v0.0.3
  • Loading branch information
civitaspo authored Jul 17, 2019
2 parents c7b9843 + c731102 commit 67535f0
Show file tree
Hide file tree
Showing 17 changed files with 1,022 additions and 682 deletions.
9 changes: 0 additions & 9 deletions .scalafmt.conf

This file was deleted.

6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
0.0.3 (2019-07-17)
==================

* [New Feature] Add `catalog` option to register a new table that has data created by `s3_parquet` plugin.
* [Enhancement] Update dependencies.

0.0.2 (2019-01-21)
==================

Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@
- **role_external_id**: a unique identifier that is used by third parties when assuming roles in their customers' accounts. this is optionally used for **auth_method**: `"assume_role"`. (string, optional)
- **role_session_duration_seconds**: duration, in seconds, of the role session. this is optionally used for **auth_method**: `"assume_role"`. (int, optional)
- **scope_down_policy**: an iam policy in json format. this is optionally used for **auth_method**: `"assume_role"`. (string, optional)
- **catalog**: Register a table if this option is specified (optional)
- **catalog_id**: glue data catalog id if you use a catalog different from account/region default catalog. (string, optional)
- **database**: The name of the database (string, required)
- **table**: The name of the table (string, required)
- **column_options**: a key-value pairs where key is a column name and value is options for the column. (string to options map, default: `{}`)
- **type**: type of a column when this plugin creates new tables (e.g. `STRING`, `BIGINT`) (string, default: depends on input column type. `BIGINT` if input column type is `long`, `BOOLEAN` if boolean, `DOUBLE` if `double`, `STRING` if `string`, `STRING` if `timestamp`, `STRING` if `json`)
- **operation_if_exists**: operation if the table already exist. Available operations are `"delete"` and `"skip"` (string, default: `"delete"`)
- **endpoint**: The AWS Service endpoint (string, optional)
- **region**: The AWS region (string, optional)
- **http_proxy**: Indicate whether using when accessing AWS via http proxy. (optional)
Expand Down
25 changes: 12 additions & 13 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ plugins {
id "scala"
id "com.jfrog.bintray" version "1.1"
id "com.github.jruby-gradle.base" version "1.5.0"
id "com.diffplug.gradle.spotless" version "3.13.0"
id "com.adarshr.test-logger" version "1.6.0" // For Pretty test logging
}
import com.github.jrubygradle.JRubyExec
Expand All @@ -14,30 +13,30 @@ configurations {
provided
}

version = "0.0.2"
version = "0.0.3"

sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
compile "org.embulk:embulk-core:0.9.12"
provided "org.embulk:embulk-core:0.9.12"
compile "org.embulk:embulk-core:0.9.17"
provided "org.embulk:embulk-core:0.9.17"

compile 'org.scala-lang:scala-library:2.12.8'
['s3', 'sts'].each { v ->
compile "com.amazonaws:aws-java-sdk-${v}:1.11.479"
compile 'org.scala-lang:scala-library:2.13.0'
['glue', 's3', 'sts'].each { v ->
compile "com.amazonaws:aws-java-sdk-${v}:1.11.592"
}
['column', 'common', 'encoding', 'format', 'hadoop', 'jackson'].each { v ->
compile "org.apache.parquet:parquet-${v}:1.10.0"
compile "org.apache.parquet:parquet-${v}:1.10.1"
}
compile 'org.apache.hadoop:hadoop-common:2.9.2'
compile 'org.xerial.snappy:snappy-java:1.1.7.2'
compile 'org.xerial.snappy:snappy-java:1.1.7.3'

testCompile 'org.scalatest:scalatest_2.12:3.0.5'
testCompile 'org.embulk:embulk-test:0.9.12'
testCompile 'org.embulk:embulk-standards:0.9.12'
testCompile 'org.scalatest:scalatest_2.13:3.0.8'
testCompile 'org.embulk:embulk-test:0.9.17'
testCompile 'org.embulk:embulk-standards:0.9.17'
testCompile 'cloud.localstack:localstack-utils:0.1.15'
testCompile 'org.apache.parquet:parquet-tools:1.8.0'
testCompile 'org.apache.parquet:parquet-tools:1.10.1'
testCompile 'org.apache.hadoop:hadoop-client:2.9.2'
}

Expand Down
178 changes: 178 additions & 0 deletions src/main/scala/org/embulk/output/s3_parquet/CatalogRegistrator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package org.embulk.output.s3_parquet


import java.util.{Optional, Map => JMap}

import com.amazonaws.services.glue.model.{Column, CreateTableRequest, DeleteTableRequest, GetTableRequest, SerDeInfo, StorageDescriptor, TableInput}
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.embulk.config.{Config, ConfigDefault, ConfigException}
import org.embulk.output.s3_parquet.aws.Aws
import org.embulk.output.s3_parquet.CatalogRegistrator.ColumnOptions
import org.embulk.spi.Schema
import org.embulk.spi.`type`.{BooleanType, DoubleType, JsonType, LongType, StringType, TimestampType, Type}
import org.slf4j.{Logger, LoggerFactory}

import scala.jdk.CollectionConverters._
import scala.util.Try


object CatalogRegistrator
{
trait Task
extends org.embulk.config.Task
{
@Config("catalog_id")
@ConfigDefault("null")
def getCatalogId: Optional[String]

@Config("database")
def getDatabase: String

@Config("table")
def getTable: String

@Config("column_options")
@ConfigDefault("{}")
def getColumnOptions: JMap[String, ColumnOptions]

@Config("operation_if_exists")
@ConfigDefault("\"delete\"")
def getOperationIfExists: String
}

trait ColumnOptions
{
@Config("type")
def getType: String
}

def apply(aws: Aws,
task: Task,
schema: Schema,
location: String,
compressionCodec: CompressionCodecName,
loggerOption: Option[Logger] = None): CatalogRegistrator =
{
new CatalogRegistrator(aws, task, schema, location, compressionCodec, loggerOption)
}
}

class CatalogRegistrator(aws: Aws,
task: CatalogRegistrator.Task,
schema: Schema,
location: String,
compressionCodec: CompressionCodecName,
loggerOption: Option[Logger] = None)
{
val logger: Logger = loggerOption.getOrElse(LoggerFactory.getLogger(classOf[CatalogRegistrator]))

def run(): Unit =
{
if (doesTableExists()) {
task.getOperationIfExists match {
case "skip" =>
logger.info(s"Skip to register the table: ${task.getDatabase}.${task.getTable}")
return

case "delete" =>
logger.info(s"Delete the table: ${task.getDatabase}.${task.getTable}")
deleteTable()

case unknown =>
throw new ConfigException(s"Unsupported operation: $unknown")
}
}
registerNewParquetTable()
showNewTableInfo()
}

def showNewTableInfo(): Unit =
{
val req = new GetTableRequest()
task.getCatalogId.ifPresent(cid => req.setCatalogId(cid))
req.setDatabaseName(task.getDatabase)
req.setName(task.getTable)

val t = aws.withGlue(_.getTable(req)).getTable
logger.info(s"Created a table: ${t.toString}")
}

def doesTableExists(): Boolean =
{
val req = new GetTableRequest()
task.getCatalogId.ifPresent(cid => req.setCatalogId(cid))
req.setDatabaseName(task.getDatabase)
req.setName(task.getTable)

Try(aws.withGlue(_.getTable(req))).isSuccess
}

def deleteTable(): Unit =
{
val req = new DeleteTableRequest()
task.getCatalogId.ifPresent(cid => req.setCatalogId(cid))
req.setDatabaseName(task.getDatabase)
req.setName(task.getTable)
aws.withGlue(_.deleteTable(req))
}

def registerNewParquetTable(): Unit =
{
logger.info(s"Create a new table: ${task.getDatabase}.${task.getTable}")
val req = new CreateTableRequest()
task.getCatalogId.ifPresent(cid => req.setCatalogId(cid))
req.setDatabaseName(task.getDatabase)
req.setTableInput(new TableInput()
.withName(task.getTable)
.withDescription("Created by embulk-output-s3_parquet")
.withTableType("EXTERNAL_TABLE")
.withParameters(Map("EXTERNAL" -> "TRUE",
"classification" -> "parquet",
"parquet.compression" -> compressionCodec.name()).asJava)
.withStorageDescriptor(new StorageDescriptor()
.withColumns(getGlueSchema: _*)
.withLocation(location)
.withCompressed(isCompressed)
.withInputFormat("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")
.withOutputFormat("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
.withSerdeInfo(new SerDeInfo()
.withSerializationLibrary("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
.withParameters(Map("serialization.format" -> "1").asJava)
)
)
)
aws.withGlue(_.createTable(req))
}

private def getGlueSchema: Seq[Column] =
{
val columnOptions: Map[String, ColumnOptions] = task.getColumnOptions.asScala.toMap
schema.getColumns.asScala.toSeq.map { c =>
val cType: String =
if (columnOptions.contains(c.getName)) columnOptions(c.getName).getType
else convertEmbulkType2GlueType(c.getType)
new Column()
.withName(c.getName)
.withType(cType)
}
}

private def convertEmbulkType2GlueType(t: Type): String =
{
t match {
case _: BooleanType => "boolean"
case _: LongType => "bigint"
case _: DoubleType => "double"
case _: StringType => "string"
case _: TimestampType => "string"
case _: JsonType => "string"
case unknown => throw new ConfigException(s"Unsupported embulk type: ${unknown.getName}")
}
}

private def isCompressed: Boolean =
{
!compressionCodec.equals(CompressionCodecName.UNCOMPRESSED)
}

}
Loading

0 comments on commit 67535f0

Please sign in to comment.