Skip to content

Commit

Permalink
Convert the Logical to Physical map to a visitor (#43)
Browse files Browse the repository at this point in the history
* Convert the Logical to Physical map to a visitor

I noticed that the FixedType was missing.

* Comments, thanks Ryan!
  • Loading branch information
Fokko committed Oct 7, 2023
1 parent 6e9b716 commit ce85358
Showing 1 changed file with 65 additions and 22 deletions.
87 changes: 65 additions & 22 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1099,21 +1099,70 @@ def map_value_partner(self, partner_map: Optional[pa.Array]) -> Optional[pa.Arra
return partner_map.items if isinstance(partner_map, pa.MapArray) else None


_PRIMITIVE_TO_PHYSICAL = {
BooleanType(): "BOOLEAN",
IntegerType(): "INT32",
LongType(): "INT64",
FloatType(): "FLOAT",
DoubleType(): "DOUBLE",
DateType(): "INT32",
TimeType(): "INT64",
TimestampType(): "INT64",
TimestamptzType(): "INT64",
StringType(): "BYTE_ARRAY",
UUIDType(): "FIXED_LEN_BYTE_ARRAY",
BinaryType(): "BYTE_ARRAY",
}
_PHYSICAL_TYPES = set(_PRIMITIVE_TO_PHYSICAL.values()).union({"INT96"})
def _primitive_to_phyisical(iceberg_type: PrimitiveType) -> str:
return visit(iceberg_type, _PRIMITIVE_TO_PHYISCAL_TYPE_VISITOR)


class PrimitiveToPhysicalType(SchemaVisitorPerPrimitiveType[str]):
def schema(self, schema: Schema, struct_result: str) -> str:
raise ValueError(f"Expected primitive-type, got: {schema}")

def struct(self, struct: StructType, field_results: List[str]) -> str:
raise ValueError(f"Expected primitive-type, got: {struct}")

def field(self, field: NestedField, field_result: str) -> str:
raise ValueError(f"Expected primitive-type, got: {field}")

def list(self, list_type: ListType, element_result: str) -> str:
raise ValueError(f"Expected primitive-type, got: {list_type}")

def map(self, map_type: MapType, key_result: str, value_result: str) -> str:
raise ValueError(f"Expected primitive-type, got: {map_type}")

def visit_fixed(self, fixed_type: FixedType) -> str:
return "BYTE_ARRAY"

def visit_decimal(self, decimal_type: DecimalType) -> str:
return "FIXED_LEN_BYTE_ARRAY"

def visit_boolean(self, boolean_type: BooleanType) -> str:
return "BOOLEAN"

def visit_integer(self, integer_type: IntegerType) -> str:
return "INT32"

def visit_long(self, long_type: LongType) -> str:
return "INT64"

def visit_float(self, float_type: FloatType) -> str:
return "FLOAT"

def visit_double(self, double_type: DoubleType) -> str:
return "DOUBLE"

def visit_date(self, date_type: DateType) -> str:
return "INT32"

def visit_time(self, time_type: TimeType) -> str:
return "INT64"

def visit_timestamp(self, timestamp_type: TimestampType) -> str:
return "INT64"

def visit_timestamptz(self, timestamptz_type: TimestamptzType) -> str:
return "INT64"

def visit_string(self, string_type: StringType) -> str:
return "BYTE_ARRAY"

def visit_uuid(self, uuid_type: UUIDType) -> str:
return "FIXED_LEN_BYTE_ARRAY"

def visit_binary(self, binary_type: BinaryType) -> str:
return "BYTE_ARRAY"


_PRIMITIVE_TO_PHYISCAL_TYPE_VISITOR = PrimitiveToPhysicalType()


class StatsAggregator:
Expand All @@ -1126,13 +1175,7 @@ def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc
self.current_max = None
self.trunc_length = trunc_length

if physical_type_string not in _PHYSICAL_TYPES:
raise ValueError(f"Unknown physical type {physical_type_string}")

if physical_type_string == "INT96":
raise NotImplementedError("Statistics not implemented for INT96 physical type")

expected_physical_type = _PRIMITIVE_TO_PHYSICAL[iceberg_type]
expected_physical_type = _primitive_to_phyisical(iceberg_type)
if expected_physical_type != physical_type_string:
raise ValueError(
f"Unexpected physical type {physical_type_string} for {iceberg_type}, expected {expected_physical_type}"
Expand Down

0 comments on commit ce85358

Please sign in to comment.