Skip to content

How to make a processor

Stijn Peeters edited this page Jun 11, 2021 · 29 revisions

Processors

4CAT is a modular tool. Its modules come in two varietes: data sources and processors. This article covers the latter.

Processors are bits of code that produce a dataset. Typically, their input is another dataset. As such they can be used to analyse data; for example, a processor can take a csv file containing posts as input, count how many posts occur per month, and produce another csv file with the amount of posts per month (one month per row) as output. Processors always produce the following things:

  • A set of metadata for the Dataset the processor will produce. This is stored in 4CAT's PostgreSQL database. The record for the database is created when the processor's job is first queued, and updated by the processor.
  • A result file, which may have an arbitrary format. This file contains whatever the processor produces, e.g. a list of frequencies, an image wall or a zip archive containing word embedding models.
  • A log file, with the same file name as the result file but with a '.log' extension. This documents any output from the processor while it was producing the result file.

4CAT has an API that can do most of the scaffolding around this for you so processors can be quite lightweight and mostly focus on the analysis while 4CAT's back-end takes care of the scheduling, determining where the output should go, et cetera.

Example

This is a minimal, annotated example of a 4CAT processor:

"""
A minimal example 4CAT processor
"""
from backend.abstract.processor import BasicProcessor

class ExampleProcessor(BasicProcessor):
	"""
	Example Processor
	"""
	type = "example-processor"  # job type ID
	category = "Examples" # category
	title = "A simple example"  # title displayed in UI
	description = "This doesn't do much"  # description displayed in UI
	extension = "csv"  # extension of result file, used internally and in UI

	input = "csv:body"
	output = "csv:value"

	def process(self):
		"""
		Saves a CSV file with one column ("value") and one row with a value ("Hello
		world") and marks the dataset as finished.
		"""
		data = {"value": "Hello world!"}
		self.write_csv_items_and_finish(data)

Processor properties

Processor settings and metadata is stored as a class property. The following properties are available and can be set:

  • type (string) - A unique identifier for this processor type. Used internally to queue jobs, store results, et cetera. Should be a string without any spaces in it.
  • category (string) - A category for this processor, used in the 4CAT web interface to group processors thematically. Displayed as-is in the web interface.
  • title (string) - Processor title, displayed as-is in the web interface.
  • description (string) - Description, displayed in the web interface. Markdown in the description will be parsed.
  • extension (string) - Extension for files produced by this processor, e.g. csv or ndjson.
  • input (string) - Simple schema for the input files required by this processor, in the format type[:fields]. For example, csv:column1,column2 means the processor requires a CSV file with at least the columns column1 and column2 as input. The field definition is optional, i.e. just csv is also valid.
  • output (string) - Simple schema for the files produced by this processor.
  • option (dictionary) - A dictionary of configurable parameters for this processor. See the section on 'processor options' for more details.
  • interrupted (bool) - false by default. This may be set to true by 4CAT's scheduler. Once it is true, the processor should return as soon as possible, even if processing is not complete yet. If processing has not finished yet, the result so far (if any) should be discarded and the dataset status updated to reflect that it has been interrupted. Since abort procedures are different per processor, this is the processor's responsibility. The simplest way of addressing this is raising a ProcessorInterruptedException (found in backend.lib.exceptions); this will gracefully stop the processor and clean up so it may be attempted again later.

Processor API

The full API available to processors is as follows. All of these are members of the processor object, i.e. they are accessed via self.property within the processor code. While other methods or classes may be available within processors, relying on them is discouraged and unsupported; when possible, use only the API documented here.

