Skip to content

Commit

Permalink
Merge pull request #301 from NOAA-OWP/issue292
Browse files Browse the repository at this point in the history
Issue292
  • Loading branch information
james-d-brown authored Sep 3, 2024
2 parents b1715d0 + 296021e commit f6b283e
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 43 deletions.
6 changes: 4 additions & 2 deletions wres-reading/src/wres/reading/wrds/ahps/WrdsAhpsReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -586,9 +586,11 @@ private Map<String, String> createWrdsAhpsUrlParameters( Pair<Instant, Instant>
}

urlParameters.put( timeTag,
"[" + dateRange.getLeft().toString()
"[" + dateRange.getLeft()
.toString()
+ ","
+ dateRange.getRight().toString()
+ dateRange.getRight()
.toString()
+ "]" );

return Collections.unmodifiableMap( urlParameters );
Expand Down
46 changes: 46 additions & 0 deletions wres-reading/src/wres/reading/wrds/nwm/DateTimeDeserializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package wres.reading.wrds.nwm;

import java.io.IOException;
import java.time.Instant;
import java.time.format.DateTimeFormatter;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;

import wres.reading.ReaderUtilities;

/**
* Custom deserializer for a datetime string in the ISO8601 "basic" format with optional minutes and seconds. For
* example: 20240830T12Z, 20240830T1200Z and 20240830T120000Z are all acceptable.
*
* @author James Brown
*/
public class DateTimeDeserializer extends JsonDeserializer<Instant>
{
@Override
public Instant deserialize( JsonParser jp, DeserializationContext context )
throws IOException
{
JsonNode node = jp.getCodec()
.readTree( jp );

String time;

// Parse the instant.
if ( node.isTextual() )
{
time = node.asText();
}
else
{
throw new IOException( "Could not find a datetime field in the document, which is not allowed." );
}

// Lenient formatting in the "basic" ISO8601 format, hours and seconds are optional
DateTimeFormatter formatter = DateTimeFormatter.ofPattern( "yyyyMMdd'T'HH[mm[ss]]'Z'" )
.withZone( ReaderUtilities.UTC );
return formatter.parse( time, Instant::from );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import java.io.IOException;
import java.io.Serial;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
Expand All @@ -15,13 +13,17 @@

/**
* Custom deserializer to allow for handling a null value in a NWM data point.
* @author Hank.Herr
* @author Hank Herr
* @author James Brown
*/
public class NwmDataPointDeserializer extends StdDeserializer<NwmDataPoint>
{
@Serial
private static final long serialVersionUID = 5616289115474402095L;

/** Deserializer for a datetime {@link Instant}. **/
private static final DateTimeDeserializer INSTANT_DESERIALIZER = new DateTimeDeserializer();

/**
* Creates an instance.
*
Expand All @@ -43,21 +45,25 @@ public NwmDataPointDeserializer()
@Override
public NwmDataPoint deserialize( JsonParser jp, DeserializationContext ctxt ) throws IOException
{
JsonNode node = jp.getCodec().readTree( jp );
JsonNode node = jp.getCodec()
.readTree( jp );

JsonNode timeNode = node.get( "time" );
JsonParser parser = timeNode.traverse();
parser.setCodec( jp.getCodec() );

//Parse the instant.
DateTimeFormatter formatter = new DateTimeFormatterBuilder()
.appendPattern( "uuuuMMdd'T'HHX" )
.toFormatter();
Instant instant = formatter.parse( node.get( "time" ).asText(), Instant::from );
// Parse the instant.
Instant instant = INSTANT_DESERIALIZER.deserialize( parser, ctxt );

//Parse the value. Note that if the value is null, the node will not be
//null. Rather, isNull will return true. So there is not need to check
//explicitly for null.
// Parse the value. Note that if the value is null, the node will not be
// null. Rather, isNull will return true. So there is no need to check
// explicitly for null.
double value = MissingValues.DOUBLE;
if ( !node.get( "value" ).isNull() )
if ( !node.get( "value" )
.isNull() )
{
value = Double.parseDouble( node.get( "value" ).asText() );
value = node.get( "value" )
.asDouble();
}

return new NwmDataPoint( instant, value );
Expand Down
2 changes: 2 additions & 0 deletions wres-reading/src/wres/reading/wrds/nwm/NwmForecast.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.commons.lang3.builder.ToStringBuilder;

/**
Expand All @@ -28,6 +29,7 @@ public class NwmForecast
public NwmForecast( @JsonProperty( "reference_time" )
@JsonFormat( shape = JsonFormat.Shape.STRING,
pattern = "uuuuMMdd'T'HHX" )
@JsonDeserialize( using = DateTimeDeserializer.class )
Instant referenceDatetime,
@JsonProperty( "features" )
List<NwmFeature> nwmFeatures )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private Supplier<TimeSeriesTuple> getTimeSeriesSupplier( DataSource dataSource,
// Create a supplier that returns a time-series once complete
return () -> {

// Read all of the time-series eagerly on first use: this will still delay any read until a terminal stream
// Read the time-series eagerly on first use: this will still delay any read until a terminal stream
// operation pulls from the supplier (which is why we use a reference holder and do not request the
// time-series outside of this lambda), but it will then acquire all the time-series eagerly, i.e., now
if ( Objects.isNull( timeSeriesTuples.get() ) )
Expand Down
54 changes: 38 additions & 16 deletions wres-reading/src/wres/reading/wrds/nwm/WrdsNwmReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAdjusters;
Expand Down Expand Up @@ -256,10 +255,14 @@ private Stream<TimeSeriesTuple> read( DataSource dataSource, EvaluationDeclarati
// The chunked features
List<List<String>> featureBlocks = ListUtils.partition( features, this.getFeatureChunkSize() );

LOGGER.debug( "Will request data for these feature chunks: {}.", featureBlocks );

// Date ranges
Set<Pair<Instant, Instant>> dateRanges = WrdsNwmReader.getWeekRanges( declaration, dataSource );

// Combine the features and date ranges to form the chunk boundaries
LOGGER.debug( "Will request data for these datetime chunks: {}.", dateRanges );

// Combine the features and date ranges to form the overall chunk boundaries
Set<Pair<List<String>, Pair<Instant, Instant>>> chunks = new HashSet<>();
for ( List<String> nextFeatures : featureBlocks )
{
Expand Down Expand Up @@ -495,7 +498,7 @@ private static Set<Pair<Instant, Instant>> getWeekRanges( EvaluationDeclaration
ZonedDateTime latest = dates.maximum()
.atZone( ReaderUtilities.UTC );

LOGGER.debug( "Given {} parsed {} for latest.",
LOGGER.debug( "Given {} calculated {} for latest.",
dates.maximum(),
latest );

Expand Down Expand Up @@ -614,8 +617,12 @@ private URI getUriForChunk( URI baseUri,
use );
}

return ReaderUtilities.getUriWithParameters( uriWithLocation,
wrdsParameters );
URI uri = ReaderUtilities.getUriWithParameters( uriWithLocation,
wrdsParameters );

LOGGER.debug( "Will request time-series data with this URI: {}.", uri );

return uri;
}

/**
Expand Down Expand Up @@ -650,29 +657,44 @@ private Map<String, String> createWrdsNwmUrlParameters( Pair<Instant, Instant> r
// This will override the parameter added above.
urlParameters.putAll( additionalParameters );

String leftWrdsFormattedDate = WrdsNwmReader.iso8601TruncatedToHourFromInstant( range.getLeft() );
String rightWrdsFormattedDate = WrdsNwmReader.iso8601TruncatedToHourFromInstant( range.getRight() );
Pair<String, String> wrdsFormattedDates = WrdsNwmReader.toBasicISO8601String( range.getLeft(),
range.getRight() );
urlParameters.put( "reference_time",
"(" + leftWrdsFormattedDate
"(" + wrdsFormattedDates.getLeft()
+ ","
+ rightWrdsFormattedDate
+ wrdsFormattedDates.getRight()
+ "]" );
urlParameters.put( "validTime", "all" );

return Collections.unmodifiableMap( urlParameters );
}

/**
* The WRDS NWM API uses a concise ISO-8601 format rather than RFC3339 instant.
* @param instant the instance
* The WRDS NWM API uses the basic ISO-8601 format for the date range.
* @param left the instant
* @param right the right instant
* @return the ISO-8601 instant string
*/
private static String iso8601TruncatedToHourFromInstant( Instant instant )
private static Pair<String, String> toBasicISO8601String( Instant left, Instant right )
{
String dateFormat = "uuuuMMdd'T'HHX";
String dateFormat = "yyyyMMdd'T'HH'Z'";

ZonedDateTime zonedLeft = left.atZone( ReaderUtilities.UTC );
ZonedDateTime zonedRight = right.atZone( ReaderUtilities.UTC );

if ( zonedLeft.getMinute() > 0
|| zonedLeft.getSecond() > 0
|| zonedRight.getMinute() > 0
|| zonedRight.getSecond() > 0 )
{
dateFormat = "yyyyMMdd'T'HHmmss'Z'";
}

DateTimeFormatter formatter = DateTimeFormatter.ofPattern( dateFormat )
.withZone( ZoneId.of( "UTC" ) );
return formatter.format( instant );
.withZone( ReaderUtilities.UTC );
String leftString = formatter.format( left );
String rightString = formatter.format( right );

return Pair.of( leftString, rightString );
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -134,7 +133,7 @@ void testReadObservationsFromStreamResultsInOneTimeSeries() throws IOException
Stream<TimeSeriesTuple> tupleStream = reader.read( this.fakeSource, inputStream ) )
{
List<TimeSeries<Double>> actual = tupleStream.map( TimeSeriesTuple::getSingleValuedTimeSeries )
.collect( Collectors.toList() );
.toList();

TimeSeriesMetadata metadata = TimeSeriesMetadata.of( Map.of( ReferenceTimeType.T0,
Instant.parse( "2020-01-12T00:00:00Z" ) ),
Expand Down
93 changes: 85 additions & 8 deletions wres-reading/test/wres/reading/wrds/nwm/WrdsNwmReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -384,20 +384,17 @@ void testReadReturnsThreeChunkedForecastTimeSeries()
Parameters parametersOne = new Parameters( new Parameter( "proj", "UNKNOWN_PROJECT_USING_WRES" ),
new Parameter( "reference_time",
"(20220102T00Z,20220109T00Z]" ),
new Parameter( "forecast_type", "deterministic" ),
new Parameter( "validTime", "all" ) );
new Parameter( "forecast_type", "deterministic" ) );

Parameters parametersTwo = new Parameters( new Parameter( "proj", "UNKNOWN_PROJECT_USING_WRES" ),
new Parameter( "reference_time",
"(20220109T00Z,20220116T00Z]" ),
new Parameter( "forecast_type", "deterministic" ),
new Parameter( "validTime", "all" ) );
new Parameter( "forecast_type", "deterministic" ) );

Parameters parametersThree = new Parameters( new Parameter( "proj", "UNKNOWN_PROJECT_USING_WRES" ),
new Parameter( "reference_time",
"(20220116T00Z,20220123T00Z]" ),
new Parameter( "forecast_type", "deterministic" ),
new Parameter( "validTime", "all" ) );
new Parameter( "forecast_type", "deterministic" ) );

this.mockServer.when( HttpRequest.request()
.withPath( FORECAST_PATH )
Expand Down Expand Up @@ -502,7 +499,6 @@ void testReadReturnsThreeChunkedForecastTimeSeries()
VerificationTimes.exactly( 1 ) );
}


/**
* Tests for an expected exception and not an unexpected one. See #109238.
*/
Expand Down Expand Up @@ -574,4 +570,85 @@ void testReadDoesNotThrowClassCastExceptionWhenChunkingFeatures()
} );
}

}
@Test
void testReadRequestsDateRangeWithMinutesAndSeconds()
{
this.mockServer.when( HttpRequest.request()
.withPath( ANALYSIS_PATH )
.withMethod( GET ) )
.respond( HttpResponse.response( ANALYSIS_RESPONSE ) );

URI fakeUri = URI.create( "http://localhost:"
+ this.mockServer.getLocalPort()
+ "/api/v1/nwm/ops/analysis_assim/"
+ ANALYSIS_PARAMS );

Source fakeDeclarationSource = SourceBuilder.builder()
.uri( fakeUri )
.sourceInterface( SourceInterface.WRDS_NWM )
.build();

Dataset dataset = DatasetBuilder.builder()
.sources( List.of( fakeDeclarationSource ) )
.variable( VariableBuilder.builder()
.name( "streamflow" )
.build() )
.build();

DataSource fakeSource = DataSource.of( DataSource.DataDisposition.JSON_WRDS_NWM,
fakeDeclarationSource,
dataset,
Collections.emptyList(),
fakeUri,
DatasetOrientation.LEFT,
null );

SystemSettings systemSettings = Mockito.mock( SystemSettings.class );
Mockito.when( systemSettings.getMaximumWebClientThreads() )
.thenReturn( 6 );
Mockito.when( systemSettings.getPoolObjectLifespan() )
.thenReturn( 30_000 );

// Create a declaration with a short period of valid dates to create one chunked request where the first and
// last datetimes both contain minutes and seconds
Instant minimum = Instant.parse( "2024-09-01T00:13:00Z" );
Instant maximum = Instant.parse( "2024-09-03T00:27:59Z" );
TimeInterval interval = TimeIntervalBuilder.builder()
.minimum( minimum )
.maximum( maximum )
.build();
Set<GeometryTuple> geometries
= Set.of( GeometryTuple.newBuilder()
.setLeft( Geometry.newBuilder()
.setName( Integer.toString( NWM_FEATURE_ID ) ) )
.build() );
Features features = FeaturesBuilder.builder()
.geometries( geometries )
.build();
EvaluationDeclaration declaration = EvaluationDeclarationBuilder.builder()
.validDates( interval )
.features( features )
.build();

WrdsNwmReader reader = WrdsNwmReader.of( declaration, systemSettings );

Parameters parametersOne = new Parameters( new Parameter( "proj", "UNKNOWN_PROJECT_USING_WRES" ),
new Parameter( "reference_time",
"(20240901T000000Z,20240903T002759Z]" ),
new Parameter( "forecast_type", "deterministic" ) );

try ( Stream<TimeSeriesTuple> tupleStream = reader.read( fakeSource ) )
{
List<TimeSeries<Double>> actual = tupleStream.map( TimeSeriesTuple::getSingleValuedTimeSeries )
.toList();

assertEquals( 1, actual.size() );

// One request made with parameters one
this.mockServer.verify( request().withMethod( GET )
.withPath( ANALYSIS_PATH )
.withQueryStringParameters( parametersOne ),
VerificationTimes.exactly( 1 ) );
}
}
}

0 comments on commit f6b283e

Please sign in to comment.