An operation is the building block of RedisGears functions. Different operation types can be used to achieve a variety of results to meet various data processing needs.
Operations can have zero or more arguments that control their operation. Depending on the operation's type arguments may be language-native data types and function callbacks.
The following sections describe the different operations.
Operation | Description | Type |
---|---|---|
Map | Maps 1:1 | Local |
FlatMap | Maps 1:N | Local |
ForEach | Does something for each record | Local |
Filter | Filters records | Local |
Accumulate | Maps N:1 | Local |
LocalGroupBy | Groups records by key | Local |
Limit | Limits the number of records | Local |
Collect | Shuffles all records to one engine | Global |
Repartition | Shuffles records between all engines | Global |
GroupBy | Groups records by key | Sugar |
BatchGroupBy | Groups records by key | Sugar |
Sort | Sorts records | Sugar |
Distinct | Makes distinct records | Sugar |
Aggregate | Aggregates records | Sugar |
AggregateBy | Aggregates records by key | Sugar |
Count | Counts records | Sugar |
CountBy | Counts records by key | Sugar |
Avg | Computes the average | Sugar |
The local Map operation performs the one-to-one (1:1) mapping of records.
It requires one mapper callback.
!!! info "Common uses" * Transform the data's shape * Typecasting and value formatting * Splitting, joining and similar string manipulations * Removing and/or adding data from/to the record
Python API
class GearsBuilder.map(f)
Arguments
- f: the mapper function callback
Examples
{{ include('operations/map.py') }}
The local FlatMap operation performs one-to-many (1:N) mapping of records.
It requires one expander callback that maps a single input record to one or more output records.
FlatMap is nearly identical to the Map operation in purpose and use. Unlike regular mapping, however, when FlatMap returns a list, each element in the list is turned into a separate output record.
!!! info "Common uses" * Deconstruction of nested, multi-part, or otherwise overly-complicated records
Python API
class GearsBuilder.flatmap(f)
Arguments
- f: the expander function callback
Examples
{{ include('operations/flatmap.py') }}
The local ForEach operation performs one-to-the-same (1=1) mapping.
It requires one processor callback to perform some work that's related to the input record.
Its output record is a copy of the input, which means anything the callback returns is discarded.
!!! info "Common uses" * Non-transforming, record-related logic
Python API
class GearsBuilder.foreach(f)
Arguments
- f: the processor function callback that will be used on each record
Examples
{{ include('operations/foreach.py') }}
The local Filter operation performs one-to-zero-or-one (1:(0|1)) filtering of records.
It requires a filterer function callback.
An input record that yields a falsehood will be discarded and only truthful ones will be output.
!!! info "Common uses" * Filtering records
Python API
class GearsBuilder.filter(f)
Arguments
- f: the filtering function callback
Examples
{{ include('operations/filter.py') }}
The local Accumulate operation performs many-to-one mapping (N:1) of records.
It requires one accumulator callback.
Once input records are exhausted its output is a single record consisting of the accumulator's value.
!!! info "Common uses" * Aggregating records
Python API
class GearsBuilder.accumulate(f)
Arguments
- f: an accumulator function callback
Examples
{{ include('operations/accumulate.py') }}
The local LocalGroupBy operation performs many-to-less mapping (N:M) of records.
The operation requires two callbacks: an extractor a reducer.
The output records consist of the grouping key and its respective accumulator's value.
!!! info "Common uses" * Grouping records by key
Python API
class GearsBuilder.localgroupby(e, r)
Arguments
Examples
{{ include('operations/localgroupby.py') }}
The local Limit operation limits the number of records.
It accepts two numeric arguments: a starting position in the input records "array" and a maximal number of output records.
!!! info "Common uses" * Returning the first results * Batch paging on static data
Python API
class GearsBuilder.limit(length, start=0)
Arguments
- length: the maximal length of the output records list
- start: a 0-based index of the input record to start from
Examples
{{ include('operations/limit.py') }}
The global Collect operation collects the result records from all of the shards to the originating one.
It has no arguments.
!!! info "Common uses" * Final steps of distributed executions
Python API
class GearsBuilder.collect()
Examples
{{ include('operations/collect.py') }}
The global Repartition operation repartitions the records by them shuffling between shards.
It accepts a single key extractor function callback. The extracted key is used for computing the record's new placement in the cluster (i.e. hash slot). The operation then moves the record from its original shard to the new one.
!!! info "Common uses" * Remapping of records to engines * JOIN-like operations
Python API
class GearsBuilder.repartition(f)
Arguments
- f: a key extractor function callback
Examples
{{ include('operations/repartition-001.py') }}
{{ include('operations/repartition-002.py') }}
The sugar Aggregate operation performs many-to-one mapping (N:1) of records.
Aggregate provides an alternative to the local accumulate operation as it takes the partitioning of data into consideration. Furthermore, because records are aggregated locally before collection, its performance is usually superior.
It requires a zero value and two accumulator callbacks for computing the local and global aggregates.
The operation is made of these steps:
- The local accumulator is executed locally and initialized with the zero value
- A global collect moves all records to the originating engine
- The global accumulator is executed locally by the originating engine
Its output is a single record consisting of the accumulator's global value.
Python API
class GearsBuilder.aggregate(z, l, g)
Arguments
- z: the aggregate's zero value
- l: a local accumulator function callback
- g: a global accumulator function callback
Examples
{{ include('operations/aggregate.py') }}
The sugar AggregateBy operation performs many-to-less mapping (N:M) of records.
It is similar to the Aggregate operation but aggregates per key. It requires a an extractor callback, a zero value and two reducers callbacks for computing the local and global aggregates.
The operation is made of these steps:
- extraction of the groups using extractor
- The local reducer is executed locally and initialized with the zero value
- A global repartition operation that uses the extractor
- The global reducer is executed on each shard once it is repartitioned with its relevant keys
Output list of records, one for each key. The output records consist of the grouping key and its respective reducer's value.
Python API
class GearsBuilder.aggregateby(e, z, l, g)
Arguments
- e: a key extractor function callback
- z: the aggregate's zero value
- l: a local reducer function callback
- g: a global reducer function callback
Examples
{{ include('operations/aggregateby.py') }}
The sugar GroupBy* operation performs a many-to-less (N:M) grouping of records. It is similar to AggregateBy but uses only a global reducer. It can be used in cases where locally reducing the data isn't possible.
The operation requires two callbacks: an extractor a reducer.
The operation is made of these steps:
- A global repartition operation that uses the extractor
- The reducer is locally invoked
Output is a locally-reduced list of records, one for each key. The output records consist of the grouping key and its respective accumulator's value.
Python API
class GearsBuilder.groupby(e, r)
Arguments
Examples
{{ include('operations/groupby.py') }}
The sugar BatchGroupBy operation performs a many-to-less (N:M) grouping of records.
!!! important "Prefer the GroupBy Operation" Instead of using BatchGroupBy, prefer using the GroupBy operation as it is more efficient and performant. Only use BatchGroupBy when the reducer's logic requires the full list of records for each input key.
The operation requires two callbacks: an extractor a batch reducer.
The operation is made of these steps:
- A global repartition operation that uses the extractor
- A local localgroupby operation that uses the batch reducer
Once finished, the operation locally outputs a record for each key and its respective accumulator value.
!!! warning "Increased memory consumption" Using this operation may cause a substantial increase in memory usage during runtime.
Python API
class GearsBuilder.batchgroupby(e, r)
Arguments
- e: a key extractor function callback
- r: a batch reducer function callback
Examples
{{ include('operations/batchgroupby.py') }}
The sugar Sort operation sorts the records.
It accepts a single Boolean argument that determines the order.
The operation is made of the following steps:
- A global aggregate operation collects and combines all records
- A local sort is performed on the list
- The list is flatmapped to records
!!! warning "Increased memory consumption" Using this operation may cause an increase in memory usage during runtime due to the list being copied during the sorting operation.
Python API
class GearsBuilder.sort(reverse=True)
Arguments
- reverse: when
True
sorts in descending order
Examples
{{ include('operations/sort.py') }}
The sugar Distinct operation returns distinct records.
It requires no arguments.
The operation is made of the following steps:
- A aggregate operation locally reduces the records to sets that are then collected and unionized globally
- A local flatmap operation turns the set into records
Python API
class GearsBuilder.distinct()
Examples
{{ include('operations/distinct.py') }}
The sugar Count operation counts the records.
It requires no arguments.
The operation is made of an aggregate operation that uses local counting and global summing accumulators.
Python API
class GearsBuilder.count()
Examples
{{ include('operations/count.py') }}
The sugar CountBy operation counts the records grouped by key.
It requires a single extractor function callback.
The operation is made of an aggregateby operation that uses local counting and global summing accumulators.
Python API
class GearsBuilder.countby(extractor=lambda x: x)
Arguments
- extractor: an optional key extractor function callback
Examples
{{ include('operations/countby.py') }}
The sugar Avg operation returns the arithmetic average of records.
It accepts an optional value extractor function callback.
The operation is made of the following steps:
- A aggregate operation locally reduces the records to tuples of sum and count that are globally combined.
- A local map operation calculates the average from the global tuple
Python API
class GearsBuilder.avg(extractor=lambda x: float(x))
Arguments
- extractor: an optional value extractor function callback
Examples
{{ include('operations/avg.py') }}
The Local execution of an operation is carried out the RedisGears engine that's deployed in either stand-alone or cluster mode. When used alone, there's a single engine executing all operations on all data locally.
When clustered, the operation is distributed to all shards. Each shard's engine executes the operation locally as well. Shards' engines, however, can only process the data they are partitioned with by the cluster.
Global operations are only relevant in the context of a clustered RedisGears environment. These are the Collect and Repartition operations that shuffle records between shards.
A Sugar operation is a utility operation. These are implemented internally with basic operations and the relevant callbacks.
A Callback is used for calling a function in the language used by the API.
An Extractor is a callback that receives an input record as an argument. It returns a value extracted from the record. The returned value should be a native string.
Python
# Lambda function form
lambda r: str(...)
# Function form
def extractorFunction(r):
...
return ...
Arguments
- r: the input record
Examples
# These extractors expect dict() records having a 'key' key (e.g. KeysReader)
def keyExtractor(r):
''' Just extracts the key '''
return str(r['key'])
def reverseExtractor(r):
''' Reverses the extracted key '''
return str(r['key'])[::-1]
# This extractor expects dict() records having a string 'value' key (e.g. KeysReader with Redis Strings)
def floatExtractor(r):
''' Makes the value float '''
return float(r['value'])
A Mapper is a callback that receives an input record as an argument. It must return an output record.
Python
# Lambda function form
lambda r: ...
# Function form
def mapperrFunction(r):
...
return o
Arguments
- r: the input record
Return
- o: an output record
Examples
# This mapper expects dict() records having a 'key' key (e.g. KeysReader)
def keyOnlyMapper(r):
''' Maps a record to its key only '''
return str(r['key'])
An Expander is a callback that receives an input record. It must return one or one or more output records.
Python
# Lambda function form
lambda r: list(...)
# Function form
def expanderFunction(r):
...
return list(i)
Arguments
- r: the input record
Return
- i: an iterable of output records
Examples
# This expander expects KeysReader records of Redis Hashes
def hashExploder(r):
''' Splats a record's dict() 'value' into its keys '''
# Prefix each exploded key with the original in curly brackets and a colon
# for clustering safety, i.e.: {hashkeyname}:fieldname
pre = '{' + r['key'] + '}:'
l = [{ 'key': f'{pre}{x[0]}', 'value': x[1] } for x in r['value'].items()]
return l
A Processor is a callback that receives an input record. It shouldn't return anything.
Python
# Lambda function form
lambda r: ...
# Function form
def processorFunction(r):
...
Arguments
- r: the input record
Examples
def logProcessor(r):
''' Log each record '''
log(str(r))
A Filterer is a callback that receives an input record. It must return a Boolean value.
Python
# Lambda function form
lambda r: bool(...)
# Function form
def filtererFunction(r):
...
return bool(b)
Arguments
- r: the input record
Return
- b: a Boolean value
Examples
def dictRecordFilter(r):
''' Filters out non-dict records (e.g. Redis' Strings won't pass) '''
return type(r) is dict
An Accumulator is a callback that receives an input record and variable that's also called an accumulator. It aggregates inputs into the accumulator variable, which stores the state between the function's invocations. The function must return the accumulator's updated value after each call.
Python
# Lambda function form
lambda a, r: ...
# Function form
def accumulatorFunction(a, r):
...
return u
Arguments
- a: the accumulator's value from previous calls
- r: the input record
Return
- u: the accumulator's updated value
Examples
# This accumulator expects nothing
def countingAccumulator(a, r):
''' Counts records '''
# a's initial value is None so set it to a zero value if so
a = a if a else 0
# increment it by one
a = a + 1
return a
A Reducer is a callback function that receives a key, an input and a variable that's called an accumulator. It performs similarly to the accumulator callback, with the difference being that it maintains an accumulator per reduced key.
Python
# Lambda function form
lambda k, a, r: ...
# Function form
def reducerFunction(k, a, r):
...
return u
Arguments
- k: the key
- a: the accumulator's value from previous calls
- r: the input record
Return
- u: the accumulator's updated value
Examples
def keyCountingReducer(k, a, r):
''' Counts records for each key'''
# a's initial value is None so set it to a zero value if so
a = a if a else 0
# increment it by one
a = a + 1
return a
A Batch Reducer is a callback function that receives a key and a list of input records. It performs similarly to the reducer callback, with the difference being that it is input with a list of records instead of a single one. It is expected to return an accumulator value for these records.
Python
# Lambda function form
lambda k, l: ...
# Function form
def batchReducerFunction(k, l):
...
return a
Arguments
- k: the key
- l: the list of input record
Return
- a: the accumulator's value
Examples
def batchKeyCountingReducer(k, l):
''' Counts records for each key'''
a = len(l)
return a