Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix persistence and add automatic journal flushing #98

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
7 changes: 7 additions & 0 deletions pysyncobj/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,14 @@ def __init__(self, **kwargs):
self.fullDumpFile = kwargs.get('fullDumpFile', None)

#: File to store operations journal. Save each record as soon as received.
#: If unspecified or None, an in-memory journal is used.
self.journalFile = kwargs.get('journalFile', None)

#: Flush the journal whenever something is written to it.
#: Enabled by default if a journalFile is used.
#: Cannot be enabled if the journalFile is unspecified (i.e. when the journal is not preserved).
self.flushJournal = kwargs.get('flushJournal', None)

#: Will try to bind port every bindRetryTime seconds until success.
self.bindRetryTime = kwargs.get('bindRetryTime', 1.0)

Expand Down Expand Up @@ -179,6 +185,7 @@ def validate(self):
assert self.logCompactionMinEntries >= 2
assert self.logCompactionMinTime > 0
assert self.logCompactionBatchSize > 0
assert self.journalFile is not None or not self.flushJournal, 'flushJournal cannot be enabled without specifying a journal file'
assert self.bindRetryTime > 0
assert (self.deserializer is None) == (self.serializer is None)
if self.serializer is not None:
Expand Down
209 changes: 191 additions & 18 deletions pysyncobj/journal.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,40 @@
import hashlib
import os
import mmap
import pysyncobj.pickle
import struct

from .atomic_replace import atomicReplace
from .version import VERSION
from .pickle import to_bytes

class Journal(object):

@property
def currentTerm(self):
raise NotImplementedError

@currentTerm.setter
def currentTerm(self, term):
raise NotImplementedError

@property
def votedForNodeId(self):
raise NotImplementedError

@votedForNodeId.setter
def votedForNodeId(self, nodeId):
raise NotImplementedError

def set_currentTerm_and_votedForNodeId(self, term, nodeId):
"""
Convenience method since the two are often modified at the same time.

Subclasses may choose to implement a more efficient method than setting the two individually here.
"""

self.currentTerm = term
self.votedForNodeId = nodeId

def add(self, command, idx, term):
raise NotImplementedError

Expand Down Expand Up @@ -34,6 +62,24 @@ class MemoryJournal(Journal):
def __init__(self):
self.__journal = []
self.__bytesSize = 0
self.__currentTerm = 0
self.__votedForNodeId = None

@property
def currentTerm(self):
return self.__currentTerm

@currentTerm.setter
def currentTerm(self, term):
self.__currentTerm = term

@property
def votedForNodeId(self):
return self.__votedForNodeId

@votedForNodeId.setter
def votedForNodeId(self, nodeId):
self.__votedForNodeId = nodeId

def add(self, command, idx, term):
self.__journal.append((command, idx, term))
Expand All @@ -57,6 +103,32 @@ def _destroy(self):
pass


class VotedForNodeIdHashProxy(object):
"""
A proxy for the voted-for node ID storing only the hash.

This object can only be used for equality tests (equal if the MD5 hash of the other operand after pickling is identical) and identity checks against None ('is None').
"""

def __init__(self, nodeId = None, _hash = None):
# Accepts either a node ID or a hash, but the latter is not public API (optimisation because the FileJournal already needs to compute the hash)
if nodeId is None and _hash is None:
raise ValueError('Argument required')
if _hash is not None:
self.__hash = _hash
else:
self.__hash = hashlib.md5(pysyncobj.pickle.dumps(nodeId)).digest()

def __eq__(self, other):
return self.__hash == hashlib.md5(pysyncobj.pickle.dumps(other)).digest()

def __ne__(self, other): # Py2 compatibility
return not (self == other)

def __repr__(self):
return '{}({!r})'.format(type(self).__name__, self.__hash)


class ResizableFile(object):

