diff --git a/pysyncobj/config.py b/pysyncobj/config.py index 5729cf9..9c0db50 100644 --- a/pysyncobj/config.py +++ b/pysyncobj/config.py @@ -115,8 +115,14 @@ def __init__(self, **kwargs): self.fullDumpFile = kwargs.get('fullDumpFile', None) #: File to store operations journal. Save each record as soon as received. + #: If unspecified or None, an in-memory journal is used. self.journalFile = kwargs.get('journalFile', None) + #: Flush the journal whenever something is written to it. + #: Enabled by default if a journalFile is used. + #: Cannot be enabled if the journalFile is unspecified (i.e. when the journal is not preserved). + self.flushJournal = kwargs.get('flushJournal', None) + #: Will try to bind port every bindRetryTime seconds until success. self.bindRetryTime = kwargs.get('bindRetryTime', 1.0) @@ -179,6 +185,7 @@ def validate(self): assert self.logCompactionMinEntries >= 2 assert self.logCompactionMinTime > 0 assert self.logCompactionBatchSize > 0 + assert self.journalFile is not None or not self.flushJournal, 'flushJournal cannot be enabled without specifying a journal file' assert self.bindRetryTime > 0 assert (self.deserializer is None) == (self.serializer is None) if self.serializer is not None: diff --git a/pysyncobj/journal.py b/pysyncobj/journal.py index 2ae88a3..3a25897 100644 --- a/pysyncobj/journal.py +++ b/pysyncobj/journal.py @@ -1,12 +1,40 @@ +import hashlib import os import mmap +import pysyncobj.pickle import struct +from .atomic_replace import atomicReplace from .version import VERSION -from .pickle import to_bytes class Journal(object): + @property + def currentTerm(self): + raise NotImplementedError + + @currentTerm.setter + def currentTerm(self, term): + raise NotImplementedError + + @property + def votedForNodeId(self): + raise NotImplementedError + + @votedForNodeId.setter + def votedForNodeId(self, nodeId): + raise NotImplementedError + + def set_currentTerm_and_votedForNodeId(self, term, nodeId): + """ + Convenience method since the two are often modified at the same time. + + Subclasses may choose to implement a more efficient method than setting the two individually here. + """ + + self.currentTerm = term + self.votedForNodeId = nodeId + def add(self, command, idx, term): raise NotImplementedError @@ -34,6 +62,24 @@ class MemoryJournal(Journal): def __init__(self): self.__journal = [] self.__bytesSize = 0 + self.__currentTerm = 0 + self.__votedForNodeId = None + + @property + def currentTerm(self): + return self.__currentTerm + + @currentTerm.setter + def currentTerm(self, term): + self.__currentTerm = term + + @property + def votedForNodeId(self): + return self.__votedForNodeId + + @votedForNodeId.setter + def votedForNodeId(self, nodeId): + self.__votedForNodeId = nodeId def add(self, command, idx, term): self.__journal.append((command, idx, term)) @@ -57,6 +103,32 @@ def _destroy(self): pass +class VotedForNodeIdHashProxy(object): + """ + A proxy for the voted-for node ID storing only the hash. + + This object can only be used for equality tests (equal if the MD5 hash of the other operand after pickling is identical) and identity checks against None ('is None'). + """ + + def __init__(self, nodeId = None, _hash = None): + # Accepts either a node ID or a hash, but the latter is not public API (optimisation because the FileJournal already needs to compute the hash) + if nodeId is None and _hash is None: + raise ValueError('Argument required') + if _hash is not None: + self.__hash = _hash + else: + self.__hash = hashlib.md5(pysyncobj.pickle.dumps(nodeId)).digest() + + def __eq__(self, other): + return self.__hash == hashlib.md5(pysyncobj.pickle.dumps(other)).digest() + + def __ne__(self, other): # Py2 compatibility + return not (self == other) + + def __repr__(self): + return '{}({!r})'.format(type(self).__name__, self.__hash) + + class ResizableFile(object): def __init__(self, fileName, initialSize = 1024, resizeFactor = 2.0, defaultContent = None): @@ -78,7 +150,7 @@ def __init__(self, fileName, initialSize = 1024, resizeFactor = 2.0, defaultCont def write(self, offset, values): size = len(values) currSize = self.__mm.size() - if offset + size > self.__mm.size(): + while offset + size > self.__mm.size(): try: self.__mm.resize(int(self.__mm.size() * self.__resizeFactor)) except SystemError: @@ -106,7 +178,7 @@ def flush(self): -JOURNAL_FORMAT_VERSION = 1 +JOURNAL_FORMAT_VERSION = 2 APP_NAME = b'PYSYNCOBJ' APP_VERSION = str.encode(VERSION) @@ -114,20 +186,68 @@ def flush(self): VERSION_SIZE = 8 assert len(APP_NAME) < NAME_SIZE assert len(APP_VERSION) < VERSION_SIZE -FIRST_RECORD_OFFSET = NAME_SIZE + VERSION_SIZE + 4 + 4 -LAST_RECORD_OFFSET_OFFSET = NAME_SIZE + VERSION_SIZE + 4 +CURRENT_TERM_SIZE = 8 +VOTED_FOR_SIZE = 16 +FIRST_RECORD_OFFSET = NAME_SIZE + VERSION_SIZE + 4 + CURRENT_TERM_SIZE + VOTED_FOR_SIZE + 4 +LAST_RECORD_OFFSET_OFFSET = NAME_SIZE + VERSION_SIZE + 4 + CURRENT_TERM_SIZE + VOTED_FOR_SIZE -# -# APP_NAME (24b) + APP_VERSION (8b) + FORMAT_VERSION (4b) + LAST_RECORD_OFFSET (4b) + -# record1size + record1 + record1size + record2size + record2 + record2size + ... -# (record1) | (record2) | ... -# +# Journal version 2: +# APP_NAME (24b) + APP_VERSION (8b) + FORMAT_VERSION (4b) + CURRENT_TERM (8b) + VOTED_FOR (16b) + LAST_RECORD_OFFSET (4b) + +# record1size + record1 + record1size + record2size + record2 + record2size + ... +# (record1) | (record2) | ... + +# VOTED_FOR is an MD5 hash of the pickled node ID. +# LAST_RECORD_OFFSET is the offset from the beginning of the journal file at which the last record ends. + +VOTED_FOR_NONE_HASH = hashlib.md5(pysyncobj.pickle.dumps(None)).digest() + +# Version 1 is identical except it has neither CURRENT_TERM nor VOTED_FOR. class FileJournal(Journal): - def __init__(self, journalFile): + def __init__(self, journalFile, flushJournal): self.__journalFile = ResizableFile(journalFile, defaultContent=self.__getDefaultHeader()) self.__journal = [] + + # Handle journal format version upgrades + version = struct.unpack(' self.__raftCurrentTerm: - self.__raftCurrentTerm = message['term'] - self.__votedForNodeId = None + if message['term'] > self.__raftLog.currentTerm: + self.__raftLog.set_currentTerm_and_votedForNodeId(message['term'], None) self.__setState(_RAFT_STATE.FOLLOWER) self.__raftLeader = None + if self.__voteBlockTime is not None and time.time() <= self.__voteBlockTime: + return + self.__voteBlockTime = None + if self.__raftState in (_RAFT_STATE.FOLLOWER, _RAFT_STATE.CANDIDATE): lastLogTerm = message['last_log_term'] lastLogIdx = message['last_log_index'] - if message['term'] >= self.__raftCurrentTerm: + if message['term'] >= self.__raftLog.currentTerm: if lastLogTerm < self.__getCurrentLogTerm(): return if lastLogTerm == self.__getCurrentLogTerm() and \ lastLogIdx < self.__getCurrentLogIndex(): return - if self.__votedForNodeId is not None: + if self.__raftLog.votedForNodeId is not None: return - self.__votedForNodeId = node.id + self.__raftLog.votedForNodeId = node.id self.__raftElectionDeadline = time.time() + self.__generateRaftTimeout() self.__transport.send(node, { @@ -738,14 +747,13 @@ def __onMessageReceived(self, node, message): 'term': message['term'], }) - if message['type'] == 'append_entries' and message['term'] >= self.__raftCurrentTerm: + if message['type'] == 'append_entries' and message['term'] >= self.__raftLog.currentTerm: self.__raftElectionDeadline = time.time() + self.__generateRaftTimeout() if self.__raftLeader != node: self.__onLeaderChanged() self.__raftLeader = node - if message['term'] > self.__raftCurrentTerm: - self.__raftCurrentTerm = message['term'] - self.__votedForNodeId = None + if message['term'] > self.__raftLog.currentTerm: + self.__raftLog.set_currentTerm_and_votedForNodeId(message['term'], None) self.__setState(_RAFT_STATE.FOLLOWER) newEntries = message.get('entries', []) serialized = message.get('serialized', None) @@ -832,7 +840,7 @@ def __onMessageReceived(self, node, message): self.__commandsWaitingCommit[idx].append((term, callback)) if self.__raftState == _RAFT_STATE.CANDIDATE: - if message['type'] == 'response_vote' and message['term'] == self.__raftCurrentTerm: + if message['type'] == 'response_vote' and message['term'] == self.__raftLog.currentTerm: self.__votesCount += 1 if self.__votesCount > (len(self.__otherNodes) + 1) / 2: @@ -962,7 +970,7 @@ def _isReady(self): return self.isReady() def _getTerm(self): - return self.__raftCurrentTerm + return self.__raftLog.currentTerm def _getRaftLogSize(self): return len(self.__raftLog) @@ -992,7 +1000,7 @@ def __onBecomeLeader(self): self.__lastResponseTime[node] = time.time() # No-op command after leader election. - idx, term = self.__getCurrentLogIndex() + 1, self.__raftCurrentTerm + idx, term = self.__getCurrentLogIndex() + 1, self.__raftLog.currentTerm self.__raftLog.add(_bchr(_COMMAND_TYPE.NO_OP), idx, term) self.__noopIDx = idx if not self.__conf.appendEntriesUseBatch: @@ -1050,7 +1058,7 @@ def __sendAppendEntries(self): 'type': 'append_entries', 'transmission': transmission, 'data': currData, - 'term': self.__raftCurrentTerm, + 'term': self.__raftLog.currentTerm, 'commit_index': self.__raftCommitIndex, 'prevLogIdx': prevLogIdx, 'prevLogTerm': prevLogTerm, @@ -1059,7 +1067,7 @@ def __sendAppendEntries(self): else: message = { 'type': 'append_entries', - 'term': self.__raftCurrentTerm, + 'term': self.__raftLog.currentTerm, 'commit_index': self.__raftCommitIndex, 'entries': entries, 'prevLogIdx': prevLogIdx, @@ -1070,7 +1078,7 @@ def __sendAppendEntries(self): transmissionData = self.__serializer.getTransmissionData(node) message = { 'type': 'append_entries', - 'term': self.__raftCurrentTerm, + 'term': self.__raftLog.currentTerm, 'commit_index': self.__raftCommitIndex, 'serialized': transmissionData, } diff --git a/test_syncobj.py b/test_syncobj.py index 1b92207..dce04f2 100755 --- a/test_syncobj.py +++ b/test_syncobj.py @@ -1,4 +1,7 @@ from __future__ import print_function +import hashlib +import io +import math import os import time import pytest @@ -15,6 +18,7 @@ from pysyncobj import SyncObj, SyncObjConf, replicated, FAIL_REASON, _COMMAND_TYPE, \ createJournal, HAS_CRYPTO, replicated_sync, Utility, SyncObjException, SyncObjConsumer, _RAFT_STATE from pysyncobj.batteries import ReplCounter, ReplList, ReplDict, ReplSet, ReplLockManager, ReplQueue, ReplPriorityQueue +import pysyncobj.journal from pysyncobj.node import TCPNode from collections import defaultdict @@ -31,6 +35,8 @@ class TEST_TYPE: AUTO_TICK_1 = 5 WAIT_BIND = 6 LARGE_COMMAND = 7 + NOFLUSH_NOVOTE_1 = 8 + NOFLUSH_NOVOTE_2 = 9 class TestObj(SyncObj): @@ -109,6 +115,15 @@ def __init__(self, selfNodeAddr, otherNodeAddrs, cfg.maxBindRetries = 1 cfg.autoTick = True + if testType in (TEST_TYPE.NOFLUSH_NOVOTE_1, TEST_TYPE.NOFLUSH_NOVOTE_2): + cfg.logCompactionMinTime = 999999 + cfg.logCompactionMinEntries = 999999 + cfg.raftMinTimeout = 0.5 if testType == TEST_TYPE.NOFLUSH_NOVOTE_1 else 2.4 + cfg.raftMaxTimeout = cfg.raftMinTimeout * 1.000001 + cfg.fullDumpFile = dumpFile + cfg.journalFile = journalFile + cfg.flushJournal = testType == TEST_TYPE.NOFLUSH_NOVOTE_1 + super(TestObj, self).__init__(selfNodeAddr, otherNodeAddrs, cfg, consumers) self.__counter = 0 self.__data = {} @@ -357,6 +372,92 @@ def test_syncThreeObjectsLeaderFail(): o2._destroy() o3._destroy() +def sync_noflush_novote(journalFile2Enabled): + # Test that if flushing is disabled, the node won't vote until the maximum timeout has been exceeded + # o1's timeout is set to 0.5 seconds, so it will call for an election almost immediately. + # o2's timeout is set to 2.4 seconds, so it will not call for an election before o1, and it will ignore o1's request_vote messages. + # Specifically, o2 is expected to ignore the messages until 1.1 * timeout, i.e. including the one sent by o1 after 2.5 seconds, except for updating its term. + # Note that o1 has flushing enabled but o2 doesn't! + + def tick(o1, o2, startTime, totalTickTime, sleepTime): + # Tick o1 and o2 until totalTickTime has elapsed since startTime + assert time.time() < startTime + totalTickTime # Make sure that we tick at least once + while time.time() < start + totalTickTime: + o1.doTick() + o2.doTick() + time.sleep(sleepTime) + + random.seed(42) + + a = [getNextAddr(), getNextAddr()] + if journalFile2Enabled: + journalFiles = [getNextJournalFile(), getNextJournalFile()] + else: + journalFiles = [getNextJournalFile()] + removeFiles(journalFiles) + + # Make sure that o1 already has a log; this means that it will never accept a vote request from o2 + o1 = TestObj(a[0], [], TEST_TYPE.NOFLUSH_NOVOTE_1, journalFile = journalFiles[0]) + doTicks([o1], 10, stopFunc=lambda: o1.isReady()) + o1.addValue(42) + doTicks([o1], 10, stopFunc=lambda: o1.getCounter() == 42) + o1._destroy() + + states = defaultdict(list) + + start = time.time() + o1 = TestObj(a[0], [a[1]], TEST_TYPE.NOFLUSH_NOVOTE_1, journalFile = journalFiles[0], onStateChanged=lambda old, new: states[a[0]].append(new)) + o2 = TestObj(a[1], [a[0]], TEST_TYPE.NOFLUSH_NOVOTE_2, journalFile = journalFiles[1] if journalFile2Enabled else None, onStateChanged=lambda old, new: states[a[1]].append(new)) + objs = [o1, o2] + + assert not o1._isReady() + assert not o2._isReady() + + tick(o1, o2, start, 2.25, 0.01) + + # Here, o1 has called several elections, but o2 never granted its vote. + + assert o1._SyncObj__raftState == _RAFT_STATE.CANDIDATE + assert o2._SyncObj__raftState == _RAFT_STATE.FOLLOWER + assert _RAFT_STATE.LEADER not in states[a[0]] + assert states[a[1]] == [] # Never had a state change, i.e. it's still the default follower + + tick(o1, o2, start, 2.45, 0.01) + + # We have now surpassed o2's timeout, but the last vote request from o1 was at 2.0, i.e. *before* o2's 1.1 * timeout (= 2.64) has expired. + # o2 is expected to have called for an election at 2.4, but o1 would never vote for o2 due to the missing log entry. + # o1 converted to a follower due to o2's bigger term. + + assert o1._SyncObj__raftState == _RAFT_STATE.FOLLOWER + assert o2._SyncObj__raftState == _RAFT_STATE.CANDIDATE + assert _RAFT_STATE.LEADER not in states[a[0]] + assert _RAFT_STATE.LEADER not in states[a[1]] + + tick(o1, o2, start, 2.55, 0.01) + + # While o1 converted to a follower at 2.4 due to o2's vote request, it still called for an election at 2.5 since the term change doesn't affect the election timeout. + # Therefore, o2 converted to a follower again at 2.5 due to the bigger term. However, o2 still didn't grant its vote to o1 since 1.1 * timeout (= 2.64) hasn't elapsed yet. + + assert o1._SyncObj__raftState == _RAFT_STATE.CANDIDATE + assert o2._SyncObj__raftState == _RAFT_STATE.FOLLOWER + assert _RAFT_STATE.LEADER not in states[a[0]] + assert _RAFT_STATE.LEADER not in states[a[1]] + + tick(o1, o2, start, 3.15, 0.01) + + # o1 called for another election at 3.0, i.e. after o2's vote block timeout, so it should now be elected. + assert o1._SyncObj__raftState == _RAFT_STATE.LEADER + assert o2._SyncObj__raftState == _RAFT_STATE.FOLLOWER + assert _RAFT_STATE.LEADER not in states[a[1]] + + o1._destroy() + o2._destroy() + removeFiles(journalFiles) + +def test_sync_noflush_novote(): + sync_noflush_novote(True) # Test with an unflushed journal file + sync_noflush_novote(False) # Test with a memory journal + def test_manyActionsLogCompaction(): random.seed(42) @@ -1059,32 +1160,287 @@ def test_journalTest2(): removeFiles(journalFiles) removeFiles(journalFiles) - journal = createJournal(journalFiles[0]) + journal = createJournal(journalFiles[0], True) journal.add(b'cmd1', 1, 0) journal.add(b'cmd2', 2, 0) journal.add(b'cmd3', 3, 0) + journal.currentTerm = 42 + journal.votedForNodeId = 'example.org' journal._destroy() - journal = createJournal(journalFiles[0]) + journal = createJournal(journalFiles[0], True) assert len(journal) == 3 assert journal[0] == (b'cmd1', 1, 0) assert journal[-1] == (b'cmd3', 3, 0) + assert journal.currentTerm == 42 + assert journal.votedForNodeId == 'example.org' journal.deleteEntriesFrom(2) + journal.set_currentTerm_and_votedForNodeId(100, 'other.example.net') journal._destroy() - journal = createJournal(journalFiles[0]) + journal = createJournal(journalFiles[0], True) assert len(journal) == 2 assert journal[0] == (b'cmd1', 1, 0) assert journal[-1] == (b'cmd2', 2, 0) + assert journal.currentTerm == 100 + assert journal.votedForNodeId == 'other.example.net' journal.deleteEntriesTo(1) journal._destroy() - journal = createJournal(journalFiles[0]) + journal = createJournal(journalFiles[0], True) assert len(journal) == 1 assert journal[0] == (b'cmd2', 2, 0) journal._destroy() removeFiles(journalFiles) + +def test_journal_flushing(): + # Patch ResizableFile to simulate a file which isn't flushed without calling flush explicitly; the read method will always return the expected data, but it won't be written to disk. + # This is done because testing actual flushing is very difficult from this level. However, it achieves the same effect and tests whether the FileJournal itself handles flushing correctly. + # Note that this does *not* test whether ResizableFile flushing is correct. + + class TestResizableFile(object): + # A class that provides the same API as pysyncobj.journal.ResizableFile but never writes data to disk unless it's explicitly flushed. + # To achieve this, it simply uses an in-memory buffer that is written to disk upon flush. This is to avoid having nearly identical code to ResizableFile in here. + + def __init__(self, filename, defaultContent = None, **kwargs): + initialData = defaultContent + if os.path.exists(filename): + with open(filename, 'rb') as fp: + initialData = fp.read() + self._buffer = io.BytesIO(initialData) + self._filename = filename + + def write(self, offset, data): + self._buffer.seek(offset) + self._buffer.write(data) + + def read(self, offset, size): + self._buffer.seek(offset) + return self._buffer.read(size) + + def flush(self): + with open(self._filename, 'wb') as fp: + fp.write(self._buffer.getvalue()) + + def _destroy(self): + self.flush() + self._buffer.close() + + origResizableFile = pysyncobj.journal.ResizableFile + try: + pysyncobj.journal.ResizableFile = TestResizableFile + + journalFiles = [getNextJournalFile()] + + def run_test(flushJournal, useDestroy): + for mode in ('add', 'clear', 'deleteEntriesFrom', 'deleteEntriesTo', 'currentTerm', 'votedForNodeId', 'currentTerm and votedForNodeId'): + removeFiles(journalFiles) + journal = createJournal(journalFiles[0], flushJournal) + assert len(journal) == 0 + journal.add(b'cmd1', 1, 0) + journal.add(b'cmd2', 2, 0) + journal.add(b'cmd3', 3, 0) + expectedCurrentTermNotWorking = 13 + journal.currentTerm = 13 + expectedVotedForNodeIdNotWorking = 'bad.example.org' + journal.votedForNodeId = 'bad.example.org' + journal.flush() + expectedNotWorking = [(b'cmd1', 1, 0), (b'cmd2', 2, 0), (b'cmd3', 3, 0)] + + # Default values for when something else is being tested + expectedCurrentTermWorking = expectedCurrentTermNotWorking + expectedVotedForNodeIdWorking = expectedVotedForNodeIdNotWorking + expectedWorking = expectedNotWorking + + if mode == 'add': + journal.add(b'cmd4', 4, 0) + expectedWorking = [(b'cmd1', 1, 0), (b'cmd2', 2, 0), (b'cmd3', 3, 0), (b'cmd4', 4, 0)] + elif mode == 'clear': + journal.clear() + expectedWorking = [] + elif mode == 'deleteEntriesFrom': + journal.deleteEntriesFrom(2) + expectedWorking = [(b'cmd1', 1, 0), (b'cmd2', 2, 0)] + elif mode == 'deleteEntriesTo': + journal.deleteEntriesTo(2) + expectedWorking = [(b'cmd3', 3, 0)] + elif mode == 'currentTerm': + journal.currentTerm = 42 + expectedCurrentTermWorking = 42 + elif mode == 'votedForNodeId': + journal.votedForNodeId = 'good.example.org' + expectedVotedForNodeIdWorking = 'good.example.org' + elif mode == 'currentTerm and votedForNodeId': + journal.set_currentTerm_and_votedForNodeId(42, 'good.example.org') + expectedCurrentTermWorking = 42 + expectedVotedForNodeIdWorking = 'good.example.org' + + if useDestroy: + journal._destroy() + del journal + + journal = createJournal(journalFiles[0], flushJournal) + if flushJournal or useDestroy: + assert journal[:] == expectedWorking + assert journal.currentTerm == expectedCurrentTermWorking + assert journal.votedForNodeId == expectedVotedForNodeIdWorking + else: + assert journal[:] == expectedNotWorking + assert journal.currentTerm == expectedCurrentTermNotWorking + assert journal.votedForNodeId == expectedVotedForNodeIdNotWorking + + # Verify that, when journal flushing is disabled, values written after the last flush without using _destroy (which implicitly flushes) are not preserved. + run_test(False, False) + + # Verify that using _destroy causes a flush + run_test(False, True) + + # Verify that journal flushing without _destroy preserves everything + run_test(True, False) + + # Verify that journal flushing with _destroy works as well + run_test(True, True) + finally: + pysyncobj.journal.ResizableFile = origResizableFile + + removeFiles(journalFiles) + + +def test_journal_upgrade_version_1_to_2(): + def data_to_entry(index, term, data): + d = struct.pack('?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89\x8a\x8b\x8c\x8d\x8e\x8f\x90\x91\x92\x93\x94\x95\x96\x97\x98\x99\x9a\x9b\x9c\x9d\x9e\x9f\xa0\xa1\xa2\xa3\xa4\xa5\xa6\xa7\xa8\xa9\xaa\xab\xac\xad\xae\xaf\xb0\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba\xbb\xbc\xbd\xbe\xbf\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc\xcd\xce\xcf\xd0\xd1\xd2\xd3\xd4\xd5\xd6\xd7\xd8\xd9\xda\xdb\xdc\xdd\xde\xdf\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xfb\xfc\xfd\xfe\xff'), + [(bytes(bytearray.fromhex(''.join('{:02x}'.format(i) for i in range(256)))), 1, 1)] # bytes(range(256)) on Py3 + ), + + # Empty data should work as well + ( + 'empty data', + data_to_entry(1, 1, b''), + [(b'', 1, 1)] + ), + + # A few entries + ( + 'a few entries', + data_to_entry(1, 1, b'\x01') + data_to_entry(2, 2, b'\x01') + data_to_entry(3, 4, b'\x01'), + [(b'\x01', 1, 1), (b'\x01', 2, 2), (b'\x01', 3, 4)] + ), + ] + + # Generate larger journals + # 400 B: some arbitrary length which fits well within a single block + # 1000 B: should expand to exactly 1024 B in the new format, i.e. no resizing of the file necessary + # 1020/3/4 B: almost or entirely full journal which can't fit the new header fields anymore, so it needs to be expanded on migration + # 1025 B: a journal which already has been expanded + # pi MiB: some large journal + for length in (400, 1000, 1020, 1023, 1024, 1025, int(math.pi * 1048576)): # length including the header... + for smallRecords in (True, False): + entries = [] + expectedEntries = [] + deltaLength = length - len(baseHeader) - 4 # 4: last record offset + + # Each record has an overhead of 8 bytes (length of the record at beginning and end) and also stores an index and a term with 8 bytes each. + if smallRecords: + # Small records hold only 1 B of data, i.e. are 25 B in total; so we create deltaLength // 25 - 1 such records and then fill up with one possibly slightly larger record (up to 25 bytes of payload) + nSmall = deltaLength // 25 - 1 + for i in range(nSmall): + entries.append(data_to_entry(i, i, b'\x01')) + expectedEntries.append((b'\x01', i, i)) + remainingData = b'\x00' * (deltaLength - nSmall * 25 - 24) + entries.append(data_to_entry(nSmall, nSmall, remainingData)) + expectedEntries.append((remainingData, nSmall, nSmall)) + else: + # Just one huge record to fit the length + data = b'\x00' * (deltaLength - 24) + entries.append(data_to_entry(1, 1, data)) + expectedEntries.append((data, 1, 1)) + + journals.append(('length={} small={}'.format(length, smallRecords), b''.join(entries), expectedEntries)) + + journalFile = getNextJournalFile() + + for name, entryBytes, expectedEntries in journals: + print(name) + removeFiles([journalFile]) + with open(journalFile, 'wb') as fp: + size = 0 + fp.write(baseHeader) + size += len(baseHeader) + fp.write(struct.pack('