Methods

  • process() -> void: This method should contain the processor's logic. Other methods may be defined, but this one will be called when an analysis with this processor is queued.
  • iterate_items(pathlib.Path source_file) -> Generator: This yields one item from the dataset's data file per call and raises a ProcessorInterruptedException if the processor has been interrupted while iterating. Can be used on datasets that are CSV or NDJSON files. If possible, you should always use this method instead of interacting with the source file directly. If the map_item() method (see below) is defined, the result of that method called with the item as an argument will be yielded instead.
  • iterate_archive_contents(Path source_file) -> Generator: This yields Path objects for each file in the given archive. Files are temporarily uncompressed so they can be worked with, and then the temporary file is deleted afterwards.
  • write_csv_items_and_finish(list data) -> void: Writes a list of dictionaries as a CSV as the result of this processor, and finishes processing. Raises a ProcessorInterruptedException if the processor has been interrupted while writing.
  • write_archive_and_finish(list|Path files) -> void: Compresses all files in list files, or all files in folder files, into a zip archive and saves that as the dataset result. Files are deleted after adding them, and if files is a folder the folder is deleted too. Useful when processor output cannot be conveniently saved into one file.
  • map_item(item) -> dict: Optional. If defined, any item yielded by iterate_items() (see above) will be passed through this method first, and the result of this method will be passed instead. This is especially useful for datasets that are stored as NDJSON, as items may be stored as nested objects with arbitrary depth in that format, and map_item() can be used to reduce them to 'flat' dictionaries before processing.
  • is_compatible_with(DataSet) -> bool: Optional. If defined, this method should be decorated as a @classmethod and take cls as its first argument. For every dataset, this method is then used to determine if the processor can run on that dataset. If the processor does not define this method, it will be available for any 'top-level' dataset (i.e. any dataset created via 'Create Dataset').
  • get_options(DataSet) -> dict: Optional. If defined, this method should be decorated as a @classmethod and take cls as its first argument. The method should then return a processor options definition (see below) that is compatible with the given dataset. If this method is not defined, the (static) options attribute of the processor class is used instead.

Properties

  • dataset (DataSet): The dataset produced by this processor
    • dataset.finish(int num_items_in_result) -> void: Manually mark the dataset as finished. Either this method or write_csv_and_finish() should always be called at the end of process(). If num_items_in_result is 0 or less, no download link is displayed for the result in the web interface. If it is 0, "No results" is additionally displayed in the interface.
    • dataset.get_result_path() -> pathlib.Path: Returns a Path object referencing the location the processor result should be saved to.
    • dataset.update_status(bool is_final=False) -> void: Updates the dataset status. This can be used to indicate processor progress to the user through the web interface. Note that this updates the database, which is relatively expensive, and you should not call it too often (for example, not every iteration of a loop, but only every 500 iterations). If is_final is set, subsequent calls to this method have no effect.
    • dataset.get_staging_area() -> pathlib.Path: Returns a Path object referencing a newly created folder that can be used as a staging area; files can be stored here while the processor is active. After the processor finishes, files here may be deleted and cannot be relied on. You should nevertheless take care of this within the processor, and delete the path (e.g. with shutil.rmtree()) after it is no longer needed, or use the processor.write_archive_and_finish() method with the staging area Path as an argument, which will also delete it.
  • parent (DataSet): The Dataset used as input by this processor
  • source_file (pathlib.Path): The file to be processed as input, i.e. the data file corresponding to the parent Dataset
  • parameters (dict): Options set by the user for this processor

Processor options

It may be useful to offer a user options through which they can configure a processor, e.g. to set the language to use for text analysis or the resolution of a visualisation. This is possible with the options property of the processor class. This option, if set, should be a dictionary, each item having its option identifier as a key and the option definition as a value, e.g.:

from backend.lib.helpers import UserInput
from backend.abstract.process import BasicProcessor

def SomeProcessor(BasicProcessor):
    options = {
        "graph-title": {
            "type": UserInput.OPTION_TEXT,
            "default": "Default title"
        },
        "graph-height: {
            "type": UserInput.OPTION_TEXT,
            "default": 25,
            "min": 0,
            "max": 100
        }

This would, in the web interface, show these two options to a user. The value set for these options, when running the processor, may then be accessed as follows:

    def process():
        height = self.parameters.get("graph-height", 10)
        # or, to use the default set before...
        height = self.parameters.get("graph-height", self.options["graph-height"]["default"])

Currently, four types of options are available:

  • OPTION_TEXT, shown as a simple text field. Optionally, min and max may be set for this option to parse the value as a (clamped) integer.
  • OPTION_TOGGLE, a checkbox. Will be either true or false.
  • OPTION_CHOICE, a select box. Select options are to be provided via options in the option definition, containing a dictionary with possible values as keys and labels for those values as values.
  • OPTION_MULTI, a list of several checkboxes. Requires an options value like OPTION_CHOICE, but the parsed value of the option will be a list of selected values rather than a single value.

Next to these type-specific option properties, the following properties may be set for all options:

  • help: The label for the option as displayed in the web interface (required)
  • default: The default value for this option (required)
  • tooltip: Further information on the option may be added through a tooltip which can be displayed through a tooltip anchor displayed next to the option in the interface (option).

The values set for these options will be stored in the metadata of the created Dataset, and can be retrieved with Dataset.parameters. By default, dataset parameters will be shown in the web tool interface. Parameters starting with api_ will not be shown in the interface, so you can use this to hide sensitive information, e.g. API keys. Note that the parameter is still stored elsewhere as plain text and you should not rely on this alone to ensure the security of any data therein.