Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #23 - Memory Dump for Large RDB Files #24

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rdbtools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from rdbtools.callbacks import JSONCallback, DiffCallback
from rdbtools.memprofiler import MemoryCallback, PrintAllKeys, StatsAggregator

__version__ = '0.1.5'
__version__ = '0.1.3'
VERSION = tuple(map(int, __version__.split('.')))

__all__ = [
Expand Down
66 changes: 49 additions & 17 deletions rdbtools/cli/rdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os
import sys
from optparse import OptionParser
import timeit
import time
from rdbtools import RdbParser, JSONCallback, DiffCallback, MemoryCallback, PrintAllKeys

VALID_TYPES = ("hash", "set", "string", "list", "sortedset")
Expand All @@ -19,10 +21,31 @@ def main():
parser.add_option("-n", "--db", dest="dbs", action="append",
help="Database Number. Multiple databases can be provided. If not specified, all databases will be included.")
parser.add_option("-k", "--key", dest="keys", default=None,
help="Keys to export. This can be a regular expression")
help="Keys to export. This can be a regular expression,"
"When picking this mode, we will dump a deep copy of the key")
parser.add_option("-t", "--type", dest="types", action="append",
help="""Data types to include. Possible values are string, hash, set, sortedset, list. Multiple typees can be provided.
If not specified, all data types will be returned""")

parser.add_option("-p", "--pos", dest="pos",
help="""Position in RDB file to skip to, after generated an memory index file this can be used to speed up.
The data starts reading at 9 bytes""",
default = 9)

parser.add_option("-m", "--max", dest="max",
help=""" Read maximum number of keys, to limit search.""",
default = 2e31)


parser.add_option("-v", "--verbose", dest="verbose",
help="""If true dump a deep copy of the data structure""",
action = "store_true",
default = False)

parser.add_option("-q", "--quick", dest="quick",
help="""If true dump a deep copy of the data structure""",
action = "store_true",
default = False)

(options, args) = parser.parse_args()

Expand All @@ -49,35 +72,44 @@ def main():
raise Exception('Invalid type provided - %s. Expected one of %s' % (x, (", ".join(VALID_TYPES))))
else:
filters['types'].append(x)


filters['pos'] = options.pos
filters['max'] = options.max

# TODO : Fix this ugly if-else code
callback = None
if options.output:
with open(options.output, "wb") as f:
if 'diff' == options.command:
callback = DiffCallback(f)
elif 'json' == options.command:
callback = JSONCallback(f)
elif 'memory' == options.command:
reporter = PrintAllKeys(f)
callback = MemoryCallback(reporter, 64)
else:
raise Exception('Invalid Command %s' % options.output)
parser = RdbParser(callback)
parser.parse(dump_file)
f = open(options.output, "wb")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file doesn't look like it's being closed after use.

if 'diff' == options.command:
callback = DiffCallback(f)
elif 'json' == options.command:
callback = JSONCallback(f)
elif 'memory' == options.command:
reporter = PrintAllKeys(f)
callback = MemoryCallback(reporter, 64, options.verbose)
else:
raise Exception('Invalid Command %s' % options.output)
else:
if 'diff' == options.command:
callback = DiffCallback(sys.stdout)
elif 'json' == options.command:
callback = JSONCallback(sys.stdout)
elif 'memory' == options.command:
reporter = PrintAllKeys(sys.stdout)
callback = MemoryCallback(reporter, 64)
callback = MemoryCallback(reporter, 64, options.verbose)
else:
raise Exception('Invalid Command %s' % options.output)

parser = RdbParser(callback, filters=filters)
parser.parse(dump_file)

start = time.clock()
parser = RdbParser(callback, filters=filters, quick=options.quick)
parser.parse(dump_file)
end = time.clock()

print "time=%s seconds" % (end-start)

if __name__ == '__main__':
main()

main()

82 changes: 63 additions & 19 deletions rdbtools/memprofiler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from collections import namedtuple
from os import mkdir
import os
import random
import json
import shutil

from rdbtools.parser import RdbCallback
from rdbtools.callbacks import encode_key
Expand All @@ -9,7 +12,7 @@
ZSKIPLIST_P=0.25
REDIS_SHARED_INTEGERS = 10000

