Skip to content

Commit

Permalink
Merge pull request #86 from legend-exp/patch_partitioninfo
Browse files Browse the repository at this point in the history
Allow `partitioninfo` to filter categories
  • Loading branch information
fhagemann authored Dec 16, 2024
2 parents 0d3faa7 + 94df5c8 commit a678b6b
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 44 deletions.
98 changes: 57 additions & 41 deletions src/dataprod_config.jl
Original file line number Diff line number Diff line change
Expand Up @@ -102,66 +102,82 @@ end
export pydataprod_parameters


const _cached_partitioninfo = LRU{Tuple{UInt, Symbol}, IdDict{DataPartition, Table}}(maxsize = 300)

function _get_partitions(data::LegendData, label::Symbol)
get!(_cached_partitioninfo, (objectid(data), label)) do
parts = pydataprod_config(data).partitions.default
parts_label = get(pydataprod_config(data).partitions, label, PropDict())
for k in keys(parts_label)
parts[k] = parts_label[k]
end
# type for live time
rinfo_type = typeof(first(runinfo(data)))
result::IdDict{
DataPartition,
Table{rinfo_type}
} = IdDict([
let
periods_and_runs = [
let period = DataPeriod(string(p))
filter(row -> row.run in Vector{DataRun}(rs), runinfo(data, period))
end
for (p,rs) in part
]
flat_pr = vcat(periods_and_runs...)::Table{rinfo_type}
DataPartition(pidx)::DataPartition => sort(Table(flat_pr))
const _cached_partitioninfo = LRU{Tuple{UInt, Symbol, Symbol}, IdDict{DataPartition, Table}}(maxsize = 300)

function _get_partitions(data::LegendData, label::Symbol; category::DataCategoryLike=:all)
let cat = Symbol(DataCategory(category))
get!(_cached_partitioninfo, (objectid(data), label, cat)) do
parts = pydataprod_config(data).partitions.default
parts_label = get(pydataprod_config(data).partitions, label, PropDict())
for k in keys(parts_label)
parts[k] = parts_label[k]
end
for (pidx, part) in parts
])
# type for live time
rinfo_type = typeof(first(runinfo(data)))
result::IdDict{
DataPartition,
Table{rinfo_type}
} = IdDict([
let
periods_and_runs = [
let period = DataPeriod(string(p))
filter(row -> row.run in Vector{DataRun}(rs), runinfo(data, period))
end
for (p,rs) in part
]
flat_pr = vcat(periods_and_runs...)::Table{rinfo_type}
tab = if cat == :all
Table(flat_pr)
else
Table(filter(row -> getproperty(row, cat).is_analysis_run, Table(flat_pr)))
end
DataPartition(pidx)::DataPartition => sort(tab)
end
for (pidx, part) in parts
])

IdDict{DataPartition, typeof(Table(result[first(keys(result))]))}(keys(result) .=> Table.(values(result)))
IdDict{DataPartition, typeof(Table(result[first(keys(result))]))}(keys(result) .=> Table.(values(result)))
end
end
end

