diff --git a/python/whylogs/api/logger/experimental/logger/actor/process_rolling_logger.py b/python/whylogs/api/logger/experimental/logger/actor/process_rolling_logger.py index 514a54963f..b17d84f3d0 100644 --- a/python/whylogs/api/logger/experimental/logger/actor/process_rolling_logger.py +++ b/python/whylogs/api/logger/experimental/logger/actor/process_rolling_logger.py @@ -111,6 +111,45 @@ def create_writers(self, dataset_id: str) -> List[Writer]: class ProcessRollingLogger(ProcessActor[MessageType], DataLogger[Dict[str, ProcessLoggerStatus]]): + """ + Log data asynchronously using a separate process. + + The ProcessRollingLogger is a rolling logger that manages a separate process to do the actual logging. This means + it logs data over time and periodically uploads it in the background, using a separate process so that it doesn't + block the main one. + + ```python + logger = ProcessRollingLogger( + aggregate_by=TimeGranularity.Day, + write_schedule=Schedule(cadence=TimeGranularity.Minute, interval=5), + ) + + logger.start() + + logger.log(data_frame) + ``` + + This class mostly wraps and manages several ThreadRollingLoggers that do the real logging with whylogs. + + MAC USERS: You'll run into issues running this on Python>=3.8 because Python will use spawn instead of fork. + You should be able to get around it by setting the environment variable OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES + in the environment that the process logger runs in, but you can't set it in Python (no using os.environ). + + Args: + aggregate_by: The time granularity to aggregate data by. This determines how the time bucketing is done. For + the Hour type, the logger will end up pooling data into profiles by the hour. + write_schedule: The schedule to use for writing data. This is used to determine when to upload data. + schema: The DatasetSchema to use for whylogs under the hood. + sync_enabled: Whether to enable synchronous logging. If this is enabled then you can pass log(sync=True) to the + log call. Without this you can't use the sync flag. + queue_config: Let's you change various polling and timeout parameters. + thread_queue_config: Same as queue_config, but for the wrapped ThreadRollingLoggers. + writer_factory: The writer factory to use for creating writers. + queue_type: The type of queue to to manage multiprocessing. By default, faster_fifo is used because it's + a lot faster than the default multiprocessing queue, but you can use the built in mp.Queue by setting + this to QueueType.MP. + """ + def __init__( self, aggregate_by: TimeGranularity = TimeGranularity.Hour, @@ -406,7 +445,7 @@ class PipeSignaler(th.Thread): - The parent and child process each have references to the pipes and they each need to close their references, which means close_child has to be called from the child process and close has to be called from the parent. Calling close_child in the main processing code will have right effect. - - The process actor does message batching so multiple ids mmay be signaled even though a single batch was processed + - The process actor does message batching so multiple ids may be signaled even though a single batch was processed because that batch could have contained multiple messages. - The signaler uses Events under the hood to know when to stop working. They can be th.Events even though this is being used in a multiprocessing environment because nothing the child does can affect them. Keep in mind diff --git a/python/whylogs/api/logger/experimental/logger/actor/string_util.py b/python/whylogs/api/logger/experimental/logger/actor/string_util.py index 0eb5544be4..4d73639fcb 100644 --- a/python/whylogs/api/logger/experimental/logger/actor/string_util.py +++ b/python/whylogs/api/logger/experimental/logger/actor/string_util.py @@ -7,6 +7,9 @@ def encode_strings(col_names: List[str]) -> int: Encode a list of strings as a number by hashing each one and then adding them together. This is useful for generating group keys based on string lists that don't care about order and have reasonably low collision rate without having to sort everything. + + Args: + col_names: list of column names to encode. """ # Use sha1 because its fast. This isn't used for anything related to security. hashes = [int.from_bytes(sha1(it.encode("utf-8")).digest(), "big") for it in col_names]