Program for creating data pipelines triggered by file creation events.
0.1.0-beta.4
To utilise this package, it should be installed within a dedicated Conda environment. You can create this environment using the following command:
conda create --name <environment_name> python=3.10
To activate the conda environment use:
conda activate <environment_name>
Alternatively, use virtualenv
to setup and activate the environment:
python -m venv <environment_name>
source <envionment_name>/bin/activate
- Clone the repository:
git clone git@github.com:NOC-OI/dpyepline.git
- Navigate to the package directory:
After cloning the repository, navigate to the root directory of the package.
- Install in editable mode:
To install dpypeline
in editable mode, execute the following comman from the root directory:
pip install -e .
This command will install the library in editable mode, allowing you to make changes to the code if needed.
- Alternative installation methods:
- Install from the GitHub repository directly:
pip install git+https://github.com/NOC-OI/dpypeline.git@main#egg=dpypeline
- Install from the PyPI repository:
pip install dpypeline
Run tests using pytest
in the main directory:
pip install pytest
pytest
Examples of Python scripts explaining how to use this package can be found in the examples directory.
The CLI provided by this package allows you to execute data pipelines defined in YAML files; however, it offers less flexibility compared to using the Python scripts. To run the dpypeline CLI, type, e.g., the following command:
dpypeline -i <input_file> > output 2> errors
-h
or--help
: show an help message-i INPUT_FILE
or--input INPUT_FILE
: Filepath to the pipeline YAML file (by defaultpipelien.yaml
)-v
or--version
: show dpypeline's version umber
There are a few environment variables that need to be set so that the application can run correctly:
CACHE_DIR
: Path to the cache directory.
In the thread-based pipeline, Akita
enqueues events into an in-memory queue. These events are subsequently consumed by ConsumerSerial
, which generates jobs for sequential execution within the ThreadPipeline
(an alias for BasicPipeline
).
In the parallel pipeline, Akita
enqueues events into an in-memory queue. These events are then consumed by ConsumerParallel
, which generates futures that are executed concurrently by multiple Dask workers.