Skip to content

Commit

Permalink
Fix process logger bug for empty column names.
Browse files Browse the repository at this point in the history
Also added some docs about a processing issue on mac that requires a
special env var to be set in order to make the process forking work.
  • Loading branch information
naddeoa committed Oct 19, 2023
1 parent 1d1adf0 commit 3aff412
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,45 @@ def create_writers(self, dataset_id: str) -> List[Writer]:


class ProcessRollingLogger(ProcessActor[MessageType], DataLogger[Dict[str, ProcessLoggerStatus]]):
"""
Log data asynchronously using a separate process.
The ProcessRollingLogger is a rolling logger that manages a separate process to do the actual logging. This means
it logs data over time and periodically uploads it in the background, using a separate process so that it doesn't
block the main one.
```python
logger = ProcessRollingLogger(
aggregate_by=TimeGranularity.Day,
write_schedule=Schedule(cadence=TimeGranularity.Minute, interval=5),
)
logger.start()
logger.log(data_frame)
```
This class mostly wraps and manages several ThreadRollingLoggers that do the real logging with whylogs.
MAC USERS: You'll run into issues running this on Python>=3.8 because Python will use spawn instead of fork.
You should be able to get around it by setting the environment variable OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
in the environment that the process logger runs in, but you can't set it in Python (no using os.environ).
Args:
aggregate_by: The time granularity to aggregate data by. This determines how the time bucketing is done. For
the Hour type, the logger will end up pooling data into profiles by the hour.
write_schedule: The schedule to use for writing data. This is used to determine when to upload data.
schema: The DatasetSchema to use for whylogs under the hood.
sync_enabled: Whether to enable synchronous logging. If this is enabled then you can pass log(sync=True) to the
log call. Without this you can't use the sync flag.
queue_config: Let's you change various polling and timeout parameters.
thread_queue_config: Same as queue_config, but for the wrapped ThreadRollingLoggers.
writer_factory: The writer factory to use for creating writers.
queue_type: The type of queue to to manage multiprocessing. By default, faster_fifo is used because it's
a lot faster than the default multiprocessing queue, but you can use the built in mp.Queue by setting
this to QueueType.MP.
"""

def __init__(
self,
aggregate_by: TimeGranularity = TimeGranularity.Hour,
Expand Down Expand Up @@ -406,7 +445,7 @@ class PipeSignaler(th.Thread):
- The parent and child process each have references to the pipes and they each need to close their references,
which means close_child has to be called from the child process and close has to be called from the parent.
Calling close_child in the main processing code will have right effect.
- The process actor does message batching so multiple ids mmay be signaled even though a single batch was processed
- The process actor does message batching so multiple ids may be signaled even though a single batch was processed
because that batch could have contained multiple messages.
- The signaler uses Events under the hood to know when to stop working. They can be th.Events even though this
is being used in a multiprocessing environment because nothing the child does can affect them. Keep in mind
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ def encode_strings(col_names: List[str]) -> int:
Encode a list of strings as a number by hashing each one and then adding them together.
This is useful for generating group keys based on string lists that don't care about order
and have reasonably low collision rate without having to sort everything.
Args:
col_names: list of column names to encode.
"""
# Use sha1 because its fast. This isn't used for anything related to security.
hashes = [int.from_bytes(sha1(it.encode("utf-8")).digest(), "big") for it in col_names]
Expand Down

0 comments on commit 3aff412

Please sign in to comment.