Skip to content

Commit 16e418a

Browse files
authored
[CIVIS-10251] Add civis.utils.job_logs() function (#509)
1 parent d515389 commit 16e418a

File tree

7 files changed

+396
-51
lines changed

7 files changed

+396
-51
lines changed

CHANGELOG.md

+14-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1818

1919
### Security
2020

21+
## 2.5.0 - 2025-02-24
22+
23+
### Added
24+
25+
- Added `civis.utils.job_logs()` function to return a generator of log messages for a job run (#509)
26+
27+
### Changed
28+
29+
- Revised the CLI commands `civis jobs follow-log` and `civis jobs follow-run-log` to not skip log messages for running jobs (#509)
30+
2131
## 2.4.3 - 2025-01-13
2232

2333
### Fixed
@@ -114,7 +124,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
114124
top-level and nested response objects. (#493)
115125

116126
### Security
117-
- Bumped the minimum required version of `requests` to the latest v2.32.3,
127+
- Bumped the minimum required version of `requests` to the latest v2.32.3,
118128
due to a security vulnerability for < v2.32.0
119129
([CVE-2024-35195](https://nvd.nist.gov/vuln/detail/CVE-2024-35195)). (#488)
120130

@@ -181,7 +191,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
181191

182192
### Changed
183193
- Updated references from 'master' to 'main' (#460)
184-
- Clarified the usage example for `civis.io.civis_to_multifile_csv`. Updated
194+
- Clarified the usage example for `civis.io.civis_to_multifile_csv`. Updated
185195
CircleCI config so dev-requirements is only used when needed. (#452)
186196
- Removed unneeded `time.sleep` calls and `pytest.mark` calls and mocked `time.sleep` calls to optimize tests. (#453)
187197
- Refactored tests to remove dependency on the vcr library. (#456)
@@ -227,7 +237,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
227237
### Changed
228238
- Added a warning message when using `civis.io.file_to_civis` with file size of 0 bytes (#451)
229239
- Specified that `civis.io.civis_file_to_table` can handle compressed files (#450)
230-
- Explicitly stated CSV-like civis file format requirement in
240+
- Explicitly stated CSV-like civis file format requirement in
231241
`civis.io.civis_file_to_table`'s docstring (#445)
232242
- Called out the fact that `joblib.Parallel`'s `pre_dispatch` defaults to `"2*n_jobs"`
233243
in the Sphinx docs (#443)
@@ -247,7 +257,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
247257
has `VARCHAR` (#439)
248258
- Updated info about MacOS shell configuration file to be `~/.zshrc` (#444)
249259
- Fixed the Sphinx docs to show details of multi-word API endpoints (#442)
250-
- Dropped the buggy/unnecessary `_get_headers` in `civis.io.read_civis_sql` (#415)
260+
- Dropped the buggy/unnecessary `_get_headers` in `civis.io.read_civis_sql` (#415)
251261
- Clarified the `table_columns` parameter in `civis.io.*` functions (#434)
252262
- Warned about the `retry_total` parameter of `civis.APIClient` being inactive and deprecated (#431)
253263
- Converted `assert` statements in non-test code into proper error handling (#430, #435)

docs/utils.rst

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ and templates on the Civis Platform.
1010
:toctree: generated
1111

1212
run_job
13-
run_template
13+
run_template
14+
job_logs

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "civis"
7-
version = "2.4.3"
7+
version = "2.5.0"
88
description = "Civis API Python Client"
99
readme = "README.rst"
1010
requires-python = ">= 3.10"

src/civis/cli/_cli_commands.py

+4-41
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,16 @@
44
Additional commands to add to the CLI beyond the OpenAPI spec.
55
"""
66
import functools
7-
import operator
87
import os
98
import sys
10-
import time
119

1210
import click
1311
import requests
1412
import webbrowser
1513

1614
import civis
1715
from civis.io import file_to_civis, civis_to_file
16+
from civis.utils import job_logs
1817

1918

2019
# From http://patorjk.com/software/taag/#p=display&f=3D%20Diagonal&t=CIVIS
@@ -41,11 +40,8 @@
4140
until the run is done and then exit. If the run is already finished, it
4241
will output all the logs from that run and then exit.
4342
44-
NOTE: This command could miss some log entries from a currently-running
45-
job. It does not re-fetch logs that might have been saved out of order, to
46-
preserve the chronological order of the logs and without duplication.
43+
NOTE: Log entries may appear our of order, particularly at the end of a run.
4744
"""
48-
_FOLLOW_POLL_INTERVAL_SEC = 3
4945

5046

5147
@click.command("upload")
@@ -236,41 +232,8 @@ def jobs_follow_run_log(id, run_id):
236232

237233

238234
def _jobs_follow_run_log(id, run_id):
239-
client = civis.APIClient(return_type="raw")
240-
local_max_log_id = 0
241-
continue_polling = True
242-
243-
while continue_polling:
244-
# This call gets all available log messages since last_id up to
245-
# the page size, ordered by log ID. We leave it to Platform to decide
246-
# the best page size.
247-
response = client.jobs.list_runs_logs(id, run_id, last_id=local_max_log_id)
248-
if "civis-max-id" in response.headers:
249-
remote_max_log_id = int(response.headers["civis-max-id"])
250-
else:
251-
# Platform hasn't seen any logs at all yet
252-
remote_max_log_id = None
253-
logs = response.json()
254-
if logs:
255-
local_max_log_id = max(log["id"] for log in logs)
256-
logs.sort(key=operator.itemgetter("createdAt", "id"))
257-
for log in logs:
258-
print(" ".join((log["createdAt"], log["message"].rstrip())))
259-
# if output is a pipe, write the buffered output immediately:
260-
sys.stdout.flush()
261-
262-
log_finished = response.headers["civis-cache-control"] != "no-store"
263-
if remote_max_log_id is None:
264-
remote_has_more_logs_to_get_now = False
265-
elif local_max_log_id == remote_max_log_id:
266-
remote_has_more_logs_to_get_now = False
267-
if log_finished:
268-
continue_polling = False
269-
else:
270-
remote_has_more_logs_to_get_now = True
271-
272-
if continue_polling and not remote_has_more_logs_to_get_now:
273-
time.sleep(_FOLLOW_POLL_INTERVAL_SEC)
235+
for log in job_logs(id, run_id):
236+
print(" ".join((log["createdAt"], log["message"].rstrip())), flush=True)
274237

275238

276239
@click.command("download")

src/civis/utils/__init__.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from civis.utils._jobs import run_job, run_template
1+
from civis.utils._jobs import run_job, run_template, job_logs
22

3-
__all__ = ["run_job", "run_template"]
3+
__all__ = ["run_job", "run_template", "job_logs"]

src/civis/utils/_jobs.py

+149
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
import logging
2+
import operator
3+
import time
4+
from datetime import datetime
25

36
from civis import APIClient
47
from civis.futures import CivisFuture
58

69
log = logging.getLogger(__name__)
710

11+
_FOLLOW_POLL_INTERVAL_SEC = 5
12+
_LOG_REFETCH_CUTOFF_SECONDS = 300
13+
_LOG_REFETCH_COUNT = 100
14+
_LOGS_PER_QUERY = 250
15+
816

917
def run_job(job_id, client=None, polling_interval=None):
1018
"""Run a job.
@@ -96,3 +104,144 @@ def run_template(id, arguments, JSONValue=False, client=None):
96104
else:
97105
file_ids = {o.name: o.object_id for o in outputs}
98106
return file_ids
107+
108+
109+
def _timestamp_from_iso_str(s):
110+
"""Return an integer POSIX timestamp for a given ISO date string.
111+
112+
Note: Until Python 3.11, datetime.fromisoformat doesn't work
113+
with the format returned by Civis Platform.
114+
"""
115+
try:
116+
return datetime.fromisoformat(s).timestamp()
117+
except ValueError:
118+
try:
119+
# This is the format that Civis Platform returns.
120+
return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S.%f%z").timestamp()
121+
except ValueError:
122+
# Another format, just in case.
123+
return datetime.strptime(s, "%Y-%m-%dT%H:%M:%S%z").timestamp()
124+
125+
126+
def _compute_effective_max_log_id(logs):
127+
"""Find a max log ID use in order to avoid missing late messages.
128+
129+
The order of log IDs may not be consistent with "created at" times
130+
since log entries are created by Civis Platform as well as the code
131+
for the job itself. This function looks through recent logs
132+
and finds a maximum ID that is at least as old as a set cutoff period,
133+
so that messages with lower IDs that show up a bit late won't be skipped.
134+
With this, it is still theoretically possible but extremely unlikely
135+
for some late log messages to be skipped in the job_logs function.
136+
"""
137+
if not logs:
138+
return 0
139+
140+
sorted_logs = sorted(logs, key=operator.itemgetter("id"))
141+
142+
max_created_at_timestamp = _timestamp_from_iso_str(sorted_logs[-1]["createdAt"])
143+
cutoff = time.time() - _LOG_REFETCH_CUTOFF_SECONDS
144+
if max_created_at_timestamp < cutoff:
145+
return sorted_logs[-1]["id"]
146+
elif len(sorted_logs) >= _LOG_REFETCH_COUNT:
147+
return sorted_logs[-_LOG_REFETCH_COUNT]["id"]
148+
149+
return 0
150+
151+
152+
def _job_finished_past_timeout(job_id, run_id, finished_timeout, raw_client):
153+
"""Return true if the run finished more than so many seconds ago."""
154+
if finished_timeout is None:
155+
return False
156+
157+
run = raw_client.jobs.get_runs(job_id, run_id)
158+
finished_at = run.json()["finishedAt"]
159+
if finished_at is None:
160+
return False
161+
finished_at_ts = _timestamp_from_iso_str(finished_at)
162+
result = finished_at_ts < time.time() - finished_timeout
163+
return result
164+
165+
166+
def job_logs(job_id, run_id=None, finished_timeout=None):
167+
"""Return a generator of log message dictionaries for a given run.
168+
169+
Parameters
170+
----------
171+
job_id : int
172+
The ID of the job to retrieve log message for.
173+
run_id : int or None
174+
The ID of the run to retrieve log messages for.
175+
If None, the ID for the most recent run will be used.
176+
finished_timeout: int or None
177+
If not None, then this function will return once the run has
178+
been finished for the specified number of seconds.
179+
If None, then this function will wait until the API says there
180+
will be no more new log messages, which may take a few minutes.
181+
A timeout of 30-60 seconds is usually enough to retrieve all
182+
log messages.
183+
184+
Yields
185+
------
186+
dict
187+
A log message dictionary with "message", "createdAt" and other attributes
188+
provided by the job logs endpoint. Note that this will block execution
189+
until the job has stopped and all log messages are retrieved.
190+
"""
191+
# The return_type for the client is "raw" in order to check
192+
# the "civis-cache-control" and "civis-max-id" headers when
193+
# list_runs_logs returns an empty list of new messages.
194+
# Caching of the endpoint information in
195+
# civis.resources.generate_classes_maybe_cached avoids extra API calls.
196+
raw_client = APIClient(return_type="raw")
197+
198+
if run_id is None:
199+
run_id = raw_client.jobs.list_runs(
200+
job_id, limit=1, order="id", order_dir="desc"
201+
).json()[0]["id"]
202+
203+
local_max_log_id = 0
204+
continue_polling = True
205+
206+
known_log_ids = set()
207+
208+
while continue_polling:
209+
# This call gets a limited number of log messages since last_id,
210+
# ordered by log ID.
211+
response = raw_client.jobs.list_runs_logs(
212+
job_id,
213+
run_id,
214+
last_id=local_max_log_id,
215+
limit=_LOGS_PER_QUERY,
216+
)
217+
if "civis-max-id" in response.headers:
218+
remote_max_log_id = int(response.headers["civis-max-id"])
219+
else:
220+
# Platform hasn't seen any logs at all yet
221+
remote_max_log_id = None
222+
logs = response.json()
223+
if logs:
224+
local_max_log_id = max(log["id"] for log in logs)
225+
logs.sort(key=operator.itemgetter("createdAt", "id"))
226+
for log in logs:
227+
if log["id"] in known_log_ids:
228+
continue
229+
known_log_ids.add(log["id"])
230+
yield log
231+
232+
log_finished = response.headers["civis-cache-control"] != "no-store"
233+
234+
if remote_max_log_id is None:
235+
remote_has_more_logs_to_get_now = False
236+
elif local_max_log_id == remote_max_log_id:
237+
remote_has_more_logs_to_get_now = False
238+
local_max_log_id = _compute_effective_max_log_id(logs)
239+
if log_finished or _job_finished_past_timeout(
240+
job_id, run_id, finished_timeout, raw_client
241+
):
242+
continue_polling = False
243+
else:
244+
remote_has_more_logs_to_get_now = True
245+
246+
if continue_polling and not remote_has_more_logs_to_get_now:
247+
time.sleep(_FOLLOW_POLL_INTERVAL_SEC)

0 commit comments

Comments
 (0)