Skip to content

Commit

Permalink
Merge pull request #17 from civitaspo/develop
Browse files Browse the repository at this point in the history
v0.1.0
  • Loading branch information
civitaspo authored Nov 17, 2019
2 parents 67535f0 + eee59a4 commit 860ce66
Show file tree
Hide file tree
Showing 19 changed files with 898 additions and 103 deletions.
1 change: 1 addition & 0 deletions .github/FUNDING.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
github: civitaspo
40 changes: 40 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Release CI

on:
pull_request:
branches:
- master
types:
- closed

jobs:
release:

runs-on: ubuntu-latest
services:
localstack:
image: localstack/localstack
ports:
- 4572:4572
env:
SERVICES: s3

steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Test with Gradle
if: github.event.pull_request.merged == true
run: ./gradlew test
- name: Release the new gem
if: github.event.pull_request.merged == true
run: |
mkdir -p $HOME/.gem
touch $HOME/.gem/credentials
chmod 0600 $HOME/.gem/credentials
printf -- "---\n:rubygems_api_key: ${RUBYGEMS_API_KEY}\n" > $HOME/.gem/credentials
./gradlew gemPush
env:
RUBYGEMS_API_KEY: ${{secrets.RUBYGEMS_API_KEY}}
26 changes: 26 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Test CI

on:
- push

jobs:
test:

runs-on: ubuntu-latest
services:
localstack:
image: localstack/localstack
ports:
- 4572:4572
env:
SERVICES: s3

steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Test with Gradle
run: ./gradlew test

9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
0.1.0 (2019-11-17)
==================

* [New Feature] Support Logical Types older representations(OriginalTypes) #12
* [Enhancement] Add Github Actions CI settings #13
* [Enhancement] Support LogicalTypes for Glue Data Catalog #14
* [Enhancement] Update dependencies #15
* [New Feature] Support `auth_method: web_identity_token` #15

0.0.3 (2019-07-17)
==================

Expand Down
51 changes: 44 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# S3 Parquet output plugin for Embulk

