Skip to content

Commit

Permalink
Merge pull request #412 from NOAA-OWP/issue397
Browse files Browse the repository at this point in the history
Do not use covariate datasets to filter unless they are explicitly de…
  • Loading branch information
james-d-brown authored Feb 11, 2025
2 parents 2bc211b + 4486f14 commit 888f5d4
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 71 deletions.
24 changes: 18 additions & 6 deletions src/wres/pipeline/pooling/EventsGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -315,11 +317,14 @@ private Set<TimeWindowOuter> doEventDetection( EventDetectionDetails details )
timeWindow = TimeWindowSlicer.adjustTimeWindowForTimeScale( adjustedOuter, details.desiredTimeScale() );
}

Set<Feature> features = this.getFeatures( featureGroup.getFeatures(), featureGetter );

LOGGER.info( "Getting time-series data to perform event detection for the following features: {}", features );

switch ( details.dataset() )
{
case OBSERVED ->
{
Set<Feature> features = this.getFeatures( featureGroup.getFeatures(), featureGetter );
Stream<TimeSeries<Double>> series = eventRetriever.getLeftRetriever( features, timeWindow )
.get();

Expand All @@ -336,7 +341,6 @@ private Set<TimeWindowOuter> doEventDetection( EventDetectionDetails details )
}
case PREDICTED ->
{
Set<Feature> features = this.getFeatures( featureGroup.getFeatures(), featureGetter );
Stream<TimeSeries<Double>> series = eventRetriever.getRightRetriever( features, timeWindow )
.get();

Expand All @@ -353,7 +357,6 @@ private Set<TimeWindowOuter> doEventDetection( EventDetectionDetails details )
}
case BASELINE ->
{
Set<Feature> features = this.getFeatures( featureGroup.getFeatures(), featureGetter );
Stream<TimeSeries<Double>> series = eventRetriever.getBaselineRetriever( features, timeWindow )
.get();

Expand All @@ -370,22 +373,31 @@ private Set<TimeWindowOuter> doEventDetection( EventDetectionDetails details )
}
case COVARIATES ->
{
Set<Feature> features = this.getFeatures( featureGroup.getFeatures(), featureGetter );
Stream<TimeSeries<Double>> series = eventRetriever.getCovariateRetriever( features,
details.covariateName(),
timeWindow )
.get();

// Count the number of time-series
AtomicLong seriesCount = new AtomicLong();
UnaryOperator<TimeSeries<Double>> counter = s ->
{
seriesCount.incrementAndGet();
return s;
};
Set<TimeWindowOuter> innerEvents =
series.flatMap( s -> this.doEventDetection( s,
series.map( counter )
.flatMap( s -> this.doEventDetection( s,
this.getAdjustedDetails( details, s.getMetadata()
.getUnit() ),
this.covariateUpscaler() )
.stream() )
.collect( Collectors.toSet() );
LOGGER.info( "Detected {} events in the {} dataset for feature group {} with variable name, '{}'.",
LOGGER.info( "Detected {} events in the {} dataset containing {} time-series for feature group {} "
+ "with variable name, '{}'.",
innerEvents.size(),
EventDetectionDataset.COVARIATES,
seriesCount.get(),
featureGroup.getName(),
details.covariateName() );
events.addAll( innerEvents );
Expand Down
62 changes: 35 additions & 27 deletions src/wres/pipeline/pooling/PoolFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;

import wres.config.yaml.components.CovariateDataset;
import wres.config.yaml.components.CovariatePurpose;
import wres.config.yaml.components.CrossPair;
import wres.config.yaml.DeclarationUtilities;
import wres.config.yaml.components.DataType;
Expand Down Expand Up @@ -574,9 +575,9 @@ private List<SupplierWithPoolRequest<Pool<TimeSeries<Pair<Double, Double>>>>> ge
TimeSeriesUpscaler<Double> baselineUpscaler = this.getBaselineSingleValuedUpscaler();
TimeSeriesUpscaler<Double> covariateUpscaler = this.getCovariateSingleValuedUpscaler();

Set<Covariate<Double>> covariates = this.getCovariates( declaration,
covariateUpscaler, this.getProject()
.getDesiredTimeScale() );
Set<Covariate<Double>> covariateFilters = this.getCovariateFilters( declaration,
covariateUpscaler, this.getProject()
.getDesiredTimeScale() );

// Create a feature-specific baseline generator function (e.g., persistence), if required
Function<Set<Feature>, BaselineGenerator<Double>> baselineGenerator = null;
Expand Down Expand Up @@ -706,7 +707,7 @@ else if ( method == GeneratedBaselines.CLIMATOLOGY )
.setBaselineMissingFilter( missingFilter )
.setLeftUpscaler( leftUpscaler )
.setRightUpscaler( rightUpscaler )
.setCovariates( covariates )
.setCovariateFilters( covariateFilters )
.setBaselineUpscaler( baselineUpscaler )
.setLeftTimeShift( leftTimeShift )
.setRightTimeShift( rightTimeShift )
Expand Down Expand Up @@ -774,9 +775,9 @@ private List<SupplierWithPoolRequest<Pool<TimeSeries<Pair<Double, Ensemble>>>>>
TimeSeriesUpscaler<Ensemble> baselineUpscaler = this.getBaselineEnsembleUpscaler();
TimeSeriesUpscaler<Double> covariateUpscaler = this.getCovariateSingleValuedUpscaler();

Set<Covariate<Double>> covariates = this.getCovariates( declaration,
covariateUpscaler, this.getProject()
.getDesiredTimeScale() );
Set<Covariate<Double>> covariateFilters = this.getCovariateFilters( declaration,
covariateUpscaler, this.getProject()
.getDesiredTimeScale() );

// Left transformer, which is a composition of several transformations
Map<GeometryTuple, Offset> offsets = project.getOffsets();
Expand Down Expand Up @@ -886,7 +887,7 @@ private List<SupplierWithPoolRequest<Pool<TimeSeries<Pair<Double, Ensemble>>>>>
.setLeftUpscaler( leftUpscaler )
.setRightUpscaler( rightUpscaler )
.setBaselineUpscaler( baselineUpscaler )
.setCovariates( covariates )
.setCovariateFilters( covariateFilters )
.setLeftFilter( singleValuedFilter )
.setRightFilter( ensembleFilter )
.setBaselineFilter( ensembleFilter )
Expand Down Expand Up @@ -957,9 +958,9 @@ private List<SupplierWithPoolRequest<Pool<TimeSeries<Pair<Double, Ensemble>>>>>
TimeSeriesUpscaler<Ensemble> baselineUpscaler = this.getBaselineEnsembleUpscaler();
TimeSeriesUpscaler<Double> covariateUpscaler = this.getCovariateSingleValuedUpscaler();

Set<Covariate<Double>> covariates = this.getCovariates( declaration,
covariateUpscaler, this.getProject()
.getDesiredTimeScale() );
Set<Covariate<Double>> covariateFilters = this.getCovariateFilters( declaration,
covariateUpscaler, this.getProject()
.getDesiredTimeScale() );

// Left transformer, which is a composition of several transformations
Map<GeometryTuple, Offset> offsets = project.getOffsets();
Expand Down Expand Up @@ -1072,7 +1073,7 @@ private List<SupplierWithPoolRequest<Pool<TimeSeries<Pair<Double, Ensemble>>>>>
.setLeftUpscaler( leftUpscaler )
.setRightUpscaler( rightUpscaler )
.setBaselineUpscaler( baselineUpscaler )
.setCovariates( covariates )
.setCovariateFilters( covariateFilters )
.setLeftFilter( singleValuedFilter )
.setRightFilter( ensembleFilter )
.setBaselineFilter( ensembleFilter )
Expand Down Expand Up @@ -2538,15 +2539,22 @@ private Duration getPairFrequency( EvaluationDeclaration declaration )
* @param declaration the declaration
* @param upscaler an upscaler for each covariate dataset by variable name
* @param desiredTimeScale the desired timescale
* @return the covariate upscalers
* @return the covariate filters
*/
private Set<Covariate<Double>> getCovariates( EvaluationDeclaration declaration,
TimeSeriesUpscaler<Double> upscaler,
TimeScaleOuter desiredTimeScale )
private Set<Covariate<Double>> getCovariateFilters( EvaluationDeclaration declaration,
TimeSeriesUpscaler<Double> upscaler,
TimeScaleOuter desiredTimeScale )
{
Set<Covariate<Double>> covariates = new HashSet<>();

for ( CovariateDataset covariateDataset : declaration.covariates() )
// Covariates that are intended for filtering:
List<CovariateDataset> covariateFilters = declaration.covariates()
.stream()
.filter( c -> c.purposes()
.contains( CovariatePurpose.FILTER ) )
.toList();

for ( CovariateDataset nextFilter : covariateFilters )
{
TimeScaleOuter covariateTimeScale = null;
if ( ( Objects.nonNull( desiredTimeScale ) ) )
Expand All @@ -2556,27 +2564,27 @@ private Set<Covariate<Double>> getCovariates( EvaluationDeclaration declaration,
.toBuilder();

// Set the intended scale function, if declared
if ( Objects.nonNull( covariateDataset.rescaleFunction() ) )
if ( Objects.nonNull( nextFilter.rescaleFunction() ) )
{
covariateScale.setFunction( covariateDataset.rescaleFunction() );
covariateScale.setFunction( nextFilter.rescaleFunction() );
}

covariateTimeScale = TimeScaleOuter.of( covariateScale.build() );

LOGGER.debug( "Adjusted the covariate timescale function for variable {} from {} to {}.",
covariateDataset.dataset()
.variable()
.name(),
nextFilter.dataset()
.variable()
.name(),
desiredTimeScale.getFunction(),
covariateTimeScale.getFunction() );
}

// Create a filter
Predicate<Double> filter = d -> ( Objects.isNull( covariateDataset.minimum() )
|| d >= covariateDataset.minimum() )
&& ( Objects.isNull( covariateDataset.maximum() )
|| d <= covariateDataset.maximum() );
Covariate<Double> covariate = new Covariate<>( covariateDataset, filter, covariateTimeScale, upscaler );
Predicate<Double> filter = d -> ( Objects.isNull( nextFilter.minimum() )
|| d >= nextFilter.minimum() )
&& ( Objects.isNull( nextFilter.maximum() )
|| d <= nextFilter.maximum() );
Covariate<Double> covariate = new Covariate<>( nextFilter, filter, covariateTimeScale, upscaler );
covariates.add( covariate );
}

Expand Down
16 changes: 16 additions & 0 deletions src/wres/pipeline/pooling/PoolProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import wres.config.yaml.components.DatasetOrientation;
import wres.config.yaml.components.SamplingUncertainty;
import wres.datamodel.bootstrap.BootstrapUtilities;
import wres.datamodel.pools.PoolSlicer;
import wres.metrics.FunctionFactory;
import wres.metrics.ScalarSummaryStatisticFunction;
import wres.metrics.SummaryStatisticsCalculator;
Expand Down Expand Up @@ -470,6 +471,21 @@ private List<Statistics> createStatistics( Pool<TimeSeries<Pair<L, R>>> pool,
SamplingUncertainty samplingUncertainty,
Function<Pool<TimeSeries<Pair<L, R>>>, Pair<Long, Duration>> blockSizeEstimator )
{
// Short-circuit when there are no pairs: GitHub issue #397
int poolEventCount = PoolSlicer.getEventCount( pool );

if ( poolEventCount == 0 )
{
LOGGER.debug( "Skipping the generation of statistics for a pool with no pairs: {}", pool.getMetadata() );
return List.of();
}
else
{
LOGGER.debug( "Creating statistics for a pool that contains {} time-series events: {}",
poolEventCount,
pool.getMetadata() );
}

// Compute the statistics
Function<Pool<TimeSeries<Pair<L, R>>>, List<StatisticsStore>> processor =
this.getStatisticsProcessingTask( this.metricProcessors,
Expand Down
Loading

0 comments on commit 888f5d4

Please sign in to comment.