Skip to content

Commit

Permalink
Add user defined process see #19
Browse files Browse the repository at this point in the history
  • Loading branch information
danlooo committed Jul 28, 2023
1 parent e8dd4cc commit 8d666b2
Showing 1 changed file with 73 additions and 1 deletion.
74 changes: 73 additions & 1 deletion src/ProcessGraph.jl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Base.getindex(g::ProcessGraph, i) = Base.getindex(g.data, i)
Base.length(g::ProcessGraph) = Base.length(g.data)

struct Reducer <: AbstractProcessGraph
process_graph::OrderedDict
process_graph::Union{OrderedDict,AbstractProcessGraph,Dict}
end

"""
Expand All @@ -62,6 +62,78 @@ function Reducer(process::String="mean")
return Reducer(process_graph)
end

function action(n::Union{Expr,Core.ReturnNode})
typeof(n) == Core.ReturnNode && return :noop
try
# BUG: can not disinglush between array in function argument and new ones inside the function
n.args[2].args[1].name == :getindex && return :create
catch
end
n.head == :(=) && return :assign
return :call
end

openeo_processes = Dict(
:+ => "add",
:- => "subtract",
:* => "multiply",
:/ => "divide"
)

"""
Create a ProcessGraph using a user-defined Function
This is useful e.g. to create reducers to combine values of different bands in a customized way.
"""
function ProcessGraph(func::Function, types::Tuple{DataType}=(Any,))
lowered = code_lowered(func, types)[1]
actions = action.(lowered.code)

idx = 0 # openEO arrays starts with 0
array_processes = Dict()
for (step, action) in zip(lowered.code, actions)
if action == :create
p = ProcessNode("array_element", Dict(
:data => Dict(:from_parameter => "data"),
:index => idx
))
idx += 1
push!(array_processes, step.args[1].id => p)
end
end

processes = Vector{ProcessNode}()
for (step, action) in zip(lowered.code, actions)
if action == :create
push!(processes, array_processes[step.args[1].id])
elseif action == :call
process_id = openeo_processes[step.args[1].name]
arguments = Dict(
:x => array_processes[step.args[2].id],
:y => array_processes[step.args[3].id]
)
process = ProcessNode(process_id, arguments)
push!(processes, process)
elseif action == :assign
step = step.args[2].args
process_id = openeo_processes[step[1].name]
arguments = Dict(
:x => processes[step[2].id],
:y => processes[step[3].id]
)
process = ProcessNode(process_id, arguments)
push!(processes, process)
end
end
processes |> last |> ProcessGraph
end

function Reducer(func::Function, types::Tuple{DataType}=(Any,))
process_graph = ProcessGraph(func, types)
return Reducer(process_graph)
end


"""
Process and download data synchronously
"""
Expand Down

0 comments on commit 8d666b2

Please sign in to comment.