Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for supplying kafka-reassign-partitions.sh the --throttle argument #81

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/tools/assigner/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def main():

for i, batch in enumerate(batches):
log.info("Executing partition reassignment {0}/{1}: {2}".format(i + 1, len(batches), repr(batch)))
batch.execute(i + 1, len(batches), args.zookeeper, tools_path, plugins, dry_run)
batch.execute(i + 1, len(batches), args.zookeeper, tools_path, plugins, dry_run, args.throttle)

run_plugins_at_step(plugins, 'before_ple')

Expand Down
1 change: 1 addition & 0 deletions kafka/tools/assigner/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def set_up_arguments(action_map, sizer_map, plugins):
aparser.add_argument('-g', '--generate', help="Generate partition reassignment file", action='store_true')
aparser.add_argument('-e', '--execute', help="Execute partition reassignment", action='store_true')
aparser.add_argument('-m', '--moves', help="Max number of moves per step", required=False, default=10, type=int)
aparser.add_argument('-r', '--throttle', help="Max B/s per broker to use for reassignment", required=False, default=None)
aparser.add_argument('-x', '--exclude-topics', help="Comma-separated list of topics to skip when performing actions", action=CSVAction, default=[])
aparser.add_argument('--sizer', help="Select module to use to get partition sizes", required=False, default='ssh', choices=sizer_map.keys())
aparser.add_argument('-p', '--property', help="Property of the form 'key=value' to be passed to modules (i.e. sizer)", required=False, default=[],
Expand Down
21 changes: 14 additions & 7 deletions kafka/tools/assigner/models/reassignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,30 @@ def dict_for_reassignment(self):
reassignment['partitions'].append(partition.dict_for_reassignment())
return reassignment

def execute(self, num, total, zookeeper, tools_path, plugins=[], dry_run=True):
def execute(self, num, total, zookeeper, tools_path, plugins=[], dry_run=True, throttle=None):
for plugin in plugins:
plugin.before_execute_batch(num)
if not dry_run:
self._execute(num, total, zookeeper, tools_path)
self._execute(num, total, zookeeper, tools_path, throttle)
for plugin in plugins:
plugin.after_execute_batch(num)

def _execute(self, num, total, zookeeper, tools_path):
def _execute(self, num, total, zookeeper, tools_path, throttle):
with NamedTemporaryFile(mode='w') as assignfile:
json.dump(self.dict_for_reassignment(), assignfile)
assignfile.flush()
FNULL = open(os.devnull, 'w')
proc = subprocess.Popen(['{0}/kafka-reassign-partitions.sh'.format(tools_path), '--execute',
'--zookeeper', zookeeper,
'--reassignment-json-file', assignfile.name],
stdout=FNULL, stderr=FNULL)
if throttle is not None:
proc = subprocess.Popen(['{0}/kafka-reassign-partitions.sh'.format(tools_path), '--execute',
'--zookeeper', zookeeper,
'--reassignment-json-file', assignfile.name,
'--throttle', throttle],
stdout=FNULL, stderr=FNULL)
else:
proc = subprocess.Popen(['{0}/kafka-reassign-partitions.sh'.format(tools_path), '--execute',
'--zookeeper', zookeeper,
'--reassignment-json-file', assignfile.name],
stdout=FNULL, stderr=FNULL)
proc.wait()

# Wait until finished
Expand Down
9 changes: 7 additions & 2 deletions tests/tools/assigner/models/test_reassignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ def test_reassignment_repr(self):
@patch.object(Reassignment, '_execute')
def test_reassignment_execute_real(self, mock_exec):
self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', plugins=[self.null_plugin], dry_run=False)
mock_exec.assert_called_once_with(1, 1, 'zkconnect', '/path/to/tools')
mock_exec.assert_called_once_with(1, 1, 'zkconnect', '/path/to/tools', None)

@patch.object(Reassignment, '_execute')
def test_reassignment_execute_throttle(self, mock_exec):
self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', plugins=[self.null_plugin], dry_run=False, throttle='1000')
mock_exec.assert_called_once_with(1, 1, 'zkconnect', '/path/to/tools', '1000')

@patch.object(Reassignment, '_execute')
def test_reassignment_execute_dryrun(self, mock_exec):
Expand All @@ -55,7 +60,7 @@ def test_reassignment_internal_execute(self, mock_check, mock_popen):
mock_popen.set_default()
mock_check.side_effect = [10, 5, 0]

self.reassignment._execute(1, 1, 'zkconnect', '/path/to/tools')
self.reassignment._execute(1, 1, 'zkconnect', '/path/to/tools', None)

compare([call.Popen(['/path/to/tools/kafka-reassign-partitions.sh', '--execute', '--zookeeper', 'zkconnect', '--reassignment-json-file', ANY],
stderr=ANY, stdout=ANY),
Expand Down
1 change: 1 addition & 0 deletions tests/tools/assigner/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def test_main(self, mock_plugins, mock_sizes):
tools_path='/path/to/tools',
property=['datadir=/path/to/data'],
moves=10,
throttle=1000,
execute=False,
exclude_topics=[],
generate=False,
Expand Down