MemoryRecord = namedtuple('MemoryRecord', ['database', 'type', 'key', 'bytes', 'encoding','size', 'len_largest_element'])
MemoryRecord = namedtuple('MemoryRecord', ['database', 'type', 'key', 'file_pos', 'bytes', 'compressed_size', 'encoding','size', 'len_largest_element'])

class StatsAggregator():
def __init__(self, key_groupings = None):
Expand Down Expand Up @@ -70,24 +73,28 @@ def get_json(self):
class PrintAllKeys():
def __init__(self, out):
self._out = out
self._out.write("%s,%s,%s,%s,%s,%s,%s\n" % ("database", "type", "key",
"size_in_bytes", "encoding", "num_elements", "len_largest_element"))
self._out.write("%s,%s,%s,%s,%s,%s,%s,%s,%s\n" % ("database", "type", "key", "file_pos",
"size_in_bytes", "compressed_size", "encoding", "num_elements", "len_largest_element"))

def next_record(self, record) :
self._out.write("%d,%s,%s,%d,%s,%d,%d\n" % (record.database, record.type, encode_key(record.key),
record.bytes, record.encoding, record.size, record.len_largest_element))
self._out.write("%d,%s,%s,%d,%d,%d,%s,%d,%d\n" % (record.database, record.type, encode_key(record.key), record.file_pos,
record.bytes, record.compressed_size, record.encoding, record.size, record.len_largest_element))

class MemoryCallback(RdbCallback):
'''Calculates the memory used if this rdb file were loaded into RAM
The memory usage is approximate, and based on heuristics.
'''
def __init__(self, stream, architecture):
def __init__(self, stream, architecture, verbose_dump=False):
self._stream = stream
self._dbnum = 0
self._current_size = 0
self._compressed_size = 0
self._current_encoding = None
self._current_length = 0
self._len_largest_element = 0
self._file_pos = 0
self._verbose_dump = verbose_dump
self._current_key = None

if architecture == 64 or architecture == '64':
self._pointer_size = 8
Expand All @@ -105,19 +112,28 @@ def end_database(self, db_number):

def end_rdb(self):
pass


def start_object(self, file_pos):
"""
Called once we know we've found a new object, useful to store things
common to all other types.
"""
self._current_pos = file_pos


def set(self, key, value, expiry, info):
def set(self, key, value, expiry, info, sizeE):
self._current_encoding = info['encoding']
size = self.sizeof_string(key) + self.sizeof_string(value) + self.top_level_object_overhead()
size += 2*self.robj_overhead()
size += self.key_expiry_overhead(expiry)

length = element_length(value)
record = MemoryRecord(self._dbnum, "string", key, size, self._current_encoding, length, length)
record = MemoryRecord(self._dbnum, "string", key, self._current_pos, size, sizeE, self._current_encoding, length, length)
self._stream.next_record(record)
self.end_key()

def start_hash(self, key, length, expiry, info):
self._current_key = key
self._current_encoding = info['encoding']
self._current_length = length
size = self.sizeof_string(key)
Expand All @@ -132,8 +148,14 @@ def start_hash(self, key, length, expiry, info):
else:
raise Exception('start_hash', 'Could not find encoding or sizeof_value in info object %s' % info)
self._current_size = size

if self._verbose_dump:
keyc = self._current_key.replace(":","-")
if os.path.isdir(keyc):
shutil.rmtree(keyc)
mkdir(keyc, 0777)

def hset(self, key, field, value):
def hset(self, key, field, value, sizeE):
if(element_length(field) > self._len_largest_element) :
self._len_largest_element = element_length(field)
if(element_length(value) > self._len_largest_element) :
Expand All @@ -144,31 +166,39 @@ def hset(self, key, field, value):
self._current_size += self.sizeof_string(value)
self._current_size += self.hashtable_entry_overhead()
self._current_size += 2*self.robj_overhead()
self._compressed_size += sizeE

if self._verbose_dump:
keyc = self._current_key.replace(":","-")
with open("%s/%s" % (keyc, field), "wb") as f:
f.write(value)