"""
partitioninfo(data::LegendData, ch::ChannelId)
partitioninfo(data::LegendData, ch::ChannelId, part::DataPartitionLike)
partitioninfo(data::LegendData, ch::ChannelId, period::DataPeriodLike)
partitioninfo(data::LegendData, ch::ChannelId)::IdDict{DataPartition, Table}
partitioninfo(data::LegendData, ch::ChannelId, part::DataPartitionLike; category::DataCategoryLike=:all)
partitioninfo(data::LegendData, ch::ChannelId, period::DataPeriodLike; category::DataCategoryLike=:all)
partitioninfo(data, ch, period::DataPeriodLike, run::DataRunLike; category::DataCategoryLike=:all)
Return cross-period data partitions.
# Arguments
- `data::LegendData`: The LegendData object containing the data.
- `ch::ChannelId`: The channel identifier.
Return cross-period data partitions.
# Returns
- `IdDict{DataPartition, Table}`: A dictionary mapping data partitions to tables.
"""
function partitioninfo end
export partitioninfo
function partitioninfo(data::LegendData, ch::ChannelId)
_get_partitions(data, Symbol(ChannelId(ch)))
function partitioninfo(data::LegendData, ch::ChannelId; kwargs...)
_get_partitions(data, Symbol(ChannelId(ch)); kwargs...)
end
function partitioninfo(data::LegendData, det::DetectorIdLike)
function partitioninfo(data::LegendData, det::DetectorIdLike; kwargs...)
ch = channelinfo(data, first(filter(!ismissing, runinfo(data).cal.startkey)), det).channel
partitioninfo(data, ch)
partitioninfo(data, ch; kwargs...)
end
partitioninfo(data, ch, part::DataPartition) = partitioninfo(data, ch)[part]
partitioninfo(data, ch, period::DataPeriod) = sort(Vector{DataPartition}([p for (p, pinfo) in partitioninfo(data, ch) if period in pinfo.period]))
function partitioninfo(data, ch, p::Union{Symbol, AbstractString})
partitioninfo(data, ch, part::DataPartition; kwargs...) = partitioninfo(data, ch; kwargs...)[part]
partitioninfo(data, ch, period::DataPeriod; kwargs...) = sort(Vector{DataPartition}([p for (p, pinfo) in partitioninfo(data, ch; kwargs...) if period in pinfo.period]))
function partitioninfo(data, ch, p::Union{Symbol, AbstractString}; kwargs...)
if _can_convert_to(DataPartition, p)
partitioninfo(data, ch, DataPartition(p))
partitioninfo(data, ch, DataPartition(p); kwargs...)
elseif _can_convert_to(DataPeriod, p)
partitioninfo(data, ch, DataPeriod(p))
partitioninfo(data, ch, DataPeriod(p); kwargs...)
else
throw(ArgumentError("Invalid specification \"$p\". Must be of type DataPartition or DataPeriod"))
end
end
partitioninfo(data, ch, period::DataPeriodLike, run::DataRunLike) = sort(Vector{DataPartition}([p for (p, pinfo) in partitioninfo(data, ch) if any(map(row -> row.period == DataPeriod(period) && row.run == DataRun(run), pinfo))]))
partitioninfo(data, ch, period::DataPeriodLike, run::DataRunLike; kwargs...) = sort(Vector{DataPartition}([p for (p, pinfo) in partitioninfo(data, ch; kwargs...) if any(map(row -> row.period == DataPeriod(period) && row.run == DataRun(run), pinfo))]))



Expand Down
2 changes: 1 addition & 1 deletion src/exposure.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ end
function get_exposure(data::LegendData, det::DetectorIdLike, part::DataPartition; is_analysis_run::Bool=true, cat::DataCategoryLike=:phy)
part_dict = partitioninfo(data, det)
if haskey(part_dict, part)
rinfo = partitioninfo(data, det, part)
rinfo = partitioninfo(data, det, part; category=cat)
return _get_exposure(data, det, rinfo, is_analysis_run, cat)
end

Expand Down
2 changes: 1 addition & 1 deletion src/utils/management_utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ function get_partition_channelinfo(data::LegendData, chinfo::Table, period::Data
# get partition information for given period
period = DataPeriod(period)
# get partition information for given period and channels
parts = partitioninfo.(data, chinfo.channel, period)
parts = partitioninfo.(data, chinfo.channel, period)
t = StructArray(merge((partition = parts, ), columns(chinfo)))
if unfold_partitions
t_unfold = t |> filterby(@pf length($partition) > 1)
Expand Down
2 changes: 1 addition & 1 deletion src/utils/pars_utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ function get_partitionvalidity(data::LegendData, ch::ChannelIdLike, det::Detecto
# unpack
ch, det, part = ChannelId(ch), DetectorId(det), DataPartition(part)
# get partition validity
partinfo = partitioninfo(data, ch, part)
partinfo = partitioninfo(data, ch, part; category=cat)
Vector{@NamedTuple{period::DataPeriod, run::DataRun, filekey::FileKey, validity::String}}([(period = pinf.period, run = pinf.run, filekey = start_filekey(data, (pinf.period, pinf.run, cat)), validity = "$det/$(part).json") for pinf in partinfo])
end
export get_partitionvalidity
Expand Down

0 comments on commit a678b6b

Please sign in to comment.