def __init__(self, fileName, initialSize = 1024, resizeFactor = 2.0, defaultContent = None):
Expand All @@ -78,7 +150,7 @@ def __init__(self, fileName, initialSize = 1024, resizeFactor = 2.0, defaultCont
def write(self, offset, values):
size = len(values)
currSize = self.__mm.size()
if offset + size > self.__mm.size():
while offset + size > self.__mm.size():
try:
self.__mm.resize(int(self.__mm.size() * self.__resizeFactor))
except SystemError:
Expand Down Expand Up @@ -106,28 +178,76 @@ def flush(self):



JOURNAL_FORMAT_VERSION = 1
JOURNAL_FORMAT_VERSION = 2
APP_NAME = b'PYSYNCOBJ'
APP_VERSION = str.encode(VERSION)

NAME_SIZE = 24
VERSION_SIZE = 8
assert len(APP_NAME) < NAME_SIZE
assert len(APP_VERSION) < VERSION_SIZE
FIRST_RECORD_OFFSET = NAME_SIZE + VERSION_SIZE + 4 + 4
LAST_RECORD_OFFSET_OFFSET = NAME_SIZE + VERSION_SIZE + 4
CURRENT_TERM_SIZE = 8
VOTED_FOR_SIZE = 16
FIRST_RECORD_OFFSET = NAME_SIZE + VERSION_SIZE + 4 + CURRENT_TERM_SIZE + VOTED_FOR_SIZE + 4
LAST_RECORD_OFFSET_OFFSET = NAME_SIZE + VERSION_SIZE + 4 + CURRENT_TERM_SIZE + VOTED_FOR_SIZE

#
# APP_NAME (24b) + APP_VERSION (8b) + FORMAT_VERSION (4b) + LAST_RECORD_OFFSET (4b) +
# record1size + record1 + record1size + record2size + record2 + record2size + ...
# (record1) | (record2) | ...
#
# Journal version 2:
# APP_NAME (24b) + APP_VERSION (8b) + FORMAT_VERSION (4b) + CURRENT_TERM (8b) + VOTED_FOR (16b) + LAST_RECORD_OFFSET (4b) +
# record1size + record1 + record1size + record2size + record2 + record2size + ...
# (record1) | (record2) | ...

# VOTED_FOR is an MD5 hash of the pickled node ID.
# LAST_RECORD_OFFSET is the offset from the beginning of the journal file at which the last record ends.

VOTED_FOR_NONE_HASH = hashlib.md5(pysyncobj.pickle.dumps(None)).digest()

# Version 1 is identical except it has neither CURRENT_TERM nor VOTED_FOR.

class FileJournal(Journal):

def __init__(self, journalFile):
def __init__(self, journalFile, flushJournal):
self.__journalFile = ResizableFile(journalFile, defaultContent=self.__getDefaultHeader())
self.__journal = []

# Handle journal format version upgrades
version = struct.unpack('<I', self.__journalFile.read(32, 4))[0]
if version == 1:
# Header size increased by 24 bytes, so everything needs to be moved...
tmpFile = journalFile + '.tmp'
if os.path.exists(tmpFile):
raise RuntimeError('Migration of journal file failed: {} already exists'.format(tmpFile))
oldJournalFile = self.__journalFile # Just for readability
newJournalFile = ResizableFile(tmpFile, defaultContent=self.__getDefaultHeader())
oldFirstRecordOffset = NAME_SIZE + VERSION_SIZE + 4 + 4
oldLastRecordOffset = struct.unpack('<I', oldJournalFile.read(NAME_SIZE + VERSION_SIZE + 4, 4))[0]
oldCurrentOffset = oldFirstRecordOffset
deltaOffset = 24 # delta in record offsets between old and new format
while oldCurrentOffset < oldLastRecordOffset:
# Copy data in chunks of 4 MB (plus possibly a smaller chunk at the end)
d = oldJournalFile.read(oldCurrentOffset, min(4000000, oldLastRecordOffset - oldCurrentOffset))
if not d:
# Reached EOF
break
newJournalFile.write(oldCurrentOffset + deltaOffset, d)
oldCurrentOffset += len(d)
newJournalFile.write(LAST_RECORD_OFFSET_OFFSET, struct.pack('<I', oldLastRecordOffset + deltaOffset))
newJournalFile.flush()

del oldJournalFile # Delete reference
self.__journalFile._destroy()
newJournalFile._destroy()
atomicReplace(tmpFile, journalFile)
self.__journalFile = ResizableFile(journalFile, defaultContent=self.__getDefaultHeader())
elif version == JOURNAL_FORMAT_VERSION:
# Nothing to do
pass
else:
raise RuntimeError('Unknown journal file version encountered: {} (expected <= {})'.format(version, JOURNAL_FORMAT_VERSION))

self.__currentTerm = struct.unpack('<Q', self.__journalFile.read(NAME_SIZE + VERSION_SIZE + 4, CURRENT_TERM_SIZE))[0]
self.__votedForNodeIdHash = self.__journalFile.read(NAME_SIZE + VERSION_SIZE + 4 + CURRENT_TERM_SIZE, VOTED_FOR_SIZE)
self.__votedForNodeIdProxy = VotedForNodeIdHashProxy(_hash = self.__votedForNodeIdHash) if self.__votedForNodeIdHash != VOTED_FOR_NONE_HASH else None

currentOffset = FIRST_RECORD_OFFSET
lastRecordOffset = self.__getLastRecordOffset()
while currentOffset < lastRecordOffset:
Expand All @@ -138,32 +258,78 @@ def __init__(self, journalFile):
self.__journal.append((command, idx, term))
currentOffset += nextRecordSize + 8
self.__currentOffset = currentOffset
self.__flushJournal = flushJournal

def __getDefaultHeader(self):
appName = APP_NAME + b'\0' * (NAME_SIZE - len(APP_NAME))
appVersion = APP_VERSION + b'\0' * (VERSION_SIZE - len(APP_VERSION))
header = appName + appVersion + struct.pack('<II', JOURNAL_FORMAT_VERSION, FIRST_RECORD_OFFSET)
header = (appName + appVersion + struct.pack('<I', JOURNAL_FORMAT_VERSION) +
struct.pack('<Q', 0) + # default term = 0
VOTED_FOR_NONE_HASH + # default voted for = empty
struct.pack('<I', FIRST_RECORD_OFFSET))
return header

def __getLastRecordOffset(self):
return struct.unpack('<I', self.__journalFile.read(LAST_RECORD_OFFSET_OFFSET, 4))[0]

def __setLastRecordOffset(self, offset):
self.__journalFile.write(LAST_RECORD_OFFSET_OFFSET, struct.pack('<I', offset))

def add(self, command, idx, term):
# No auto-flushing needed here because it's called in the methods below.

@property
def currentTerm(self):
return self.__currentTerm

@currentTerm.setter
def currentTerm(self, term):
self.__set_currentTerm(term)

@property
def votedForNodeId(self):
return self.__votedForNodeIdProxy

@votedForNodeId.setter
def votedForNodeId(self, nodeId):
self.__set_votedForNodeId(nodeId)

def __set_currentTerm(self, term, flush = True):
self.__journalFile.write(NAME_SIZE + VERSION_SIZE + 4, struct.pack('<Q', term))
if flush and self.__flushJournal:
self.flush()
self.__currentTerm = term

def __set_votedForNodeId(self, nodeId, flush = True):
self.__votedForNodeIdHash = hashlib.md5(pysyncobj.pickle.dumps(nodeId)).digest()
self.__journalFile.write(NAME_SIZE + VERSION_SIZE + 4 + CURRENT_TERM_SIZE, self.__votedForNodeIdHash)
if flush and self.__flushJournal:
self.flush()
if self.__votedForNodeIdHash != VOTED_FOR_NONE_HASH:
self.__votedForNodeIdProxy = VotedForNodeIdHashProxy(_hash = self.__votedForNodeIdHash)
else:
self.__votedForNodeIdProxy = None

def set_currentTerm_and_votedForNodeId(self, term, nodeId):
# Only flush once
self.__set_currentTerm(term, flush = False)
self.__set_votedForNodeId(nodeId, flush = True)

def add(self, command, idx, term, _doFlush = True):
self.__journal.append((command, idx, term))
cmdData = struct.pack('<QQ', idx, term) + to_bytes(command)
cmdData = struct.pack('<QQ', idx, term) + pysyncobj.pickle.to_bytes(command)
cmdLenData = struct.pack('<I', len(cmdData))
cmdData = cmdLenData + cmdData + cmdLenData
self.__journalFile.write(self.__currentOffset, cmdData)
self.__currentOffset += len(cmdData)
self.__setLastRecordOffset(self.__currentOffset)
if _doFlush and self.__flushJournal:
JustAnotherArchivist marked this conversation as resolved.
Show resolved Hide resolved
self.flush()

def clear(self):
self.__journal = []
self.__setLastRecordOffset(FIRST_RECORD_OFFSET)
self.__currentOffset = FIRST_RECORD_OFFSET
if self.__flushJournal:
self.flush()

def __getitem__(self, idx):
return self.__journal[idx]
Expand All @@ -184,20 +350,27 @@ def deleteEntriesFrom(self, entryFrom):
self.__setLastRecordOffset(currentOffset)
self.__currentOffset = currentOffset
self.__setLastRecordOffset(currentOffset)
if self.__flushJournal:
self.flush()

def deleteEntriesTo(self, entryTo):
journal = self.__journal[entryTo:]
self.clear()
for entry in journal:
self.add(*entry)
self.add(*entry, _doFlush = False)
if self.__flushJournal:
self.flush()

def _destroy(self):
self.__journalFile._destroy()

def flush(self):
self.__journalFile.flush()

def createJournal(journalFile = None):
def createJournal(journalFile, flushJournal):
if flushJournal is None:
flushJournal = journalFile is not None
if journalFile is None:
assert not flushJournal
return MemoryJournal()
return FileJournal(journalFile)
return FileJournal(journalFile, flushJournal)
Loading