-
Notifications
You must be signed in to change notification settings - Fork 63
How to make a processor
4CAT is a modular tool. Its modules come in two varietes: data sources and processors. This article covers the latter.
Processors are bits of code that produce a dataset. Typically, their input is another dataset. As such they can be used to analyse data; for example, a processor can take a csv file containing posts as input, count how many posts occur per month, and produce another csv file with the amount of posts per month (one month per row) as output.
4CAT has an API that can do most of the scaffolding around this for you so processors can be quite lightweight and mostly focus on the analysis while 4CAT's back-end takes care of the scheduling, determining where the output should go, et cetera.
This is a minimal, annotated example of a 4CAT processor:
"""
A minimal example 4CAT processor
"""
from backend.abstract.processor import BasicProcessor
class ExampleProcessor(BasicProcessor):
"""
Example Processor
"""
type = "example-processor" # job type ID
category = "Examples" # category
title = "A simple example" # title displayed in UI
description = "This doesn't do much" # description displayed in UI
extension = "csv" # extension of result file, used internally and in UI
input = "csv:body"
output = "csv:value"
def process(self):
"""
Saves a CSV file with one column ("value") and one row with a value ("Hello
world") and marks the dataset as finished.
"""
data = {"value": "Hello world!"}
self.write_csv_items_and_finish(data)
Processor settings and metadata is stored as a class property. The following properties are available and can be set:
-
type
(string) - A unique identifier for this processor type. Used internally to queue jobs, store results, et cetera. Should be a string without any spaces in it. -
category
(string) - A category for this processor, used in the 4CAT web interface to group processors thematically. Displayed as-is in the web interface. -
title
(string) - Processor title, displayed as-is in the web interface. -
description
(string) - Description, displayed in the web interface. Markdown in the description will be parsed. -
extension
(string) - Extension for files produced by this processor, e.g.csv
orndjson
. -
input
(string) - Simple schema for the input files required by this processor, in the formattype[:fields]
. For example,csv:column1,column2
means the processor requires a CSV file with at least the columnscolumn1
andcolumn2
as input. The field definition is optional, i.e. justcsv
is also valid. -
output
(string) - Simple schema for the files produced by this processor. -
option
(dictionary) - A dictionary of configurable parameters for this processor. See the section on 'processor options' for more details. -
interrupted
(bool) -false
by default. This may be set totrue
by 4CAT's scheduler. Once it istrue
, the processor should return as soon as possible, even if processing is not complete yet. If processing has not finished yet, the result so far (if any) should be discarded and the dataset status updated to reflect that it has been interrupted. Since abort procedures are different per processor, this is the processor's responsibility. The simplest way of addressing this is raising aProcessorInterruptedException
(found inbackend.lib.exceptions
); this will gracefully stop the processor and clean up so it may be attempted again later.
The full API available to processors is as follows. All of these are members of the processor object, i.e. they are accessed via self.property
within the processor code. While other methods or classes may be available within processors, relying on them is discouraged and unsupported; when possible, use only the API documented here.
-
process() -> void
: This method should contain the processor's logic. Other methods may be defined, but this one will be called when an analysis with this processor is queued. -
iterate_items(pathlib.Path source_file) -> Generator
: This yields one item from the dataset's data file per call and raises aProcessorInterruptedException
if the processor has been interrupted while iterating. Can be used on datasets that are CSV or NDJSON files. If themap_item()
method (see below) is defined, the result of that method called with the item as an argument will be yielded instead. -
iterate_archive_contents(Path source_file) -> Generator
: This yieldsPath
objects for each file in the given archive. Files are temporarily uncompressed so they can be worked with, and then the temporary file is deleted afterwards. -
write_csv_items_and_finish(list data) -> void
: Writes a list of dictionaries as a CSV as the result of this processor, and finishes processing. Raises aProcessorInterruptedException
if the processor has been interrupted while writing. -
write_archive_and_finish(list|Path files) -> void
: Compresses all files in listfiles
, or all files in folderfiles
, into a zip archive and saves that as the dataset result. Files are deleted after adding them, and iffiles
is a folder the folder is deleted too. Useful when processor output cannot be conveniently saved into one file. -
map_item(item) -> dict
: Optional. If defined, any item yielded byiterate_items()
(see above) will be passed through this method first, and the result of this method will be passed instead. This is especially useful for datasets that are stored as NDJSON, as items may be stored as nested objects with arbitrary depth in that format, andmap_item()
can be used to reduce them to 'flat' dictionaries before processing.
-
dataset
(DataSet
): The dataset produced by this processor-
dataset.finish(int num_items_in_result) -> void
: Manually mark the dataset as finished. Either this method orwrite_csv_and_finish()
should always be called at the end ofprocess()
. Ifnum_items_in_result
is 0 or less, no download link is displayed for the result in the web interface. If it is 0, "No results" is additionally displayed in the interface. -
dataset.get_result_path() -> pathlib.Path
: Returns aPath
object referencing the location the processor result should be saved to. -
dataset.update_status(bool is_final=False) -> void
: Updates the dataset status. This can be used to indicate processor progress to the user through the web interface. Note that this updates the database, which is relatively expensive, and you should not call it too often (for example, not every iteration of a loop, but only every 500 iterations). Ifis_final
is set, subsequent calls to this method have no effect. -
dataset.get_staging_area() -> pathlib.Path
: Returns aPath
object referencing a newly created folder that can be used as a staging area; files can be stored here while the processor is active. After the processor finishes, files here may be deleted and cannot be relied on. You should nevertheless take care of this within the processor, and delete the path (e.g. withshutil.rmtree()
) after it is no longer needed, or use theprocessor.write_archive_and_finish()
method with the staging area Path as an argument, which will also delete it.
-
-
source_file
(pathlib.Path
): The file to be processed as input -
parent
(DataSet
): The dataset used as input by this processor -
parameters
(dict
): Options set by the user for this processor
It may be useful to offer a user options through which they can configure a processor, e.g. to set the language to use for text analysis or the resolution of a visualisation. This is possible with the options
property of the processor class. This option, if set, should be a dictionary, each item having its option identifier as a key and the option definition as a value, e.g.:
from backend.lib.helpers import UserInput
from backend.abstract.process import BasicProcessor
def SomeProcessor(BasicProcessor):
options = {
"graph-title": {
"type": UserInput.OPTION_TEXT,
"default": "Default title"
},
"graph-height: {
"type": UserInput.OPTION_TEXT,
"default": 25,
"min": 0,
"max": 100
}
This would, in the web interface, show these two options to a user. The value set for these options, when running the processor, may then be accessed as follows:
def process():
height = self.parameters.get("graph-height", 10)
# or, to use the default set before...
height = self.parameters.get("graph-height", self.options["graph-height"]["default"])
Currently, four types of options are available:
-
OPTION_TEXT
, shown as a simple text field. Optionally,min
andmax
may be set for this option to parse the value as a (clamped) integer. -
OPTION_TOGGLE
, a checkbox. Will be eithertrue
orfalse
. -
OPTION_CHOICE
, a select box. Select options are to be provided viaoptions
in the option definition, containing a dictionary with possible values as keys and labels for those values as values. -
OPTION_MULTI
, a list of several checkboxes. Requires anoptions
value likeOPTION_CHOICE
, but the parsed value of the option will be a list of selected values rather than a single value.
Next to these type-specific option properties, the following properties may be set for all options:
-
help
: The label for the option as displayed in the web interface (required) -
default
: The default value for this option (required) -
tooltip
: Further information on the option may be added through a tooltip which can be displayed through a tooltip anchor displayed next to the option in the interface (option).
By default, processor parameters will be shown in the web tool interface. Parameters starting with api_
will not be shown in the interface, so you can use this to hide sensitive information, e.g. API keys. Note that the parameter is still stored elsewhere as plain text and you should not rely on this alone to ensure the security of any data therein.
🐈🐈🐈🐈