Skip to content

Commit

Permalink
Give more detail when we cannot process a non-JSON streamed line (#1186
Browse files Browse the repository at this point in the history
…) (#1258)

Pack the line information into job_explanation for technical reasons

Limit line length in these error messages to print to 1000 characters

Update tests to check for more error reporting
  • Loading branch information
AlanCoding authored Jun 13, 2023
1 parent 0393c39 commit c311e81
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 11 deletions.
9 changes: 7 additions & 2 deletions ansible_runner/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,13 @@ def run(self):
try:
line = self._input.readline()
data = json.loads(line)
except (json.decoder.JSONDecodeError, IOError):
self.status_callback({'status': 'error', 'job_explanation': 'Failed to JSON parse a line from worker stream.'})
except (json.decoder.JSONDecodeError, IOError) as exc:
self.status_callback({
'status': 'error',
'job_explanation': (
f'Failed to JSON parse a line from worker stream. Error: {exc} Line with invalid JSON data: {line[:1000]}'
)
})
break

if 'status' in data:
Expand Down
31 changes: 22 additions & 9 deletions test/integration/test_transmit_worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,11 +459,15 @@ def test_garbage_private_dir_worker(tmp_path):
_output=outgoing_buffer,
private_data_dir=worker_dir,
)
sent = outgoing_buffer.getvalue()
assert b'"status": "error"' in sent
outgoing_buffer.seek(0)
sent = outgoing_buffer.readline()
data = json.loads(sent)
assert data['status'] == 'error'
assert data['job_explanation'] == 'Failed to extract private data directory on worker.'
assert data['result_traceback']


def test_unparsable_private_dir_worker(tmp_path):
def test_unparsable_line_worker(tmp_path):
worker_dir = tmp_path / 'for_worker'
worker_dir.mkdir()
incoming_buffer = io.BytesIO(b'')
Expand All @@ -476,18 +480,27 @@ def test_unparsable_private_dir_worker(tmp_path):
_output=outgoing_buffer,
private_data_dir=worker_dir,
)
sent = outgoing_buffer.getvalue()
assert b'"status": "error"' in sent
outgoing_buffer.seek(0)
sent = outgoing_buffer.readline()
data = json.loads(sent)
assert data['status'] == 'error'
assert data['job_explanation'] == 'Failed to JSON parse a line from transmit stream.'


def test_unparsable_private_dir_processor(tmp_path):
def test_unparsable_really_big_line_processor(tmp_path):
process_dir = tmp_path / 'for_process'
process_dir.mkdir()
incoming_buffer = io.BytesIO(b'')
incoming_buffer = io.BytesIO(bytes(f'not-json-data with extra garbage:{"f"*10000}', encoding='utf-8'))

def status_receiver(status_data, runner_config):
assert status_data['status'] == 'error'
assert 'Failed to JSON parse a line from worker stream.' in status_data['job_explanation']
assert 'not-json-data with extra garbage:ffffffffff' in status_data['job_explanation']
assert len(status_data['job_explanation']) < 2000

processor = run(
run(
streamer='process',
_input=incoming_buffer,
private_data_dir=process_dir,
status_handler=status_receiver
)
assert processor.status == 'error'

0 comments on commit c311e81

Please sign in to comment.