Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

[WIP] Support RecordBatch #844

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
first implementation of RecordBatch.decode
emmettbutler committed Aug 9, 2018

Verified

This commit was signed with the committer’s verified signature.
tisonkun tison
commit 32f5237b4f0462aa75c911612afb89a1bb2c16e2
43 changes: 33 additions & 10 deletions pykafka/protocol/message.py
Original file line number Diff line number Diff line change
@@ -408,16 +408,20 @@ class RecordBatch(MessageSet):
FirstSequence => int32
Records => [Record]
"""
def __init__(self, records=None, compression_type=CompressionType.NONE,
broker_version='0.9.0'):
super(RecordBatch, self).__init__(messages=records,
def __init__(self, messages=None, compression_type=CompressionType.NONE,
broker_version='0.9.0', first_offset=-1, last_offset_delta=-1,
first_timestamp=-1, max_timestamp=-1, protocol_version=None):
super(RecordBatch, self).__init__(messages=messages,
broker_version=broker_version,
compression_type=compression_type)
self.protocol_version = msg_protocol_version(broker_version)
self.first_offset = -1
self.last_offset_delta = -1
self.first_timestamp = -1
self.max_timestamp = -1
if protocol_version is not None:
self.protocol_version = protocol_version
else:
self.protocol_version = msg_protocol_version(broker_version)
self.first_offset = first_offset
self.last_offset_delta = last_offset_delta
self.first_timestamp = first_timestamp
self.max_timestamp = max_timestamp

def __len__(self):
if self.compression_type == CompressionType.NONE:
@@ -453,7 +457,25 @@ def _get_compressed(self):

@classmethod
def decode(cls, buff, partition_id=-1):
pass
offset = 0
fmt = '!qiiBihiqqqhii'
(first_offset, _, _, protocol_version, _, attr, last_offset_delta,
first_timestamp, max_timestamp, _, _, _,
records_count) = struct_helpers.unpack_from(fmt, buff, offset)
offset += struct.calcsize(fmt)

messages = []
while offset < len(buff):
size = struct.unpack_from('V', buff, offset)
message = Record.decode(buff[offset:offset + size],
partition_id=partition_id)
messages.append(message)
offset += size

return RecordBatch(messages=messages, first_offset=first_offset,
protocol_version=protocol_version, compression_type=attr,
last_offset_delta=last_offset_delta,
first_timestamp=first_timestamp, max_timestamp=max_timestamp)

def pack_into(self, buff, offset):
if self.compression_type == CompressionType.NONE:
@@ -475,7 +497,8 @@ def pack_into(self, buff, offset):
struct.pack_into(fmt, buff, offset + 4, *args)
offset += struct.calcsize(fmt) + 4
for record in records:
record.pack_into(buff, offset)
record.pack_into(buff, offset, base_timestamp=self.first_timestamp,
base_offset=self.first_offset)
offset += len(record)
end_offset = offset