Skip to content

Commit

Permalink
Add support for creating Iceberg datasets in S3 with Glue Catalog (#11)
Browse files Browse the repository at this point in the history
* Finalize Glue catalog support for iceberg
  • Loading branch information
dacort authored Jul 11, 2024
1 parent 8305098 commit 8b50557
Show file tree
Hide file tree
Showing 7 changed files with 955 additions and 13 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ Install with the `delta` module: `pip install faker-cli[delta]`
fake -n 10 pyint,user_name,date_this_year -f deltalake -o sample_data
```

### Iceberg

And, of course, Iceberg tables!

Currently supported are writing to a Glue or generic SQL catalog.

```bash
fake -n 10 pyint,user_name,date_this_year -f iceberg -C glue://default.iceberg_sample -o s3://YOUR_BUCKET/iceberg-data/
```

## Templates

The libary includes a couple templates that can be used to generate certain types of fake data easier.
Expand Down
29 changes: 22 additions & 7 deletions faker_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,22 @@
@click.option(
"--format",
"-f",
type=click.Choice(["csv", "json", "parquet", "deltalake"]),
type=click.Choice(["csv", "json", "parquet", "deltalake", "iceberg"]),
default="csv",
help="Format of the output",
)
@click.option("--output", "-o", type=click.Path(writable=True))
@click.option("--columns", "-c", help="Column names", default=None, required=False)
@click.option("--template", "-t", help="Template to use", type=click.Choice(["s3access", "cloudfront"]), default=None)
@click.option("--catalog", "-C", help="Catalog URI", default=None, required=False)
@click.argument("column_types", required=False)
@click.option("--provider", "-p", help="Fake data provider", type=click.Choice(["faker", "mimesis"]), default="faker")
def main(num_rows, format, output, columns, template, column_types, provider):
def main(num_rows, format, output, columns, template, catalog, column_types, provider):
"""
Generate fake data, easily.
COLUMN_TYPES is a comma-seperated list of Faker property names, like
pyint,username,date_this_year
pyint,user_name,date_this_year
You can also use --template for real-world synthetic data.
"""
Expand All @@ -62,10 +63,12 @@ def main(num_rows, format, output, columns, template, column_types, provider):
raise click.BadArgumentUsage('templates are only supported with the "faker" provider.')

# Parquet output requires a filename
if format in ["parquet", "deltalake"] and output is None:
raise click.BadArgumentUsage("parquet | deltalake formats requires --output/-o filename parameter.")
if output is not None and format not in ["parquet", "deltalake"]:
if format in ["parquet", "deltalake", "iceberg"] and output is None:
raise click.BadArgumentUsage(f"{format} format requires --output/-o filename parameter.")
if output is not None and format not in ["parquet", "deltalake", "iceberg"]:
raise click.BadArgumentUsage("output files not supported for csv/json yet.")
if catalog and format not in ['iceberg']:
raise click.BadArgumentUsage("catalog option is only available for Iceberg formats")

# Optionally load additional features
if format == "parquet":
Expand All @@ -90,6 +93,17 @@ def main(num_rows, format, output, columns, template, column_types, provider):
"Make sure to install faker-cli using `pip install faker-cli[delta]`."
)

if format == "iceberg":
try:
from faker_cli.writers.iceberg import IcebergWriter

KLAS_MAPPER["iceberg"] = IcebergWriter
except ImportError:
raise click.ClickException(
"Using Iceberg writer, but the 'iceberg' package is not installed. "
"Make sure to install faker-cli using `pip install faker-cli[iceberg]`."
)

# If the user provides a template, we use that provider and writer and exit.
# We assume a template has a custom writer that may be different than CSV or JSON
if template:
Expand All @@ -108,7 +122,8 @@ def main(num_rows, format, output, columns, template, column_types, provider):
format_klas = KLAS_MAPPER.get(format)
if format_klas is None:
raise click.ClickException(f"Format {format} not supported.")
writer = format_klas(sys.stdout, headers, output)
# Fix in a better way - maybe passing **kwargs?
writer = format_klas(sys.stdout, headers, output, catalog)
for i in range(num_rows):
writer.write(fake.generate_row(col_types))
writer.close()
Expand Down
6 changes: 3 additions & 3 deletions faker_cli/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


class Writer:
def __init__(self, output, headers, filename: Optional[str] = None):
def __init__(self, output, headers, filename: Optional[str] = None, catalog_uri: Optional[str] = None):
self.output = output
self.headers = headers
self.writer = None
Expand All @@ -17,7 +17,7 @@ def close(self):


class CSVWriter(Writer):
def __init__(self, output, headers, filename):
def __init__(self, output, headers, filename, catalog_uri):
super().__init__(output, headers)
self.writer = csv.writer(self.output)
self.write(headers)
Expand All @@ -27,7 +27,7 @@ def write(self, row):


class JSONWriter(Writer):
def __init__(self, output, headers, filename):
def __init__(self, output, headers, filename, catalog_uri):
super().__init__(output, headers)
self.writer = self.output

Expand Down
82 changes: 82 additions & 0 deletions faker_cli/writers/iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from tempfile import TemporaryDirectory
from urllib.parse import urlparse

import click
import pyarrow as pa
from pyiceberg.catalog import Catalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError

from faker_cli.writers.parquet import ParquetWriter


class CatalogManager:
def __init__(self, uri: str, location: str) -> None:
[self.database, self.table] = urlparse(uri).netloc.split(".")
self.catalog = self._from_uri(uri, location)

def _from_uri(self, uri: str, location: str) -> Catalog:
u = urlparse(uri)
if u.scheme == "glue":
try:
from pyiceberg.catalog.glue import GlueCatalog
except ImportError:
raise click.ClickException(
"Using Iceberg writer with Glue catalog, but the 'boto3' package is not installed. "
"Make sure to install faker-cli using `pip install faker-cli[iceberg,glue]`."
)
glue = GlueCatalog(self.database)
try:
glue.load_namespace_properties(self.database)
glue.load_table(u.netloc)
raise Exception("Table already exists, please delete or choose another name.")
except NoSuchNamespaceError:
glue.create_namespace(self.database)
except NoSuchTableError:
pass

return glue

elif u.scheme == "sqlite":
self.temp_path = TemporaryDirectory()
sql = SqlCatalog(
self.database, uri=f"sqlite:////{self.temp_path.name}/pyiceberg_catalog.db", warehouse=location
)
sql.create_namespace(self.database)
return sql
else:
raise Exception("Unsupported catalog type, only glue or sqllite are supported.")

def create_table(self, schema, warehouse_path) -> pa.Table:
if self.catalog is SqlCatalog:
table = self.catalog.create_table(
f"{self.database}.{self.table}",
schema=schema,
)
else:
# location required for GlueCatalog
table = self.catalog.create_table(
f"{self.database}.{self.table}",
schema=schema,
location=warehouse_path.rstrip("/"),
)
return table


class IcebergWriter(ParquetWriter):
def __init__(self, output, headers, filename, catalog_uri):
super().__init__(output, headers, filename, catalog_uri)
self.warehouse_path = filename
self.temp_path = TemporaryDirectory()
self.table: pa.Table = None
self.catalog: CatalogManager = CatalogManager(catalog_uri, filename)

def close(self):
iceberg_table = self.catalog.create_table(self.table.schema, self.warehouse_path)
iceberg_table.overwrite(self.table)

if self.catalog is SqlCatalog:
pa.fs.copy_files(
f"{self.temp_path.name}/pyiceberg_catalog.db", f"{self.warehouse_path}pyiceberg_catalog.db"
)
return super().close()
2 changes: 1 addition & 1 deletion faker_cli/writers/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from faker_cli.writer import Writer

class ParquetWriter(Writer):
def __init__(self, output, headers, filename):
def __init__(self, output, headers, filename, catalog_uri):
super().__init__(output, headers)
self.filename = filename
self.table: pa.Table = None
Expand Down
Loading

0 comments on commit 8b50557

Please sign in to comment.