diff --git a/templates/kudu-table-ddl/Makefile b/templates/kudu-table-ddl/Makefile index 2735ea2..2e2c819 100644 --- a/templates/kudu-table-ddl/Makefile +++ b/templates/kudu-table-ddl/Makefile @@ -25,6 +25,11 @@ kudu-table-clean: kudu-table-drop.sql ## Drop Kudu table tables-clean: kudu-table-clean ## Drop all tables +kudu-table-alter: kudu-table-alter.sql ## Alter Kudu table + $(impala-cmd) kudu-table-alter.sql + +tables-alter: kudu-table-alter ## Alter all tables + tables: kudu-table ## Create all tables integration-test: # Run integration-tests diff --git a/templates/kudu-table-ddl/Makefile.meta b/templates/kudu-table-ddl/Makefile.meta index 67d4792..f1df4e2 100644 --- a/templates/kudu-table-ddl/Makefile.meta +++ b/templates/kudu-table-ddl/Makefile.meta @@ -26,6 +26,13 @@ tables-clean-{{ table.id }}: $(MAKE) tables-clean -C {{ table.id }} {%- endfor %} +tables-alter-all: {%- for table in tables %} tables-alter-{{ table.id }} {%- endfor %} + +{%- for table in tables %} +tables-alter-{{ table.id }}: + $(MAKE) tables-alter -C {{ table.id }} +{%- endfor %} + tables-compute-stats-all: {%- for table in tables %} tables-compute-stats-{{ table.id }} {%- endfor %} {%- for table in tables %} diff --git a/templates/kudu-table-ddl/compute-stats.sql b/templates/kudu-table-ddl/compute-stats.sql index 66e5333..430fc06 100644 --- a/templates/kudu-table-ddl/compute-stats.sql +++ b/templates/kudu-table-ddl/compute-stats.sql @@ -14,5 +14,5 @@ -#} -- Compute table stats for optimized joins USE {{ conf.staging_database.name }}; -COMPUTE STATS {{ table.destination.name }}_kudu; +COMPUTE STATS {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.kudu_suffix is defined %}{{ conf.user_defined.kudu_suffix }}{% endif %}; diff --git a/templates/kudu-table-ddl/imports b/templates/kudu-table-ddl/imports index aac3e13..20f4512 100644 --- a/templates/kudu-table-ddl/imports +++ b/templates/kudu-table-ddl/imports @@ -3,3 +3,4 @@ ../shared/run-with-logging.sh ../shared/kudu-table-drop.sql ../shared/kudu-table-create.sql +../shared/kudu-table-alter.sql diff --git a/templates/shared/kudu-table-alter.sql b/templates/shared/kudu-table-alter.sql new file mode 100644 index 0000000..6471dd0 --- /dev/null +++ b/templates/shared/kudu-table-alter.sql @@ -0,0 +1,26 @@ +{#- Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +-#} +-- Create a Kudu table in Impala +USE {{ conf.staging_database.name }}; +ALTER TABLE {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.kudu_suffix is defined %}{{ conf.user_defined.kudu_suffix }}{% endif %} +SET TBLPROPERTIES( +{%- if table.metadata %} + {%- for key, value in table.metadata.items() %} + '{{ key }}' = '{{ value }}', + {%- endfor %} +{%- endif %} +{%- for column in table.columns -%} + "{{ cleanse_column(column.name)|lower }}" = "{{ column.comment }}"{%- if not loop.last -%},{% endif %} +{%- endfor -%}) diff --git a/templates/shared/kudu-table-create.sql b/templates/shared/kudu-table-create.sql index 0ebc0af..fde9a24 100644 --- a/templates/shared/kudu-table-create.sql +++ b/templates/shared/kudu-table-create.sql @@ -14,14 +14,25 @@ -#} -- Create a Kudu table in Impala USE {{ conf.staging_database.name }}; -CREATE TABLE IF NOT EXISTS {{ table.destination.name }}_kudu +CREATE TABLE IF NOT EXISTS {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.kudu_suffix is defined %}{{ conf.user_defined.kudu_suffix }}{% endif %} {%- set ordered_columns = order_columns(table.primary_keys,table.columns) -%} ({%- for column in ordered_columns %} - {{ column.name }} {{ map_datatypes(column).kudu }} + `{{ cleanse_column(column.name) }}` {{ map_datatypes_v2(column, 'kudu') }} {%- if not loop.last -%},{% endif %} {%- endfor %}, primary key ({{ table.primary_keys|join(', ') }})) -PARTITION BY HASH({{ table.kudu.hash_by|join(', ') }}) PARTITIONS {{ table.kudu.num_partitions }} +{%- if table.kudu.hash_by or table.kudu.range %} + PARTITION BY +{%- endif %} +{%- if table.kudu.hash_by %} + HASH({{ table.kudu.hash_by|join(', ') }}) PARTITIONS {{ table.kudu.num_partitions }} {%- if table.kudu.range %} ,{%- endif %} +{%- endif %} +{%- if table.kudu.range %} + RANGE ({{ table.kudu.range|join(', ') }}) + ( + {{ table.kudu.ranges|join(', ') }} + ) +{%- endif %} COMMENT '{{ table.comment }}' STORED AS KUDU TBLPROPERTIES( @@ -31,5 +42,5 @@ TBLPROPERTIES( {%- endfor %} {%- endif %} {%- for column in table.columns -%} - '{{ column.name|lower }}' = "{{ column.comment }}"{%- if not loop.last -%},{% endif %} + "{{ cleanse_column(column.name)|lower }}" = "{{ column.comment }}"{%- if not loop.last -%},{% endif %} {%- endfor -%}) diff --git a/templates/shared/kudu-table-drop.sql b/templates/shared/kudu-table-drop.sql index 1124935..90d5a0d 100644 --- a/templates/shared/kudu-table-drop.sql +++ b/templates/shared/kudu-table-drop.sql @@ -15,4 +15,4 @@ -- Drop the Kudu table USE {{ conf.staging_database.name }}; -DROP TABLE IF EXISTS {{ table.destination.name }}_kudu PURGE; +DROP TABLE IF EXISTS {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.kudu_suffix is defined %}{{ conf.user_defined.kudu_suffix }}{% endif %} PURGE; diff --git a/templates/shared/kudu-table-rowcount.sql b/templates/shared/kudu-table-rowcount.sql index 0b11d69..5c2f4d1 100644 --- a/templates/shared/kudu-table-rowcount.sql +++ b/templates/shared/kudu-table-rowcount.sql @@ -13,5 +13,5 @@ limitations under the License. #} USE {{ conf.staging_database.name }}; -INVALIDATE METADATA {{ table.destination.name }}_kudu; -SELECT COUNT(*) FROM {{ table.destination.name }}_kudu; \ No newline at end of file +INVALIDATE METADATA {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.kudu_suffix is defined %}{{ conf.user_defined.kudu_suffix }}{% endif %}; +SELECT COUNT(*) FROM {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.kudu_suffix is defined %}{{ conf.user_defined.kudu_suffix }}{% endif %}; diff --git a/templates/shared/parquet-refresh.sql b/templates/shared/parquet-refresh.sql index fe7a9bf..7cc40a0 100644 --- a/templates/shared/parquet-refresh.sql +++ b/templates/shared/parquet-refresh.sql @@ -15,4 +15,4 @@ -- Create a Parquet table in Impala SET SYNC_DDL=1; USE {{ conf.staging_database.name }}; -REFRESH {{ table.destination.name }}_parquet; +REFRESH {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %}; diff --git a/templates/shared/parquet-table-alter.sql b/templates/shared/parquet-table-alter.sql new file mode 100644 index 0000000..727f81f --- /dev/null +++ b/templates/shared/parquet-table-alter.sql @@ -0,0 +1,31 @@ +{#- Copyright 2017 Cargill Incorporated + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. #} + +-- Create a Parquet table in Impala +set sync_ddl=1; +USE {{ conf.staging_database.name }}; +{%- for column in table.columns %} +ALTER TABLE {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %} +CHANGE {{ cleanse_column(column.name) }} {{ cleanse_column(column.name) }} {{ map_datatypes(column).parquet }} COMMENT "{{ column.comment }}"; +{%- endfor %}) + +{%- if table.metadata %} +ALTER TABLE {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %} +SET TBLPROPERTIES ( + {%- for key, value in table.metadata.items() %} + '{{ key }}' = '{{ value }}'{%- if not loop.last -%}, {% endif %} + {%- endfor %} +) +{%- endif %} + diff --git a/templates/shared/parquet-table-create.sql b/templates/shared/parquet-table-create.sql index 5e93111..e0c073e 100644 --- a/templates/shared/parquet-table-create.sql +++ b/templates/shared/parquet-table-create.sql @@ -15,9 +15,9 @@ -- Create a Parquet table in Impala set sync_ddl=1; USE {{ conf.staging_database.name }}; -CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }}_parquet ( +CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %} ( {%- for column in table.columns %} -{{ column.name }} {{ map_datatypes(column).parquet }} COMMENT "{{ column.comment }}" +{{ cleanse_column(column.name) }} {{ map_datatypes_v2(column, 'parquet') }} COMMENT "{{ column.comment }}" {%- if not loop.last -%}, {% endif %} {%- endfor %}) COMMENT '{{ table.comment }}' diff --git a/templates/shared/parquet-table-drop.sql b/templates/shared/parquet-table-drop.sql index 5fb5364..8a07a64 100644 --- a/templates/shared/parquet-table-drop.sql +++ b/templates/shared/parquet-table-drop.sql @@ -14,4 +14,4 @@ -- Drop the Impala Parquet table USE {{ conf.staging_database.name }}; -DROP TABLE IF EXISTS {{ table.destination.name }}_parquet; +DROP TABLE IF EXISTS {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %}; diff --git a/templates/shared/parquet-table-rowcount.sql b/templates/shared/parquet-table-rowcount.sql index a488d1a..ce4d8ec 100644 --- a/templates/shared/parquet-table-rowcount.sql +++ b/templates/shared/parquet-table-rowcount.sql @@ -14,6 +14,6 @@ -- Query Parquet table in Impala USE {{ conf.staging_database.name }}; -INVALIDATE METADATA {{ table.destination.name }}_parquet; -SELECT COUNT(*) FROM {{ table.destination.name }}_parquet; +INVALIDATE METADATA {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %}; +SELECT COUNT(*) FROM {{ table.destination.name }}{% if conf.user_defined is defined and conf.user_defined.parquet_suffix is defined %}{{ conf.user_defined.parquet_suffix }}{% endif %}; diff --git a/templates/sqoop-parquet-full-load/avro-table-create.sql b/templates/sqoop-parquet-full-load/avro-table-create.sql index ed604ca..9198d3f 100644 --- a/templates/sqoop-parquet-full-load/avro-table-create.sql +++ b/templates/sqoop-parquet-full-load/avro-table-create.sql @@ -15,9 +15,9 @@ -- Create a Parquet table in Impala set sync_ddl=1; USE {{ conf.raw_database.name }}; -CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }}_avro ( +CREATE EXTERNAL TABLE IF NOT EXISTS `{{ table.destination.name }}_avro` ( {% for column in table.columns %} -`{{ column.name.replace('/','_') }}` {{ map_datatypes(column).avro }} COMMENT "{{ column.comment }}" +`{{ cleanse_column(column.name) }}` {{ map_datatypes_v2(column, 'avro') }} COMMENT "{{ column.comment }}" {%- if not loop.last -%}, {% endif %} {%- endfor %}) COMMENT '{{ table.comment }}' diff --git a/templates/sqoop-parquet-full-load/copy-avsc.sh b/templates/sqoop-parquet-full-load/copy-avsc.sh index 8864540..c9aa11e 100755 --- a/templates/sqoop-parquet-full-load/copy-avsc.sh +++ b/templates/sqoop-parquet-full-load/copy-avsc.sh @@ -1,2 +1,2 @@ -hadoop fs -mkdir {{ conf.raw_database.path }}/{{ table.destination.name }}_avro/.meta/ -hadoop fs -put -f AutoGeneratedSchema.avsc {{ conf.raw_database.path }}/{{ table.destination.name }}_avro/.meta/{{ table.destination.name }}.avsc \ No newline at end of file +hadoop fs -mkdir -p {{ conf.raw_database.path }}/{{ table.destination.name }}_avro/.meta/ +hadoop fs -put -f AutoGeneratedSchema.avsc {{ conf.raw_database.path }}/{{ table.destination.name }}_avro/.meta/{{ table.destination.name }}.avsc diff --git a/templates/sqoop-parquet-full-load/insert-overwrite.sql b/templates/sqoop-parquet-full-load/insert-overwrite.sql index c62ecce..14d1dde 100644 --- a/templates/sqoop-parquet-full-load/insert-overwrite.sql +++ b/templates/sqoop-parquet-full-load/insert-overwrite.sql @@ -1,9 +1,9 @@ -INSERT OVERWRITE TABLE {{ conf.raw_database.name }}.{{ table.destination.name }}_partitioned PARTITION (ingest_partition=${var:val}) +INSERT OVERWRITE TABLE `{{ conf.raw_database.name }}`.`{{ table.destination.name }}_partitioned` PARTITION (ingest_partition=${var:val}) SELECT {% for column in table.columns %} -{%- if column["datatype"].lower() == "decimal" %} -cast (`{{ column.name.replace('/','_') }}` as decimal({{column.precision}}, {{column.scale}}) ) -{%- else %} `{{ column.name.replace('/','_') }}` -{% endif %} -{%- if not loop.last -%}, {% endif %} +{%- if column["datatype"].lower() == "decimal" %} cast (`{{ cleanse_column(column.name) }}` as decimal({{column.precision}}, {{column.scale}}) ) +{%- elif column["datatype"].lower() == "timestamp" %} cast (`{{ cleanse_column(column.name) }}`/1000 as timestamp ) AS `{{ cleanse_column(column.name) }}` +{%- else %} `{{ cleanse_column(column.name) }}` +{%- endif %} +{%- if not loop.last %}, {% endif %} {%- endfor %} - FROM {{ conf.raw_database.name }}.{{ table.destination.name }}_avro; + FROM `{{ conf.raw_database.name }}`.`{{ table.destination.name }}_avro`; diff --git a/templates/sqoop-parquet-full-load/partitioned-table-create.sql b/templates/sqoop-parquet-full-load/partitioned-table-create.sql index fda89e4..8c48114 100644 --- a/templates/sqoop-parquet-full-load/partitioned-table-create.sql +++ b/templates/sqoop-parquet-full-load/partitioned-table-create.sql @@ -14,14 +14,10 @@ -- Create a Parquet table in Impala set sync_ddl=1; -USE {{ conf.raw_database.name }}; -CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }}_partitioned ( -{% for column in table.columns %} -{%- if column["datatype"].lower() == "decimal" %} -`{{ column.name.replace('/','_') }}` {{ map_datatypes(column).parquet }}({{column.precision}},{{column.scale}}) COMMENT "{{ column.comment }}" -{%- else %} `{{ column.name.replace('/','_') }}` {{ map_datatypes(column).parquet }} COMMENT "{{ column.comment }}" -{% endif %} -{%- if not loop.last -%}, {% endif %} +USE `{{ conf.raw_database.name }}`; +CREATE EXTERNAL TABLE IF NOT EXISTS `{{ table.destination.name }}_partitioned` ( +{%- for column in table.columns %} +`{{ cleanse_column(column.name) }}` {{ map_datatypes_v2(column, 'parquet') }} COMMENT "{{ column.comment }}" {%- if not loop.last -%}, {% endif %} {%- endfor %}) PARTITIONED BY (ingest_partition int) COMMENT '{{ table.comment }}' @@ -33,4 +29,4 @@ TBLPROPERTIES( '{{ key }}' = '{{ value }}'{%- if not loop.last -%}, {% endif %} {%- endfor %} ) -{%- endif %} +{%- endif %} \ No newline at end of file diff --git a/templates/sqoop-parquet-full-load/report-table-create.sql b/templates/sqoop-parquet-full-load/report-table-create.sql index 626c10b..16b3bf8 100644 --- a/templates/sqoop-parquet-full-load/report-table-create.sql +++ b/templates/sqoop-parquet-full-load/report-table-create.sql @@ -14,14 +14,10 @@ -- Create a Parquet table in Impala set sync_ddl=1; -USE {{ conf.staging_database.name }}; -CREATE EXTERNAL TABLE IF NOT EXISTS {{ table.destination.name }} ( -{% for column in table.columns %} -{%- if column["datatype"].lower() == "decimal" %} -`{{ column.name.replace('/','_') }}` {{ map_datatypes(column).parquet }}({{column.precision}},{{column.scale}}) COMMENT "{{ column.comment }}" -{%- else %} `{{ column.name.replace('/','_') }}` {{ map_datatypes(column).parquet }} COMMENT "{{ column.comment }}" -{% endif %} -{%- if not loop.last -%}, {% endif %} +USE `{{ conf.staging_database.name }}`; +CREATE EXTERNAL TABLE IF NOT EXISTS `{{ table.destination.name }}` ( +{%- for column in table.columns %} +`{{ cleanse_column(column.name) }}` {{ map_datatypes_v2(column, 'parquet') }} COMMENT "{{ column.comment }}" {%- if not loop.last -%}, {% endif %} {%- endfor %}) COMMENT '{{ table.comment }}' STORED AS PARQUET @@ -32,4 +28,4 @@ TBLPROPERTIES( '{{ key }}' = '{{ value }}'{%- if not loop.last -%}, {% endif %} {%- endfor %} ) -{%- endif %} +{%- endif %} \ No newline at end of file diff --git a/templates/sqoop-parquet-full-load/sqoop-import.sh b/templates/sqoop-parquet-full-load/sqoop-import.sh index bbef5ce..aa17c93 100755 --- a/templates/sqoop-parquet-full-load/sqoop-import.sh +++ b/templates/sqoop-parquet-full-load/sqoop-import.sh @@ -12,34 +12,20 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. #} -{# This function will put the --map-column-java col=String parameter for any clob data types.#} - {% macro map_clobs_macro(columns) -%} - {{ map_clobs(columns) }} - {%- endmacro -%} # Create a Sqoop job set -eu -{% set mapcolumn = [] %} -{%- for column in table.columns -%} -{%- if column["datatype"].lower() == "varbinary" or column["datatype"].lower() == "binary" or column["datatype"].lower() == "longvarbinary" -%} -{%- set mapcolumn = mapcolumn.append(column["name"]) -%} -{%- endif -%} -{%- endfor -%} sqoop import \ + -D 'mapred.job.name={{ conf.source_database.name }}.{{ table.source.name }}.{{ conf.sqoop_job_name_suffix }}' \ --connect '{{ conf.source_database.connection_string }}' \ --username '{{ conf.user_name }}' \ --password-file '{{ conf.sqoop_password_file }}' \ {%- if conf["sqoop_driver"] is defined %} --driver {{ conf.sqoop_driver }} \ {%- endif %} - {% if mapcolumn|length > 0 -%} - --map-column-java {% for column in mapcolumn -%} - {% if loop.last -%} - {{ '"{}"'.format(column) }}=String \ - {%- else -%} - {{ '"{}"'.format(column) }}=String, - {%- endif -%} - {% endfor %} - {% endif -%} + {%- set map_java_column = sqoop_map_java_column(table.columns,clean_column=True) %} + {%- if map_java_column %} + {{ map_java_column }} \ + {%- endif %} --delete-target-dir \ --target-dir {{ conf.raw_database.path }}/{{ table.destination.name }}_avro/ \ --temporary-rootdir {{ conf.raw_database.path }}/{{ table.destination.name }}_avro/ \ @@ -47,15 +33,16 @@ sqoop import \ --fetch-size {% if table.columns|length < 30 -%} 10000 {% else %} 5000 {% endif %} \ --compress \ --compression-codec snappy \ - -m 1 \ -{%- if conf["sqoop_driver"] is defined %} - {%- if "sqlserver" in conf["sqoop_driver"].lower() -%} - --query 'SELECT {% for column in table.columns%} {% if loop.last %} {{ '"{}"'.format(column.name) }} {% else %} {{ '"{}",'.format(column.name) }} {% endif %} {% endfor %} FROM {{ table.source.name }} WHERE $CONDITIONS' - {%- elif "sap" in conf["sqoop_driver"].lower() -%} - --query 'SELECT {% for column in table.columns%} {% if loop.last %} {{ '"{}"'.format(column.name) }} {% else %} {{ '"{}",'.format(column.name) }} {% endif %} {% endfor %} FROM {{ conf.source_database.name }}.{{ table.source.name }} WHERE $CONDITIONS' + {%- if table.num_mappers > 1 %} + --split-by {{ table.split_by_column }} \ + --boundary-query 'SELECT min({{ table.split_by_column }}), max({{ table.split_by_column }}) FROM {{ conf.source_database.name }}.{{ table.source.name }}' \{%- endif %} + -m {{ table.num_mappers or 1 }} \ + {% if conf["sqoop_driver"] is defined %} + {%- if "sqlserver" in conf["sqoop_driver"].lower() -%} + --query 'SELECT {% for column in table.columns%} "{{ column.name }}" AS "{{ cleanse_column(column.name) }}"{% if loop.last %} {% else %},{% endif %} {% endfor %} FROM {{ conf.source_database.name }}.{{ table.source.name }} WHERE $CONDITIONS' + {%- else -%} + --query 'SELECT {% for column in table.columns%} "{{ column.name }}" AS "{{ cleanse_column(column.name) }}"{% if loop.last %} {% else %},{% endif %} {% endfor %} FROM {{ conf.source_database.name }}.{{ table.source.name }} WHERE $CONDITIONS' + {% endif -%} {%- else -%} - --query 'SELECT {% for column in table.columns%} {% if loop.last %} {{ column.name }} {% else %} {{ column.name }}, {% endif %} {% endfor %} FROM {{ conf.source_database.name }}.{{ table.source.name }} WHERE $CONDITIONS' - {% endif -%} -{%- else %} - --query 'SELECT {% for column in table.columns%} {% if loop.last %} {{ column.name }} {% else %} {{ column.name }}, {% endif %} {% endfor %} FROM {{ conf.source_database.name }}.{{ table.source.name }} WHERE $CONDITIONS' -{%- endif -%} + --query 'SELECT {% for column in table.columns%} "{{ column.name }}" AS "{{ cleanse_column(column.name) }}"{% if loop.last %} {% else %},{% endif %} {% endfor %} FROM {{ conf.source_database.name }}.{{ table.source.name }} WHERE $CONDITIONS' + {%- endif -%} diff --git a/templates/sqoop-parquet-full-load/tables-drop.sql b/templates/sqoop-parquet-full-load/tables-drop.sql index 85b24df..c87a9b4 100644 --- a/templates/sqoop-parquet-full-load/tables-drop.sql +++ b/templates/sqoop-parquet-full-load/tables-drop.sql @@ -1,3 +1,3 @@ -DROP TABLE IF EXISTS {{ conf.raw_database.name }}.{{ table.destination.name }}_avro; -DROP TABLE IF EXISTS {{ conf.raw_database.name }}.{{ table.destination.name }}_partitioned; -DROP TABLE IF EXISTS {{ conf.staging_database.name }}.{{ table.destination.name }}; +DROP TABLE IF EXISTS `{{ conf.raw_database.name }}`.`{{ table.destination.name }}_avro`; +DROP TABLE IF EXISTS `{{ conf.raw_database.name }}`.`{{ table.destination.name }}_partitioned`; +DROP TABLE IF EXISTS `{{ conf.staging_database.name }}`.`{{ table.destination.name }}`; diff --git a/templates/sqoop-parquet-full-load/test-rowcount.sh b/templates/sqoop-parquet-full-load/test-rowcount.sh index f236967..28af17b 100755 --- a/templates/sqoop-parquet-full-load/test-rowcount.sh +++ b/templates/sqoop-parquet-full-load/test-rowcount.sh @@ -16,8 +16,7 @@ set -e # Check parquet table AVRO=$({{ conf.impala_cmd }} avro-table-rowcount.sql -B 2> /dev/null) PARQUET=$({{ conf.impala_cmd }} report-table-rowcount.sql -B 2> /dev/null) -SOURCE=$(cat sourceCount.txt) - +SOURCE=$({{ conf.source_database.cmd }} source-table-rowcount.sql -s -r -N -B 2> /dev/null) echo "avro count: $AVRO" echo "report count: $PARQUET" echo "source count: $SOURCE" @@ -30,4 +29,4 @@ fi if [ "$PARQUET" -ne "$SOURCE" ]; then echo FINAL TABLE ROW COUNTS DO NOT MATCH exit 1 -fi +fi \ No newline at end of file diff --git a/templates/sqoop-parquet-full-load/type-mapping.yml b/templates/sqoop-parquet-full-load/type-mapping.yml index dd78224..379564b 100644 --- a/templates/sqoop-parquet-full-load/type-mapping.yml +++ b/templates/sqoop-parquet-full-load/type-mapping.yml @@ -73,7 +73,7 @@ date: timestamp: kudu: bigint impala: timestamp - parquet: bigint + parquet: timestamp avro: bigint datetime: kudu: bigint @@ -171,4 +171,4 @@ boolean: kudu: boolean impala: boolean parquet: boolean - avro: boolean + avro: boolean \ No newline at end of file diff --git a/templates/sqoop-parquet-hdfs-impala/Makefile b/templates/sqoop-parquet-hdfs-impala/Makefile index 51849a8..46d26b8 100644 --- a/templates/sqoop-parquet-hdfs-impala/Makefile +++ b/templates/sqoop-parquet-hdfs-impala/Makefile @@ -52,6 +52,8 @@ hdfs-clean: hdfs-delete.sh ## Delete parquet files from HDFS tables-clean: parquet-table-clean ## Drop all tables +tables-alter: parquet-table-alter ## Alter all tables + tables: parquet-table ## Create all tables update: sqoop-exec ## Insert data from source db into Kudu diff --git a/templates/sqoop-parquet-hdfs-impala/Makefile.meta b/templates/sqoop-parquet-hdfs-impala/Makefile.meta index 1cbaea5..661d974 100644 --- a/templates/sqoop-parquet-hdfs-impala/Makefile.meta +++ b/templates/sqoop-parquet-hdfs-impala/Makefile.meta @@ -48,3 +48,10 @@ integration-test-all: {%- for table in tables %} $(MAKE) integration-test -C {{ table.id }} {%- endfor %} + +tables-alter-all: {%- for table in tables %} tables-alter-{{ table.id }} {%- endfor %} + +{%- for table in tables %} +tables-alter-{{ table.id }}: + $(MAKE) tables-alter -C {{ table.id }} +{%- endfor %} \ No newline at end of file diff --git a/templates/sqoop-parquet-hdfs-impala/imports b/templates/sqoop-parquet-hdfs-impala/imports index 296e04f..c71781d 100644 --- a/templates/sqoop-parquet-hdfs-impala/imports +++ b/templates/sqoop-parquet-hdfs-impala/imports @@ -7,6 +7,7 @@ ../shared/parquet-table-drop.sql ../shared/parquet-table-create.sql ../shared/parquet-table-rowcount.sql +../shared/parquet-table-alter.sql ../shared/source-table-rowcount.sql ../shared/hdfs-unarchive.sh ../shared/hdfs-delete.sh