-
Notifications
You must be signed in to change notification settings - Fork 57
dataflow_advanced
A dataflow has
- stages that describe the origin of records and actions taken on records
- inputs and outputs as defined by the source and sink stages it contains
- configuration
- a source originates data -- for example,
file_source
,http_listener
,console_source
. - a sink consumes data, typically placing it outside of the dataflow's immediate concern. The
file_sink
,cube_sink
, andconsole_sink
are all sinks. - simple transform processors:
- alter or augment the record in-place
- transform the record into one or many other records
- plumbing processors (aka topological processors):
many_to_many
delivers every record from its inputs to all of its outputs; aswitch
delivers each input record to exactly one of its outputs. - encapsulating processors:
retriable
.
Here is a simple dataflow; it translates plain-text documents into Pig Latin:
Wukong.dataflow(:pig_latinizer) do
doc "Translates documents into pig latin"
input :raw_text, file_source
output :latin_text, file_sink
input(:raw_text) > pig_latinizer > :latin_text
end
For example the sample document
But in a last word to the wise of these days let it be
said that of all who give gifts these two were the wisest.
Of all who give and receive gifts, such as they are wisest.
Everywhere they are wisest. They are the magi.
will be embiggened to read
Utbay inway away astlay ordway otay ethay iseway ofway esethay aysday etlay itway ebay
aidsay atthay ofway allway owhay ivegay iftsgay esethay otway ereway ethay isestway.
Ofway allway owhay ivegay andway eceiveray iftsgay, uchsay asway eythay areway isestway.
Everywhereway eythay areway isestway. Eythay areway ethay agimay.
You can refer to stages as a stage object, or a symbol naming a stage object
Wukong.dataflow(:gotta_make_the_donuts) do
input :dough_circles, dough_hopper
output :donut_box, box(:capacity => 12)
flavor = Date.today.monday? ? glazer(:raspberry) : glazer(:plain)
input(:dough_circles) >
frier(:top_frier) >
flipper >
frier(:btm_frier) >
cooling(:pre_glazer) >
flavor >
cooling(:ready) >
:donut_box
end
Defining a processor creates a dataflow method that calls the class' .make
method. Here's its original definition:
class Wukong::Processor
def make(name=nil, attrs={})
name = uniqed_name if name.nil?
self.receive(attrs.merge(:name => name))
end
end
Calling register_processor
from your class endows data flows with a correspondingly-named method; in this example, registering the PigLatinizer
class constructs the pig_latinizer
method.
class PigLatinizer
register_processor
end
# in a dataflow
input > pig_latinizer > output
You can override the name for the created method by calling
class Wukong::Widget::RegexpFilter
def make(regexp, name=nil, attrs={})
super(name, attrs.merge(:regexp => regexp))
end
register_processor(:re)
end
# in a dataflow
re(/duck/) # RegexpFilter initialized with `:pattern => /duck/` and a generated name
re(/duck/, :duck_selector) # RegexpFilter initialized with `:pattern => /duck/`, named `:duck_selector`
Typically, .make
just calls .receive
in turn,k
selectk
# as a local variable -- this is the same thing, but seems worth saying:
make([optional positional args], [name (or anonymous name will be applied)], [attrs])
re(/hi/)
re(/hi/, nil, :whatever => val)
re(/hi/, {}, :bob)
Wukong.dataflow(:pig_latinizer) do
doc "Translates documents into pig latin, saves original in log file, translated into MySQL database"
input :raw_text, http_listener(:port => 8300)
output :original_text, file_sink
output :latin_text, mysql_sink
input(:raw_text) > many_to_many([
:to_json > :original_text,
pig_latinizer > :latin_text
])
end
You can replace a dataflow source at run time:
wukong run thumbnailer.rb --mode=local --source.=
(Now of course the input you swap in must meet the expectations of its downstream stages. If a downstream stage depends on knowing what filename a record came from, you may have to set a reasonable value explicitly).