![Release CI Status Badge](https://github.com/civitaspo/embulk-output-s3_parquet/workflows/Release%20CI/badge.svg) ![Test CI Status Badge](https://github.com/civitaspo/embulk-output-s3_parquet/workflows/Test%20CI/badge.svg)

[Embulk](https://github.com/embulk/embulk/) output plugin to dump records as [Apache Parquet](https://parquet.apache.org/) files on S3.

## Overview
Expand All @@ -22,12 +24,13 @@
- **column_options**: a map whose keys are name of columns, and values are configuration with following parameters (optional)
- **timezone**: timezone if type of this column is timestamp. If not set, **default_timezone** is used. (string, optional)
- **format**: timestamp format if type of this column is timestamp. If not set, **default_timestamp_format**: is used. (string, optional)
- **logical_type**: a Parquet logical type name (`timestamp-millis`, `timestamp-micros`, `json`, `int8`, `int16`, `int32`, `int64`, `uint8`, `uint16`, `uint32`, `uint64`) (string, optional)
- **canned_acl**: grants one of [canned ACLs](https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#CannedACL) for created objects (string, default: `private`)
- **block_size**: The block size is the size of a row group being buffered in memory. This limits the memory usage when writing. Larger values will improve the I/O when reading but consume more memory when writing. (int, default: `134217728` (128MB))
- **page_size**: The page size is for compression. When reading, each page can be decompressed independently. A block is composed of pages. The page is the smallest unit that must be read fully to access a single record. If this value is too small, the compression will deteriorate. (int, default: `1048576` (1MB))
- **max_padding_size**: The max size (bytes) to write as padding and the min size of a row group (int, default: `8388608` (8MB))
- **enable_dictionary_encoding**: The boolean value is to enable/disable dictionary encoding. (boolean, default: `true`)
- **auth_method**: name of mechanism to authenticate requests (`"basic"`, `"env"`, `"instance"`, `"profile"`, `"properties"`, `"anonymous"`, or `"session"`, default: `"default"`)
- **auth_method**: name of mechanism to authenticate requests (`"basic"`, `"env"`, `"instance"`, `"profile"`, `"properties"`, `"anonymous"`, `"session"`, `"web_identity_token"`, default: `"default"`)
- `"basic"`: uses **access_key_id** and **secret_access_key** to authenticate.
- `"env"`: uses `AWS_ACCESS_KEY_ID` (or `AWS_ACCESS_KEY`) and `AWS_SECRET_KEY` (or `AWS_SECRET_ACCESS_KEY`) environment variables.
- `"instance"`: uses EC2 instance profile or attached ECS task role.
Expand All @@ -44,6 +47,7 @@
- `"anonymous"`: uses anonymous access. This auth method can access only public files.
- `"session"`: uses temporary-generated **access_key_id**, **secret_access_key** and **session_token**.
- `"assume_role"`: uses temporary-generated credentials by assuming **role_arn** role.
- `"web_identity_token"`: uses temporary-generated credentials by assuming **role_arn** role with web identity.
- `"default"`: uses AWS SDK's default strategy to look up available credentials from runtime environment. This method behaves like the combination of the following methods.
1. `"env"`
1. `"properties"`
Expand All @@ -54,17 +58,42 @@
- **access_key_id**: aws access key id. this is required when **auth_method** is `"basic"` or `"session"`. (string, optional)
- **secret_access_key**: aws secret access key. this is required when **auth_method** is `"basic"` or `"session"`. (string, optional)
- **session_token**: aws session token. this is required when **auth_method** is `"session"`. (string, optional)
- **role_arn**: arn of the role to assume. this is required for **auth_method** is `"assume_role"`. (string, optional)
- **role_session_name**: an identifier for the assumed role session. this is required when **auth_method** is `"assume_role"`. (string, optional)
- **role_arn**: arn of the role to assume. this is required for **auth_method** is `"assume_role"` or `"web_identity_token"`. (string, optional)
- **role_session_name**: an identifier for the assumed role session. this is required when **auth_method** is `"assume_role"` or `"web_identity_token"`. (string, optional)
- **role_external_id**: a unique identifier that is used by third parties when assuming roles in their customers' accounts. this is optionally used for **auth_method**: `"assume_role"`. (string, optional)
- **role_session_duration_seconds**: duration, in seconds, of the role session. this is optionally used for **auth_method**: `"assume_role"`. (int, optional)
- **role_session_duration_seconds**: duration, in seconds, of the role session. this is optionally used for **auth_method**: `"assume_role"`. (int, optional)
- **web_identity_token_file**: the absolute path to the web identity token file. this is required when **auth_method** is `"web_identity_token"`. (string, optional)
- **scope_down_policy**: an iam policy in json format. this is optionally used for **auth_method**: `"assume_role"`. (string, optional)
- **catalog**: Register a table if this option is specified (optional)
- **catalog_id**: glue data catalog id if you use a catalog different from account/region default catalog. (string, optional)
- **database**: The name of the database (string, required)
- **table**: The name of the table (string, required)
- **column_options**: a key-value pairs where key is a column name and value is options for the column. (string to options map, default: `{}`)
- **type**: type of a column when this plugin creates new tables (e.g. `STRING`, `BIGINT`) (string, default: depends on input column type. `BIGINT` if input column type is `long`, `BOOLEAN` if boolean, `DOUBLE` if `double`, `STRING` if `string`, `STRING` if `timestamp`, `STRING` if `json`)
- **type**: type of column when this plugin creates new tables (e.g. `string`, `bigint`) (string, default: depends on the input embulk column type, or the parquet logical type. See the below table)
|embulk column type|glue data type|
|:---|:---|
|long|bigint|
|boolean|boolean|
|double|double|
|string|string|
|timestamp|string|
|json|string|
|parquet logical type|glue data type|note|
|:---|:---|:---|
|timestamp-millis|timestamp||
|timestamp-micros|long|Glue cannot recognize timestamp-micros.|
|int8|tinyint||
|int16|smallint||
|int32|int||
|int64|bigint||
|uint8|smallint|Glue tinyint is a minimum value of -2^7 and a maximum value of 2^7-1|
|uint16|int|Glue smallint is a minimum value of -2^15 and a maximum value of 2^15-1.|
|uint32|bigint|Glue int is a minimum value of-2^31 and a maximum value of 2^31-1.|
|uint64|ConfigException|Glue bigint supports only a 64-bit signed integer.|
|json|string||
- **operation_if_exists**: operation if the table already exist. Available operations are `"delete"` and `"skip"` (string, default: `"delete"`)
- **endpoint**: The AWS Service endpoint (string, optional)
- **region**: The AWS region (string, optional)
Expand All @@ -75,6 +104,8 @@
- **user** proxy user (string, optional)
- **password** proxy password (string, optional)
- **buffer_dir**: buffer directory for parquet files to be uploaded on S3 (string, default: Create a Temporary Directory)
- **type_options**: a map whose keys are name of embulk type(`boolean`, `long`, `double`, `string`, `timestamp`, `json`), and values are configuration with following parameters (optional)
- **logical_type**: a Parquet logical type name (`timestamp-millis`, `timestamp-micros`, `json`, `int8`, `int16`, `int32`, `int64`, `uint8`, `uint16`, `uint32`, `uint64`) (string, optional)
## Example
Expand All @@ -92,7 +123,8 @@ out:

## Note

* The current implementation does not support [LogicalTypes](https://github.com/apache/parquet-format/blob/2b38663/LogicalTypes.md). I will implement them later as **column_options**. So, currently **timestamp** type and **json** type are stored as UTF-8 String. Please be careful.
* The current Parquet [LogicalTypes](https://github.com/apache/parquet-format/blob/2b38663/LogicalTypes.md) implementation does only old representation.
* Some kind of LogicalTypes are sometimes not supported on your middleware. Be careful to giving logical type name.

## Development

Expand All @@ -106,6 +138,8 @@ $ embulk run example/config.yml -Ilib
### Run test:

```shell
## Run fake S3 with localstack
$ docker run -it --rm -p 4572:4572 -e SERVICES=s3 localstack/localstack
$ ./gradlew test
```

Expand All @@ -121,9 +155,12 @@ Fix [build.gradle](./build.gradle), then

```shell
$ ./gradlew gemPush

```

## ChangeLog

[CHANGELOG.md](./CHANGELOG.md)

## Contributors

- @syucream
15 changes: 7 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ configurations {
provided
}

version = "0.0.3"
version = "0.1.0"

sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
compile "org.embulk:embulk-core:0.9.17"
provided "org.embulk:embulk-core:0.9.17"
compile "org.embulk:embulk-core:0.9.20"
provided "org.embulk:embulk-core:0.9.20"

compile 'org.scala-lang:scala-library:2.13.0'
compile 'org.scala-lang:scala-library:2.13.1'
['glue', 's3', 'sts'].each { v ->
compile "com.amazonaws:aws-java-sdk-${v}:1.11.592"
compile "com.amazonaws:aws-java-sdk-${v}:1.11.676"
}
['column', 'common', 'encoding', 'format', 'hadoop', 'jackson'].each { v ->
compile "org.apache.parquet:parquet-${v}:1.10.1"
Expand All @@ -33,9 +33,8 @@ dependencies {
compile 'org.xerial.snappy:snappy-java:1.1.7.3'

testCompile 'org.scalatest:scalatest_2.13:3.0.8'
testCompile 'org.embulk:embulk-test:0.9.17'
testCompile 'org.embulk:embulk-standards:0.9.17'
testCompile 'cloud.localstack:localstack-utils:0.1.15'
testCompile 'org.embulk:embulk-test:0.9.20'
testCompile 'org.embulk:embulk-standards:0.9.20'
testCompile 'org.apache.parquet:parquet-tools:1.10.1'
testCompile 'org.apache.hadoop:hadoop-client:2.9.2'
}
Expand Down
36 changes: 36 additions & 0 deletions example/with_catalog.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

in:
type: file
path_prefix: ./example/data.tsv
parser:
type: csv
delimiter: "\t"
skip_header_lines: 0
null_string: ""
columns:
- { name: id, type: long }
- { name: description, type: string }
- { name: name, type: string }
- { name: t, type: timestamp, format: "%Y-%m-%d %H:%M:%S %z"}
- { name: payload, type: json}
stop_on_invalid_record: true

out:
type: s3_parquet
bucket: dev-baikal-workspace
path_prefix: path/to/my-obj-2.
file_ext: snappy.parquet
compression_codec: snappy
default_timezone: Asia/Tokyo
canned_acl: bucket-owner-full-control
column_options:
id:
logical_type: "int64"
payload:
logical_type: "json"
type_options:
timestamp:
logical_type: "timestamp-millis"
catalog:
database: example_db
table: example_tbl
31 changes: 31 additions & 0 deletions example/with_logicaltypes.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

in:
type: file
path_prefix: ./example/data.tsv
parser:
type: csv
delimiter: "\t"
skip_header_lines: 0
null_string: ""
columns:
- { name: id, type: long }
- { name: description, type: string }
- { name: name, type: string }
- { name: t, type: timestamp, format: "%Y-%m-%d %H:%M:%S %z"}
- { name: payload, type: json}
stop_on_invalid_record: true

out:
type: s3_parquet
bucket: my-bucket
path_prefix: path/to/my-obj-2.
file_ext: snappy.parquet
compression_codec: snappy
default_timezone: Asia/Tokyo
canned_acl: bucket-owner-full-control
column_options:
id:
logical_type: "uint64"
type_options:
timestamp:
logical_type: "timestamp-millis"
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ object CatalogRegistrator
schema: Schema,
location: String,
compressionCodec: CompressionCodecName,
loggerOption: Option[Logger] = None): CatalogRegistrator =
loggerOption: Option[Logger] = None,
parquetColumnLogicalTypes: Map[String, String] = Map.empty): CatalogRegistrator =
{
new CatalogRegistrator(aws, task, schema, location, compressionCodec, loggerOption)
new CatalogRegistrator(aws, task, schema, location, compressionCodec, loggerOption, parquetColumnLogicalTypes)
}
}

Expand All @@ -62,7 +63,8 @@ class CatalogRegistrator(aws: Aws,
schema: Schema,
location: String,
compressionCodec: CompressionCodecName,
loggerOption: Option[Logger] = None)
loggerOption: Option[Logger] = None,
parquetColumnLogicalTypes: Map[String, String] = Map.empty)
{
val logger: Logger = loggerOption.getOrElse(LoggerFactory.getLogger(classOf[CatalogRegistrator]))

Expand Down Expand Up @@ -150,14 +152,36 @@ class CatalogRegistrator(aws: Aws,
schema.getColumns.asScala.toSeq.map { c =>
val cType: String =
if (columnOptions.contains(c.getName)) columnOptions(c.getName).getType
else convertEmbulkType2GlueType(c.getType)
else if (parquetColumnLogicalTypes.contains(c.getName)) convertParquetLogicalTypeToGlueType(parquetColumnLogicalTypes(c.getName))
else convertEmbulkTypeToGlueType(c.getType)
new Column()
.withName(c.getName)
.withType(cType)
}
}

private def convertEmbulkType2GlueType(t: Type): String =
private def convertParquetLogicalTypeToGlueType(t: String): String =
{
t match {
case "timestamp-millis" => "timestamp"
case "timestamp-micros" => "bigint" // Glue cannot recognize timestamp-micros.
case "int8" => "tinyint"
case "int16" => "smallint"
case "int32" => "int"
case "int64" => "bigint"
case "uint8" => "smallint" // Glue tinyint is a minimum value of -2^7 and a maximum value of 2^7-1
case "uint16" => "int" // Glue smallint is a minimum value of -2^15 and a maximum value of 2^15-1.
case "uint32" => "bigint" // Glue int is a minimum value of-2^31 and a maximum value of 2^31-1.
case "uint64" => throw new ConfigException("Cannot convert uint64 to Glue data types automatically" +
" because the Glue bigint supports a 64-bit signed integer." +
" Please use `catalog.column_options` to define the type.")
case "json" => "string"
case _ => throw new ConfigException(s"Unsupported a parquet logical type: $t. Please use `catalog.column_options` to define the type.")
}

}

private def convertEmbulkTypeToGlueType(t: Type): String =
{
t match {
case _: BooleanType => "boolean"
Expand Down
Loading

0 comments on commit 860ce66

Please sign in to comment.