Skip to content

Commit b8c049b

Browse files
authored
Support Arrow format of result sets in QueryService (#721)
1 parent 478a375 commit b8c049b

27 files changed

+1003
-334
lines changed

examples/arrow/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# YDB Python SDK Example: arrow
2+
3+
Example code demonstrating how to get query execution results in Arrow format.
4+
5+
## Requirements
6+
7+
- `pyarrow` version 5.0.0 or higher
8+
- `EnableArrowResultSetFormat` feature flag enabled on the YDB server
9+
10+
See the top-level [README.md](../README.md) file for instructions on running this example.

examples/arrow/basic_example.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import ydb
2+
import pyarrow as pa
3+
4+
5+
def main():
6+
driver_config = ydb.DriverConfig(
7+
endpoint="grpc://localhost:2136",
8+
database="/local",
9+
# credentials=ydb.credentials_from_env_variables(),
10+
# root_certificates=ydb.load_ydb_root_certificate(),
11+
)
12+
13+
try:
14+
driver = ydb.Driver(driver_config)
15+
driver.wait(timeout=5)
16+
except TimeoutError:
17+
raise RuntimeError("Connect failed to YDB")
18+
19+
pool = ydb.QuerySessionPool(driver)
20+
21+
query = """
22+
SELECT * FROM example ORDER BY key LIMIT 100;
23+
"""
24+
25+
format_settings = ydb.ArrowFormatSettings(
26+
compression_codec=ydb.ArrowCompressionCodec(ydb.ArrowCompressionCodecType.ZSTD, 10)
27+
)
28+
29+
result = pool.execute_with_retries(
30+
query,
31+
result_set_format=ydb.QueryResultSetFormat.ARROW,
32+
arrow_format_settings=format_settings,
33+
)
34+
35+
for result_set in result:
36+
schema: pa.Schema = pa.ipc.read_schema(pa.py_buffer(result_set.arrow_format_meta.schema))
37+
batch: pa.RecordBatch = pa.ipc.read_record_batch(pa.py_buffer(result_set.data), schema)
38+
print(f"Record batch with {batch.num_rows} rows and {batch.num_columns} columns")
39+
40+
41+
if __name__ == "__main__":
42+
main()

ydb/_grpc/grpcwrapper/ydb_query.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import typing
33
from typing import Optional
44

5-
65
# Workaround for good IDE and universal for runtime
76
if typing.TYPE_CHECKING:
87
from ..v4.protos import ydb_query_pb2
@@ -168,15 +167,24 @@ class ExecuteQueryRequest(IToProto):
168167
exec_mode: int
169168
parameters: dict
170169
stats_mode: int
170+
schema_inclusion_mode: int
171+
result_set_format: int
172+
arrow_format_settings: Optional[public_types.ArrowFormatSettings]
171173

172174
def to_proto(self) -> ydb_query_pb2.ExecuteQueryRequest:
173175
tx_control = self.tx_control.to_proto() if self.tx_control is not None else self.tx_control
176+
arrow_format_settings = (
177+
self.arrow_format_settings.to_proto() if self.arrow_format_settings is not None else None
178+
)
174179
return ydb_query_pb2.ExecuteQueryRequest(
175180
session_id=self.session_id,
176181
tx_control=tx_control,
177182
query_content=self.query_content.to_proto(),
178183
exec_mode=self.exec_mode,
179184
stats_mode=self.stats_mode,
185+
schema_inclusion_mode=self.schema_inclusion_mode,
186+
result_set_format=self.result_set_format,
187+
arrow_format_settings=arrow_format_settings,
180188
concurrent_result_sets=self.concurrent_result_sets,
181189
parameters=convert.query_parameters_to_pb(self.parameters),
182190
)

ydb/_grpc/grpcwrapper/ydb_query_public_types.py

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import abc
2+
import enum
23
import typing
34

4-
from .common_utils import IToProto
5+
from .common_utils import IFromProto, IToProto
56

67
# Workaround for good IDE and universal for runtime
78
if typing.TYPE_CHECKING:
8-
from ..v4.protos import ydb_query_pb2
9+
from ..v4.protos import ydb_query_pb2, ydb_formats_pb2
910
else:
10-
from ..common.protos import ydb_query_pb2
11+
from ..common.protos import ydb_query_pb2, ydb_formats_pb2
1112

1213

1314
class BaseQueryTxMode(IToProto):
@@ -93,3 +94,51 @@ def name(self):
9394

9495
def to_proto(self) -> ydb_query_pb2.StaleModeSettings:
9596
return ydb_query_pb2.StaleModeSettings()
97+
98+
99+
class ArrowCompressionCodecType(enum.IntEnum):
100+
UNSPECIFIED = 0
101+
NONE = 1
102+
ZSTD = 2
103+
LZ4_FRAME = 3
104+
105+
106+
class ArrowCompressionCodec(IToProto):
107+
"""Compression codec for Arrow format result sets."""
108+
109+
def __init__(
110+
self, codec_type: typing.Optional[ArrowCompressionCodecType] = None, level: typing.Optional[int] = None
111+
):
112+
self.type = codec_type if codec_type is not None else ArrowCompressionCodecType.UNSPECIFIED
113+
self.level = level
114+
115+
def to_proto(self):
116+
return ydb_formats_pb2.ArrowFormatSettings.CompressionCodec(type=self.type, level=self.level)
117+
118+
119+
class ArrowFormatSettings(IToProto):
120+
"""Settings for Arrow format result sets."""
121+
122+
def __init__(self, compression_codec: typing.Optional[ArrowCompressionCodec] = None):
123+
self.compression_codec = compression_codec
124+
125+
def to_proto(self):
126+
settings = ydb_formats_pb2.ArrowFormatSettings()
127+
if self.compression_codec is not None:
128+
codec_proto = self.compression_codec.to_proto()
129+
settings.compression_codec.CopyFrom(codec_proto)
130+
return settings
131+
132+
133+
class ArrowFormatMeta(IFromProto):
134+
"""Metadata for Arrow format result sets containing the schema."""
135+
136+
def __init__(self, schema: bytes):
137+
self.schema = schema
138+
139+
@classmethod
140+
def from_proto(cls, proto_message):
141+
return cls(schema=proto_message.schema)
142+
143+
def __repr__(self):
144+
return f"ArrowFormatMeta(schema_size={len(self.schema)} bytes)"

ydb/_grpc/v3/protos/ydb_formats_pb2.py

Lines changed: 175 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)