def end_hash(self, key):
record = MemoryRecord(self._dbnum, "hash", key, self._current_size, self._current_encoding, self._current_length, self._len_largest_element)
record = MemoryRecord(self._dbnum, "hash", key, self._current_pos, self._current_size, self._compressed_size, self._current_encoding, self._current_length, self._len_largest_element)
self._stream.next_record(record)
self.end_key()

def start_set(self, key, cardinality, expiry, info):
# A set is exactly like a hashmap
self.start_hash(key, cardinality, expiry, info)

def sadd(self, key, member):
def sadd(self, key, member, sizeE):
if(element_length(member) > self._len_largest_element) :
self._len_largest_element = element_length(member)

if self._current_encoding == 'hashtable':
self._current_size += self.sizeof_string(member)
self._current_size += self.hashtable_entry_overhead()
self._current_size += self.robj_overhead()
self._compressed_size += sizeE

def end_set(self, key):
record = MemoryRecord(self._dbnum, "set", key, self._current_size, self._current_encoding, self._current_length, self._len_largest_element)
record = MemoryRecord(self._dbnum, "set", key, self._current_pos, self._current_size,self._compressed_size, self._current_encoding, self._current_length, self._len_largest_element)
self._stream.next_record(record)
self.end_key()

def start_list(self, key, length, expiry, info):
self._current_key = key
self._current_length = length
self._current_encoding = info['encoding']
size = self.sizeof_string(key)
Expand All @@ -183,18 +213,29 @@ def start_list(self, key, length, expiry, info):
else:
raise Exception('start_list', 'Could not find encoding or sizeof_value in info object %s' % info)
self._current_size = size

if self._verbose_dump:
if os.path.isdir(self._current_key):
shutil.rmtree(self._current_key)
mkdir(self._current_key, 0777)

def rpush(self, key, value) :
if(element_length(value) > self._len_largest_element) :
def rpush(self, key, value, index, sizeE) :
if element_length(value) > self._len_largest_element:
self._len_largest_element = element_length(value)

if self._current_encoding == 'linkedlist':
self._current_size += self.sizeof_string(value)
self._current_size += self.linkedlist_entry_overhead()
self._current_size += self.robj_overhead()
self._compressed_size += sizeE


if self._verbose_dump:
with open("%s/%s" % (self._current_key, index), "wb") as f:
f.write(value)

def end_list(self, key):
record = MemoryRecord(self._dbnum, "list", key, self._current_size, self._current_encoding, self._current_length, self._len_largest_element)
record = MemoryRecord(self._dbnum, "list", key, self._current_pos, self._current_size, self._compressed_size, self._current_encoding, self._current_length, self._len_largest_element)
self._stream.next_record(record)
self.end_key()

Expand All @@ -214,7 +255,7 @@ def start_sorted_set(self, key, length, expiry, info):
raise Exception('start_sorted_set', 'Could not find encoding or sizeof_value in info object %s' % info)
self._current_size = size

def zadd(self, key, score, member):
def zadd(self, key, score, member, sizeE):
if(element_length(member) > self._len_largest_element):
self._len_largest_element = element_length(member)

Expand All @@ -223,16 +264,19 @@ def zadd(self, key, score, member):
self._current_size += self.sizeof_string(member)
self._current_size += 2*self.robj_overhead()
self._current_size += self.skiplist_entry_overhead()
self._compressed_size += sizeE

def end_sorted_set(self, key):
record = MemoryRecord(self._dbnum, "sortedset", key, self._current_size, self._current_encoding, self._current_length, self._len_largest_element)
record = MemoryRecord(self._dbnum, "sortedset", key, self._current_pos, self._current_size, self._compressed_size , self._current_encoding, self._current_length, self._len_largest_element)
self._stream.next_record(record)
self.end_key()


def end_key(self):
self._current_encoding = None
self._current_size = 0
self._len_largest_element = 0
self._compressed_size = 0

def sizeof_string(self, string):
# See struct sdshdr over here https://github.com/antirez/redis/blob/unstable/src/sds.h
Expand Down
Loading